要点
- 线程池,功能是管理线程资源,管理任务队列。空闲线程从任务队列中取出任务完成,需要线程时将任务挂在任务对列。
- 由于对任务队列的操作要实现同步,引入了互斥信号量和条件变量。
- 当线程池生命周期结束时,不允许再挂载任务,引入一个原子变量stop_flag
- 实现线程池ThreadPool类,成员变量有:
- vector : workers
- queue<function<void()>> : tasks
- mtx : mutex
- cv :conditon_vatiable
- stop_flag : atomic
- 实现线程池ThreadPool类,成员函数有:
- ThreadPool(int size) : 开辟大小为size的线程池,创建size个线程worker存在workers里,安排worker从tasks中取出任务执行
- void puts(Function<void()>&& F) : 把任务f挂在任务队列tasks下
- ~ThreadPool() : stop并join结束所有线程
代码实现
#include <iostream>#include <vector>#include <queue>#include <thread>#include <memory>#include <mutex>#include <atomic>#include <functional>using namespace std;class ThreadPool {private:vector<thread> workers;queue<function<void()>> tasks;mutex mtx_for_que;condition_variable cv;atomic<bool> stop_flag;public:ThreadPool(int size);void make_Task(function<void()>&& f);~ThreadPool();};ThreadPool::ThreadPool(int size) : stop_flag(false) {for (int i = 0; i < size; ++i) {workers.emplace_back([this]() {for (;;) {unique_lock<mutex> ulock(this->mtx_for_que);while (!this->stop_flag && this->tasks.empty()) {this->cv.wait(ulock);}if (this->stop_flag && this->tasks.empty()) {return;}function<void()> task = move(this->tasks.front());this->tasks.pop();// 执行task();}});}}void ThreadPool::make_Task(function<void()>&& f) {function<void()> task = f;unique_lock<mutex> ulock(mtx_for_que);if (stop_flag) {cout << "This ThreadPool has been stopped!" << endl;return;}tasks.emplace([task]() {(task)(); });cv.notify_one();return;}ThreadPool::~ThreadPool() {stop_flag = true;cv.notify_all();for (auto& worker : workers) {worker.join();}return;}int main() {ThreadPool pool(4);for (int i = 0; i < 20; ++i) {pool.make_Task([i]() {cout << i << endl;});}return 0;}