
大家好,我是蟹老板~
在Linux内核的世界里,“并发”是常态,但“混乱”是禁忌。Linux内核作为一个抢占式的操作系统 ,多核CPU同时执行内核代码,中断突然打断正在运行的进程,内核抢占机制随时可能切换任务——这些场景下,多个执行流若同时访问同一份共享资源(比如内核变量、硬件寄存器),没有适当的同步机制的话,会导致数据竞争、死锁、优先级反转等严重问题。
举个通俗的例子:内核中有一个记录系统在线用户数的变量online_users,当两个CPU核心同时执行“用户登录”逻辑,都读取到当前值为100,随后各自加1并写回,最终结果会是101而非102——这就是典型的“竞争条件”。而内核同步机制,就是为了避免这种混乱,确保多个执行流“有序访问”共享资源,保障系统稳定性和数据一致性,这也是内核能够高效、可靠运行的核心基石。
很多人第一次接触并发,是用户态线程,通过 pthread_create() 创建多个线程,然后多个线程抢一个变量,于是大家开始学 mutex。可 Linux 内核里的并发,比用户态野蛮太多。并发的来源并非只有线程,还可能来自:
甚至还有:
“你以为代码不会并发,但其实会。”
举个经典场景。
static int counter;void update_counter(void){ counter++;}很多新人会认为这仅是简单的自增操作,但在 SMP(对称多处理)系统中,counter++ 并非原子操作!它会被拆分为三条指令:
load counteradd 1store counter两个 CPU 同时执行:
CPU0:
load 5add 1CPU1:
load 5add 1store 6CPU0:
store 6最终 counter 的值仍是 6,而非预期的 7。这种多个执行流同时访问共享资源并导致数据损坏的现象,就是竞争条件(Race Condition)。竞争条件具有不确定性,说白了,两个 CPU 都觉得自己没问题,最后一起把数据写坏了。
所谓临界区(Critical Section),是指“在同一时刻不能由多个执行流同时进入的代码区域”。例如,对 Linux 链表的操作通常属于临界区:
list_add(&node->list, &global_list);由于 Linux 链表基于裸指针实现,没有任何内置保护机制。若并发修改 next 指针,链表极易被篡改甚至形成无限循环。我曾遇到过驱动因未保护链表操作,导致内核陷入死循环,CPU 占用飙升到 100%,连 top 命令都无法打开,最终只能通过 SysRq 强制重启系统。
很多人刚开始学同步机制时,会陷入一种“锁崇拜”。
仿佛所有问题都要 spin_lock,其实不能这样。
Linux 内核里最轻量的同步方式,往往是原子操作。简单说就是“一句话干到底,谁也别想打断”——就像你用打印机,一次只能打一份文件,别人想打只能等你打完,不能中途插纸。
为什么原子操作能实现“不可中断”?
核心在于CPU会提供专门的原子指令(如x86的LOCK前缀指令),执行这些指令时,CPU会锁住系统总线(或缓存),禁止其他核心访问相关内存地址,直到原子操作完成。也就是说,原子操作的“原子性”是由硬件保证的,内核层面只是对这些硬件指令进行了封装。
原子操作主要用于“简单的数值操作”,比如计数器增减、标志位设置等,无法用于保护复杂的临界区(如多句代码、复杂数据结构访问)。
Linux内核提供很多封装好了API,不用自己造轮子(造也造不出来,CPU层面的东西,没这个实力 ^_^),基于Linux 5.10,常用的就这两类:
先划重点:别用int定义原子变量,必须用atomic_t,不然编译器优化会搞砸你的操作——过来人的血泪教训!
#include <linux/atomic.h>// 初始化原子变量为0,相当于给计数器归零atomic_t online_users = ATOMIC_INIT(0);常用API:
用于对变量的某一位进行原子操作,适用于标志位场景(如设备状态标志):
// 用户登录:计数器自增atomic_inc(&online_users);// 打印当前在线用户数printk("当前在线用户数:%d\n", atomic_read(&online_users));// 用户退出:计数器自减,若为0则打印提示if (atomic_dec_and_test(&online_users)) { printk("所有用户已退出\n");}自旋锁,顾名思义就是“自旋等待”是 Linux 内核最经典的同步机制。
当原子操作无法满足需求(比如临界区包含多句代码),就需要用到锁机制,自旋锁就是内核中最常用的“短临界区保护工具”。
自旋锁的核心工作原理:当一个执行流(进程、中断)试图获取自旋锁时,如果锁是空闲的,就直接获取锁并进入临界区;如果锁已经被其他执行流持有,当前执行流不会休眠,而是“自旋等待”(不断循环检测锁的状态),直到锁被释放。
我开始接触自旋锁的时候觉得很傻——拿不到锁你就去睡觉嘛,在那死循环转什么圈?后来才明白,这里面的逻辑是这样的:上下文切换的代价远比自旋等待要大。
所以自旋锁只适用于临界区执行时间极短(通常是几行代码),且不允许休眠(如中断上下文)的场景。什么叫极短?几十到几百个CPU周期,最好不要超过一微秒。如果临界区执行时间长,自旋等待会浪费大量CPU资源,此时应该用信号量。
对了,再次强调一下:持有自旋锁期间绝对不能睡眠。这是铁律。如果你持着自旋锁调了schedule()或者任何可能引起调度的函数,那别的CPU上等着拿这把锁的人就惨了——他们在自旋等待你释放锁,而你跑去睡觉了,他们死活等不到,最后就是死锁。而且自旋锁会禁用当前CPU的抢占,你本来就不可能被调度走。
自旋锁的核心结构体是spinlock_t,定义在<linux/spinlock.h>,不同CPU架构(x86、ARM)实现不一样,但原理都差不多——就是用CPU的原子指令,抢锁和放锁。
初始化别搞混了,静态和动态都行,看场景选:
// 静态初始化:直接定义,适合全局锁,一步到位spinlock_t my_lock = __SPIN_LOCK_UNLOCKED(my_lock);// 动态初始化:运行时初始化,适合局部锁,比如函数里的锁spinlock_t my_lock;spin_lock_init(&my_lock);以x86架构为例,说白了就是“抢锁用原子指令,放锁重置状态”,不用深究其底层,知道流程就行:
另外,还有“中断安全的自旋锁”(spinlock_irq、spinlock_bh),比如中断和进程共享资源时用,后面实战会细说,现在先记着有这么个东西。
#include <linux/spinlock.h>spinlock_t my_lock;int shared_data = 0; // 共享资源// 初始化自旋锁(通常在模块加载时)static int __init my_module_init(void) { spin_lock_init(&my_lock); return 0;}// 访问共享资源的函数void update_shared_data(int value) { spin_lock(&my_lock); // 加锁 // 临界区:操作共享资源 shared_data += value; printk("共享数据更新为:%d\n", shared_data); spin_unlock(&my_lock); // 解锁}典型写法:
spinlock_t lock;spin_lock_init(&lock);spin_lock(&lock);critical_section();spin_unlock(&lock);但真正容易出事的,是中断。
比如,进程上下文拿了锁,这时候中断来了。
中断处理函数也拿同一把锁。直接死锁。
因为中断会打断当前 CPU,当前 CPU 已经持锁,中断还在等锁。
等个锤子。
于是 Linux 提供:
spin_lock_irqsave()spin_unlock_irqrestore()进入临界区前关闭本地中断,这个场景我见过太多人踩过。
还有一个非常重要的规则:
spinlock 里不能睡眠,这个规则真的真的非常重要,重要到什么程度呢?
重要到很多老内核开发者看到别人“持有 spinlock 后 sleep”,血压会瞬间上来。
spinlock 里不能睡眠,绝对不能!!!!
因为自旋锁期间,调度可能被禁止。
你 sleep 以后,系统直接卡死。
所以:
spin_lock(&lock);msleep(100);spin_unlock(&lock);这种代码,属于“看见就想砸电脑”的级别。
个人使用自旋锁的几个小心得:
如果临界区执行时间较长,或者需要允许执行流休眠(如等待资源可用),自旋锁就不再合适,此时需要使用信号量——信号量是一种“睡眠等待”型同步机制,获取不到锁时,执行流会主动休眠,释放CPU资源,直到锁被释放后被唤醒。
信号量和自旋锁最大的区别是信号量能睡眠——spinlock 是忙等,semaphore 是阻塞。
信号量带个计数器,用于表示“可用资源的数量”。在内核里,我们更多把它当成“互斥信号量”用,其工作原理如下:
计数信号量适用于“多个执行流可以同时访问共享资源”的场景,比如系统中有3个可用的串口设备,计数器初始值设为3,最多允许3个进程同时使用串口。
常用API(定义在<linux/semaphore.h>):
// 静态初始化:sem_init(name, count)struct semaphore my_sem = __SEMAPHORE_INITIALIZER(my_sem, 3);// 动态初始化struct semaphore my_sem;sema_init(&my_sem, 3);// 获取信号量(可休眠,不可中断)down(&my_sem);// 获取信号量(可休眠,可被信号中断)down_interruptible(&my_sem);// 尝试获取信号量(不休眠,获取失败立即返回非0)down_trylock(&my_sem);// 释放信号量up(&my_sem);互斥信号量是计数信号量的一种特殊情况——计数器初始值为1,意味着“最多允许一个执行流访问共享资源”,本质是实现“独占访问”,与自旋锁的功能类似,但核心差异是“可休眠”。
Mutex与自旋锁的核心区别:
Mutex的常用API(定义在<linux/mutex.h>):
// 静态初始化struct mutex my_mutex = __MUTEX_INITIALIZER(my_mutex);// 动态初始化struct mutex my_mutex;mutex_init(&my_mutex);// 获取Mutex(可休眠,不可中断)mutex_lock(&my_mutex);// 获取Mutex(可休眠,可被信号中断)mutex_lock_interruptible(&my_mutex);// 尝试获取Mutex(不休眠,失败返回非0)mutex_trylock(&my_mutex);// 释放Mutexmutex_unlock(&my_mutex);示例:用Mutex保护一个需要长时间处理的共享资源(如文件读写):
struct mutex file_mutex;struct file *shared_file;static int __init my_init(void) { mutex_init(&file_mutex); shared_file = filp_open("/tmp/shared.txt", O_RDWR, 0644); return 0;}void write_shared_file(const char *data) { mutex_lock(&file_mutex); // 临界区:长时间文件写入(可休眠) kernel_write(shared_file, data, strlen(data), 0); msleep(100); // 模拟长时间操作 mutex_unlock(&file_mutex);}在实际场景中,很多共享资源的访问模式是“读多写少”——比如内核中的路由表、系统配置信息,大部分时间是被读取,很少被修改。如果用自旋锁或Mutex,会导致所有读操作排队等待,严重影响性能。
读写锁就是为“读多写少”场景设计的同步机制,其核心规则是:多个读操作可以同时进行(读共享),但写操作必须独占(写独占)——即读操作之间不互斥,读操作与写操作互斥,写操作之间互斥。
Linux内核提供两种读写锁:读写自旋锁(适用于短临界区、不可休眠)和读写信号量(适用于长临界区、可休眠)。
读写自旋锁是自旋锁的扩展,底层基于自旋锁实现,适用于“读多写少、临界区短、不可休眠”的场景(如中断上下文、内核态快速访问)。
核心特性:
常用API(定义在<linux/spinlock.h>):
// 静态初始化rwlock_t my_rwlock = __RW_LOCK_UNLOCKED(my_rwlock);// 动态初始化rwlock_t my_rwlock;rwlock_init(&my_rwlock);// 获取读锁read_lock(&my_rwlock);// 释放读锁read_unlock(&my_rwlock);// 获取写锁write_lock(&my_rwlock);// 释放写锁write_unlock(&my_rwlock);// 中断安全的读写锁(关闭本地中断)read_lock_irq(&my_rwlock);read_unlock_irq(&my_rwlock);write_lock_irq(&my_rwlock);write_unlock_irq(&my_rwlock);示例:用读写自旋锁保护路由表(读多写少):
rwlock_t route_table_lock;struct route_table *route_table; // 路由表(共享资源)// 读取路由表(读操作,可并发)struct route *get_route(ip_addr_t dest) {struct route *rt; read_lock(&route_table_lock); rt = find_route(route_table, dest); // 读临界区(短) read_unlock(&route_table_lock); return rt;}// 更新路由表(写操作,独占)int update_route(ip_addr_t dest, struct route *new_rt) { write_lock(&route_table_lock); replace_route(route_table, dest, new_rt); // 写临界区(短) write_unlock(&route_table_lock); return 0;}读写信号量是信号量的扩展,底层基于信号量实现,适用于“读多写少、临界区长、可休眠”的场景(如进程上下文的文件系统访问、配置修改)。
核心特性与读写自旋锁一致(读共享、写独占),区别在于:获取不到锁时,执行流会休眠(而非自旋),适合临界区较长的场景。
常用API(定义在<linux/rwsem.h>):
// 静态初始化struct rw_semaphore my_rwsem = __RWSEM_INITIALIZER(my_rwsem);// 动态初始化struct rw_semaphore my_rwsem;init_rwsem(&my_rwsem);// 获取读锁(可休眠)down_read(&my_rwsem);// 释放读锁up_read(&my_rwsem);// 获取写锁(可休眠)down_write(&my_rwsem);// 释放写锁up_write(&my_rwsem);// 尝试获取读锁(不休眠)down_read_trylock(&my_rwsem);// 尝试获取写锁(不休眠)down_write_trylock(&my_rwsem);读写信号量与读写自旋锁的选型原则:
RCU(Read-Copy-Update,读-复制-更新)是一种“无锁同步机制”,其核心优势是“读操作完全无锁、无阻塞”,适用于“读极多、写极少”的场景(如内核链表、路由表、文件系统inode管理),也是Linux内核中性能最优的同步机制之一。
第一次看到 RCU 的人, 十个有九个会懵, 因为它和传统锁思维完全不同。与读写锁相比,RCU的读操作不需要加锁、不需要等待,几乎没有开销;写操作虽然复杂,但频率极低,因此整体性能更优。
RCU的核心思想是“读操作不阻塞写操作,写操作不阻塞读操作”,通过“复制-更新-延迟释放”三个步骤实现数据一致性:
举个通俗的例子:图书馆有一本唯一的书(共享资源),读者(读操作)可以直接拿书阅读,不需要排队;如果管理员(写操作)要修改书的内容,不会直接在原书上改,而是先复制一本副本,在副本上修改,修改完成后,告诉新的读者去读副本,等所有正在读原书的读者都读完,再把原书扔掉(延迟释放)。
RCU的核心难点的是“判断宽限期是否结束”(即所有旧读操作是否完成),内核通过“RCU调度器”跟踪所有读端的状态,确保旧副本在合适的时机被释放。
RCU的API主要分为“读端API”“写端API”和“内存回收API”,以下是最常用的核心API(定义在<linux/rcupdate.h>):
// 进入RCU读临界区(无锁,轻量级)rcu_read_lock();// 访问共享资源(如RCU保护的链表)struct node *p = rcu_dereference(shared_list);// 退出RCU读临界区rcu_read_unlock();注意:rcu_dereference()用于安全访问RCU保护的指针,避免编译器优化导致的指针乱序问题。
// 1. 复制旧副本(假设shared_list是RCU保护的链表头)struct node *old_list = shared_list;struct node *new_list = kmalloc(sizeof(struct node), GFP_KERNEL);memcpy(new_list, old_list, sizeof(struct node)); // 复制旧数据// 2. 修改新副本new_list->data = new_data;// 3. 原子替换指针(将共享资源指针指向新副本)rcu_assign_pointer(shared_list, new_list);// 4. 延迟释放旧副本(等待宽限期结束后释放)synchronize_rcu(); // 等待宽限期,阻塞当前进程kfree(old_list); // 释放旧副本其中,synchronize_rcu()会阻塞写进程,直到所有读端都退出读临界区(宽限期结束);如果写操作不能阻塞,可以使用call_rcu()异步释放旧副本(通过回调函数)。
内核中常用RCU保护链表(struct list_head),结合rcu_list API实现高效的读写访问:
#include <linux/rcupdate.h>#include <linux/list.h>// RCU保护的链表头LIST_HEAD(rcu_list);DEFINE_SPINLOCK(list_lock); // 写操作需要自旋锁保护(避免多个写操作并发)// 读操作:遍历链表(无锁)void rcu_list_read(void) {struct list_head *pos;struct my_node *node; rcu_read_lock(); // 用rcu_list_for_each_entry遍历RCU链表 rcu_list_for_each_entry(node, &rcu_list, list) { printk("节点数据:%d\n", node->data); } rcu_read_unlock();}// 写操作:添加节点(复制-更新)int rcu_list_add(int data) {struct my_node *new_node = kmalloc(sizeof(struct my_node), GFP_KERNEL); new_node->data = data; INIT_LIST_HEAD(&new_node->list); spin_lock(&list_lock); // 写操作互斥 // 复制链表头,添加新节点(简化,实际需复制整个链表或相关节点) list_add_rcu(&new_node->list, &rcu_list); spin_unlock(&list_lock); return 0;}// 写操作:删除节点(延迟释放)void rcu_list_del(struct my_node *node) { spin_lock(&list_lock); list_del_rcu(&node->list); // 从链表中移除节点(不立即释放) spin_unlock(&list_lock); synchronize_rcu(); // 等待宽限期 kfree(node); // 释放节点}完成量(Completion)是一种“简单的事件同步机制”,用于实现“一个执行流等待另一个执行流完成某个特定事件”——比如进程A等待进程B完成初始化、中断处理程序等待DMA传输完成、内核线程等待设备就绪等。
完成量的核心逻辑:一个执行流(等待者)调用等待函数,进入休眠状态,等待事件完成;另一个执行流(完成者)完成事件后,调用完成函数,唤醒等待者。
与信号量相比,完成量更简单、更轻量级,是 Linux 里很容易被忽视的同步机制,但它特别好用, ”专门用于“一对一”或“一对多”的事件同步(信号量更适合资源计数)。
// 静态初始化struct completion my_completion = COMPLETION_INITIALIZER(my_completion);// 动态初始化struct completion my_completion;init_completion(&my_completion);// 等待事件完成(可休眠,不可中断)wait_for_completion(&my_completion);// 等待事件完成(可休眠,可被信号中断)wait_for_completion_interruptible(&my_completion);// 等待事件完成(带超时,超时返回-ETIMEDOUT)wait_for_completion_timeout(&my_completion, msecs_to_jiffies(1000));// 标记事件完成,唤醒一个等待者complete(&my_completion);// 标记事件完成,唤醒所有等待者complete_all(&my_completion);#include <linux/completion.h>#include <linux/kthread.h>struct completion dev_init_completion;struct device *my_dev;// 设备初始化线程(完成者)static int dev_init_thread(void *data) { // 模拟设备初始化(耗时操作) msleep(2000); my_dev = device_create(...); // 初始化设备 printk("设备初始化完成\n"); complete(&my_dev_init_completion); // 标记事件完成,唤醒等待者 return 0;}// 业务线程(等待者)static int business_thread(void *data) { printk("等待设备初始化...\n"); wait_for_completion(&dev_init_completion); // 休眠等待 // 设备初始化完成,开始业务逻辑 printk("设备已就绪,开始执行业务\n"); return 0;}static int __init my_init(void) { init_completion(&dev_init_completion); // 创建设备初始化线程和业务线程 kthread_run(dev_init_thread, NULL, "dev_init"); kthread_run(business_thread, NULL, "business"); return 0;}顺序锁(Seqlock)是一种特殊的同步机制,适用于“写操作频繁、读操作不允许阻塞、数据一致性要求不严格(或可通过版本号校验)”的场景——比如内核中的时间戳、计数器、传感器数据等。
Seqlock的核心原理是“版本号机制”,通过一个递增的版本号(偶数表示当前无写操作,奇数表示正在进行写操作),读操作通过校验版本号,判断读过程中是否发生了写操作,若发生则重新读取,确保数据一致性。
注意:Seqlock仅适用于“简单数据类型”(如int、long、double)或“可原子复制的数据结构”,不适用于复杂数据结构(如链表、结构体,因为读过程中数据可能被部分修改)。
// 静态初始化seqlock_t my_seqlock = __SEQLOCK_UNLOCKED(my_seqlock);// 动态初始化seqlock_t my_seqlock;seqlock_init(&my_seqlock);// 读操作(循环读取,直到获取有效数据)unsigned int seq;int shared_data;do { seq = read_seqbegin(&my_seqlock); // 读取开始版本号 shared_data = global_data; // 读取共享数据} while (read_seqretry(&my_seqlock, seq)); // 校验版本号,不一致则重试// 写操作(原子修改数据,更新版本号)write_seqlock(&my_seqlock);global_data += 1; // 修改共享数据write_sequnlock(&my_seqlock);// 中断安全的写操作(关闭本地中断)write_seqlock_irq(&my_seqlock);global_data += 1;write_sequnlock_irq(&my_seqlock);seqlock_t time_seqlock;struct timeval system_time; // 共享时间戳static int __init my_init(void) { seqlock_init(&time_seqlock); system_time.tv_sec = 0; system_time.tv_usec = 0; return 0;}// 写操作:更新时间戳(频繁执行)void update_time(void) { write_seqlock(&time_seqlock); system_time.tv_usec += 100; if (system_time.tv_usec >= 1000000) { system_time.tv_sec += 1; system_time.tv_usec = 0; } write_sequnlock(&time_seqlock);}// 读操作:获取时间戳(不阻塞,确保数据有效)struct timeval get_time(void) {struct timeval tv; unsigned int seq; do { seq = read_seqbegin(&time_seqlock); tv = system_time; } while (read_seqretry(&time_seqlock, seq)); return tv;}前面我们聊的都是基础同步机制,但Linux 是可抢占内核,这意味着: 当前任务运行时,调度器可能随时切走,所以有时候,仅仅加锁还不够,你还得防抢占。
内核抢占是并发的重要来源之一——即使在单核心CPU上,高优先级进程也能打断低优先级进程的内核态执行,导致临界区竞态。禁止抢占,就是通过关闭内核抢占机制,避免这种竞态,适用于“临时保护短临界区”的场景。
内核抢占的关闭与启用
Linux内核提供了专门的API用于关闭和启用抢占(定义在<linux/preempt.h>):
// 关闭内核抢占(保存当前抢占状态)preempt_disable();// 临界区:短时间操作,无需加锁shared_data += 1;// 启用内核抢占(恢复之前的抢占状态)preempt_enable();// 嵌套关闭/启用(需成对使用)preempt_disable();preempt_disable(); // 嵌套一次shared_data += 1;preempt_enable(); // 对应第二次关闭preempt_enable(); // 对应第一次关闭// 关闭抢占并禁止中断(更严格的保护)preempt_disable();local_irq_disable();// 临界区preempt_enable();local_irq_enable();禁止抢占的适用场景非常有限,主要用于:
注意:禁止抢占不能替代锁机制——如果有多核并发、中断访问,仅禁止抢占无法保护临界区,必须结合自旋锁等同步机制。
在多核CPU上,自旋锁本身会关闭本地CPU的抢占(避免持有锁的进程被抢占,导致其他CPU自旋等待),但在某些场景下,需要手动禁止抢占,进一步优化性能:
spinlock_t my_lock;void update_data(int value) { preempt_disable(); // 关闭抢占 spin_lock(&my_lock); // 加自旋锁 shared_data += value; // 临界区 spin_unlock(&my_lock); // 解锁 preempt_enable(); // 启用抢占}这样做的好处是减少了自旋锁的开销(避免自旋锁内部重复关闭/启用抢占),适用于高频访问的临界区。
内存屏障(Memory Barrier)的作用就是“禁止指令重排序”,确保内存操作的顺序性,保障多核心场景下的数据一致性。
这里开始进入 Linux 内核同步最阴间的区域了。也是很多程序员职业生涯第一次真正意识到: “CPU 不一定按你写的顺序执行代码,编译器也会重排” 那种世界观崩塌感。 真的很刺激。
这是很多程序员真正的噩梦区——内存屏障。
比如:
x = 1;y = 1;CPU 可能先写 y,再写 x。
为什么?
因为性能,CPU 为了榨干流水线,会疯狂优化。
但并发程序就容易出事。
Linux 提供了很多 barrier API:
smp_mb();smp_rmb();smp_wmb();例如:
data = 123;smp_wmb();flag = 1;reader:
while (!flag) ;smp_rmb();printf("%d\n", data);没有 barrier。
reader 可能看到 flag=1。
但 data 还是旧值。
是不是很离谱?
但 ARM、PowerPC 上真会发生。
所以很多“在 x86 正常”的代码。
换 ARM 后直接炸。
这也是为什么很多老内核工程师特别怕:
“只在 ARM 复现的问题。”
那种问题。
经常查到怀疑人生。
示例:用内存屏障解决多核心重排序问题:
int a = 0, b = 0;// 核心A执行void core_a(void) { a = 1; smp_wmb(); // 写屏障:确保a=1执行完成后,再执行读b int x = b;}// 核心B执行void core_b(void) { b = 1; smp_wmb(); // 写屏障:确保b=1执行完成后,再执行读a int y = a;}通过smp_wmb()写屏障,确保核心A的a=1先执行,核心B的b=1先执行,避免读-写重排,确保x和y不会同时为0。
锁的粒度(Lock Granularity)是指锁保护的临界区大小——粒度越粗,保护的范围越大,并发性能越差;粒度越细,保护的范围越小,并发性能越好,但锁的开销(加锁、解锁)会增加。锁的优化,本质是“在锁开销和并发性能之间找平衡”。
比如一个内核模块中有两个独立的共享资源(A和B),原本用一个锁保护,所有访问A或B的执行流都需要排队,并发性能较差。通过锁拆分,为A和B分别设置锁(lock_A和lock_B),访问A的执行流和访问B的执行流可以同时进行,无需相互等待,大幅提升并发效率。
后来拆锁也不能拆分的过细,锁的数量过多,会增加加锁、解锁的开销,还可能增加死锁的风险(比如多个执行流同时获取多个细粒度锁,顺序不当就会死锁)。
锁升级(Lock Escalation)与锁拆分相反,当多个细粒度锁被频繁同时获取时,会将这些细粒度锁升级为一个粗粒度锁,减少锁的开销。适用于“多个细粒度锁被频繁并发访问,导致锁开销超过并发收益”的场景。
比如内核中的哈希表,每个哈希桶都有一个细粒度锁,用于保护桶内数据。当某个时刻,多个执行流同时访问不同哈希桶,锁开销较小;但如果大部分执行流都集中访问少数几个哈希桶,频繁加解锁会产生大量开销,此时会将这几个哈希桶的锁升级为一个全局锁,减少锁操作次数。
有时候反而会“故意扩大锁范围”,为什么?因为频繁加锁解锁也有成本。
锁粗化(Lock Coarsening)是将多个连续的加锁、解锁操作,合并为一次加锁、解锁,减少锁的操作次数,降低锁开销。适用于“多个短临界区连续访问同一共享资源”的场景。
例如原本代码中,多次对同一共享资源进行短时间操作,每次操作都单独加锁、解锁:
// 优化前:多次加解锁,开销大spin_lock(&my_lock);shared_data += 1;spin_unlock(&my_lock);spin_lock(&my_lock);shared_data *= 2;spin_unlock(&my_lock);spin_lock(&my_lock);printk("%d", shared_data);spin_unlock(&my_lock);优化后,将三次操作合并为一个临界区,只进行一次加锁、解锁:
// 优化后:一次加解锁,减少开销spin_lock(&my_lock);shared_data += 1;shared_data *= 2;printk("%d", shared_data);spin_unlock(&my_lock);无锁编程(Lock-Free)是一种不依赖锁机制,通过硬件原子指令(如CAS、原子交换)实现共享资源同步的技术,其核心目标是“避免锁带来的开销(如自旋、休眠、上下文切换)”,提升高并发场景下的性能。
无锁编程的核心原理:基于“比较并交换(CAS,Compare And Swap)”原子指令,通过不断重试,实现对共享资源的原子修改,无需加锁阻塞其他执行流。
Linux内核中常用的无锁API(基于CAS):
// CAS操作:比较*ptr的值是否等于old,如果等于则替换为new,返回true;否则返回falsebool cmpxchg(void *ptr, unsigned long old, unsigned long new);// 示例:用CAS实现无锁计数器自增unsigned long counter = 0;void lock_free_inc(void) { unsigned long old, new; do { old = counter; // 读取当前值 new = old + 1; // 计算新值 } while (!cmpxchg(&counter, old, new)); // CAS重试,直到成功}现在很多人特别喜欢用 Lock-Free ,觉得牛逼。 但我说句实话,大部分业务根本不需要。
无锁编程最大的问题是: 难 debug、ABA 问题、 内存回收、 cache contention、memory ordering,一个比一个阴间。很多 Lock-Free 算法论文看着优雅,真正线上维护时,想骂街。 所以 Linux 内核里虽然有 lock-free。 但并没有“全面无锁化”。 因为工程世界要考虑维护成本,而复杂场景仍需依赖锁机制。
PREEMPT_RT 补丁出现后,Linux 同步机制很多行为变了。
Linux内核默认是“非实时内核”,其调度机制、同步机制主要优化吞吐量,而非实时性。为了满足实时需求,内核社区提供了“实时补丁(PREEMPT_RT)”,对内核同步机制进行了重大修改,核心优化点:
注意:启用PREEMPT_RT补丁后,内核的吞吐量会略有下降,因为可休眠锁的开销比自旋锁高,因此仅适用于实时性要求高于吞吐量的场景。
优先级反转(Priority Inversion):高优先级任务等待低优先级任务释放锁,而低优先级任务又被中优先级任务抢占,导致高优先级任务无法及时执行,严重影响实时性。
示例:任务A(高优先级)需要获取锁L,而锁L被任务B(低优先级)持有;此时任务C(中优先级)抢占任务B,导致任务B无法释放锁L,任务A只能一直等待,原本高优先级的任务A,响应时间反而比中优先级的任务C更差。
优先级继承(Priority Inheritance):解决优先级反转的核心机制,其原理是“当低优先级任务持有高优先级任务需要的锁时,将低优先级任务的优先级临时提升到高优先级任务的级别”,直到低优先级任务释放锁,再恢复其原始优先级。
Linux内核中,支持优先级继承的同步机制主要是“互斥锁(Mutex)”,通过设置Mutex的优先级继承属性实现:
#include <linux/mutex.h>// 初始化支持优先级继承的Mutexstruct mutex my_mutex;mutex_init(&my_mutex);// 设置优先级继承属性mutex_set_pi(&my_mutex);死锁避免(Deadlock Avoidance):除了优先级反转,死锁也是影响实时性和系统稳定性的重要问题。死锁的产生需要满足四个条件(资源互斥、持有并等待、不可剥夺、循环等待),内核中常用的死锁避免策略:
前面我们说了一堆锁,写代码时, “该用哪个?”
其实可以先问几个问题。
临界区长不长?
很短?→spinlock。
很长?→mutex/semaphore。
因为 spin 太久会烧 CPU。
当前上下文能不能睡眠?
中断上下文不能睡眠。
所以不能 mutex,只能 spinlock。
是不是读多写少?
读特别多?
考虑rwlock、rwsem、RCU(RCU 在极端读多场景性能非常强)、seqlock。
是否需要实时性?
RT 系统里。
锁持有时间要特别敏感。
否则 latency 会飙升。
很多时候需要:
简单概括就是:简单操作原子性,中断上下文自旋锁,进程长临界用Mutex,读多写少读写锁,读极多写极少用RCU,事件同步用完成量,写繁读不堵用Seqlock。
中断处理程序(硬中断、软中断)是内核并发的重要来源,其同步需求的核心是“不可休眠、快速执行”,因此同步机制的选择受到严格限制。
(1)软中断与硬中断的同步需求
硬中断优先级高于软中断,软中断运行在进程上下文(但不可休眠),硬中断与软中断之间、多个软中断之间,可能会访问同一共享资源,需要同步保护:
(2)自旋锁 vs 禁止抢占的选择
中断上下文的同步,优先选择自旋锁,禁止抢占仅适用于特殊场景:
示例:硬中断处理程序与进程上下文共享一个计数器,使用spinlock_irq保护:
spinlock_t irq_lock;atomic_t irq_counter;// 硬中断处理程序irqreturn_t my_irq_handler(int irq, void *dev_id) { spin_lock_irq(&irq_lock); // 加锁,关闭本地中断 atomic_inc(&irq_counter); // 临界区 spin_unlock_irq(&irq_lock); // 解锁,启用本地中断 return IRQ_HANDLED;}// 进程上下文函数void process_func(void) { spin_lock_irq(&irq_lock); printk("计数器值:%d", atomic_read(&irq_counter)); spin_unlock_irq(&irq_lock);}设备驱动是内核并发问题的重灾区——多个进程可能同时调用驱动接口,中断处理程序可能与进程上下文交互,DMA传输与中断、进程之间也需要同步,核心需求是“保护硬件资源、确保操作的原子性”。
(1)并发访问硬件资源的锁机制
硬件资源(如寄存器、IO端口)是独占资源,多个执行流同时访问会导致硬件异常,因此必须用同步机制保护,选择原则:
(2)DMA传输与中断的同步
DMA(直接内存访问)传输过程中,CPU与DMA控制器并行工作,DMA传输完成后会触发中断,通知CPU处理数据。此时需要同步的场景:
示例:DMA传输与中断的同步(用完成量等待传输完成):
struct completion dma_completion;struct dma_chan *dma_chan;char *dma_buf;// DMA传输完成中断处理程序irqreturn_t dma_irq_handler(int irq, void *dev_id) { complete(&dma_completion); // 标记DMA传输完成 return IRQ_HANDLED;}// 启动DMA传输int dma_start_transfer(void) { init_completion(&dma_completion); // 配置DMA传输(缓冲区、长度等) dmaengine_submit(dma_chan, ...); dma_async_issue_pending(dma_chan); // 启动DMA // 等待DMA传输完成(可休眠) wait_for_completion(&dma_completion); printk("DMA传输完成"); return 0;}内核中常用的数据结构(如链表、哈希表、队列),往往被多个执行流共享,需要根据访问模式选择同步机制:
(1)链表保护
(2)哈希表保护
哈希表的同步通常采用“桶级锁”(锁拆分的一种),为每个哈希桶设置一个锁,保护桶内的链表/红黑树,这样不同桶的访问可以并发进行,提升性能:
// 哈希表结构(每个桶一个自旋锁)struct hash_table { spinlock_t bucket_locks[16]; // 16个桶,每个桶一个锁struct list_head buckets[16]; // 每个桶的链表};// 初始化哈希表void hash_table_init(struct hash_table *ht) { int i; for (i = 0; i < 16; i++) { spin_lock_init(&ht->bucket_locks[i]); INIT_LIST_HEAD(&ht->buckets[i]); }}// 插入数据(桶级锁保护)void hash_table_insert(struct hash_table *ht, struct hash_node *node) { unsigned int bucket = hash_func(node->key) % 16; // 计算桶序号 spin_lock(&ht->bucket_locks[bucket]); // 仅锁定当前桶 list_add(&node->list, &ht->buckets[bucket]); spin_unlock(&ht->bucket_locks[bucket]);}文件系统的同步需求复杂,涉及元数据(inode、dentry)、数据缓冲区、磁盘IO等,不同部分的同步机制不同:
核心原则:文件系统的同步既要保证数据一致性,也要兼顾IO性能,因此多采用“读写分离、锁拆分”的策略。
同步问题最恶心的点在于:复现困难。尤其线上,可能几周才撞一次,所以调试工具非常重要。
1)死锁预防
死锁的产生必须满足四个条件,预防死锁的核心是“破坏其中一个或多个条件”:
2)死锁定位
Linux lockdep 真的是神器。
打开:
CONFIG_PROVE_LOCKING它会跟踪锁依赖,检测循环等待。
例如:
A -> BB -> A直接报警,以前没有 lockdep 时。
很多死锁只能靠 dump stack 猜。
现在已经幸福太多了。
3)锁粒度优化
工具:
perflockstatftracebpftrace特别 lockstat。可以直接看到锁竞争热点。
例如:
contention countwait timehold time很多性能问题,其实最后都是锁竞争,CPU 没干活,全在等锁。
4)调试技巧
这一段算是过来人的血泪经验,不少都是线上事故换来的。
我自己这些年有几个经验。
第一,别迷信日志
并发 bug 很多时候加日志会改变时序,然后 bug 消失,你会怀疑人生。
这叫 Heisenbug,特别经典。
第二,先怀疑锁顺序
死锁问题,90% 是锁顺序,一定先画依赖图,别上来就猜。
第三,ARM 问题优先怀疑 barrier
真的,x86 太强一致性了,很多 bug 被“掩盖”,ARM 才是真实世界。
第四,别轻易自己写 lock-free
除非你非常懂 memory model,否则后面维护的人会想把你挂路灯,包括未来的你自己。