C++ 11 线程池
单任务队列线程池
C++ 11引入了五个头文件用于支持多线程编程,但是在很多场合下,工作的子任务往往是循环往复的,因此产生了线程池的应用,线程池一个重要的初衷就是希望将线程成池管理起来,由我们去控制它的分配和使用,需要时取出线程分配使用,当线程结束时它能够被重新回到池中复用。在搜寻相关资料时看到一篇通俗易懂的文章,这部分的代码参考自原文,对语法糖不熟悉的可以移步学习。
其基本工作流如下:
积木一:安全队列
安全队列是一个带锁的FIFO结构,用于管理各种任务的出入:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35template<typename T>
class SafeQueue{
private:
mutex qlock;
queue<T> m_queue;
public:
SafeQueue(){}
SafeQueue(SafeQueue&& otherQueue){}
~SafeQueue(){}
bool DeQueue(T& t){
unique_lock<mutex> lock(qlock);
if(m_queue.empty())
return false;
//代表资源所有权转到t,如果T不支持移动语义,退化为拷贝赋值操作
t = move(m_queue.front());
m_queue.pop();
return true;
}
void EnQueue(T& t){
unique_lock<mutex> lock(qlock);
m_queue.emplace(t);
}
bool empty(){
unique_lock<mutex> lock(qlock);
return m_queue.empty();
}
int size(){
unique_lock<mutex> lock(qlock);
return m_queue.size();
}
};
积木二:提交函数
提交函数应该能够做到:
- 接收任何类型和数量参数的任何函数(普通函数、匿名函数、成员函数等);
- 能返回线程执行的结果;
1 | //函数指针类、参数类 |
此处使用的packaged_task
是没有见过的语法糖,其可以将任何可调用对象封装成异步调用任务,与std::future和thread配合使用异步执行,使用get同步获取结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class Person{
public:
static int sub(int a,int b){
return a-b;
}
int operator()(int c,int d){
return c*d;
}
int div(int a,int b){
if(b==0)
return 0;
return a/b;
}
};
int add(int a, int b){
return a+b;
}
int main(){
std::packaged_task<int(int,int)> task1(add); //普通函数
std::packaged_task<int(int,int)> task2(&Person::sub); //静态成员
std::packaged_task<int(int,int)> task3([](int a,int b){return a*b;}); //匿名函数
std::packaged_task<int(int,int)> task4{Person()}; //仿函数法一
Person p;
std::packaged_task<int(int,int)> task5(std::ref(p)); //仿函数法二
//普通成员函数
std::packaged_task<int(int,int)> task6(std::bind(&Person::div,&p,std::placeholders::_1,std::placeholders::_2));
//future结果对象
std::future<int> ret1 = task1.get_future();std::future<int> ret2 = task2.get_future();
std::future<int> ret3 = task3.get_future();std::future<int> ret4 = task4.get_future();
std::future<int> ret5 = task5.get_future();std::future<int> ret6 = task6.get_future();
//optional:异步执行
std::thread t1(std::move(task1),1,2); std::thread t2(std::move(task2),1,2);
std::thread t3(std::move(task3),1,2); std::thread t4(std::move(task4),1,2);
//同步执行
task5(1,2); task6(1,2);
//结果依次应为3、-1、2、2、2、0
std::cout<<ret1.get()<<std::endl; std::cout<<ret2.get()<<std::endl;
std::cout<<ret3.get()<<std::endl;std::cout<<ret4.get()<<std::endl;
std::cout<<ret5.get()<<std::endl; std::cout<<ret6.get()<<std::endl;
t1.join();t2.join();
t3.join();t4.join();
return 0;
}std::packaged_task<int(int,int)> task4(Person())
是不正确的,而且这个错误比较反直觉,因为其他函数均是接收函数指针,这里却要求写入实例化类,所以使用中括号代表Person()是一个实例化类(调用无参构造),此处也可以使用有参构造,更直观的是引入std::ref(p)
代表获取类实例的引用;
除了这个语法糖,make_shared
制作的共享指针使得packaged_task
对象成为可拷贝的,因此能够被值引用拷贝并且调用,也保证了其在调用过程不会被析构产生未定义错误;
积木三:内置工作类
该内置类负责从任务队列取出任务并且执行: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28mutex m_lock;
condition_variable cond;
SafeQueue<function<void()>> safeQueue; //任务队列
bool isClosed = false;
//内置工作类
class ThreadWorker{
private:
int t_id;
ThreadPool *m_pool;
public:
ThreadWorker(ThreadPool *mpool, int tid):t_id(tid), m_pool(mpool){}
void operator()(){
function<void()> func; //出队元素,注意提交函数入队就是该类型
bool dequeued = false;
while(!m_pool->isClosed){
unique_lock<mutex>lock(m_pool->m_lock);
while(m_pool->safeQueue.empty()&&!m_pool->isClosed){
m_pool->cond.wait(lock);
}
dequeued = m_pool->safeQueue.DeQueue(func); //从任务队列取任务
if(dequeued) //取到,则执行
func();
}
}
};
积木四:线程池
积木二和积木三都是线程池的一部分,除了提交函数和内置出队执行函数,线程池需要定义初始化(为分配线程)和关闭线程池(回收线程资源):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29class ThreadPool{
private:
std::vector<std::thread> m_threads; //线程队列
public:
ThreadPool(const int nthreads = 4):m_threads(std::vector<std::thread>(nthreads)){}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
void init(){
for(int i=0; i<m_threads.size(); i++){
//线程执行工作类operator()函数:取任务并执行任务,初始时无任务处于cond.wait状态
m_threads[i] = std::thread(ThreadWorker(this,i));
}
}
void close(){
isClosed = true;
cond.notify_all();
for(int i=0; i<m_threads.size(); i++){
if(m_threads[i].joinable())
m_threads[i].join();
}
}
//内置工作类......
//提交函数......
};
完整示例
1 |
|
加延时更方便查看线程执行顺序,其中cout<<"Sub result:"<<ret.get()<<endl
虽然是一个语句,但是分成两步执行,主线程首先打印字符,等待get阻塞完成再单独打印结果,加法则依次输出结果,简单线程池测试通过。
参考链接: