单任务队列线程池

C++ 11引入了五个头文件用于支持多线程编程,但是在很多场合下,工作的子任务往往是循环往复的,因此产生了线程池的应用,线程池一个重要的初衷就是希望将线程成池管理起来,由我们去控制它的分配和使用,需要时取出线程分配使用,当线程结束时它能够被重新回到池中复用。在搜寻相关资料时看到一篇通俗易懂的文章,这部分的代码参考自原文,对语法糖不熟悉的可以移步学习。

其基本工作流如下: 单任务队列线程池

积木一:安全队列

安全队列是一个带锁的FIFO结构,用于管理各种任务的出入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template<typename T>
class SafeQueue{
private:
mutex qlock;
queue<T> m_queue;
public:
SafeQueue(){}
SafeQueue(SafeQueue&& otherQueue){}
~SafeQueue(){}

bool DeQueue(T& t){
unique_lock<mutex> lock(qlock);
if(m_queue.empty())
return false;
//代表资源所有权转到t,如果T不支持移动语义,退化为拷贝赋值操作
t = move(m_queue.front());
m_queue.pop();
return true;
}

void EnQueue(T& t){
unique_lock<mutex> lock(qlock);
m_queue.emplace(t);
}

bool empty(){
unique_lock<mutex> lock(qlock);
return m_queue.empty();
}

int size(){
unique_lock<mutex> lock(qlock);
return m_queue.size();
}
};

积木二:提交函数

提交函数应该能够做到:

    1. 接收任何类型和数量参数的任何函数(普通函数、匿名函数、成员函数等);
    1. 能返回线程执行的结果;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//函数指针类、参数类
template <typename F,typename... Args >
auto submit(F&& f, Args&&... args) -> future<decltype(f(args...))>{ //返回类型是future<根据函数f推导的返回类型>

//封装一个函数指针,该函数指针接收任意类型函数和任意类型/数量参数,且能保留其左右值特性(前万能引用,这里完美转发)
//这里的参数全部使用bind来指定,因此function的参数列表为空
function<decltype(f(args...))()> func = bind(forward<F>(f), forward<Args>(args)...);

//封装一个异步调用,这个指针可能被多个线程调用,使用共享指针再封装
auto task_ptr = make_shared<packaged_task<decltype(f(args...))()>>(func);

//这个函数指针需要手动调用,此处再使用一层封装
function<void()> wrapper_func = [task_ptr](){
(*task_ptr)();
};

//入队安全队列
safeQueue.EnQueue(wrapper_func);

//唤醒一个等待中的线程
cond.notify_one();

//返回future对象,当线程执行完结果,该对象调用get函数即可获取结果;
return task_ptr->get_future();
}

此处使用的packaged_task是没有见过的语法糖,其可以将任何可调用对象封装成异步调用任务,与std::future和thread配合使用异步执行,使用get同步获取结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <iostream>
#include <future>
#include <thread>
#include <functional> //for bind&&ref

class Person{
public:
static int sub(int a,int b){
return a-b;
}

int operator()(int c,int d){
return c*d;
}

int div(int a,int b){
if(b==0)
return 0;
return a/b;
}
};

int add(int a, int b){
return a+b;
}

int main(){
std::packaged_task<int(int,int)> task1(add); //普通函数
std::packaged_task<int(int,int)> task2(&Person::sub); //静态成员
std::packaged_task<int(int,int)> task3([](int a,int b){return a*b;}); //匿名函数
std::packaged_task<int(int,int)> task4{Person()}; //仿函数法一

Person p;
std::packaged_task<int(int,int)> task5(std::ref(p)); //仿函数法二
//普通成员函数
std::packaged_task<int(int,int)> task6(std::bind(&Person::div,&p,std::placeholders::_1,std::placeholders::_2));

//future结果对象
std::future<int> ret1 = task1.get_future();std::future<int> ret2 = task2.get_future();
std::future<int> ret3 = task3.get_future();std::future<int> ret4 = task4.get_future();
std::future<int> ret5 = task5.get_future();std::future<int> ret6 = task6.get_future();

//optional:异步执行
std::thread t1(std::move(task1),1,2); std::thread t2(std::move(task2),1,2);
std::thread t3(std::move(task3),1,2); std::thread t4(std::move(task4),1,2);
//同步执行
task5(1,2); task6(1,2);

//结果依次应为3、-1、2、2、2、0
std::cout<<ret1.get()<<std::endl; std::cout<<ret2.get()<<std::endl;
std::cout<<ret3.get()<<std::endl;std::cout<<ret4.get()<<std::endl;
std::cout<<ret5.get()<<std::endl; std::cout<<ret6.get()<<std::endl;

t1.join();t2.join();
t3.join();t4.join();
return 0;
}
其中如果仿函数直接写成std::packaged_task<int(int,int)> task4(Person())是不正确的,而且这个错误比较反直觉,因为其他函数均是接收函数指针,这里却要求写入实例化类,所以使用中括号代表Person()是一个实例化类(调用无参构造),此处也可以使用有参构造,更直观的是引入std::ref(p)代表获取类实例的引用;

除了这个语法糖,make_shared制作的共享指针使得packaged_task对象成为可拷贝的,因此能够被值引用拷贝并且调用,也保证了其在调用过程不会被析构产生未定义错误;

积木三:内置工作类

该内置类负责从任务队列取出任务并且执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
mutex m_lock;
condition_variable cond;
SafeQueue<function<void()>> safeQueue; //任务队列
bool isClosed = false;

//内置工作类
class ThreadWorker{
private:
int t_id;
ThreadPool *m_pool;

public:
ThreadWorker(ThreadPool *mpool, int tid):t_id(tid), m_pool(mpool){}

void operator()(){
function<void()> func; //出队元素,注意提交函数入队就是该类型
bool dequeued = false;
while(!m_pool->isClosed){
unique_lock<mutex>lock(m_pool->m_lock);
while(m_pool->safeQueue.empty()&&!m_pool->isClosed){
m_pool->cond.wait(lock);
}
dequeued = m_pool->safeQueue.DeQueue(func); //从任务队列取任务
if(dequeued) //取到,则执行
func();
}
}
};

积木四:线程池

积木二和积木三都是线程池的一部分,除了提交函数和内置出队执行函数,线程池需要定义初始化(为分配线程)和关闭线程池(回收线程资源):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ThreadPool{
private:
std::vector<std::thread> m_threads; //线程队列
public:
ThreadPool(const int nthreads = 4):m_threads(std::vector<std::thread>(nthreads)){}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;

void init(){
for(int i=0; i<m_threads.size(); i++){
//线程执行工作类operator()函数:取任务并执行任务,初始时无任务处于cond.wait状态
m_threads[i] = std::thread(ThreadWorker(this,i));
}
}

void close(){
isClosed = true;
cond.notify_all();
for(int i=0; i<m_threads.size(); i++){
if(m_threads[i].joinable())
m_threads[i].join();
}
}

//内置工作类......
//提交函数......
};

完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#include <iostream>
#include <mutex>
#include <queue>
#include <future>
#include <condition_variable>
#include <functional>

using namespace std;

//安全队列
template<typename T>
class SafeQueue{
private:
mutex qlock;
queue<T> m_queue;
public:
SafeQueue(){}
SafeQueue(SafeQueue&& otherQueue){}
~SafeQueue(){}

bool DeQueue(T& t){
unique_lock<mutex> lock(qlock);
if(m_queue.empty())
return false;
t = move(m_queue.front());
m_queue.pop();
return true;
}

void EnQueue(T& t){
unique_lock<mutex> lock(qlock);
m_queue.emplace(t);
}

bool empty(){
unique_lock<mutex> lock(qlock);
return m_queue.empty();
}

int size(){
unique_lock<mutex> lock(qlock);
return m_queue.size();
}
};

class ThreadPool{
private:
mutex m_lock;
condition_variable cond;
SafeQueue<function<void()>> safeQueue; //任务队列
bool isClosed = false;

//内置工作类
class ThreadWorker{
private:
int t_id;
ThreadPool *m_pool;

public:
ThreadWorker(ThreadPool *mpool, int tid):t_id(tid), m_pool(mpool){}

void operator()(){
function<void()> func; //出队元素,注意提交函数入队就是该类型
bool dequeued = false;
while(!m_pool->isClosed){
unique_lock<mutex>lock(m_pool->m_lock);
while(m_pool->safeQueue.empty()&&!m_pool->isClosed){
m_pool->cond.wait(lock);
}
dequeued = m_pool->safeQueue.DeQueue(func); //从任务队列取任务
if(dequeued) //取到,则执行
func();
}
}
};

private:
vector<thread> m_threads; //线程队列
public:
ThreadPool(const int nthreads = 4):m_threads(vector<thread>(nthreads)){}

ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;

void init(){
for(int i=0; i<m_threads.size(); i++){
m_threads[i] = thread(ThreadWorker(this,i)); //线程执行工作类operator()函数:取任务并执行任务
}
}

void close(){
isClosed = true;
cond.notify_all();
for(int i=0; i<m_threads.size(); i++){
if(m_threads[i].joinable())
m_threads[i].join();
}
}

//函数指针类、参数类
template <typename F,typename... Args >
auto submit(F&& f, Args&&... args) -> future<decltype(f(args...))>{ //返回类型是future<根据函数f推导的返回类型>

//封装一个函数指针,该函数指针接收任意类型函数和任意类型/数量参数,且能保留其左右值特性(前万能引用,这里完美转发)
//这里的参数全部使用bind来指定,因此function的参数列表为空
function<decltype(f(args...))()> func = bind(forward<F>(f), forward<Args>(args)...);

//封装一个异步调用,这个指针可能被多个线程调用,使用共享指针再封装
auto task_ptr = make_shared<packaged_task<decltype(f(args...))()>>(func);

//这个函数指针需要手动调用,此处再使用一层封装
function<void()> wrapper_func = [task_ptr](){
(*task_ptr)();
};

//入队安全队列
safeQueue.EnQueue(wrapper_func);

//唤醒一个等待中的线程
cond.notify_one();

//返回future对象,当线程执行完结果,该对象调用get函数即可获取结果;
return task_ptr->get_future();
}
};

////////////////////////////////////////////测试函数////////////////////////////

#include <thread>

void sleepAWhile(){
this_thread::sleep_for(chrono::milliseconds(2000));
}

void add(int a,int b){
sleepAWhile();
cout<<"Add result:"<< a+b<<endl;
}

int sub(int a,int b){
sleepAWhile();
return a-b;
}

int main(){
ThreadPool pool(5);

pool.init();

for(int i=0; i<4; i++){
for(int j=0; j<5; j++){
pool.submit(add,i,j); //无返回
}
}

//带返回
future<int> ret = pool.submit(sub,3,5);

cout<<"Sub result:"<<ret.get()<<endl;

pool.close();
return 0;
}

加延时更方便查看线程执行顺序,其中cout<<"Sub result:"<<ret.get()<<endl虽然是一个语句,但是分成两步执行,主线程首先打印字符,等待get阻塞完成再单独打印结果,加法则依次输出结果,简单线程池测试通过。

参考链接:

  1. 当我谈线程池时我谈些什么——线程池学习笔记

  2. 异步线程池(基于C++11实现)