操作系统MIT 6.S081 xv6内核(九):Lock实验
Memory allocator实验
实验目的
目前的页帧分配共用一个大锁kmem.spinlock
和一个共用空闲页帧链表kmem.freelist
,也即无论哪个CPU想要通过kalloc
获取页帧,都需要首先从这个共享的数据结构获取锁和页帧,理论和实验表明这个设计是导致race condition
最严重的事件之一,导致性能瓶颈;本节的实验旨在通过为每个CPU都维护一个独享的锁和页帧空闲分配链表,以降低进程内存分配时的严重竞争条件。此外还需要考虑当当前CPU耗光页帧时,如何从其他的CPU安全地“窃取”空闲页帧。
具体实现
从kernel/param.h
获知了系统最大CPU核心数定义,这也是CPU遍历的基本原理;
1
kernel/proc.c
函数可以获取当前CPU编号:
1
2
3
4
5
6int
cpuid()
{
int id = r_tp();
return id;
}kernel/kalloc.c
修改即可:
将kmem设计成数组形式,每个CPU独享一个kmem结构:
1
2
3
4struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];修改初始化逻辑:
1
2
3
4
5
6
7
8
9
10
11char lock_name[NCPU][9]; //定义锁名称,必须是静态或者全局变量
void
kinit()
{
//initlock(&kmem.lock, "kmem");
for(int i = 0;i<NCPU;i++){
snprintf(lock_name[i],9,"kmem_cpu%d",i); //xv6仅支持snprintf,将9字节数据写入lock_name
initlock(&kmem[i].lock,lock_name[i]); //初始化锁
}
freerange(end, (void*)PHYSTOP); //同原来,初始化内存,清理垃圾内存
}修改分配逻辑:假设当前CPU存在空闲页帧,直接按原来的逻辑分配即可;如果没有空闲页帧,则需要窃取其他CPU的页帧,这里我采取的策略是每次仅窃取一个页帧,因此逻辑也比较简单:窃取其他页帧,如果没有剩下的就是给freelist赋值0,有剩下的就是更新
freelist->next
;这里需要注意两个死锁问题:
必须先释放当前的锁再尝试获取其他CPU锁:因为获取锁的顺序是不确定的,假设我们现在持有cpu0的锁去获取cpu1的锁同时,cpu1锁也在通过cpu2锁获取cpu0锁,就会导致死锁,我们定义的逻辑CPU有8个,很多逻辑可能发生这样的问题。
新增了关闭CPU中断代码:因为这里我们放弃了锁去其他CPU窃取页表,这个过程可能有其他线程通过中断获取CPU的控制权,因此需要关闭中断响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49void *
kalloc(void)
{
push_off();//新增:关闭CPU中断
int cpu_id=cpuid();
acquire(&kmem[cpu_id].lock); //获取当前进程锁
//acquire(&kmem.lock);
struct run *r;
r = kmem[cpu_id].freelist;
if(r){ //如果当前CPU还有空闲页帧,直接分配即可
kmem[cpu_id].freelist = r->next;
release(&kmem[cpu_id].lock);
}
else{ //没有空闲,以下代码去窃取
release(&kmem[cpu_id].lock); //释放锁防止死锁
int i;
struct run *steal_page;
for(i=0;i<NCPU;i++){ //遍历其他CPU
if(i==cpu_id) //不要和当前CPU一致
continue;
acquire(&kmem[i].lock); //获取其他CPU锁
steal_page=kmem[i].freelist; //窃取
if(steal_page&&steal_page->next){ //这里是只窃取一个页帧逻辑
kmem[i].freelist=steal_page->next;
release(&kmem[i].lock);
break;
}
else if(steal_page){
kmem[i].freelist=0;
release(&kmem[i].lock);
break;
}
else{
steal_page=0;
release(&kmem[i].lock);
}
}
if(steal_page){
r=steal_page;
}
else{
r=0;
}
}
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
pop_off(); //重启CPU中断
return (void*)r;
}
- 最后修改回收逻辑即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
//acquire(&kmem.lock);
//r->next = kmem.freelist;
//kmem.freelist = r;
//release(&kmem.lock);
int cpu_id=cpuid();
acquire(&kmem[cpu_id].lock);
r->next = kmem[cpu_id].freelist;
kmem[cpu_id].freelist = r;
release(&kmem[cpu_id].lock);
}
完整Commit:Memory allocator Done
Buffer cache实验
实验目的
看完kernel/bio.c
的代码基本都能理解,缓存块的自旋锁只用于一个场景:更新缓存块引用计数refcnt。意味着所有CPU、所有进程进行磁盘操作,都需要竞争这个双向循环链表的自旋锁,虽然这样很安全,但是对性能的影响是比较大的。因此这个实验希望我们改善这种结构,以降低锁的竞争。
这个实验被设定成Hard
,是因为几乎要求我们实现一种新的数据结构,以降低锁的竞争;从Memory allocator
实验我们可以找到一点思路,我们将内存分配器的共享链表修改成每个CPU独享一个链表,每个桶去竞争一个锁。但是这个实验的磁盘是所有CPU共享的,缓存块计数也是,因此不能将它分散到每个CPU中去;这里我们通过软件的结构去实现目的,根据tips
也知道,希望我们使用哈希表的方法,建立一个长度13(建议值)的哈希表即可。只有多个CPU同时尝试访问一个桶的缓存块链表,才会发生race condition
。
将块号映射到13个Blocks实际上只是为了哈希表能够分布比较平均,当桶中所有的缓存块耗尽,我们仍需要LRU策略将某个桶的缓存块置换出去(eviction),分配给新缓存块并且映射到新的位置。
为了履行LRU策略,因为链表已经分散到各种桶了,不能再通过以前双向链表遍历的方法来索引最近未使用的缓存块,因此我们要维护新的字段用于记录这个。哈希表使用首次适应算法的话,直接循环链表就可以找到缓存块,但是这个实验仍然坚持LRU策略,在放弃了双向循环链表的情况下,哈希表如何标记LRU缓存块呢?系统在kernel/trap.c
维护了一个ticks
字段:
1
2
3
4
5
6
7
8void
clockintr()
{
acquire(&tickslock);
ticks++;
wakeup(&ticks);
release(&tickslock);
}ticks
字段由时钟中断控制,触发时间中断,ticks
字段不断递增,如果系统长时间运行,一般也有对应的溢出处理机制,使其重新计数,xv6如何处理的我并不能确定,这个也不是该实验的考虑范围。我们在缓存块引入这个字段,在适当的时候我们将ticks
更新到缓存块字段,系统的ticks
越大,代表越接近现在时刻,意味着这个缓存块是最近被访问的,从而进行LRU筛选(这个让我纳闷了好久,一开始我误以为ticks
是每个进程独有的字段,不能理解为什么直接通过大小比较就能得出LRU)。
此外,新的问题是如何保持驱逐和插入是原子的?这个问题的实现是自由的。但是一定要避免环路死锁的问题,在xv6中没有维护一定的锁顺序,所以大部分情况下我们都需要先行放弃自身的锁,再尝试获取其他的锁。所以这里的操作就是先放弃当前桶的锁,去获取目标桶的锁。在目标桶使用LRU驱逐后获取到新的缓存块,这时再返回原来的桶获取锁,注意这段失去锁的时间,完全可能有其他线程为原来的桶增加了新的缓存块,现在我们又偷来一个,就造成了重复问题,这里解决也是自由的,我们可以重新遍历一次原桶,如果已经存在相同的块号映射,就增加引用数即可。
具体实现
弃用原来的双向循环链表结构,实现哈希表结构,意味着我们的
binit
、bget
等函数都要对应修改,获取bcache
自旋锁逻辑也要修改。因为使用哈希表,双向循环原来用于索引LRU缓存块的目的就没用了,完全可以放弃1
2
3
4
5
6
7
8
9
10//新增桶结构,每个桶维护一个自旋锁、头结点
struct Hash_bcache{
struct spinlock hash_lock;
struct buf head;
};
struct {
struct buf buf[NBUF]; //缓存块单链表
struct Hash_bcache Hash_bcache[MAX_CBlock]; //13个桶
} bcache;prev
指针,桶只使用单链表即可。这里还为每个缓存块新增了buf_tick
字段,后面会用到。1
2
3
4
5
6
7
8
9
10
11
12struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
//struct buf *prev; // 注释掉prev
struct buf *next;
uchar data[BSIZE];
uint buf_tick; //新增
};修改
binit
函数,将初始化大锁改成初始化桶锁,双向链表插入改为哈希表插入。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19char hashlock_name[MAX_CBlock][18]; //锁名称
char sleeplock_name[NBUF][18];
void
binit(void){
for(int i =0;i<MAX_CBlock;i++){
snprintf(hashlock_name[i],11,"bcache_spinlock%d",i); //按要求,锁需要以bcache开头
initlock(&bcache.Hash_bcache[i].hash_lock, hashlock_name[i]);
bcache.Hash_bcache[i].head.next=0;//头结点初始化
}
for(int i = 0; i < NBUF; i++){ //修改成哈希表插入
struct buf *b=&(bcache.buf[i]);
b->next = bcache.Hash_bcache[0].head.next; //首次插入将所有缓存块暂存0桶,后面会按照块号从此处驱逐并且插入其他桶
bcache.Hash_bcache[0].head.next=b;
snprintf(sleeplock_name[i],12,"bcache_sleeplock%d",i); //按要求,锁需要以bcache开头
initsleeplock(&b->lock, sleeplock_name[i]); //同原来
b->buf_tick=ticks; //初始化ticks字段,对所有缓存块平等,也可以不做
}
}修改
bget
函数,这是重点的函数:主要逻辑是获取某个桶的自旋锁,查找是否存在相同设备号、块号的缓存块,存在则引用数++,不存在则需要执行eviction逻辑。首先放弃本桶的锁,从头遍历13个桶并获取对应锁,寻找ticks
字段最小的块的前一个块pre_leastbuf
(链表操作需要),在某个桶内如果没找到,循环完链表就应该放弃锁;如果找到,则应该持有锁直至从其他桶找到ticks
更小的缓存块。最后我们会持有LRU对应的锁,将其结点孤立出来就可以放弃。这里拿着锁重复获取锁不会带来死锁,因为我们最多持有这个锁去遍历的是桶值更高的桶锁,另一个线程不会出现桶值高锁等桶值低锁情况(尽管key大于0,也是先放弃key),因此没有构成环路等待,也不会产生死锁(Maybe 应该 可能)。这里要担心的只是其他线程也在竞争
key
这个原桶的锁,原桶可能被操作(例如已经加入了相同设备号相同块号),所以重新需要遍历一次原桶,如果有则引用数++;这里一个致命的缺点是如果真的存在两个线程同时竞争key原桶并且尝试增加相同块号的缓存块,那么其中一个线程的LRU是必然被无故驱逐的,也即如果这种情况严峻,缓存块将越用越少,尽管
bcachetest
没有考虑或者没有设置足够的强度测试这个。所以这里新增了放回去的尝试,也能顺利通过test。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
int key=blockno%MAX_CBlock;
acquire(&bcache.Hash_bcache[key].hash_lock);
//遍历桶内看是否存在缓存块
for(b = bcache.Hash_bcache[key].head.next; b != 0; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.Hash_bcache[key].hash_lock);
acquiresleep(&b->lock);
return b;
}
}
uint min_ticks=0xffffffff; //设置ticks起始比较值
int find_least; //单次桶遍历是否存在LRU
release(&bcache.Hash_bcache[key].hash_lock);
//没有缓存块就要遍历其他桶,标记LRU
int least_i=-1; //上次找到LRU对应的桶值
struct buf* pre_leastbuf=0; //LRU的前一个结点
for(int i=0;i<MAX_CBlock;i++){ //遍历13个桶
acquire(&bcache.Hash_bcache[i].hash_lock);
find_least=0;
for(b= &(bcache.Hash_bcache[i].head);b->next!=0;b=b->next){ //遍历桶内是否存在LRU
if(b->next->refcnt==0&&b->next->buf_tick<min_ticks){ //找到LRU块
min_ticks=b->next->buf_tick;
pre_leastbuf=b; //b为pre_leastbuf,b->next为疑似LRU
find_least = 1;
}
}
if(!find_least) //该桶没有满足条件
release(&bcache.Hash_bcache[i].hash_lock);
else{ //该桶存在满足条件块
if(least_i!=-1)
release(&bcache.Hash_bcache[least_i].hash_lock); //释放上一次ticks比较小的LRU
least_i=i; //记录桶值
}
}
if(pre_leastbuf==0){ //说明整个哈希表都没有
panic("No Buffer");
}
//存在LRU
struct buf* leastbuf=pre_leastbuf->next;
pre_leastbuf->next=leastbuf->next; //孤立出来
release(&bcache.Hash_bcache[least_i].hash_lock); //释放目标桶锁
acquire(&bcache.Hash_bcache[key].hash_lock); //拿原桶锁
//重新检查原桶是否仍需要为块号分配LRU
for(b = bcache.Hash_bcache[key].head.next; b != 0; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.Hash_bcache[key].hash_lock);
acquire(&bcache.Hash_bcache[least_i].hash_lock); //尝试将块还回去
leastbuf->next=bcache.Hash_bcache[least_i].head.next;
bcache.Hash_bcache[least_i].head.next=leastbuf;
release(&bcache.Hash_bcache[least_i].hash_lock);
acquiresleep(&b->lock);
return b; //如果无需说明其他线程已经分配了
}
}
//如果原桶仍然需要分配,就填充信息并且插入key桶的链表
leastbuf->dev = dev;
leastbuf->blockno = blockno;
leastbuf->valid = 0;
leastbuf->refcnt = 1;
leastbuf->next=bcache.Hash_bcache[key].head.next;
bcache.Hash_bcache[key].head.next=leastbuf;
release(&bcache.Hash_bcache[key].hash_lock);
acquiresleep(&leastbuf->lock); //返回
return leastbuf;
}修改
brelse
函数,这里双向链表brelse将空闲块放回头结点后面的代码可以去除了,我们做的只是这里主要是更新ticks到缓存块的ticks字段,结构上无需改变,因为哈希表的插入可以在驱逐时候完成。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
int key=b->blockno%MAX_CBlock;
acquire(&bcache.Hash_bcache[key].hash_lock);
b->refcnt--;
if (b->refcnt == 0) {
// no one is waiting for it.
//b->next->prev = b->pre;
//b->prev->next = b->next;
//b->next = bcache.head.next;
//b->prev = &bcache.head;
//bcache.head.next->prev = b;
//bcache.head.next = b;
b->buf_tick=ticks;
}
release(&bcache.Hash_bcache[key].hash_lock);
}bpin
和bunpin
主要是获取锁的逻辑变化,比较简单,如果需要参考Commit不赘述。
最后验证: 1
2bcachetest
usertests
完整Commit:Buffer Cache Done