Memory allocator实验

实验目的

目前的页帧分配共用一个大锁kmem.spinlock和一个共用空闲页帧链表kmem.freelist,也即无论哪个CPU想要通过kalloc获取页帧,都需要首先从这个共享的数据结构获取锁和页帧,理论和实验表明这个设计是导致race condition最严重的事件之一,导致性能瓶颈;本节的实验旨在通过为每个CPU都维护一个独享的锁和页帧空闲分配链表,以降低进程内存分配时的严重竞争条件。此外还需要考虑当当前CPU耗光页帧时,如何从其他的CPU安全地“窃取”空闲页帧。

具体实现

kernel/param.h获知了系统最大CPU核心数定义,这也是CPU遍历的基本原理;

1
#define NCPU          8  // maximum number of CPUs
通过kernel/proc.c函数可以获取当前CPU编号:
1
2
3
4
5
6
int
cpuid()
{
int id = r_tp();
return id;
}
全部代码仅在kernel/kalloc.c修改即可:

  1. 将kmem设计成数组形式,每个CPU独享一个kmem结构:

    1
    2
    3
    4
    struct {
    struct spinlock lock;
    struct run *freelist;
    } kmem[NCPU];

  2. 修改初始化逻辑:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    char lock_name[NCPU][9];  //定义锁名称,必须是静态或者全局变量
    void
    kinit()
    {
    //initlock(&kmem.lock, "kmem");
    for(int i = 0;i<NCPU;i++){
    snprintf(lock_name[i],9,"kmem_cpu%d",i); //xv6仅支持snprintf,将9字节数据写入lock_name
    initlock(&kmem[i].lock,lock_name[i]); //初始化锁
    }
    freerange(end, (void*)PHYSTOP); //同原来,初始化内存,清理垃圾内存
    }

  3. 修改分配逻辑:假设当前CPU存在空闲页帧,直接按原来的逻辑分配即可;如果没有空闲页帧,则需要窃取其他CPU的页帧,这里我采取的策略是每次仅窃取一个页帧,因此逻辑也比较简单:窃取其他页帧,如果没有剩下的就是给freelist赋值0,有剩下的就是更新freelist->next;这里需要注意两个死锁问题:

  • 必须先释放当前的锁再尝试获取其他CPU锁:因为获取锁的顺序是不确定的,假设我们现在持有cpu0的锁去获取cpu1的锁同时,cpu1锁也在通过cpu2锁获取cpu0锁,就会导致死锁,我们定义的逻辑CPU有8个,很多逻辑可能发生这样的问题。

  • 新增了关闭CPU中断代码:因为这里我们放弃了锁去其他CPU窃取页表,这个过程可能有其他线程通过中断获取CPU的控制权,因此需要关闭中断响应。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    void *
    kalloc(void)
    {
    push_off();//新增:关闭CPU中断
    int cpu_id=cpuid();
    acquire(&kmem[cpu_id].lock); //获取当前进程锁
    //acquire(&kmem.lock);
    struct run *r;
    r = kmem[cpu_id].freelist;
    if(r){ //如果当前CPU还有空闲页帧,直接分配即可
    kmem[cpu_id].freelist = r->next;
    release(&kmem[cpu_id].lock);
    }
    else{ //没有空闲,以下代码去窃取
    release(&kmem[cpu_id].lock); //释放锁防止死锁
    int i;
    struct run *steal_page;
    for(i=0;i<NCPU;i++){ //遍历其他CPU
    if(i==cpu_id) //不要和当前CPU一致
    continue;
    acquire(&kmem[i].lock); //获取其他CPU锁
    steal_page=kmem[i].freelist; //窃取
    if(steal_page&&steal_page->next){ //这里是只窃取一个页帧逻辑
    kmem[i].freelist=steal_page->next;
    release(&kmem[i].lock);
    break;
    }
    else if(steal_page){
    kmem[i].freelist=0;
    release(&kmem[i].lock);
    break;
    }
    else{
    steal_page=0;
    release(&kmem[i].lock);
    }
    }
    if(steal_page){
    r=steal_page;
    }
    else{
    r=0;
    }
    }
    if(r)
    memset((char*)r, 5, PGSIZE); // fill with junk
    pop_off(); //重启CPU中断
    return (void*)r;
    }
  1. 最后修改回收逻辑即可:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void
    kfree(void *pa)
    {
    struct run *r;
    if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");
    // Fill with junk to catch dangling refs.
    memset(pa, 1, PGSIZE);

    r = (struct run*)pa;

    //acquire(&kmem.lock);
    //r->next = kmem.freelist;
    //kmem.freelist = r;
    //release(&kmem.lock);
    int cpu_id=cpuid();
    acquire(&kmem[cpu_id].lock);
    r->next = kmem[cpu_id].freelist;
    kmem[cpu_id].freelist = r;
    release(&kmem[cpu_id].lock);
    }

完整Commit:Memory allocator Done

Buffer cache实验

实验目的

看完kernel/bio.c的代码基本都能理解,缓存块的自旋锁只用于一个场景:更新缓存块引用计数refcnt。意味着所有CPU、所有进程进行磁盘操作,都需要竞争这个双向循环链表的自旋锁,虽然这样很安全,但是对性能的影响是比较大的。因此这个实验希望我们改善这种结构,以降低锁的竞争。

这个实验被设定成Hard,是因为几乎要求我们实现一种新的数据结构,以降低锁的竞争;从Memory allocator实验我们可以找到一点思路,我们将内存分配器的共享链表修改成每个CPU独享一个链表,每个桶去竞争一个锁。但是这个实验的磁盘是所有CPU共享的,缓存块计数也是,因此不能将它分散到每个CPU中去;这里我们通过软件的结构去实现目的,根据tips也知道,希望我们使用哈希表的方法,建立一个长度13(建议值)的哈希表即可。只有多个CPU同时尝试访问一个桶的缓存块链表,才会发生race condition

将块号映射到13个Blocks实际上只是为了哈希表能够分布比较平均,当桶中所有的缓存块耗尽,我们仍需要LRU策略将某个桶的缓存块置换出去(eviction),分配给新缓存块并且映射到新的位置。

为了履行LRU策略,因为链表已经分散到各种桶了,不能再通过以前双向链表遍历的方法来索引最近未使用的缓存块,因此我们要维护新的字段用于记录这个。哈希表使用首次适应算法的话,直接循环链表就可以找到缓存块,但是这个实验仍然坚持LRU策略,在放弃了双向循环链表的情况下,哈希表如何标记LRU缓存块呢?系统在kernel/trap.c维护了一个ticks字段:

1
2
3
4
5
6
7
8
void
clockintr()
{
acquire(&tickslock);
ticks++;
wakeup(&ticks);
release(&tickslock);
}
ticks字段由时钟中断控制,触发时间中断,ticks字段不断递增,如果系统长时间运行,一般也有对应的溢出处理机制,使其重新计数,xv6如何处理的我并不能确定,这个也不是该实验的考虑范围。我们在缓存块引入这个字段,在适当的时候我们将ticks更新到缓存块字段,系统的ticks越大,代表越接近现在时刻,意味着这个缓存块是最近被访问的,从而进行LRU筛选(这个让我纳闷了好久,一开始我误以为ticks是每个进程独有的字段,不能理解为什么直接通过大小比较就能得出LRU)。

此外,新的问题是如何保持驱逐和插入是原子的?这个问题的实现是自由的。但是一定要避免环路死锁的问题,在xv6中没有维护一定的锁顺序,所以大部分情况下我们都需要先行放弃自身的锁,再尝试获取其他的锁。所以这里的操作就是先放弃当前桶的锁,去获取目标桶的锁。在目标桶使用LRU驱逐后获取到新的缓存块,这时再返回原来的桶获取锁,注意这段失去锁的时间,完全可能有其他线程为原来的桶增加了新的缓存块,现在我们又偷来一个,就造成了重复问题,这里解决也是自由的,我们可以重新遍历一次原桶,如果已经存在相同的块号映射,就增加引用数即可。

具体实现

  1. 弃用原来的双向循环链表结构,实现哈希表结构,意味着我们的binitbget等函数都要对应修改,获取bcache自旋锁逻辑也要修改。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //新增桶结构,每个桶维护一个自旋锁、头结点
    struct Hash_bcache{
    struct spinlock hash_lock;
    struct buf head;
    };

    struct {
    struct buf buf[NBUF]; //缓存块单链表
    struct Hash_bcache Hash_bcache[MAX_CBlock]; //13个桶
    } bcache;
    因为使用哈希表,双向循环原来用于索引LRU缓存块的目的就没用了,完全可以放弃prev指针,桶只使用单链表即可。这里还为每个缓存块新增了buf_tick字段,后面会用到。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    struct buf {
    int valid; // has data been read from disk?
    int disk; // does disk "own" buf?
    uint dev;
    uint blockno;
    struct sleeplock lock;
    uint refcnt;
    //struct buf *prev; // 注释掉prev
    struct buf *next;
    uchar data[BSIZE];
    uint buf_tick; //新增
    };

  2. 修改binit函数,将初始化大锁改成初始化桶锁,双向链表插入改为哈希表插入。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    char hashlock_name[MAX_CBlock][18]; //锁名称
    char sleeplock_name[NBUF][18];
    void
    binit(void){
    for(int i =0;i<MAX_CBlock;i++){
    snprintf(hashlock_name[i],11,"bcache_spinlock%d",i); //按要求,锁需要以bcache开头
    initlock(&bcache.Hash_bcache[i].hash_lock, hashlock_name[i]);
    bcache.Hash_bcache[i].head.next=0;//头结点初始化
    }

    for(int i = 0; i < NBUF; i++){ //修改成哈希表插入
    struct buf *b=&(bcache.buf[i]);
    b->next = bcache.Hash_bcache[0].head.next; //首次插入将所有缓存块暂存0桶,后面会按照块号从此处驱逐并且插入其他桶
    bcache.Hash_bcache[0].head.next=b;
    snprintf(sleeplock_name[i],12,"bcache_sleeplock%d",i); //按要求,锁需要以bcache开头
    initsleeplock(&b->lock, sleeplock_name[i]); //同原来
    b->buf_tick=ticks; //初始化ticks字段,对所有缓存块平等,也可以不做
    }
    }

  3. 修改bget函数,这是重点的函数:主要逻辑是获取某个桶的自旋锁,查找是否存在相同设备号、块号的缓存块,存在则引用数++,不存在则需要执行eviction逻辑。首先放弃本桶的锁,从头遍历13个桶并获取对应锁,寻找ticks字段最小的块的前一个块pre_leastbuf(链表操作需要),在某个桶内如果没找到,循环完链表就应该放弃锁;如果找到,则应该持有锁直至从其他桶找到ticks更小的缓存块。最后我们会持有LRU对应的锁,将其结点孤立出来就可以放弃。这里拿着锁重复获取锁不会带来死锁,因为我们最多持有这个锁去遍历的是桶值更高的桶锁,另一个线程不会出现桶值高锁等桶值低锁情况(尽管key大于0,也是先放弃key),因此没有构成环路等待,也不会产生死锁(Maybe 应该 可能)。

    这里要担心的只是其他线程也在竞争key这个原桶的锁,原桶可能被操作(例如已经加入了相同设备号相同块号),所以重新需要遍历一次原桶,如果有则引用数++;

    这里一个致命的缺点是如果真的存在两个线程同时竞争key原桶并且尝试增加相同块号的缓存块,那么其中一个线程的LRU是必然被无故驱逐的,也即如果这种情况严峻,缓存块将越用越少,尽管bcachetest没有考虑或者没有设置足够的强度测试这个。所以这里新增了放回去的尝试,也能顺利通过test。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    static struct buf*
    bget(uint dev, uint blockno)
    {
    struct buf *b;

    int key=blockno%MAX_CBlock;

    acquire(&bcache.Hash_bcache[key].hash_lock);

    //遍历桶内看是否存在缓存块
    for(b = bcache.Hash_bcache[key].head.next; b != 0; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
    b->refcnt++;
    release(&bcache.Hash_bcache[key].hash_lock);
    acquiresleep(&b->lock);
    return b;
    }
    }

    uint min_ticks=0xffffffff; //设置ticks起始比较值
    int find_least; //单次桶遍历是否存在LRU
    release(&bcache.Hash_bcache[key].hash_lock);
    //没有缓存块就要遍历其他桶,标记LRU
    int least_i=-1; //上次找到LRU对应的桶值
    struct buf* pre_leastbuf=0; //LRU的前一个结点
    for(int i=0;i<MAX_CBlock;i++){ //遍历13个桶
    acquire(&bcache.Hash_bcache[i].hash_lock);
    find_least=0;
    for(b= &(bcache.Hash_bcache[i].head);b->next!=0;b=b->next){ //遍历桶内是否存在LRU
    if(b->next->refcnt==0&&b->next->buf_tick<min_ticks){ //找到LRU块
    min_ticks=b->next->buf_tick;
    pre_leastbuf=b; //b为pre_leastbuf,b->next为疑似LRU
    find_least = 1;
    }
    }
    if(!find_least) //该桶没有满足条件
    release(&bcache.Hash_bcache[i].hash_lock);
    else{ //该桶存在满足条件块
    if(least_i!=-1)
    release(&bcache.Hash_bcache[least_i].hash_lock); //释放上一次ticks比较小的LRU
    least_i=i; //记录桶值
    }
    }
    if(pre_leastbuf==0){ //说明整个哈希表都没有
    panic("No Buffer");
    }
    //存在LRU
    struct buf* leastbuf=pre_leastbuf->next;
    pre_leastbuf->next=leastbuf->next; //孤立出来
    release(&bcache.Hash_bcache[least_i].hash_lock); //释放目标桶锁
    acquire(&bcache.Hash_bcache[key].hash_lock); //拿原桶锁

    //重新检查原桶是否仍需要为块号分配LRU
    for(b = bcache.Hash_bcache[key].head.next; b != 0; b = b->next){
    if(b->dev == dev && b->blockno == blockno){
    b->refcnt++;
    release(&bcache.Hash_bcache[key].hash_lock);
    acquire(&bcache.Hash_bcache[least_i].hash_lock); //尝试将块还回去
    leastbuf->next=bcache.Hash_bcache[least_i].head.next;
    bcache.Hash_bcache[least_i].head.next=leastbuf;
    release(&bcache.Hash_bcache[least_i].hash_lock);
    acquiresleep(&b->lock);
    return b; //如果无需说明其他线程已经分配了
    }
    }
    //如果原桶仍然需要分配,就填充信息并且插入key桶的链表
    leastbuf->dev = dev;
    leastbuf->blockno = blockno;
    leastbuf->valid = 0;
    leastbuf->refcnt = 1;
    leastbuf->next=bcache.Hash_bcache[key].head.next;
    bcache.Hash_bcache[key].head.next=leastbuf;
    release(&bcache.Hash_bcache[key].hash_lock);
    acquiresleep(&leastbuf->lock); //返回
    return leastbuf;
    }

  4. 修改brelse函数,这里双向链表brelse将空闲块放回头结点后面的代码可以去除了,我们做的只是这里主要是更新ticks到缓存块的ticks字段,结构上无需改变,因为哈希表的插入可以在驱逐时候完成。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    void
    brelse(struct buf *b)
    {
    if(!holdingsleep(&b->lock))
    panic("brelse");

    releasesleep(&b->lock);

    int key=b->blockno%MAX_CBlock;
    acquire(&bcache.Hash_bcache[key].hash_lock);
    b->refcnt--;
    if (b->refcnt == 0) {
    // no one is waiting for it.
    //b->next->prev = b->pre;
    //b->prev->next = b->next;
    //b->next = bcache.head.next;
    //b->prev = &bcache.head;
    //bcache.head.next->prev = b;
    //bcache.head.next = b;
    b->buf_tick=ticks;
    }
    release(&bcache.Hash_bcache[key].hash_lock);
    }

  5. bpinbunpin主要是获取锁的逻辑变化,比较简单,如果需要参考Commit不赘述。

最后验证:

1
2
bcachetest
usertests

完整Commit:Buffer Cache Done