C++读写锁

C++14引入了shared_mutex,允许使用shared_lock来进行并发读取操作,大幅度提高读线程的吞吐量,对于写操作仍然使用unique_lock独占锁维护数据安全,因为读写锁始终是共享一把锁,虽然读线程间没有了竞态条件,但写线程仍然需要和读、写线程同时竞争锁,而且读写锁一般开销大于普通互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>
#include <vector>
#include <shared_mutex>
#include <mutex>
#include <thread>
#include <functional>

#include <chrono>
using namespace std;

class Person{
private:
vector<int> array;
shared_mutex rw_lock;

public:
Person(int num):array(num){}
void reader(){
shared_lock lock(rw_lock);
for(int i=0; i<array.size(); i++){
cout<< array.at(i)<<" ";
}
//打印当前时刻
auto now = chrono::system_clock::now();
std::time_t nowTime = chrono::system_clock::to_time_t(now);
cout<<"read"<<ctime(&nowTime)<<endl;
std::this_thread::sleep_for(chrono::seconds(1));
}

void writer(){
unique_lock lock(rw_lock);
for(int i=array.size()/3; i<array.size(); i++){
array.at(i) = i;
}
//打印当前时刻
auto now = chrono::system_clock::now();
std::time_t nowTime = chrono::system_clock::to_time_t(now);
cout<<"write"<<ctime(&nowTime)<<endl;
std::this_thread::sleep_for(chrono::seconds(1));
}
};

int main(){
Person p(21);

std::function<void(void)> reader = std::bind(&Person::reader,&p);
std::function<void(void)> writer = std::bind(&Person::writer,&p);

vector<unique_ptr<thread>> threadpool(10);

auto start = chrono::system_clock::now();
for(int i=0; i<5; i++){
threadpool.at(i).reset(new thread(writer));
}

for(int i=5; i<10; i++){
threadpool.at(i).reset(new thread(reader));
}

for(auto &i : threadpool){
i->join();
}
return 0;
}

如果不支持C++14以上,可以通过mutexcondition_variable去构造读写锁,以下是一种例子,通过该构造方法可以窥见读写锁的一些原理:

  • 读锁:只有写锁后(isWriter==true)一直wait阻塞,否则读锁不会阻塞;

  • 解读锁:如果无读线程,notify一个写线程;

  • 写锁:写锁会阻塞,存在读线程也会阻塞,非写无读才会真正拿到写锁;

  • 解写锁:清除所有的读写阻塞,放弃isWriter;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <iostream>
#include <vector>
#include <shared_mutex>
#include <mutex>
#include <thread>
#include <functional>
#include <chrono>
#include <condition_variable>
using namespace std;

class ReadWriteLock{ //实现读写锁
public:
mutex m_lock;
condition_variable cond;
bool isWriter = false;
uint64_t readCount;

void readLock(){
unique_lock<mutex>lock(m_lock);
cond.wait(lock,[this](){
return isWriter!=false; //当非writer,退出wait
});
//这种写法等效于:
//while(isWriter) cond.wait(lock);
readCount++;
}

void readUnLock(){
unique_lock<mutex>lock(m_lock);
if(--readCount==0)
cond.notify_one();
}

void writeLock(){
unique_lock<mutex>lock(m_lock);
cond.wait(lock,[this](){ //写锁只有非写和无读才能停止wait
return !isWriter&&readCount==0;
});
isWriter = true;
}

void writeUnLock(){
unique_lock<mutex>lock(m_lock);
isWriter = false;
cond.notify_all();
}
};

////应用实例:
class Person{
private:
vector<int> array;
std::unique_ptr<ReadWriteLock> rwlock;

public:
Person(int num):array(num){
rwlock.reset(new ReadWriteLock());
}
void reader(){
rwlock->readLock();
for(int i=0; i<array.size(); i++){
cout<< array.at(i)<<" ";
}
//打印当前时刻
auto now = chrono::system_clock::now();
std::time_t nowTime = chrono::system_clock::to_time_t(now);
cout<<"read"<<ctime(&nowTime)<<endl;
std::this_thread::sleep_for(chrono::seconds(1));
rwlock->readUnLock();
}

void writer(){
rwlock->writeLock();
for(int i=array.size()/3; i<array.size(); i++){
array.at(i) = i;
}
//打印当前时刻
auto now = chrono::system_clock::now();
std::time_t nowTime = chrono::system_clock::to_time_t(now);
cout<<"write"<<ctime(&nowTime)<<endl;
std::this_thread::sleep_for(chrono::seconds(1));
rwlock->writeUnLock();
}
};

int main(){
Person p(21);

std::function<void(void)> reader = std::bind(&Person::reader,&p);
std::function<void(void)> writer = std::bind(&Person::writer,&p);

vector<unique_ptr<thread>> threadpool(10);

for(int i=0; i<5; i++){
threadpool.at(i).reset(new thread(writer));
}

for(int i=5; i<10; i++){
threadpool.at(i).reset(new thread(reader));
}

for(auto &i : threadpool){
i->join();
}
return 0;
}

参考链接:https://mp.weixin.qq.com/s/zoF9LepxksvWAAXFEnDOSQ