c++ 线程池/Github 开源项目源码分析(progschj/ThreadPool)
- 前言
- [ThreadPool 项目地址](https://github.com/progschj/ThreadPool)
- 项目源码:
- 基本用法
- 类成员变量
- 类成员函数
- 构造函数的签名
- 创建线程
- 线程默认的任务
- 向任务队列中添加一个任务
- 析构函数
 
- 总结
前言
维基百科上对线程池的简要介绍:
线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
ThreadPool 项目地址
progschj/ThreadPool 是一个简易的基于 c++11 标准的线程池实现,采用了 Zlib license(相当宽松自由的开源协议,任意修改分发商用),截止当前时间点,已获得 7k+ stars。整个项目源码仅有一个头文件,代码行数不足一百行,早在多年前就已稳定不再更新。
项目源码:
#ifndef THREAD_POOL_H
#define THREAD_POOL_H#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool {
public:ThreadPool(size_t);template<class F, class... Args>auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();
private:// need to keep track of threads so we can join themstd::vector< std::thread > workers;// the task queuestd::queue< std::function<void()> > tasks;// synchronizationstd::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)workers.emplace_back([this]{for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });}condition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread& worker : workers)worker.join();
}#endif
基本用法
// create thread pool with 4 worker threads
ThreadPool pool(4);// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);// get result from future
std::cout << result.get() << std::endl;
类成员变量
std::vector< std::thread > workers;
std::queue< std::function<void()> > tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
- workers:存储线程池中 std::thread 的容器
- tasks:任务队列
- queue_mutex:任务队列的互斥锁
- condition:任务队列的条件变量
- stop:线程池是否停止的标志位
类成员函数
构造函数的签名
inline ThreadPool::ThreadPool(size_t threads): stop(false)
- 构造函数传入一个 size_t类型的参数,初始化线程池中线程的数量
- 初始化列表将 stop 标志位初始化为 false
创建线程
for (size_t i = 0; i < threads; ++i)
{workers.emplace_back([this]{for (;;){//...}});
}
- 使用 for 循环创建 threads 个线程,将线程加入 workers 容器
- lambda 表达式用于创建线程,捕获 this,lambda 表达式中包含一个无限循环
线程默认的任务
std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();
}task();
- 首先声明了一个 std::function<void()>类型的变量 task
- 在互斥锁保护任务队列后,调用 condition.wait()等待任务队列非空或线程池停止,线程创建后,会在这里等待;如果 stop 标志位为 true 或者任务队列不为空,解除等待,继续往下执行
- 如果标志位 stop 为 true,且任务队列为空,此任务将退出
- 以上条件都通过后,将从任务队列中取出一个任务 task,移动到局部变量 task 中(吐槽下:距离 c++11 标准的发布已经过去了十几年,现在还不明白这一条的,就很难评价了)
- 执行 task(),也就是上一步从队列头部取出的任务
向任务队列中添加一个任务
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)-> std::future<typename std::result_of<F(Args...)>::type>;
- enqueue函数模板,用于任务的入队列
- F&& f,这里预期是一个任意的 callable 对象
- Args&&... args,一个可变模板参数,会在编译期展开参数包
- 返回值是一个 std::future类型的对象,用于获取任务的执行结果,std::future的模板参数使用std::result_of萃取可调用对象的返回值类型
- 注意,c++17 后 std::result_of就已经是 deprecated,可以使用std::invoke_result类型萃取
继续往下看 enqueue 函数的实现:
using return_type = typename std::result_of<F(Args...)>::type;
- 使用 std::result_of类型萃取可调用对象的返回值类型,并使用 using 为其起个别名 reture_type
auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
这一段做了好几件事,一步一步拆解:
- std::make_shared构建一个- std::shared_ptr
- std::packaged_task模板是用于包装 callable 对象,使用了前面推导出的- return_type类型来实例化模板
- 而 std::make_shared需要调用实例类型的构造函数,而std::packaged_task的构造函数需要一个可调用对象,所以这里使用std::bind将可变模板参数绑定给 f(对std::bind不熟悉的建议先行查阅资料),std::forward转发一下类型
- 简单来说,以上只是构建一个 callable 对象的包装器
std::future<return_type> res = task->get_future();
{std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the poolif (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
- 从 task 中获取 std::future对象
- 使用大括号控制代码块,在这个代码块中上锁
- 如果线程池已经停止,抛出异常
- 否则正常执行,将 task 推入到队列尾部
- 条件变量通知一个等待的线程,这个时候,构造函数中 condition.wait()会被唤醒,以执行后面的代码块,即从队列头部取出一个任务并执行
- 最后返回 std::future对象
析构函数
遵循 RAII 原则,释放所有资源
{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;
}
condition.notify_all();
for (std::thread& worker : workers)worker.join();
- 上锁,将停止标志位置为 true
- 通知所有等待的线程
- 等待所有线程终止
总结
该项目仅是一个线程池的简易实现,对学习 c++11 标准的多线程及部分特性有一定帮助,如果想要更复杂的具有各种调度策略的线程池,还需进一步细化。