
正文
你可能像我一样,手搓过无数个环形队列,其实在Linux内核里有一个叫kfifo的无锁环形队列,不到100行核心代码,设计非常的巧妙,几乎能够解决我上面提到的所有问题,所以我被它的惊艳所折服,所以必须写一篇文章来剥一下kfifo的神秘面纱了,看看它是如何用几个简单的数学技巧和内存屏障,在单生产者单消费者场景下实现真正的无锁并发。
1
为什么需要kfifo
在嵌入式开发中,生产者-消费者模型无处不在:
不管是在RTOS还是嵌入式linux中,通常我们会定义一个环形缓冲区,加一个互斥锁保护读写操作。但这种方式有三个比较麻烦的问题:
1、首先是锁竞争的开销,每次读写都要加锁解锁,在高频数据场景下,锁的开销甚至超过了数据处理本身。
2、然后是死锁的风险,如果在中断上下文里尝试获取已经被持有的锁,直接导致系统崩溃。
3、最好是缓存伪共享,这个主要是在带cache缓存时,锁变量和队列变量放在同一个缓存行,导致频繁的缓存失效。
那么kfifo的出现几乎非常完美解决了这些问题。它在单生产者单消费者(SPSC)场景下,不需要任何锁,就能保证线程安全,性能比加锁队列高出一个数量级。
2
核心设计技巧
这里就要聊聊kfifo的设计哲学:通过限制使用场景来获得极致性能。这也是我为什么非常喜欢kfifo的原因,它也是嵌入式软件设计中一个非常重要的思想。
它只支持单生产者单消费者,缓冲区大小必须是2的幂次方。正是这两个看似苛刻的限制,换来了无与伦比的性能优势。
传统环形队列最让人头疼的就是索引回绕问题。我们通常会这样写:
in = (in + 1) % size;out = (out + 1) % size;但取模运算(%)在CPU上是一个非常昂贵的操作,需要几十个时钟周期,kfifo是怎么做的呢?
它把in和out索引定义为unsigned int类型,并且永远不让它们回绕!让它们一直单调递增,直到溢出。
structkfifo {unsignedchar *buffer;unsignedint size;unsignedint in; // 写入索引,一直递增unsignedint out; // 读取索引,一直递增};那索引溢出了怎么办?
利用C语言中无符号整数的溢出特性:当unsigned int超过最大值时,会自动从0开始重新计数。这是C语言标准明确规定的行为,不是未定义行为!这正是kfifo最精妙的地方!
更神奇的是,无论in和out怎么溢出,in - out永远等于队列中已有的数据长度!
// 已用空间:无论in和out怎么溢出,结果永远正确unsignedint used = fifo->in - fifo->out;// 空闲空间unsignedintfree = fifo->size - used;// 判断空满bool is_empty = (fifo->in == fifo->out);bool is_full = (used >= fifo->size);一个简单的减法,就解决了传统环形队列中判断空满的复杂逻辑,nice~
那实际访问缓冲区时怎么办呢?因为缓冲区大小是2的幂次方,我们可以用位运算来代替取模,前面bug菌也写过类似的文章,当缓冲区大小必须是2的幂次方时候,编译器都会帮我们优化,不过需要注意前提条件:
// 等价于 in % size,但速度快10倍以上unsignedint pos = fifo->in & (fifo->size - 1);位运算在CPU上是单周期指令,比取模运算快得多。这就是为什么kfifo要求缓冲区大小必须是2的幂次方的原因。
你应该bug菌开始的时候一样,会问:为什么两个线程同时修改in和out不会有竞态条件?主要的原因是生产者只修改in,消费者只修改out。
out,只写inin,只写out这就像两个人用同一张环形便签纸留言:
只要两个人不交叉操作同一个位置,就不会有任何冲突。

这两个区间永远不会重叠,因为in - out <= size。
很多人以为kfifo不需要任何同步机制,这是大错特错的。kfifo不需要锁,但它需要内存屏障。现代CPU为了提高性能,会对指令进行乱序执行;编译器为了优化,也会对代码进行重排。如果没有内存屏障,可能会出现这样的情况:
// 生产者代码memcpy(fifo->buffer + pos, data, len); // 1. 写数据fifo->in += len; // 2. 更新索引CPU可能会把这两条指令的顺序颠倒过来:先更新in索引,再写数据。
这会导致什么后果?消费者看到in索引更新了,以为有新数据可读,结果读到的还是旧数据——这就是所谓的"幽灵数据"。
那么kfifo是如何解决这个问题的呢?在写数据和更新索引之间,插入一个写内存屏障(smp_wmb())。
memcpy(fifo->buffer + pos, data, len);smp_wmb(); // 写内存屏障:保证前面的写操作全部完成fifo->in += len;同样,在消费者这边,也需要一个读内存屏障(smp_rmb()):
memcpy(data, fifo->buffer + pos, len);smp_rmb(); // 读内存屏障:保证前面的读操作全部完成fifo->out += len;内存屏障就像一堵墙,它不禁止所有乱序,只禁止墙两边的内存操作互相穿越。这是一种非常轻量级的同步机制,开销比锁小得多。
3
解读内核源码
让我们来看看Linux内核中kfifo的核心实现,说实在C真的优雅。我会去掉一些无关的宏定义,保留最核心的逻辑。
struct __kfifo {unsignedint in; // 写入索引unsignedint out; // 读取索引unsignedint mask; // size - 1,用于位运算unsignedint esize; // 每个元素的大小void *data; // 缓冲区指针};注意这里用了mask而不是size,因为mask = size - 1,这样每次计算位置时就不用再减1了,又是一个微小的性能优化。
staticinlineunsignedintkfifo_unused(struct __kfifo *fifo){return (fifo->mask + 1) - (fifo->in - fifo->out);}fifo->mask + 1就是缓冲区大小,fifo->in - fifo->out就是已用空间,相减就是空闲空间,够简洁吧。
unsignedint __kfifo_in(struct __kfifo *fifo, constvoid *buf, unsignedint len){unsignedint l;// 最多只能写入空闲空间大小的数据 len = min(len, kfifo_unused(fifo));// 第一步:计算从当前in位置到缓冲区末尾的长度 l = min(len, fifo->mask + 1 - (fifo->in & fifo->mask));// 先拷贝尾部数据memcpy(fifo->data + (fifo->in & fifo->mask), buf, l);// 如果还有剩余数据,从缓冲区头部开始拷贝memcpy(fifo->data, buf + l, len - l);// 写内存屏障:确保数据全部写入后再更新in索引 smp_wmb();// 更新写入索引 fifo->in += len;return len;}即使不需要回绕,len - l等于0,memcpy也什么都不做。这就是kfifo代码的优雅之处:用统一的逻辑处理所有情况,避免分支预测失败带来的性能损失,所以你几乎看到if else。
unsignedint __kfifo_out(struct __kfifo *fifo, void *buf, unsignedint len){unsignedint l;// 最多只能读取已用空间大小的数据 len = min(len, fifo->in - fifo->out);// 第一步:计算从当前out位置到缓冲区末尾的长度 l = min(len, fifo->mask + 1 - (fifo->out & fifo->mask));// 先拷贝尾部数据memcpy(buf, fifo->data + (fifo->out & fifo->mask), l);// 如果还有剩余数据,从缓冲区头部开始拷贝memcpy(buf + l, fifo->data, len - l);// 读内存屏障:确保数据全部读取后再更新out索引 smp_rmb();// 更新读取索引 fifo->out += len;return len;}和写入函数几乎对称,同样几乎看不到if语句。
下面是一个简化版的示例供参考:
#include<stdint.h>#include<string.h>// 内存屏障定义:根据你的平台修改#if defined(__ARM_ARCH)// ARM Cortex-M系列使用__DMB()数据内存屏障#define smp_wmb() __DMB()#define smp_rmb() __DMB()#elif defined(__GNUC__)// GCC编译器使用内置函数#define smp_wmb() __sync_synchronize()#define smp_rmb() __sync_synchronize()#else// 其他平台:至少需要一个编译器屏障#define smp_wmb() asm volatile("" ::: "memory")#define smp_rmb() asm volatile("" ::: "memory")#endif// kfifo数据结构typedefstruct {uint8_t *buffer; // 缓冲区指针uint32_t size; // 缓冲区大小(必须是2的幂次)uint32_t mask; // size - 1uint32_t in; // 写入索引uint32_t out; // 读取索引} kfifo_t;// 初始化kfifo// 注意:size必须是2的幂次!voidkfifo_init(kfifo_t *fifo, uint8_t *buffer, uint32_t size){ fifo->buffer = buffer; fifo->size = size; fifo->mask = size - 1; fifo->in = 0; fifo->out = 0;}// 写入数据uint32_tkfifo_put(kfifo_t *fifo, constuint8_t *data, uint32_t len){uint32_t l;// 计算可写入的最大长度 len = len < (fifo->size - (fifo->in - fifo->out)) ? len : (fifo->size - (fifo->in - fifo->out));// 计算从当前位置到缓冲区末尾的长度 l = len < (fifo->size - (fifo->in & fifo->mask)) ? len : (fifo->size - (fifo->in & fifo->mask));// 拷贝第一部分数据memcpy(fifo->buffer + (fifo->in & fifo->mask), data, l);// 拷贝第二部分数据(如果需要回绕)memcpy(fifo->buffer, data + l, len - l);// 写内存屏障:确保数据全部写入后再更新索引 smp_wmb();// 更新写入索引 fifo->in += len;return len;}// 读取数据uint32_tkfifo_get(kfifo_t *fifo, uint8_t *data, uint32_t len){uint32_t l;// 计算可读取的最大长度 len = len < (fifo->in - fifo->out) ? len : (fifo->in - fifo->out);// 计算从当前位置到缓冲区末尾的长度 l = len < (fifo->size - (fifo->out & fifo->mask)) ? len : (fifo->size - (fifo->out & fifo->mask));// 拷贝第一部分数据memcpy(data, fifo->buffer + (fifo->out & fifo->mask), l);// 拷贝第二部分数据(如果需要回绕)memcpy(data + l, fifo->buffer, len - l);// 读内存屏障:确保数据全部读取后再更新索引 smp_rmb();// 更新读取索引 fifo->out += len;return len;}// 使用示例intmain(void){// 定义一个大小为128字节的静态缓冲区static uint8_t buffer[128];static kfifo_t fifo;// 初始化 kfifo_init(&fifo, buffer, sizeof(buffer));// 生产者线程/中断中写入数据uint8_t tx_data[] = "Hello, kfifo!"; kfifo_put(&fifo, tx_data, sizeof(tx_data));// 消费者线程中读取数据uint8_t rx_data[128];uint32_t len = kfifo_get(&fifo, rx_data, sizeof(rx_data));return0;}最后总结下使用注意事项:
5、多生产者多消费者场景怎么办?可以给kfifo加一把自旋锁,变成"轻量级加锁队列";或者使用多个kfifo,每个生产者对应一个队列。
最后分享一段话:
最好的代码不是最复杂的代码,而是用最简单的方法解决最复杂的问题。
最后
好了,今天就跟大家分享这么多了,如果你觉得有所收获,一定记得点个赞~
bug菌唯一、永久、免费分享嵌入式技术知识平台~
推荐专辑点击蓝色字体即可跳转
☞ MCU进阶专辑

☞ 专辑|手撕C语言
☞ 专辑|经验分享
