????? Linux內核中的循環緩沖區(circular buffer)為解決某些特殊情況下的競爭問題提供了一種免鎖的方法。這種特殊的情況就是當生產者和消費者都只有一個,而在其它情況下使用它也是必須要加鎖的。
循環緩沖區定義在include/linux/kfifo.h中,如下:
struct kfifo {
unsigned char *buffer; // buffer指向存放數據的緩沖區
unsigned int size;??????? // size是緩沖區的大小
unsigned int in;?????????? // in是寫指針下標
unsigned int out;??????? // out是讀指針下標
spinlock_t *lock;???????? // lock是加到struct kfifo上的自旋鎖
};?
????? (上面說的免鎖不是免這里的鎖,這個鎖是必須的),防止多個進程并發訪問此數據結構。當in==out時,說明緩沖區為空;當(in-out)==size時,說明緩沖區已滿。
?????? 為kfifo提供的接口可以分為兩類:
?????? 一類是滿足上述情況下使用的,以雙下劃線開頭,沒有加鎖的;
?????? 另一類是在不滿足的條件下,即需要額外加鎖的情況下使用的。
?????? 其實后一類只是在前一類的基礎上進行加鎖后的包裝(也有一處進行了小的改進),實現中所加的鎖是spin_lock_irqsave。
清空緩沖區的函數:
static inline void __kfifo_reset(struct kfifo *fifo);
static inline void kfifo_reset(struct kfifo *fifo);
這很簡單,直接把讀寫指針都置為0即可。
向緩沖區里放入數據的接口是:
static inline unsigned int kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len);
unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len);
后者是在kernel/kfifo.c中定義的。這個接口是經過精心構造的,可以小心地避免一些邊界情況。我們有必要一起來看一下它的具體實現。
1: /**
2: * __kfifo_put - puts some data into the FIFO, no locking version
3: * @fifo: the fifo to be used.
4: * @buffer: the data to be added.
5: * @len: the length of the data to be added.
6: *
7: * This function copies at most @len bytes from the @buffer into
8: * the FIFO depending on the free space, and returns the number of
9: * bytes copied.
10: *
11: * Note that with only one concurrent reader and one concurrent
12: * writer, you don't need extra locking to use these functions.
13: */
14: unsigned int __kfifo_put(struct kfifo *fifo,
15: const unsigned char *buffer, unsigned int len)
16: {
17: unsigned int l;
18: ?
19: len = min(len, fifo->size - fifo->in + fifo->out);
20: ?
21: /*
22: * Ensure that we sample the fifo->out index -before- we
23: * start putting bytes into the kfifo.
24: */
25: ?
26: smp_mb();
27: ?
28: /* first put the data starting from fifo->in to buffer end */
29: l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
30: memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
31: ?
32: /* then put the rest (if any) at the beginning of the buffer */
33: memcpy(fifo->buffer, buffer + l, len - l);
34: ?
35: /*
36: * Ensure that we add the bytes to the kfifo -before-
37: * we update the fifo->in index.
38: */
39: ?
40: smp_wmb();
41: ?
42: fifo->in += len;
43: ?
44: return len;
45: }
46: EXPORT_SYMBOL(__kfifo_put);
1: /**
2: * kfifo_put - puts some data into the FIFO
3: * @fifo: the fifo to be used.
4: * @buffer: the data to be added.
5: * @len: the length of the data to be added.
6: *
7: * This function copies at most @len bytes from the @buffer into
8: * the FIFO depending on the free space, and returns the number of
9: * bytes copied.
10: */
11: static inline unsigned int kfifo_put(struct kfifo *fifo,
12: const unsigned char *buffer, unsigned int len)
13: {
14: unsigned long
15: unsigned int ret;
16: ?
17: spin_lock_irqsave(fifo->lock, flags);
18: ?
19: ret = __kfifo_put(fifo, buffer, len);
20: ?
21: spin_unlock_irqrestore(fifo->lock, flags);
22: ?
23: return ret;
24: }
len = min(len, fifo->size - fifo->in + fifo->out);
????? 在 len 和 (fifo->size - fifo->in + fifo->out) 之間取一個較小的值賦給len。注意,當 (fifo->in == fifo->out+fifo->size) 時,表示緩沖區已滿,此時得到的較小值一定是0,后面實際寫入的字節數也全為0。
????? 另一種邊界情況是當 len 很大時(因為len是無符號的,負數對它來說也是一個很大的正數),這一句也能保證len取到一個較小的值,因為????fifo->in 總是大于等于 fifo->out ,所以后面的那個表達式 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); 的值不會超過fifo->size的大小。
????? smp_mb();? smp_wmb(); 是加內存屏障,這里不是我們討論的范圍,你可以忽略它。
?? ?? l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));??? 是把上一步決定的要寫入的字節數len “切開”,這里又使用了一個技巧。注意:實際分配給 fifo->buffer 的字節數 fifo->size,必須是2的冪,否則這里就會出錯。既然 fifo->size 是2的冪,那么 (fifo->size-1) 也就是一個后面幾位全為1的數,也就能保證(fifo->in & (fifo->size - 1)) 總為不超過 (fifo->size - 1) 的那一部分,和 (fifo->in)% (fifo->size - 1) 的效果一樣。
????? 這樣后面的代碼就不難理解了,它先向? fifo->in? 到緩沖區末端這一塊寫數據,如果還沒寫完,在從緩沖區頭開始寫入剩下的,從而實現了循環緩沖。最后,把寫指針后移 len 個字節,并返回len。
?????? 從上面可以看出,fifo->in的值可以從0變化到超過fifo->size的數值,fifo->out也如此,但它們的差不會超過fifo->size。
從kfifo向外讀數據的函數是:
static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len);
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len);
1: ?
2: /**
3: * __kfifo_get - gets some data from the FIFO, no locking version
4: * @fifo: the fifo to be used.
5: * @buffer: where the data must be copied.
6: * @len: the size of the destination buffer.
7: *
8: * This function copies at most @len bytes from the FIFO into the
9: * @buffer and returns the number of copied bytes.
10: *
11: * Note that with only one concurrent reader and one concurrent
12: * writer, you don't need extra locking to use these functions.
13: */
14: unsigned int __kfifo_get(struct kfifo *fifo,
15: unsigned char *buffer, unsigned int len)
16: {
17: unsigned int l;
18: ?
19: len = min(len, fifo->in - fifo->out);
20: ?
21: /*
22: * Ensure that we sample the fifo->in index -before- we
23: * start removing bytes from the kfifo.
24: */
25: ?
26: smp_rmb();
27: ?
28: /* first get the data from fifo->out until the end of the buffer */
29: l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
30: memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
31: ?
32: /* then get the rest (if any) from the beginning of the buffer */
33: memcpy(buffer + l, fifo->buffer, len - l);
34: ?
35: /*
36: * Ensure that we remove the bytes from the kfifo -before-
37: * we update the fifo->out index.
38: */
39: ?
40: smp_mb();
41: ?
42: fifo->out += len;
43: ?
44: return len;
45: }
46: EXPORT_SYMBOL(__kfifo_get);
1: ?
2: /**
3: * kfifo_get - gets some data from the FIFO
4: * @fifo: the fifo to be used.
5: * @buffer: where the data must be copied.
6: * @len: the size of the destination buffer.
7: *
8: * This function copies at most @len bytes from the FIFO into the
9: * @buffer and returns the number of copied bytes.
10: */
11: static inline unsigned int kfifo_get(struct kfifo *fifo,
12: unsigned char *buffer, unsigned int len)
13: {
14: unsigned long flags;
15: unsigned int ret;
16: ?
17: spin_lock_irqsave(fifo->lock, flags);
18: ?
19: ret = __kfifo_get(fifo, buffer, len);
20: ?
21: /*
22: * optimization: if the FIFO is empty, set the indices to 0
23: * so we don't wrap the next time
24: */
25: if (fifo->in == fifo->out)
26: fifo->in = fifo->out = 0;
27: ?
28: spin_unlock_irqrestore(fifo->lock, flags);
29: ?
30: return ret;
31: }
和上面的__kfifo_put類似,不難分析。
static inline unsigned int __kfifo_len(struct kfifo *fifo);
static inline unsigned int kfifo_len(struct kfifo *fifo);
1: ?
2: /**
3: * __kfifo_len - returns the number of bytes available in the FIFO, no locking version
4: * @fifo: the fifo to be used.
5: */
6: static inline unsigned int __kfifo_len(struct kfifo *fifo)
7: {
8: return fifo->in - fifo->out;
9: }
10: ?
11: /**
12: * kfifo_len - returns the number of bytes available in the FIFO
13: * @fifo: the fifo to be used.
14: */
15: static inline unsigned int kfifo_len(struct kfifo *fifo)
16: {
17: unsigned long flags;
18: unsigned int ret;
19: ?
20: spin_lock_irqsave(fifo->lock, flags);
21: ?
22: ret = __kfifo_len(fifo);
23: ?
24: spin_unlock_irqrestore(fifo->lock, flags);
25: ?
26: return ret;
27: }
這兩個函數返回緩沖區中實際的字節數,只要用fifo->in減去fifo->out即可。
kernel/kfifo.c中還提供了初始化kfifo,分配和釋放kfifo的接口:
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock);
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock);
void kfifo_free(struct kfifo *fifo);
再一次強調,調用kfifo_init必須保證size是2的冪,而kfifo_alloc不必,它內部會把size向上圓到2的冪。kfifo_alloc和kfifo_free搭配使用,因為這兩個函數會為fifo->buffer分配/釋放內存空間。而kfifo_init只會接受一個已分配好空間的fifo->buffer,不能和kfifo->free搭配,用kfifo_init分配的kfifo只能用kfree釋放。
循環緩沖區在驅動程序中使用較多,尤其是網絡適配器。但這種免鎖的方式在內核互斥中使用較少,取而代之的是另一種高級的互斥機制──RCU。