Linux内核自旋锁spinlock(四)---queuedspinlock
- 游戏开发
- 2025-08-22 05:15:02

文章目录 前言一、queued spinlock1.1 简介1.2. spin_lock/spin_unlock 二、源码解析2.1 struct qspinlock2.2 struct qnode2.3 queued_spin_lock2.3.1 快速申请通道CPU0申请锁 2.3.2 慢速申请通道CPU0/1申请锁CPU0/1/2申请锁CPU0/1/2/3申请锁 queued_spin_lock_slowpath总结 2.4 queued_spin_unlock 三、总结参考资料 前言
(1)老版本(2.6.25之前)的Linux内核的自旋锁请参考:Linux 内核自旋锁spinlock(一) 其缺点:该自旋锁是无序竞争的,不保证先申请的进程先获得锁,不公平。也就是所有的thread都是在无序的争抢spin lock,谁先抢到谁先得,不管thread等了很久还是刚刚开始spin。在冲突比较少的情况下,不公平不会体现的特别明显,然而,随着硬件的发展,多核处理器的数目越来越多,多核之间的冲突越来越剧烈,无序竞争的spinlock带来性能问题。
(2)2.6.25以后的Linux内核的自旋锁 称为 ticket spinlock,基于 FIFO 算法的排队自选锁。请参考:Linux 内核自旋锁spinlock(二)— ticket spinlock ticket spinlock可以让CPU按照到达的先后顺序,去获取spinlock的所有权,形成了有序竞争。 其缺点:所有申请锁的处理器在同一个变量上自旋等待,缓存同步的开销大,不适合处理器很多的系统。所有等待同一个自旋锁的处理器在同一个变量上自旋等待,申请或者释放锁的时候会修改锁,导致其他处理器存放自旋锁的缓存行失效,在拥有几百甚至几千个处理器的大型系统中,处理器申请自旋锁时竞争可能很激烈,缓存同步的开销很大,导致系统性能大幅度下降。
(3)基于ticket spinlock存在cache-line bouncing的问题,内核开发者提出MCS锁机制,请参考:Linux 内核自旋锁spinlock(三)— MCS locks MCS锁机制让每个CPU不再是等待同一个spinlock变量,而是基于各自不同的per-CPU的变量进行等待,那么每个CPU平时只需要查询自己对应的这个变量所在的本地cache line,仅在这个变量发生变化的时候,才需要读取内存和刷新这条cache line。 MCS锁机制会导致spinlock结构体变大(相比于ticket spinlock多了一个指针,64位平台指针大小8个字节),而内核很大数据结构都内嵌了spinlock结构体,比如struct page,因此MCS锁机制没有合入内核主线。
(4)4.2.0以后的Linux内核的自旋锁 称为 queued spinlock,基于 MCS 算法的排队自选锁: queued spinlock也属于排队自选锁,但是没有增加spinlock结构体的大小,进程按照申请锁的顺序排队,先申请的进程先获得锁。MCS自旋锁的策略是为每个处理器创建一个变量副本,每个处理器在自己的本地变量上自旋等待,解决了性能问题。
本文主要介绍queued spinlock。
传统的自旋锁在高竞争场景下会导致严重的缓存行争用,因为所有等待锁的 CPU 都在不停地读取和修改同一个锁变量。排队自旋锁通过引入MCS锁(Mellor-Crummey and Scott Lock)来解决这个问题:
每个等待锁的 CPU 都有一个本地节点(MCS Node),避免多个 CPU 争用同一个缓存行。
锁的等待者按顺序排队,确保公平性。
一、queued spinlock 1.1 简介qspinlock是建立在MCS自旋锁之上的。其主要思想是让每个自旋者在自己的per-cpu变量上自旋,从而避免不同CPU之间的常量cacheline bouncing。当所有CPU在同一个锁变量上自旋等待时,会发生cacheline bouncing,导致它们反复读取这个变量。当一个CPU解锁时,这个变量被修改,使所有其他CPU的缓存行无效,它们必须重新读取这个变量。这会导致性能开销。MCS锁通过让每个CPU在自己的专用变量上自旋来缓解这个问题,从而避免对单个锁变量的争用。
(1)qspinlock 的设计背景 MCS 锁的局限性: MCS 锁通过队列化等待和本地自旋机制减少缓存行争用,但其额外的 struct mcs_spinlock 结构增加了内存开销。 许多内核数据结构(如 struct page)无法容忍锁大小的增加。
(2)qspinlock 的目标: 在保留 MCS 锁性能优势的同时,将其压缩到 32 位(4 字节),使其能够嵌入到现有数据结构中。
1.2. spin_lock/spin_unlock其使用,动态定义:
spinlock_t lock; //定义一个自旋锁变量 spin_lock_init(&lock) //初始化自旋锁静态定义:
DEFINE_SPINLOCK(lock)使用:
spin_lock(&lock); //加锁 //临界区 spin_unlock(&lock); //解锁 // include/linux/spinlock_types.h #define DEFINE_SPINLOCK(x) spinlock_t x = __SPIN_LOCK_UNLOCKED(x) // include/linux/spinlock.h # define spin_lock_init(_lock) \ do { \ spinlock_check(_lock); \ *(_lock) = __SPIN_LOCK_UNLOCKED(_lock); \ } while (0) // include/linux/spinlock.h #define raw_spin_lock(lock) _raw_spin_lock(lock) static __always_inline void spin_lock(spinlock_t *lock) { raw_spin_lock(&lock->rlock); } // kernel/locking/spinlock.c void __lockfunc _raw_spin_lock(raw_spinlock_t *lock) { __raw_spin_lock(lock); } EXPORT_SYMBOL(_raw_spin_lock); // /include/linux/spinlock_api_smp.h static inline void __raw_spin_lock(raw_spinlock_t *lock) { preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }reempt_disable(): 禁用抢占,这将增加抢占计数并阻止当前 CPU 上的任务被抢占。 spin_acquire(): 这个函数调用用于表示获取自旋锁。 LOCK_CONTENDED(): 这个宏用于处理自旋锁在被占用时的情况。根据情况,可能会调用 do_raw_spin_trylock 或者 do_raw_spin_lock 函数来尝试获取自旋锁或者直接获取自旋锁。
static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock) { __acquire(lock); arch_spin_lock(&lock->raw_lock); mmiowb_spin_lock(); } // /include/asm-generic/qspinlock.h #define arch_spin_lock(l) queued_spin_lock(l)流程图如下:
二、源码解析 2.1 struct qspinlock typedef struct qspinlock { union { atomic_t val; /* * By using the whole 2nd least significant byte for the * pending bit, we can allow better optimization of the lock * acquisition for the pending bit holder. */ #ifdef __LITTLE_ENDIAN struct { u8 locked; u8 pending; }; struct { u16 locked_pending; u16 tail; }; ...... }; } arch_spinlock_t;Qspinlock的数据结构一共4个byte,如下图所示:
自旋锁的32个二进制位被划分成4个字段: (1)locked字段,锁是否被持有(0 未持有,1 已持有),长度是一个字节,占用第0~7位。 (2) 一个pending位,占用第8~15位,第1个等待自旋锁的处理器设置pending位。是否有 CPU 正在等待锁(没有在mcs队列中,等待在locked字段上)。 (3) index字段,是数组索引,占用第16~17位,指示队列的尾部节点使用数组mcs_nodes的哪一项。编码四种上下文(task=0, softirq=1, hardirq=2, nmi=3)。 (4)cpu字段,存放队列的尾部节点的处理器编号,实际存储的值是处理器编号加上1,cpu字段减去1才是真实的处理器编号。
index字段和cpu字段合起来称为tail字段,Tail成员占2byte,包括tail index(16~17)和tail cpu(18~31)两个域。存放队列的尾部节点的信息,布局分两种情况: (1)如果处理器的数量小于2的14次方,那么第915位没有使用,第1617位是index字段,第18~31位是cpu字段。 (2) 如果处理器的数量大于或等于2的14次方,那么第910位是index字段,第1131位是cpu字段。
上图是系统CPU的个数小于16k的时候的布局,如果CPU数据太大,tail需要扩展,压缩pending域的空间。这时候pending域占一个bit,其他的7个bit用于tail。
把MCS自旋锁放进4个字节的关键是:存储处理器编号和数组索引,而不是存储尾部节点的地址。
内核对MCS自旋锁做了优化:第1个等待自旋锁的处理器直接在锁自身上面自旋等待,不是在自己的mcs_spinlock结构体上自旋等待。这个优化带来的好处是:当锁被释放的时候,不需要访问mcs_spinlock结构体的缓存行,相当于减少了一次缓存没命中。后续的处理器在自己的mcs_spinlock结构体上面自旋等待,直到它们移动到队列的首部为止。
自旋锁的pending位进一步扩展这个优化策略。第1个等待自旋锁的处理器简单地设置pending位,不需要使用自己的mcs_spinlock结构体。第2个处理器看到pending被设置,开始创建等待队列,在自己的mcs_spinlock结构体的locked字段上自旋等待。这种做法消除了两个等待者之间的缓存同步,而且第1个等待者没使用自己的mcs_spinlock结构体,减少了一次缓存行没命中。 pending 位优化:减少缓存行访问 第一个等待者仅设置 pending 位,无需操作 qnode,减少缓存行访问。 第二个等待者开始构建队列,自旋在本地 qnode 上。
2.2 struct qnode // v5.15/source/kernel/locking/qspinlock.c /* * The basic principle of a queue-based spinlock can best be understood * by studying a classic queue-based spinlock implementation called the * MCS lock. A copy of the original MCS lock paper ("Algorithms for Scalable * Synchronization on Shared-Memory Multiprocessors by Mellor-Crummey and * Scott") is available at * * bugzilla.kernel.org/show_bug.cgi?id=206115 * * This queued spinlock implementation is based on the MCS lock, however to * make it fit the 4 bytes we assume spinlock_t to be, and preserve its * existing API, we must modify it somehow. * * In particular; where the traditional MCS lock consists of a tail pointer * (8 bytes) and needs the next pointer (another 8 bytes) of its own node to * unlock the next pending (next->locked), we compress both these: {tail, * next->locked} into a single u32 value. * * Since a spinlock disables recursion of its own context and there is a limit * to the contexts that can nest; namely: task, softirq, hardirq, nmi. As there * are at most 4 nesting levels, it can be encoded by a 2-bit number. Now * we can encode the tail by combining the 2-bit nesting level with the cpu * number. With one byte for the lock value and 3 bytes for the tail, only a * 32-bit word is now needed. Even though we only need 1 bit for the lock, * we extend it to a full byte to achieve better performance for architectures * that support atomic byte write. * * We also change the first spinner to spin on the lock bit instead of its * node; whereby avoiding the need to carry a node from lock to unlock, and * preserving existing lock API. This also makes the unlock code simpler and * faster. * * N.B. The current implementation only supports architectures that allow * atomic operations on smaller 8-bit and 16-bit data types. * */ #define MAX_NODES 4 /* * On 64-bit architectures, the mcs_spinlock structure will be 16 bytes in * size and four of them will fit nicely in one 64-byte cacheline. For * pvqspinlock, however, we need more space for extra data. To accommodate * that, we insert two more long words to pad it up to 32 bytes. IOW, only * two of them can fit in a cacheline in this case. That is OK as it is rare * to have more than 2 levels of slowpath nesting in actual use. We don't * want to penalize pvqspinlocks to optimize for a rare case in native * qspinlocks. */ struct qnode { struct mcs_spinlock mcs; ...... #endif }; /* * Per-CPU queue node structures; we can never have more than 4 nested * contexts: task, softirq, hardirq, nmi. * * Exactly fits one 64-byte cacheline on a 64-bit architecture. * * PV doubles the storage and uses the second cacheline for PV state. */ static DEFINE_PER_CPU_ALIGNED(struct qnode, qnodes[MAX_NODES]);MCS锁的基本原理以及为什么它不能直接替换现有的票号自旋锁。MCS锁通过队列化等待减少缓存行争用,但每个锁需要额外的结构体,导致内存开销增加。这对于像struct page这样的大小敏感的结构体来说不可行。
(1)qspinlock通过将MCS锁的尾指针和状态信息压缩到32位来解决这个问题。将嵌套级别和CPU编号组合成tail字段,这需要详细解析如何将这些信息编码到有限的位数中。同时,pending位的优化策略也很关键,因为它允许第一个等待者避免使用额外的结构体,从而减少缓存访问。
MCS 锁的压缩: 将 MCS 锁的尾指针(8 字节)和 next->locked(8 字节)压缩到 32 位。 通过 tail 字段编码队列尾部 CPU 编号和上下文嵌套级别。
(2)qspinlock在不同上下文(任务、软中断、硬中断、NMI)中的处理方式,以及每个CPU的qnodes数组如何分配和管理。
上下文隔离: 每个 CPU 预分配一个 qnode 数组(qnodes[MAX_NODES]),每个元素对应一种上下文。 不同上下文的锁请求使用独立的 qnode,避免嵌套冲突。
// v5.15/source/kernel/locking/mcs_spinlock.h struct mcs_spinlock { struct mcs_spinlock *next; int locked; /* 1 if lock acquired */ int count; /* nesting count, see qspinlock.c */ }每个处理器需要4个队列节点,原因如下:
contexts: task, softirq, hardirq, nmi.(1) task:申请自旋锁的函数禁止内核抢占,所以进程在等待自旋锁的过程中不会被其他进程抢占。
(2) softirq:进程在等待自旋锁的过程中可能被软中断抢占,然后软中断等待另一个自旋锁。
(3) hardirq:软中断在等待自旋锁的过程中可能被硬中断抢占,然后硬中断等待另一个自旋锁。
(4) nmi:硬中断在等待自旋锁的过程中可能被不可屏蔽中断抢占,然后不可屏蔽中断等待另一个自旋锁。
综上所述,一个处理器最多同时等待4个自旋锁。
和入场券自旋锁相比,MCS自旋锁增加的内存开销是数组mcs_nodes。 如下图所示: 该图片来自于:http:// .wowotech.net/kernel_synchronization/queued_spinlock.html
在某个线程上下文,由于持A锁失败而进入自旋,我们需要把该CPU上的mcs锁节点挂入A spinlock的队列。在这个自旋过程中,该CPU有可能发生软中断,在软中断处理函数中,我们试图持B锁,如果失败,那么该cpu上的mcs锁节点需要挂入B spinlock的队列。在这样的场景中,我们必须区分线程上下文和软中断上下文的mcs node。这样复杂的嵌套最多有四层:线程上下文、软中断上下文、硬中断上下文和NMI上下文。因此我们每个CPU实际上定义了多个mcs node节点(目前是四个),用来解决自旋锁的嵌套问题。
2.3 queued_spin_lock /** * queued_spin_lock - acquire a queued spinlock * @lock: Pointer to queued spinlock structure */ static __always_inline void queued_spin_lock(struct qspinlock *lock) { int val = 0; //快速申请通道 if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL))) return; //慢速申请通道 queued_spin_lock_slowpath(lock, val); }快速申请通道: (1)CPU0持有锁 初始状态,没有CPU获取锁,CPU0获取锁,因为没有其它CPU竞争,所有CPU0很快获取到锁。
可以看到queued_spin_lock函数调用atomic_try_cmpxchg_acquire时用 likely 修饰,认为大部分spinlock都只有1个CPU试图获得锁。 因为我们设计spinlock时,在大多数情况下,通常都只有1个CPU试图获得锁。
慢速申请通道: (2)CPU1设置了pending字段,自旋等待CPU0释放锁 CPU0获取锁,CPU1尝试获取锁,调用queued_spin_lock_slowpath,进入慢速申请通道,发现没有设置pending字段,CPU1设置pending字段为1,自选等待,称为锁的第一顺位继承者。(如果CPU0释放锁,那么CPU1便获取到锁)
(3)CPU2加入了MCS等待队列,自旋等待所得locked字段和pending字段被清0。 CPU0获取锁,CPU1尝试获取锁,CPU2也加入尝试获取锁,调用queued_spin_lock_slowpath,进入慢速申请通道,发现pending字段为1,这时queued spinlock利用 MSC 锁机制来进行排队。首先获取当前CPU对应的 mcs_spinlock,通常会使用mcs_spinlock[0]节点。CPU2加入 MCS 等待队列。
(4)CPU3加入了MCS等待队列,在自己的MCS节点中自旋等待 locked域被置为1。 CPU0获取锁,CPU1尝试获取锁,CPU2也加入尝试获取锁,CPU3也加入尝试获取锁,调用queued_spin_lock_slowpath,进入慢速申请通道,CPU3发现已经有别的CPU(即CPU2)在MCS等待队列,CPU3把自己的节点加到MCS等待队列末尾,等待前继节点释放锁,通过设置设置 prev->next域中指针的指向来把当前节点加入到MCS等待队列,CPU3在自己的MCS节点自旋并等待 node->locked被设置为1。
CPU0持有锁,CPU1是锁的第一顺位继承者,CPU2和CPU3在MCS等待队列中等待。 对于CPU2和CPU3,以及如果有CPU4、5、6、7、8等待,请参考:Linux 内核自旋锁spinlock(三)— MCS locks
2.3.1 快速申请通道 CPU0申请锁CPU0调用queued_spin_lock获取锁,因为没有其它CPU竞争,这时锁的旧值是0,说明申请锁的时候锁处于空闲状态,那么CPU0成功地获得锁,spinlock的值被设置为_Q_LOCKED_VAL(值为1),即标记锁处于locked状态。此时,只有一个所有者,没有自旋者。如下图所示: locked_pending.locked = 1
2.3.2 慢速申请通道 CPU0/1申请锁假设在持有锁的所有者期间,CPU1试图获取锁。CPU1将不得不等待,我们称这个等待的CPU1为“pender”。pender将pending变量设置为1以指示其存在,然后自旋等待spinlock->locked变量变为0。图中的虚线箭头表示等待关系以及相应的代码。
值得注意的是,pending变量是专门针对pender的。如果pender消失,该变量将恢复为0。如下图所示: locked_pending.locked = 1,locked_pending.pending = 1。
CPU0获取锁,CPU1尝试获取锁,调用queued_spin_lock_slowpath,进入慢速申请通道,发现没有设置pending字段,CPU1设置pending字段为1,自选等待,称为锁的第一顺位继承者。(如果CPU0释放锁,那么CPU1便获取到锁)
CPU0/1/2申请锁在这种情况下,除了pender之外,另一个CPU(CPU2)尝试获取锁。这个新的CPU将自旋等待spinlock->{locked, pending}变量。我们称这个CPU为successor。
在这种情况下,spinlock->tail变量存储了successor的CPU ID。idx字段表示这个successor的上下文(即进程上下文、软中断、硬中断、NMI),tail变量“指向”successor CPU就足够了。从tail指向successor CPU的每个CPU的MCS节点结构。
在这种情况下,有两个自旋者:pender(CPU1)等待spinlock->locked变为0,而successor(CPU2)等待spinlock->{locked, pending}同时变为0。 CPU2发现了locked_pending.locked ,locked_pending.pending 都为1,那么就要使用额外的MCS节点。
CPU0获取锁,CPU1尝试获取锁,CPU2也加入尝试获取锁,调用queued_spin_lock_slowpath,进入慢速申请通道,发现pending字段为1,这时queued spinlock利用 MSC 锁机制来进行排队。首先获取当前CPU对应的 mcs_spinlock,通常会使用mcs_spinlock[0]节点。CPU2加入 MCS 等待队列。
CPU0/1/2/3申请锁在这种更复杂的情况下,另一个CPU(CPU3)尝试获取锁,而successor(CPU2)正在等待。这个CPU将等待自己的专用MCS节点结构,并被称为queuer(CPU3)。
具体而言,每个CPU都有自己专用的MCS节点结构,其中包括一个名为locked的成员,表示为mcs->locked。该变量最初的值为0,queuer会自旋等待,直到它的mcs->locked变量变为1。这样,它就不会重复读取已经被pender(CPU1)和successor(CPU2)读取的自旋锁变量。这降低了缓存争用和跳动。
这种情况的状态转换如下所示。pender和successor分别等待它们各自的变量,而queuer则等待其专用的MCS节点。请注意,现在spinlock->tail变量指向queuer,与术语“tail”对齐,该术语表示等待队列的末尾。
在这个阶段,基于它们等待的特定变量,这三个自旋线程被命名为pender(CPU1)、successor(CPU2)和queuer(CPU3)。这种命名约定用于区分它们在同步过程中的不同角色和行为。如下图所示:
queued_spin_lock_slowpath /** * queued_spin_lock_slowpath - acquire the queued spinlock * @lock: Pointer to queued spinlock structure * @val: Current value of the queued spinlock 32-bit word * * (queue tail, pending bit, lock value) * * fast : slow : unlock * : : * uncontended (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0) * : | ^--------.------. / : * : v \ \ | : * pending : (0,1,1) +--> (0,1,0) \ | : * : | ^--' | | : * : v | | : * uncontended : (n,x,y) +--> (n,0,0) --' | : * queue : | ^--' | : * : v | : * contended : (*,x,y) +--> (*,0,0) ---> (*,0,1) -' : * queue : ^--' : */ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) { struct mcs_spinlock *prev, *next, *node; u32 old, tail; int idx; BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); if (pv_enabled()) goto pv_queue; if (virt_spin_lock(lock)) return; /* * Wait for in-progress pending->locked hand-overs with a bounded * number of spins so that we guarantee forward progress. * * 0,1,0 -> 0,0,1 */ if (val == _Q_PENDING_VAL) { int cnt = _Q_PENDING_LOOPS; val = atomic_cond_read_relaxed(&lock->val, (VAL != _Q_PENDING_VAL) || !cnt--); } /* * If we observe any contention; queue. */ if (val & ~_Q_LOCKED_MASK) goto queue; /* * trylock || pending * * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock */ val = queued_fetch_set_pending_acquire(lock); /* * If we observe contention, there is a concurrent locker. * * Undo and queue; our setting of PENDING might have made the * n,0,0 -> 0,0,0 transition fail and it will now be waiting * on @next to become !NULL. */ if (unlikely(val & ~_Q_LOCKED_MASK)) { /* Undo PENDING if we set it. */ if (!(val & _Q_PENDING_MASK)) clear_pending(lock); goto queue; } /* * We're pending, wait for the owner to go away. * * 0,1,1 -> 0,1,0 * * this wait loop must be a load-acquire such that we match the * store-release that clears the locked bit and create lock * sequentiality; this is because not all * clear_pending_set_locked() implementations imply full * barriers. */ if (val & _Q_LOCKED_MASK) atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK)); /* * take ownership and clear the pending bit. * * 0,1,0 -> 0,0,1 */ clear_pending_set_locked(lock); lockevent_inc(lock_pending); return; /* * End of pending bit optimistic spinning and beginning of MCS * queuing. */ queue: lockevent_inc(lock_slowpath); pv_queue: node = this_cpu_ptr(&qnodes[0].mcs); idx = node->count++; tail = encode_tail(smp_processor_id(), idx); /* * 4 nodes are allocated based on the assumption that there will * not be nested NMIs taking spinlocks. That may not be true in * some architectures even though the chance of needing more than * 4 nodes will still be extremely unlikely. When that happens, * we fall back to spinning on the lock directly without using * any MCS node. This is not the most elegant solution, but is * simple enough. */ if (unlikely(idx >= MAX_NODES)) { lockevent_inc(lock_no_node); while (!queued_spin_trylock(lock)) cpu_relax(); goto release; } node = grab_mcs_node(node, idx); /* * Keep counts of non-zero index values: */ lockevent_cond_inc(lock_use_node2 + idx - 1, idx); /* * Ensure that we increment the head node->count before initialising * the actual node. If the compiler is kind enough to reorder these * stores, then an IRQ could overwrite our assignments. */ barrier(); node->locked = 0; node->next = NULL; pv_init_node(node); /* * We touched a (possibly) cold cacheline in the per-cpu queue node; * attempt the trylock once more in the hope someone let go while we * weren't watching. */ if (queued_spin_trylock(lock)) goto release; /* * Ensure that the initialisation of @node is complete before we * publish the updated tail via xchg_tail() and potentially link * @node into the waitqueue via WRITE_ONCE(prev->next, node) below. */ smp_wmb(); /* * Publish the updated tail. * We have already touched the queueing cacheline; don't bother with * pending stuff. * * p,*,* -> n,*,* */ old = xchg_tail(lock, tail); next = NULL; /* * if there was a previous node; link it and wait until reaching the * head of the waitqueue. */ if (old & _Q_TAIL_MASK) { prev = decode_tail(old); /* Link @node into the waitqueue. */ WRITE_ONCE(prev->next, node); pv_wait_node(node, prev); arch_mcs_spin_lock_contended(&node->locked); /* * While waiting for the MCS lock, the next pointer may have * been set by another lock waiter. We optimistically load * the next pointer & prefetch the cacheline for writing * to reduce latency in the upcoming MCS unlock operation. */ next = READ_ONCE(node->next); if (next) prefetchw(next); } /* * we're at the head of the waitqueue, wait for the owner & pending to * go away. * * *,x,y -> *,0,0 * * this wait loop must use a load-acquire such that we match the * store-release that clears the locked bit and create lock * sequentiality; this is because the set_locked() function below * does not imply a full barrier. * * The PV pv_wait_head_or_lock function, if active, will acquire * the lock and return a non-zero value. So we have to skip the * atomic_cond_read_acquire() call. As the next PV queue head hasn't * been designated yet, there is no way for the locked value to become * _Q_SLOW_VAL. So both the set_locked() and the * atomic_cmpxchg_relaxed() calls will be safe. * * If PV isn't active, 0 will be returned instead. * */ if ((val = pv_wait_head_or_lock(lock, node))) goto locked; val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK)); locked: /* * claim the lock: * * n,0,0 -> 0,0,1 : lock, uncontended * *,*,0 -> *,*,1 : lock, contended * * If the queue head is the only one in the queue (lock value == tail) * and nobody is pending, clear the tail code and grab the lock. * Otherwise, we only need to grab the lock. */ /* * In the PV case we might already have _Q_LOCKED_VAL set, because * of lock stealing; therefore we must also allow: * * n,0,1 -> 0,0,1 * * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the * above wait condition, therefore any concurrent setting of * PENDING will make the uncontended transition fail. */ if ((val & _Q_TAIL_MASK) == tail) { if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL)) goto release; /* No contention */ } /* * Either somebody is queued behind us or _Q_PENDING_VAL got set * which will then detect the remaining tail and queue behind us * ensuring we'll see a @next. */ set_locked(lock); /* * contended path; wait for next if not observed yet, release. */ if (!next) next = smp_cond_load_relaxed(&node->next, (VAL)); arch_mcs_spin_unlock_contended(&next->locked); pv_kick_node(lock, next); release: /* * release the node */ __this_cpu_dec(qnodes[0].mcs.count); } EXPORT_SYMBOL(queued_spin_lock_slowpath);(1)处理 pending 位
if (val == _Q_PENDING_VAL) { int cnt = _Q_PENDING_LOOPS; val = atomic_cond_read_relaxed(&lock->val, (VAL != _Q_PENDING_VAL) || !cnt--); }pending 位优化: 如果锁处于 pending 状态(val == _Q_PENDING_VAL),等待 pending 位被清除。 通过 atomic_cond_read_relaxed 循环检查锁状态,直到 pending 位被清除或超时。
(2)检查锁争用
if (val & ~_Q_LOCKED_MASK) goto queue;锁争用:如果锁已被占用或有其他等待者(val & ~_Q_LOCKED_MASK),跳转到队列化处理逻辑。
(3)设置 pending 位
val = queued_fetch_set_pending_acquire(lock);设置 pending 位:通过原子操作设置 pending 位,表示当前 CPU 正在等待锁。
(4)撤销 pending 位并加入队列
if (unlikely(val & ~_Q_LOCKED_MASK)) { if (!(val & _Q_PENDING_MASK)) clear_pending(lock); goto queue; }撤销 pending 位:如果发现锁已被占用或有其他等待者,撤销 pending 位并跳转到队列化处理逻辑。
(5)等待锁持有者释放锁
if (val & _Q_LOCKED_MASK) atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));自旋等待:如果锁已被持有,等待锁持有者释放锁(_Q_LOCKED_MASK 被清除)。
(6)获取锁并清除 pending 位
clear_pending_set_locked(lock);获取锁:清除 pending 位并设置 locked 位,表示当前 CPU 已获得锁。
(7)初始化本地节点
node = this_cpu_ptr(&qnodes[0].mcs); idx = node->count++; tail = encode_tail(smp_processor_id(), idx);获取本地节点:从预分配的 qnodes 数组中获取当前 CPU 的 mcs_spinlock 节点。 编码 tail 字段:将 CPU 编号和节点索引编码到 tail 字段中。
(8)检查节点索引
if (unlikely(idx >= MAX_NODES)) { lockevent_inc(lock_no_node); while (!queued_spin_trylock(lock)) cpu_relax(); goto release; }节点索引超限:如果节点索引超过 MAX_NODES(通常为 4),直接自旋尝试获取锁。
(9)初始化节点
node->locked = 0; node->next = NULL; pv_init_node(node);初始化节点:设置 locked 为 0,next 为 NULL,并初始化 PV 相关字段。
(10)尝试获取锁
if (queued_spin_trylock(lock)) goto release;快速尝试:在加入队列前再次尝试获取锁,避免不必要的队列操作。
(11)更新 tail 字段
old = xchg_tail(lock, tail);更新 tail 字段:通过原子操作将 tail 字段更新为当前 CPU 的编码值。
(12)链接到前驱节点
if (old & _Q_TAIL_MASK) { prev = decode_tail(old); WRITE_ONCE(prev->next, node); pv_wait_node(node, prev); arch_mcs_spin_lock_contended(&node->locked); }链接到队列:如果存在前驱节点,将当前节点链接到前驱节点的 next 字段。 自旋等待:在本地节点的 locked 字段上自旋,等待前驱节点释放锁。
(13)等待锁持有者释放锁
val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));自旋等待:等待锁持有者释放锁(_Q_LOCKED_PENDING_MASK 被清除)。
(14)获取锁
set_locked(lock);设置 locked 位:通过原子操作设置 locked 位,表示当前 CPU 已获得锁。
(15)唤醒后继节点
arch_mcs_spin_unlock_contended(&next->locked); pv_kick_node(lock, next);唤醒后继节点:设置后继节点的 locked 字段为 1,唤醒等待的 CPU。
(16)释放node
/* * release the node */ __this_cpu_dec(qnodes[0].mcs.count); 总结(1)Pending Bit(挂起位):在锁竞争不激烈时,避免立即进入复杂的排队逻辑。 设置锁的 pending 位,表示当前 CPU 正在尝试获取锁。
如果锁很快被释放,当前 CPU 可以直接获取锁,而无需进入排队。
如果检测到其他竞争者(如 pending 位已被设置),则撤销 pending 位并进入排队逻辑。
(2)排队逻辑(MCS 队列): MCS 节点: 每个 CPU 有一个本地 MCS 节点,用于在队列中等待锁。 节点包含一个 locked 字段(用于本地自旋)和一个 next 指针(用于链接下一个节点)。
队操作: 将当前 CPU 的 MCS 节点加入队列尾部,通过 xchg_tail 更新锁的 tail 字段。 如果队列非空,将当前节点链接到前一个节点的 next 指针。
等待锁: 当前 CPU 在自己的 MCS 节点上自旋,等待 locked 字段被前一个节点释放。 通过 arch_mcs_spin_lock_contended 实现高效的自旋等待。
(3)获取锁: 当当前 CPU 到达队列头部时,尝试获取锁: 等待锁的 locked 和 pending 位被清除。 使用原子操作(如 atomic_try_cmpxchg_relaxed)将锁状态更新为已获取。
如果队列中还有其他等待者,当前 CPU 需要唤醒下一个等待者(通过设置下一个节点的 locked 字段)。
2.4 queued_spin_unlock /** * queued_spin_unlock - release a queued spinlock * @lock : Pointer to queued spinlock structure */ static __always_inline void queued_spin_unlock(struct qspinlock *lock) { /* * unlock() needs release semantics: */ smp_store_release(&lock->locked, 0); } #endif它将 lock->locked 的值设置为 0(表示锁已释放),并确保在此操作之前的所有内存操作对其他 CPU 可见。释放锁时,将 locked 设置为 0,表示锁已释放,其他 CPU 可以尝试获取锁。
内存顺序: 在释放锁之前的所有写操作对其他 CPU 可见。 其他 CPU 在获取锁之后,能够看到释放锁之前的所有修改。
如下图所示: CPU0释放锁将 locked_pending.locked 设置为0,CPU1获取锁。
三、总结(1) 如果只有一个CPU获取queued spinlock,那么快速获取到锁。 如果只有2个CPU获取queued spinlock,使用locked_pending的pending字段即可。
如果只有1个或2个CPU试图获取锁,那么只需要一个4字节的qspinlock就可以了,其所占内存的大小和ticket spinlock一样。
(2) 如果有三个或三个以上的CPU获取queued spinlock,那么就要额外的 MCS节点了。第三个CPU会自旋等待被释放,即locked_pending的pending字段和locked 字段被清零,而第四个CPU和后面的CPU只能在MCS节点中自旋等待mcs_spinlock节点的locked字段被前继节点设置为1,得等到前继节点把locked控制器过继给自己才能有机会自旋等待自选锁的释放。
有3个以上的CPU试图获取锁,需要一个qspinlock加上(N-2)个MCS node。
对于设计合理的spinlock,在大多数情况下,锁的争抢都不应该太激烈,最大概率是只有1个CPU试图获得锁,其次是2个,并依次递减。 queued_spin_lock函数:
/** * queued_spin_lock - acquire a queued spinlock * @lock: Pointer to queued spinlock structure */ static __always_inline void queued_spin_lock(struct qspinlock *lock) { int val = 0; if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL))) return; queued_spin_lock_slowpath(lock, val); } #endif可以看到queued_spin_lock函数调用atomic_try_cmpxchg_acquire时用 likely 修饰,认为大部分spinlock都只有1个CPU试图获得锁。
参考资料lwn.net/Articles/590243/
http:// .wowotech.net/kernel_synchronization/queued_spinlock.html http:// .wowotech.net/kernel_synchronization/460.html zhuanlan.zhihu /p/100546935 systemsresearch.io/posts/f22352cfc/ .slideshare.net/slideshow/spinlockpdf/254977958
Linux内核自旋锁spinlock(四)---queuedspinlock由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Linux内核自旋锁spinlock(四)---queuedspinlock”
上一篇
round的意思