在深度解析 Linux 内核 Ticket Spin Lock:公平性的核心设计与实现原理中,我们深入探讨了 ticket spin lock的设计,其通过引入“票号”机制来解决传统自旋锁的公平性问题,让锁的获取变得先到先得,消除了线程饥饿的问题。
但技术的演进是永无止境的,虽然ticket锁在公平性上取得了突破,但它在高并发、多核环境下仍暴露出一些问题:所有等待线程都在同一个共享的锁变量上自旋。这种集中式的轮询会在竞争激烈时引发严重的缓存行“乒乓效应”——每个核为了检查自己是否“中票”,都需要反复读取并无效化那个包含锁状态的缓存行,产生巨大的系统总线流量和延迟。
在高并发压力下,ticket自旋锁的扩展性会急剧下降。我们可以想象一个繁忙的办事大厅,虽然每个人都按号码等待很公平,但所有人都必须挤到同一个柜台前,紧盯着同一个号码显示器——这无疑会造成拥堵和效率的集体下降。
为了解决这个问题排队自旋锁(Queue Spinlock) 出现了。它不仅继承了ticket锁的公平性思想,更是进行了一项关键关键优化:将全局的集中等待,转变为分布式的、本地化的等待。每个等待线程不再“盯着大厅的中央屏幕”,而是只关注自己手中的“呼叫器”。这彻底改变了多核争用锁时的系统行为,将无序的共享内存争用转化为有序的、低开销的信号传递。
本文将深入剖析Linux内核中排队自旋锁(特别是其现代实现 qspinlock)的设计哲学、实现细节与性能优化。我们将看到内核开发者们如何通过精妙的数据结构、原子操作和内存排序约束,构建出一个在数十甚至上百个核心的激烈竞争下,仍能保持高性能和线性扩展性的同步原语。排队自旋锁不仅是自旋锁演进中的重要里程碑,更体现了在并发编程中将全局问题局部化的思想(本文将基于linux 5.10版本进行剖析)。
1.排队自旋锁的核心设计:MCS锁
我们要理解Linux内核的qspinlock,首先需要了解其产生的理论基础:MCS锁(以其发明者Mellor-Crummey和Scott命名)。我们会从其核心结构和获取/释放锁流程来分析,为读者展现其如何完成从“全局自旋”到“本地自旋”的转变。
1.1 核心数据结构
MCS锁为每个线程引入了一个等待节点结构,这个节点是等待队列的基本单元:
// 概念上的MCS节点结构struct mcs_lock_node {struct mcs_lock_node *next; // 指向队列中下一个等待者int locked; // 本地自旋标志:1表示需要等待,0表示轮到自己// ... 可能的其他字段,如CPU ID};
而锁本身就只需要维护一个队列尾部的指针:
// 概念上的MCS锁结构struct mcs_lock {struct mcs_lock_node *tail; // 指向队列最后一个节点};
通过这种设计,我们可以让每个等待线程的locked都位于自己的本地缓存行。线程只在此本地标志上自旋,从而将全局变量共享转换为线程被前驱节点通知时才发生的、一次性的缓存行无效化。
1.2 获取锁流程分析
我们初始假设锁初始空闲,也就是说tail = NULL。
1)线程A获取锁:首先观察到tail为NULL;然后使用原子比较交换操作,试图将tail从NULL设置为指向自己的节点;由于锁空闲,操作成功;此时线程A成功获得锁,无需等待。
2)线程B尝试获取(锁已被A持有):线程B初始化自己的节点:B.next = NULL, B.locked = 1;使用原子交换操作,将锁的tail指向自己的节点B,并获取之前tail的值(即指向A节点的指针);如果之前的tail不为NULL(说明有前驱),则将前驱节点(A节点)的next指针指向自己(A.next = &B),从而完成队列链接;最后,线程B在自己的B.locked标志上自旋,等待其变为0。
3)线程C加入等待:重复类似线程B的操作,将自己链接到队列尾部(B之后),并在C.locked上自旋。
1)线程A释放锁:检查自己的next指针(A.next),如果next为NULL,说明A可能是队列中唯一的节点(即没有等待者)。它尝试用原子操作将tail从指向A改回NULL。如果成功,锁释放完毕;如果next不为NULL(即后面有等待者B),则A直接将后继节点B.locked标志设置为0。
2)线程B被唤醒:线程B一直在自旋检查B.locked。当它发现B.locked被A设置为0时,便退出自旋,成功获得锁;此时,B节点成为新的“持有者节点”,而A节点可以离开或重用。
性能优势分析:
1)无全局缓存行乒乓:释放锁的线程A只写了一次后继B的locked变量(触发一次B所在缓存行的无效化)。等待者B只在本地变量上自旋读取,不产生全局流量。
2)公平性保证:严格的FIFO队列确保了绝对的公平。
3)可预测的延迟:每个线程的等待时间只与其在队列中的位置有关,与总等待线程数无关。
qspinlock并非直接使用标准的MCS锁,而是受MCS锁思想启发,并针对内核环境进行了优化的锁。它将锁状态压缩在一个32位的字(atomic_t)中,以追求极致的空间效率和缓存友好性。typedef struct qspinlock {union {atomic_t val;/** By using the whole 2nd least significant byte for the* pending bit, we can allow better optimization of the lock* acquisition for the pending bit holder.*/#ifdef __LITTLE_ENDIAN //小端序struct {u8 locked;u8 pending;};struct {u16 locked_pending;u16 tail;};#else//大端对称结构struct {u16 tail;u16 locked_pending;};struct {u8 reserved[2];u8 pending;u8 locked;};#endif};} arch_spinlock_t;
31 16 15 0┌──────────────────────────────┬──────────────────────────────┐│ Tail (16 bits) │ Locked_Pending (16 bits) │└──────────────────────────────┴──────────────────────────────┘MSB (最高有效位) LSB (最低有效位)更详细的字节视图 (__LITTLE_ENDIAN):┌─────────────┬─────────────┬─────────────┬─────────────┐│ Byte 3 │ Byte 2 │ Byte 1 │ Byte 0 ││ (bits 24-31)│ (bits 16-23)│ (bits 8-15) │ (bits 0-7) │├─────────────┼─────────────┼─────────────┼─────────────┤│ Tail高字节 │ Tail低字节 │ pending │ locked │└─────────────┴─────────────┴─────────────┴─────────────┘↑ ↑ ↑高16位 次低字节 最低字节
static __always_inline voidqueued_spin_lock(struct qspinlock *lock){u32 val = 0;//快速路径,直接获取锁if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))return;//慢速路径queued_spin_lock_slowpath(lock, val);}
/*** queued_spin_lock_slowpath - 获取排队自旋锁的慢速路径* @lock: 指向排队自旋锁结构的指针* @val: 当前排队自旋锁32位字的值** 状态表示:(队列尾部, pending位, 锁定值)** 快速路径 : 慢速路径 : 解锁* : :* 无竞争情况 (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)* : | ^--------.------. / :* : v \ \ | :* pending状态 : (0,1,1) +--> (0,1,0) \ | :* : | ^--' | | :* : v | | :* 无竞争队列 : (n,x,y) +--> (n,0,0) --' | :* (只有尾部) : | ^--' | :* : v | :* 竞争队列 : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' :* (有等待者) : ^--' :*/voidqueued_spin_lock_slowpath(struct qspinlock *lock, u32 val){struct mcs_spinlock *prev, *next, *node;u32 old, tail;int idx;// 确保CPU数量不超过tail字段的编码能力BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));// 如果启用了半虚拟化支持,跳转到半虚拟化队列处理if (pv_enabled())goto pv_queue;// 虚拟化环境下的自旋锁检查,防止过度自旋if (virt_spin_lock(lock))return;/** 等待进行中的 pending->locked 交接,使用有限次数的自旋,* 以确保前向进度。** 状态转换:0,1,0 -> 0,0,1** 如果当前状态是(0,1,0),表示有一个线程设置了pending位,* 但锁已被释放,该线程正在尝试获取锁。* 我们循环等待直到状态改变,最多_Q_PENDING_LOOPS次。*/if (val == _Q_PENDING_VAL) {int cnt = _Q_PENDING_LOOPS;// 原子条件读取:当VAL != _Q_PENDING_VAL或cnt耗尽时退出val = atomic_cond_read_relaxed(&lock->val,(VAL != _Q_PENDING_VAL) || !cnt--);}/** 如果观察到任何竞争(即有pending位或tail不为0),则进入队列。* _Q_LOCKED_MASK是锁定位的掩码,~_Q_LOCKED_MASK表示除了锁定位之外的其他位*/if (val & ~_Q_LOCKED_MASK)goto queue;/** trylock || pending 路径** 状态转换:0,0,* -> 0,1,* -> 0,0,1 (pending位优化路径)** 尝试设置pending位并获取当前值。这是一个原子获取-或操作,* 保证后续的内存操作不会被重排到它之前。*/val = queued_fetch_set_pending_acquire(lock);/** 如果设置pending位后观察到竞争(即除了locked位外还有其他位被设置),* 说明有并发加锁者。** 撤销pending位设置并进入队列;我们设置pending位可能导致* n,0,0 -> 0,0,0 转换失败(解锁操作),现在那个解锁操作正在* 等待@next变为非NULL。*/if (unlikely(val & ~_Q_LOCKED_MASK)) {/* 如果我们设置了pending位,现在需要撤销它 */if (!(val & _Q_PENDING_MASK))clear_pending(lock);goto queue;}/** 我们已经设置了pending位,等待锁持有者释放锁。** 状态转换:0,1,1 -> 0,1,0** 这个等待循环必须使用load-acquire语义,以匹配clear_pending_set_locked()* 中的store-release操作,从而创建锁的顺序性;* 这是因为并非所有的clear_pending_set_locked()实现都包含完整的内存屏障。*/if (val & _Q_LOCKED_MASK)atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));/** 获取锁所有权并清除pending位。** 状态转换:0,1,0 -> 0,0,1*/clear_pending_set_locked(lock);lockevent_inc(lock_pending); // 记录pending路径成功获取锁的事件return;/** pending位乐观自旋结束,开始MCS队列处理。*/queue:lockevent_inc(lock_slowpath); // 记录进入慢速路径的事件pv_queue:// 获取当前CPU的MCS节点,qnodes[0]是默认的节点数组node = this_cpu_ptr(&qnodes[0].mcs);// 增加节点计数并获取索引,每获取一次锁,索引递增idx = node->count++;// 将当前CPU ID和节点索引编码到tail字段tail = encode_tail(smp_processor_id(), idx);/** 基于不会发生嵌套NMI获取自旋锁的假设,分配了4个节点。* 在某些架构中,这可能不成立,但需要超过4个节点的情况仍然极不可能。* 当这种情况发生时,我们回退到直接在锁上自旋,而不使用任何MCS节点。* 这不是最优雅的解决方案,但足够简单。*/if (unlikely(idx >= MAX_NODES)) {lockevent_inc(lock_no_node); // 记录节点耗尽事件// 回退到传统自旋方式while (!queued_spin_trylock(lock))cpu_relax();goto release;}// 获取指定索引的MCS节点node = grab_mcs_node(node, idx);// 记录非零索引值的使用情况统计lockevent_cond_inc(lock_use_node2 + idx - 1, idx);/** 在初始化实际节点之前,确保增加头节点->count。* 如果编译器重新排序这些存储,那么IRQ可能会覆盖我们的赋值。*/barrier();// 初始化MCS节点node->locked = 0;node->next = NULL;pv_init_node(node); // 半虚拟化环境下的节点初始化/** 我们接触了一个(可能)冷的每CPU队列节点缓存行;* 再次尝试获取锁,希望在我们不注意的时候有人释放了锁。*/if (queued_spin_trylock(lock))goto release;/** 确保@node的初始化在我们通过xchg_tail()发布更新的tail之前完成,* 并且可能通过下面的WRITE_ONCE(prev->next, node)将@node链接到等待队列。*/smp_wmb();/** 发布更新的tail。* 我们已经接触了队列缓存行;不再处理pending相关的内容。** 状态转换:p,*,* -> n,*,* (p表示之前的tail,n表示新的tail)*/old = xchg_tail(lock, tail);next = NULL;/** 如果有前一个节点(old & _Q_TAIL_MASK为真);链接它并等待到达队列头部。*/if (old & _Q_TAIL_MASK) {prev = decode_tail(old); // 解码前一个节点的信息/* 将@node链接到等待队列中。 */WRITE_ONCE(prev->next, node);// 半虚拟化环境:等待节点pv_wait_node(node, prev);// 在节点的locked字段上自旋等待,直到前驱节点释放锁arch_mcs_spin_lock_contended(&node->locked);/** 在等待MCS锁时,下一个指针可能已被另一个锁等待者设置。* 我们乐观地加载下一个指针,并为写入预取缓存行,* 以减少即将到来的MCS解锁操作的延迟。*/next = READ_ONCE(node->next);if (next)prefetchw(next); // 预取以准备写入}/** 我们处于等待队列的头部,等待锁持有者和pending位消失。** 状态转换:*,x,y -> *,0,0** 这个等待循环必须使用load-acquire语义,以匹配清除locked位的store-release,* 并创建锁的顺序性;这是因为下面的set_locked()函数不包含完整的内存屏障。** 如果PV半虚拟化激活,pv_wait_head_or_lock函数将获取锁并返回非零值。* 所以我们必须跳过atomic_cond_read_acquire()调用。* 由于下一个PV队列头尚未指定,locked值不可能变为_Q_SLOW_VAL。* 因此set_locked()和atomic_cmpxchg_relaxed()调用都是安全的。** 如果PV未激活,将返回0。*/if ((val = pv_wait_head_or_lock(lock, node)))goto locked;// 等待lock->val的_LOCKED_PENDING_MASK位被清除(即锁被释放且没有pending)val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));locked:/** 声明锁的所有权:** n,0,0 -> 0,0,1 : 锁,无竞争(队列头是唯一等待者)* *,*,0 -> *,*,1 : 锁,有竞争** 如果队列头是队列中的唯一元素(锁值 == tail)且没有人pending,* 清除tail编码并获取锁。否则,我们只需要获取锁。*//** 在PV情况下,由于锁窃取,我们可能已经设置了_Q_LOCKED_VAL;* 因此我们也必须允许:** n,0,1 -> 0,0,1** 注意:此时(val & _Q_PENDING_MASK) == 0,因为上述等待条件,* 因此任何并发的PENDING设置都会使无竞争转换失败。*/if ((val & _Q_TAIL_MASK) == tail) {// 尝试原子比较交换:如果lock->val等于val,则设置为_Q_LOCKED_VALif (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))goto release; /* 无竞争 */}/** 要么有人在我们后面排队,要么设置了_Q_PENDING_VAL,* 然后它将检测剩余的tail并在我们后面排队,确保我们将看到@next。*/set_locked(lock);/** 竞争路径:如果尚未观察到下一个节点,则等待下一个节点,然后释放。*/if (!next)next = smp_cond_load_relaxed(&node->next, (VAL));// 解锁MCS锁,通知下一个等待者arch_mcs_spin_unlock_contended(&next->locked);pv_kick_node(lock, next); // PV环境下唤醒下一个节点release:/** 释放节点:减少当前CPU的节点计数*/__this_cpu_dec(qnodes[0].mcs.count);}
static __always_inline voidqueued_spin_unlock(struct qspinlock *lock){/** unlock() needs release semantics:*/smp_store_release(&lock->locked, 0);}
qspinlock 的核心突破在于,它通过本地化的等待队列,将高并发下的锁竞争性能损耗大大降低。这意味着无论竞争核心数量如何增长,性能开销都能被有效隔离在有限的竞争者之间,从而实现了性能提升。
这印证了高性能并发编程的一条黄金法则:化全局竞争为局部协作。从粗放的全局同步到精细的局部协调,qspinlock 的进化路径为构建下一代同步原语提供了关键范式。