MIT 6.S081 - Lab Lock - Buffer cache (3) 哈希单项优化
代码传送门:OS_Learn/bio_hash.c at main · xuanhao44/OS_Learn (github.com)
【仅包含部分代码,不能运行请自行查看文章补充相关内容】
之前总算是把 timestamp
单项优化写出来了,那么现在就来看看哈希的单项优化吧。
1 哈希桶、散列函数
// given number to hash
#define NBUCKETS 13
struct bucket
{
struct spinlock lock;
// Linked list of **a bucket of** buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head;
};
struct
{
// physical buf
struct buf buf[NBUF];
struct bucket bucket[NBUCKETS];
} bcache;
int hash(uint blockno)
{
return blockno % NBUCKETS;
}
bcache 由 bucket 数组和 buf 数组构成,其中 buf[NBUF]
是 bucket[NBUCKETS]
的物理载体。
bucket 由一个自旋锁和一个双向循环链表的头结点构成。
哈希函数是简单的取模。本来应该需要根据 key 的数字进行判断(也就是 blockno
),但是指导书直接给了 % 13 的提示,那就直接用咯。当然一般是选较大的素数的,所以 13 应该没啥问题。
2 思路
字段 blockno
是缓存的磁盘块号。
在 bread 时,bget 从 bread 那里拿来 blockno
,然后要开始找对应 blockno
的缓存块。把 blockno
放在 hash(blockno)
的哈希桶(简称 hash(blockno)
桶)中,这样在找的时候,我们会优先到 hash(blockno)
桶中寻找;如果未命中,就需要找一个空闲块来替换了。我们的选择是 refcnt == 0
的 LRU 块,即 least recent used unused 块,我们先在当前的桶内寻找,如果连 unused 块都找不到,我们就去其他 bucket 找,找到了,那么我们就替换,并且把这个缓存块移动到 hash(blockno)
桶中,放在 head.next 处。
在 brelse 中,我们会把上层文件系统释放的缓存块,取得这个块的 blockno
,如果 refcnt - 1 后为 0,则将其又放入 hash(blockno)
桶。
思考一个过程:一开始的时候并没有任何磁盘块的缓存块,上层文件系统调用 bread,然后调用 bget,先在 hash(blockno)
桶中寻找——当然无法命中;未命中,于是我们先在 hash(blockno)
桶中找 LRU 块,如果还是找不到,就去其他桶去找 LRU,找到了就替换,同时把这个缓存块移动到 hash(blockno)
桶中,放在 head.next 处。
通过上面的思考还能知道几点:
- 我们总是在尽力维护这些哈希桶,使得
blockno
的哈希函数值相同的缓存块都被放在hash(blockno)
桶中。这样在查找命中的时候,我们就可以认为:如果我们需要查找blockno
的缓存块,那么我应该先去hash(blockno)
桶中找。至于维护的操作就是:brelse 的时候我们把blockno
的缓存块又放回到hash(blockno)
桶中,以及在未命中时,如果在其他桶中找到 LRU 块并替换之后,我们是会把这个块移动到hash(blockno)
桶中的。 - 我们知道当 $f(key1) = f(key2)$ 的时候,我们称这个散列函数发生了冲突。处理冲突的方式某一种方式是:链地址法。
将所有关键字为同义词的记录存储在一个单链表中,我们称这种表为同义词子表,在散列表中只存储所有同义词子表的头指针。对于关键字集合{12,67,56,16,25,37,22,29,15,47,48,34},我们用前面同样的 12 为除数,进行除留余数法,可得到如右图结构,此时,已经不存在什么冲突换址的问题,无论有多少个冲突,都只是在当前位置给单链表增加结点的问题。
链地址法对于可能会造成很多冲突的散列函数来说,提供了绝不会出现找不到地址的保障。当然,这也就带来了查找时需要遍历单链表的性能损耗。
道理差不多,当然因为要找 LRU,所以我们现在用的是双向循环链表,头节点而不是头指针。同时这也能说明为什么桶的结构不用数组。
另外,这个思路还有两个漏洞,在下面会提到,参见 6 思路漏洞。
3 数据结构的一些操作
// 初始化双向循环链表 (头结点)
void list_init(struct buf *head)
{
head->next = head;
head->prev = head;
}
// 头插法
// eg: add_head(&buk->head, b);
void add_head(struct buf *head, struct buf *b)
{
b->next = head->next;
b->prev = head;
head->next->prev = b;
head->next = b;
}
// 删除元素
void remove(struct buf *b)
{
b->next->prev = b->prev;
b->prev->next = b->next;
}
看过原始的设计就知道双向循环链表的处理有多难看,还是把这些从函数中抽离出来吧。
4 优化实现
4.1 binit
void binit(void)
{
struct bucket *buk;
struct buf *b;
for (buk = bcache.bucket; buk < bcache.bucket + NBUCKETS; buk++)
{
// init bucket lock
initlock(&buk->lock, "bcache.bucket");
// Create linked list of buffers
list_init(&buk->head);
}
// init 的时候就全部放到 bucket0 中
buk = &bcache.bucket[0];
for (b = bcache.buf; b < bcache.buf + NBUF; b++)
{
add_head(&buk->head, b);
initsleeplock(&b->lock, "buffer");
}
}
和原始的设计类似,我们把锁初始化,同时也把所有链表初始化。把所有的 buf 先放到 bucket0 中,当然也可以选择把 buf 均分到所有 bucket 中,写起来比这个复杂一点点。
void binit(void)
{
struct bucket *buk;
struct buf *b;
for (buk = bcache.bucket; buk < bcache.bucket + NBUCKETS; buk++)
{
// init bucket lock
initlock(&buk->lock, "bcache.bucket");
// Create linked list of buffers
list_init(&buk->head);
}
int i = 0;
for (b = bcache.buf; b < bcache.buf + NBUF; b++)
{
buk = &bcache.bucket[i];
i = (i + 1) % NBUCKETS;
add_head(&buk->head, b);
initsleeplock(&b->lock, "buffer");
}
}
个人认为把 buf 尽量均分到所有 bucket 中会比较好。
4.2 brelse
思路中提到过,我们把 unused 的缓存块又放回到 hash(blockno)
桶中。
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void brelse(struct buf *b)
{
if (!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
struct bucket *buk = &bcache.bucket[hash(b->blockno)];
acquire(&buk->lock);
b->refcnt--;
if (b->refcnt == 0)
{
// no one is waiting for it.
remove(b);
add_head(&buk->head, b);
}
release(&buk->lock);
}
4.3 bpin
和 bunpin
没有之前的 bcache.lock
,现在就锁上 bucket 的锁就行了。
void bpin(struct buf *b)
{
struct bucket *buk = &bcache.bucket[hash(b->blockno)];
acquire(&buk->lock);
b->refcnt++;
release(&buk->lock);
}
void bunpin(struct buf *b)
{
struct bucket *buk = &bcache.bucket[hash(b->blockno)];
acquire(&buk->lock);
b->refcnt--;
release(&buk->lock);
}
4.4 bget
逻辑参考 2 思路部分。
static struct buf *
bget(uint dev, uint blockno)
{
struct buf *b;
struct bucket *buk_key;
struct bucket *buk;
buk_key = &bcache.bucket[hash(blockno)];
acquire(&buk_key->lock);
for (b = buk_key->head.next; b != &buk_key->head; b = b->next)
{
if (b->dev == dev && b->blockno == blockno)
{
b->refcnt++;
release(&buk_key->lock);
acquiresleep(&b->lock);
return b;
}
}
// Not cached.
// find LRU in the hash(blockno) bucket
// remind that still own buk_key->lock
for (b = buk_key->head.prev; b != &buk_key->head; b = b->prev)
{
if (b->refcnt == 0)
{
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&buk_key->lock);
acquiresleep(&b->lock);
return b;
}
}
// we cannot find unused buf in the hash(blockno) bucket
// search in other bucket
for (buk = bcache.bucket; buk < bcache.bucket + NBUCKETS; buk++)
{
// do not search hash(blockno) bucket again
if (buk == buk_key)
continue;
acquire(&buk->lock);
for (b = buk->head.prev; b != &buk->head; b = b->prev)
{
if (b->refcnt == 0)
{
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
remove(b);
add_head(&buk_key->head, b);
release(&buk->lock);
release(&buk_key->lock);
acquiresleep(&b->lock);
return b;
}
}
release(&buk->lock);
}
release(&buk_key->lock);
panic("bget: no buffers");
}
5 测试
能通过所有测试。
下面只放出 bcachetest
,usertests
略。
发现 usertests
会卡在 test manywrites
很久,不知道有没有问题。
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmems0: #fetch-and-add 0 #acquire() 32950
lock: kmems1: #fetch-and-add 0 #acquire() 41
lock: kmems3: #fetch-and-add 0 #acquire() 37
lock: kmems4: #fetch-and-add 0 #acquire() 34
lock: kmems5: #fetch-and-add 0 #acquire() 7
lock: kmems6: #fetch-and-add 0 #acquire() 10
lock: kmems7: #fetch-and-add 0 #acquire() 30
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4120
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4120
lock: bcache.bucket: #fetch-and-add 0 #acquire() 2270
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4276
lock: bcache.bucket: #fetch-and-add 0 #acquire() 2264
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4264
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4738
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6680
lock: bcache.bucket: #fetch-and-add 0 #acquire() 8650
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6174
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6174
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6180
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6176
--- top 5 contended locks:
lock: proc: #fetch-and-add 786587 #acquire() 217625
lock: proc: #fetch-and-add 624688 #acquire() 217580
lock: virtio_disk: #fetch-and-add 545181 #acquire() 1205
lock: proc: #fetch-and-add 399190 #acquire() 217584
lock: proc: #fetch-and-add 370029 #acquire() 217584
tot= 0
test0: OK
start test1
test1 OK
6 思路漏洞
- 死锁
需要注意的是,当我们去其他 bucket 中找 LRU 块的时候,我们是带着 hash(blockno)
桶的锁的。
我们必须带着这个锁去找,这是正确性的要求。如果不带着这个锁找,极有可能让一个桶内出现两个相同的缓存块,这是不可以的。
我们不能再把自己找一遍,或者说,我们为什么要先去 hash(blockno)
桶找,就是为了和到其他 bukcet 找分开处理。在已经 acquire 一个锁之后,不能再 acquire 一次同一个锁。
我们发现一个隐藏的问题——死锁。若我们持有 hash(blockno)
桶的锁,然后给其他桶上锁来查找;此时,如果另一个进程正好持有之前的某个其他桶的锁,然后向之前的 hash(blockno)
桶找,也就是尝试上锁的话,就有发生死锁的可能。
很遗憾的是,对于这个死锁问题,本篇没有提供改进的方法。
- 并非真正的 LRU
我们所说的 LRU,是 least recent used,在原本的设计中,单个链表所维护的 LRU,一定是所有 buf 中的 LRU,那么这个时候就直接找 unused 就可以了。
但是现在我们使用了很多哈希桶,并在每个桶内部有一个 LRU 的有序结构。这还是所有 buf 中的 LRU 吗?并不是。那么有办法通过改进做到这一点吗,也不能。
我们现在的实现没能做到 LRU,只是在寻找 unused 的块,这只能说是不会出错的。LRU 仅使用哈希桶的优化是做不到的。
7 最终代码
// Buffer cache.
//
// The buffer cache is a linked list of buf structures holding
// cached copies of disk block contents. Caching disk blocks
// in memory reduces the number of disk reads and also provides
// a synchronization point for disk blocks used by multiple processes.
//
// Interface:
// * To get a buffer for a particular disk block, call bread.
// * After changing buffer data, call bwrite to write it to disk.
// * When done with the buffer, call brelse.
// * Do not use the buffer after calling brelse.
// * Only one process at a time can use a buffer,
// so do not keep them longer than necessary.
#include "types.h"
#include "param.h"
#include "spinlock.h"
#include "sleeplock.h"
#include "riscv.h"
#include "defs.h"
#include "fs.h"
#include "buf.h"
// given number to hash
#define NBUCKETS 13
struct bucket
{
struct spinlock lock;
// Linked list of **a bucket of** buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head;
};
struct
{
// physical buf
struct buf buf[NBUF];
struct bucket bucket[NBUCKETS];
} bcache;
int hash(uint blockno)
{
return blockno % NBUCKETS;
}
// 初始化双向循环链表 (头结点)
void list_init(struct buf *head)
{
head->next = head;
head->prev = head;
}
// 头插法
// eg: add_head(&buk->head, b);
void add_head(struct buf *head, struct buf *b)
{
b->next = head->next;
b->prev = head;
head->next->prev = b;
head->next = b;
}
// 删除元素
void remove(struct buf *b)
{
b->next->prev = b->prev;
b->prev->next = b->next;
}
void binit(void)
{
struct bucket *buk;
struct buf *b;
for (buk = bcache.bucket; buk < bcache.bucket + NBUCKETS; buk++)
{
// init bucket lock
initlock(&buk->lock, "bcache.bucket");
// Create linked list of buffers
list_init(&buk->head);
}
int i = 0;
for (b = bcache.buf; b < bcache.buf + NBUF; b++)
{
buk = &bcache.bucket[i];
i = (i + 1) % NBUCKETS;
add_head(&buk->head, b);
initsleeplock(&b->lock, "buffer");
}
}
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf *
bget(uint dev, uint blockno)
{
struct buf *b;
struct bucket *buk_key;
struct bucket *buk;
buk_key = &bcache.bucket[hash(blockno)];
acquire(&buk_key->lock);
for (b = buk_key->head.next; b != &buk_key->head; b = b->next)
{
if (b->dev == dev && b->blockno == blockno)
{
b->refcnt++;
release(&buk_key->lock);
acquiresleep(&b->lock);
return b;
}
}
// Not cached.
// find LRU in the hash(blockno) bucket
// remind that still own buk_key->lock
for (b = buk_key->head.prev; b != &buk_key->head; b = b->prev)
{
if (b->refcnt == 0)
{
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&buk_key->lock);
acquiresleep(&b->lock);
return b;
}
}
// we cannot find unused buf in the hash(blockno) bucket
// search in other bucket
for (buk = bcache.bucket; buk < bcache.bucket + NBUCKETS; buk++)
{
// do not search hash(blockno) bucket again
if (buk == buk_key)
continue;
acquire(&buk->lock);
for (b = buk->head.prev; b != &buk->head; b = b->prev)
{
if (b->refcnt == 0)
{
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
remove(b);
add_head(&buk_key->head, b);
release(&buk->lock);
release(&buk_key->lock);
acquiresleep(&b->lock);
return b;
}
}
release(&buk->lock);
}
release(&buk_key->lock);
panic("bget: no buffers");
}
// Return a locked buf with the contents of the indicated block.
struct buf *
bread(uint dev, uint blockno)
{
struct buf *b;
b = bget(dev, blockno);
if (!b->valid)
{
virtio_disk_rw(b, 0);
b->valid = 1;
}
return b;
}
// Write b's contents to disk. Must be locked.
void bwrite(struct buf *b)
{
if (!holdingsleep(&b->lock))
panic("bwrite");
virtio_disk_rw(b, 1);
}
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void brelse(struct buf *b)
{
if (!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
struct bucket *buk = &bcache.bucket[hash(b->blockno)];
acquire(&buk->lock);
b->refcnt--;
if (b->refcnt == 0)
{
// no one is waiting for it.
remove(b);
add_head(&buk->head, b);
}
release(&buk->lock);
}
void bpin(struct buf *b)
{
struct bucket *buk = &bcache.bucket[hash(b->blockno)];
acquire(&buk->lock);
b->refcnt++;
release(&buk->lock);
}
void bunpin(struct buf *b)
{
struct bucket *buk = &bcache.bucket[hash(b->blockno)];
acquire(&buk->lock);
b->refcnt--;
release(&buk->lock);
}
8 总结
哈希单项优化可以通过测试,但是哈希单项优化从理论上说并不存在真正的 LRU,且本篇中的实现没有解决潜在的死锁问题。
我们把进一步优化的希望寄托于同时使用两项优化。