C++ 11 新特性总结(三):多线程编程
C++11 多线程相关
在Linux操作系统:进程与线程中介绍了Linux系pthread创建和管理线程基本方法,在C++11中,又引入了五个头文件支持多线程编程,它们分别是<thread>
、<future>
、<atomic>
、<mutex>
和<condition_variable>
,带来不少实用的新特性,值得学习。其函数基本使用标准命名空间std,不赘述。
thread
thread用于创建一个线程,其定义了相关构造函数,具体如下:
1
2
3
4
5
6
7
8thread() noexcept; //无参构造函数,一个空线程
template <class Fn, class... Args>
explicit thread(Fn&& fn, Args&&... args); //有参构造,线程+参数
thread(const thread&) = delete; //禁用拷贝构造
thread(thread&& x) noexcept; //移动构造函数
thread的普通构造
thread支持广泛函数类型的线程创建,包括普通函数、匿名函数、成员函数、仿函数等,
thread管理普通函数
因为时间片、CPU调度影响,输出应该是完全乱序的,之所以得到比较有序的输出因为此例是比较简单的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
using namespace std;
void func(){
cout<<"Hello_World!"<<endl;
}
void func1(int age){
cout<<"I am "<<age<<" years old"<<endl;
}
int main(){
thread t(func); //子线程1
thread t1(func1,18); //子线程2,带参数
cout<<"OH My God"<<endl; //主线程
t.join();
t1.join();
return 0;
}
thread管理匿名函数
匿名函数:注意,这里thread_vp[i]= thread(...)
不会触发拷贝构造,而遍历时必须使用引用auto &i
,否则会触发拷贝构造无法编译。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
using namespace std;
int main(){
int num = 18;
thread t([&num](){ //单个匿名函数
cout<<num<<endl;
});
vector<thread> thread_vp(4); //多个匿名函数
for(int i=0; i<4; i++){
thread_vp[i] = thread([i](string name){ //匿名函数捕获变量、参数
cout<<"I am "<<name<<"\t";
cout<<"I am "<<i<<" years old"<<endl;
},"Eden"); //thread传递参数
}
t.join();
for(auto &i:thread_vp){ //注意必须使用引用
i.join();
}
return 0;
}
thread管理成员函数、仿函数
1 |
|
thread的移动构造
通过移动构造转移thread的控制权。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
using namespace std;
void func(){
cout<<"Hello_World!"<<endl;
}
int main(){
thread t(func); //子线程1
thread t1(move(t)); //移动语义
cout<<(t.joinable()==false)<<(t1.joinable()==true)<<endl; //t已经join掉了,返回false
t1.join();
return 0;
}
detach()、join()、joinable()
特性和Linux下接口一致:当thread被join或者detach,thread对应的资源会在线程结束时立刻被回收,因此调用joinable均会返回false。
future
future&promise
在linux多线程中,通常使用pthread_exit
、pthread_join
方式接受线程返回值,为了在线程之中更灵活地进行值传递,C++11引入了promise
和future
,能够进行值的填充和获取。
如下创建主线程、计算线程、打印线程,计算线程填充计算结果,主线程绑定promise
和future
对象,打印进程接受future
对象,打印结果;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
using namespace std;
void Calculate(int a,int b,promise<xint>&& value){
value.set_value(a+b); //生产者线程:set_value()
}
void Print(future<int>&& fvalue){
int result = fvalue.get(); //消费者线程:get()
cout<<"value="<<result<<endl;
}
int main(){
promise<int>value;
future<int>fvalue = value.get_future(); //绑定唯一对象
thread t(Calculate,1,2,move(value)); //子线程1
thread t1(Print,move(fvalue));
t.join();
t1.join();
return 0;
}
future
、promise
的使用要注意以下细节:
future
、promise
是天然同步的,promise
通过set_value
填充值,future
才会get
并且返回,否则future
会阻塞等待。
future
、promise
都不支持复制,确保每个promise
只有唯一的future
对应,因此使用时应该使用move
;
- 对象的
set_value
、get
只使用一次,调用后就不应该再使用了。
- 对象的
future&async
使用promise时,需要设计值填充的策略和时机,具有比较大的灵活性,而有时候我们只关心执行异步线程,并且获取返回结果,这时候可以使用异步程度更高的async
模板:
1
std::future<R> std::async(std::launch policy, Function&& f, Args&&... args);
async
接受策略policy
、函数、参数,返回一个future
对象,通过get
等函数可以获取返回结果。policy
有两种,分别是std::launch::async
和std::launch::deferred
,前者是一种完全异步对象,调用后会立刻在新线程执行函数任务;后者则是一种延迟执行策略,调用后在使用get
或者wait
获取对象时,会在本线程下运行该函数并且获取结果。
例子: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
using namespace std;
int Calculate(int a,int b){
return a+b;
}
int main(){
future<int> fvalue = async(launch::async,Calculate,1,1); //新线程异步执行
cout<<"result:"<<fvalue.get()<<endl; //执行并且接收结果
future<void>fvalue1 = async(launch::deferred,[](string slogan){ //延迟执行
cout<<slogan<<endl;
},"We Are The Champion~马飞");
fvalue1.wait(); //只执行,不获取返回结果
return 0;
}
atomic
实现共享资源的同步、安全访问有多种方式,例如互斥锁mutex、PV操作、条件变量等,但是其开销一般很大,而且需要谨慎地设计资源数、粒度、时机等,避免死锁局面,往往适用于对代码段的保护,C++11 引入了原子类型的概念,允许对变量实现原子操作,从一些锁的底层原理中知道,例如自旋锁的底层实际上也是原子的硬件指令,这表明了一些场合可以通过原子类型实现简洁高效的互斥机制。
atomic_flag
atomic_flag
是最简单的原子类型,为原子布尔类型。其特点:
天然原子的,无需互斥量保证其读写,这种特性称为免锁/无锁(lock-free);
禁用拷贝构造、移动构造、赋值函数,只有默认构造函数,但没有定义初始化为set还是clear,默认初始化使用宏
ATOMIC_FLAG_INIT
;操作接口只有
test_and_set
和clear
,它们也是免锁的,只有一个线程能够成功不支持值的读取或者写入。
调用test_and_set:无论atomic_flag是true还是false,都会尝试将其置为true,但返回时返回的是置位前的标志。换句话说,如果返回原来是true,说明已经被其他线程占用,本线程无法抢占;而如果返回原来是false,则执行本线程逻辑,如下轮询+原子标志位实现自旋锁逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
using namespace std;
atomic_flag aflag = ATOMIC_FLAG_INIT;
void func(string name){
while(aflag.test_and_set()); //返回true说明其他线程占用,自旋等待
cout<<this_thread::get_id<<" :I am "<<name<<endl; //返回false继续执行逻辑
aflag.clear(); //清除true让其他线程获得
}
int main(){
thread t1(func,"Eden");
thread t2(func,"Mike");
t1.join();
t2.join();
return 0;
}
拓展
更高阶的原子操作可能需要考虑内存同步问题,test_and_set()
和clear()
均支持三个参数:
1
2
3std::memory_order_relaxed:松散内存序,提供了最弱的保证,允许最大程度的优化,但不提供同步;
std::memory_order_release:与std::memory_order_acquire配对使用,用于同步的写操作,保证释放操作之前的写入对其他线程可见;
std::memory_order_seq_cst(默认):顺序一致性,提供了最强的保证,确保所有线程观察到的内存操作顺序一致,但可能会限制优化;
atomic<T>
atomic接受的类条件
C++ 11定义了几种类类型,用于描述其在内存中的不同布局特性,分别是平凡类型(Trivial Type)、平凡可复制类型(TrivialCopyable)、标准布局类型(Standard-layout Type);
平凡类型:构造函数、析构函数、拷贝构造、赋值运算符等都没有自定义实现、无虚函数、虚基类、指针类型成员等堆空间分配策略,完全按照默认编译器行为;
平凡可复制类型:无虚函数、虚基类,可自定义部分函数,需要保证拷贝对象时内存按bit拷贝(memmove、memcpy等底层都是按bit)(一说C++17起可允许按byte拷贝),可以通过函数
is_trivially_copyable
判断;标准布局类型:不同编译平台布局是一致的,对跨平台友好,表现在无虚函数、虚基类、动态分配对象,非静态成员都是public,数据成员访问权限一致等。
并不是所有的类模板T都能够作为atomic,例如string就不可以,必须满足平凡可复制条件,需要遵循五个条件检验:
std::is_trivially_copyable<T>::value
:平凡可复制std::is_copy_constructible<T>::value
:拷贝构造可访问;std::is_move_constructible<T>::value
:移动构造可访问;std::is_copy_assignable<T>::value
:拷贝赋值可访问;std::is_move_assignable<T>::value
:移动赋值可访问;
atomic接口函数
原子操作要么通过原子硬件指令,要么通过加锁,前者性能更高、开销更低,与atomic_flag
不同,atomic<T>
的原子性不一定是免锁的,可能通过互斥量的方法,因为编译器、硬件支持等原因,因此C++
11提供了一种运行时判断的方法is_lock_free()
判断原子类型是否免锁,C++
17引入了is_always_lock_free
用于编译时判断原子类型是否免锁。
atomic常用操作接口如下,记录了如何对值进行原子存取操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26atomic() noexcept = default; //默认构造,atomic<T>();
//constexpr代表可用常量初始化、在编译时执行
constexpr atomic (T val) noexcept; //有参构造,atomic<int>(5)
//赋值操作符
T operator= (T val) noexcept; //atomic<int>a = 5;
T operator= (T val) volatile noexcept;
//重载括号,相当于调用了load获取值
operator T() const volatile noexcept; //atomic<int>a = 5;cout<<a;
operator T() const noexcept;
//被禁用的
atomic (const atomic&) = delete; //禁用拷贝构造
atomic& operator= (const atomic&) = delete; //禁用赋值构造
//函数接口,默认内存序是一致序
void store(T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
void store(T val, memory_order sync = memory_order_seq_cst) noexcept; //a.store(5),存入5
T load (memory_order sync = memory_order_seq_cst) const volatile noexcept;
T load (memory_order sync = memory_order_seq_cst) const noexcept; //int ret = a.load(),返回值
T exchange (T val, memory_order sync = memory_order_seq_cst) volatile noexcept;
T exchange (T val, memory_order sync = memory_order_seq_cst) noexcept; //int ret = a.exchange(6),返回旧值5,存入6;
此外,还有两种接口实现compare_exchange_strong
和compare_exchange_weak
判断、修改,见例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
using namespace std;
int main(){
atomic<int>a(5);
int expect = 5;
//判断a和expect是否相等(本质是memcmp,空间位位相等),如果相等a改为7
bool is_change = a.compare_exchange_strong(expect,7); //expect不能传入常量
cout << is_change<<'\t'<<a.load()<<endl;
//a是否等于expect,相等时a改为8;
while(!a.compare_exchange_weak(expect,8));
cout <<a.load()<<endl;
return 0;
}
compare_exchange_strong
和compare_exchange_weak
的区别是:weak判断相等后,也可能返回false,称伪返回,因此一般放在循环中,确保相等时发生赋值;strong则没有这个问题,返回false时一定是因为值不相等,一般而言weak失败概率是可接受的,因此使用weak效率更高;atomic<int>
可以对应替代成其他类类型,第一个参数不能常量直接写入。
mutex、condition_variable
基本特性还是同pthread系的一致,wait仍然是wait,signal变成了notify、lock和unlock仍然不变,此外还多了一些更高阶对象,权当了解。看一个老例子,交换线程和打印线程同步执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
using namespace std;
vector<int>shareNum={1,2,3,4,5,6,7,8};
void Print(){
do{
for(int i=0; i<shareNum.size();i++){
cout<<shareNum[i]<<"\t";
}
cout<<endl;
}while(1);
}
int main(){
thread t(Print);
//主线程执行交换
do{
for(int i=0; i<4; i++){
swap(shareNum[i],shareNum[7-i]);
}
}while(1);
t.join();
return 0;
}
为了实现交换一次、打印一次的逻辑,加上互斥锁和条件变量,互斥锁由于保证交换、打印都是原子的操作,条件变量保证了线程能够切换执行,此时仍然不能保证二者均执行一次,因为主线程获取锁的优先级没有限制,能够反复交换多次再在某次执行打印,这也是老问题了,C++11没有信号量机制,因此这里可以通过标志位解决,再复杂一点只能引入另一个条件变量了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using namespace std;
vector<int>shareNum={1,2,3,4,5,6,7,8};
mutex mlock;
condition_variable cond;
atomic<bool> aflag = 0; //确保子线程先拿到锁
void Print(){
do{
unique_lock<mutex>umlock(mlock); //unique_lock构造即加锁
aflag.store(1); //允许父进程持锁
cond.wait(umlock); //交出锁等待notify
for(int i=0; i<shareNum.size();i++){ //打印
cout<<shareNum[i]<<"\t";
}
cout<<endl; //作用域结束自动解锁
}while(1);
}
int main(){
thread t(Print);
//主线程执行交换
do{
if(aflag==1){
aflag.store(0);
unique_lock<mutex>umlock(mlock); //
for(int i=0; i<4; i++){ //交换
swap(shareNum[i],shareNum[7-i]);
} //作用域结束自动解锁
cond.notify_one(); //唤醒一个线程
}
}while(1);
t.join();
return 0;
}
因为这里的wait
函数只接受unique_lock
类型,因此互斥同步只能结合这样写了,而实际上C++
11的mutex有另外几种类型,也存在传统的lock和unlock写法:
锁类型:
mutex:普通互斥锁,可
lock()
、unlock()
、try_lock()
,try_lock
和lock
的区别是try_lock
获取锁失败会返回false,而lock
会一直阻塞等待直到拿到锁;recursive_mutex:递归锁,同一线程内可以多次上锁,常常和
lock_guard
一起使用,自动管理上锁和析构,当嵌套加锁、解锁次数相当时锁才会被真正释放,否则其他线程仍然无法获得锁。1
2
3
4
5
6
7void recur_lock(int depth){ //嵌套depth深度的锁
recursive_mutex rmtx;
lock_guard<recursive_mutex> rlock(rmtx); //上锁
if(depth>0){
recur_lock(depth-1);
}
}time_mutex:定时互斥锁,只会在某段时间、某个时间点前尝试获取互斥锁,避免无限期的互斥等待或者直接返回,配合成员函数
try_lock_for
和try_lock_util
使用:1
2
3
4
5
6
7time_mutex tmtx;
if(tmtx.try_lock_for(std::chrono::seconds(1))){ //1s内获得锁返回true,否则false
...
}
else{
...
}recursive_timed_mutex:既可递归、也可定时的锁。
除了lock、unlock、try_lock这种传统手动上锁管理,以上提到了两种RAII资源锁管理unique_lock
和lock_guard
,都能接受四种锁类型,它们区别是:
unique_lock<lock_T> : 构造自动上锁、作用域结束自动解锁和析构,且支持手动加锁和解锁(构造了手写lock、unlock),不可拷贝但支持移动语义,配合条件变量、wait等使用,灵活性比较高;
lock_guard<lock_T>:构造自动上锁、作用域结束自动解锁和析构,不支持手动管理,不可拷贝也不支持移动拷贝;简单实现RAII;
参考链接: