日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > linux >内容正文

linux

Linux同步原语系列-spinlock及其演进优化

發布時間:2023/12/20 linux 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Linux同步原语系列-spinlock及其演进优化 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 引言

通常我們的說的同步其實有兩個層面的意思:

  • 一個是線程間的同步,主要是為了按照編程者指定的特定順序執行;

  • 另外一個是數據的同步,主要是為了保存數據。

  • 為了高效解決同步問題,前人抽象出同步原語供開發者使用。不僅多核,單核也需要同步原語,核心的問題要保證共享資源訪問的正確性。如果共享資源劃分不合理,同步原語的開銷會制約多核性能。

    常見的同步原語有:互斥鎖、條件變量、信號量、讀寫鎖、RCU等。

    本文主要聚焦于互斥鎖,對應linux的spinlock,我們試圖沿著時間的脈絡去梳理spinlock的不斷改進的進程,如果涉及CPU體系結構,我們主要關注ARM體系結構的實現。如果后續有時間我們會繼續分析其他同步原語的演進和優化歷程。

    2. 演化進程

    linux內核spinlock是互斥機制的最底層基礎設施,它的性能直接關系到內核的性能,主要分為這么幾個階段:

  • linux-2.6.25之前,我們稱之為原始spinlock。

    對鎖的實現,是使用原子操作去無序競爭全局的鎖資源。

    這個階段對鎖的爭用處于無序競爭的狀態。如果CPU核心數不多,資源相對充裕,好比我們去銀行柜臺辦理業務,一共就1-2個人,無非你在前還是我在前的問題,公平性的問題并不突出,性能也沒什么大的影響,但是一旦cpu核心數和鎖的競爭者相對較多時,就會出現有些人因為某些優勢(如CPU算力強,鎖正好落在當前CPU的cacheline中等)總是能搶到鎖,其他人總是搶不到的情況出現。

  • linux-2.6.25,在x86下實現了ticket spinlock,替換原始spinlock。

    隨著CPU核數的增多以及對共享資源的爭用越發激烈,公平性問題就顯現出來了。保證公平一個很自然的思路就是大家都來排隊。如果對鎖的爭用比較激烈再加上如果此時核比較多,此時一旦釋放鎖,其實只有1個CPU能搶到鎖,但是因為大家觀察都是全局的一個鎖,那其他CPU的cacheline會因此失效,會有相當程度的性能損耗。還是就以去銀行柜臺辦理業務為例,它的實現相當于去銀行取號、排隊、等叫號這么一個過程,問題在于叫號相當是一個廣播機制,所有人都要偵聽,還是有點浪費時間精力。

  • linux-3.5,ARM體系結構用ticket spinlock替換了原始spinlock。

    過了幾個版本,ARM才替換,原因作者沒有去考證,不得而知。

  • linux-3.15,實現了mcs_spinlock,但未替換ticket spinlock。

    它把對全局鎖轉換成per-cpu的鎖,減少爭用的損耗,然后各個鎖之間通過鏈表串起來排隊,解決了公平性和性能損耗的問題。然后它卻沒有替代ticket spinlock成為內核默認實現,因為spinlock太底層了,已經嵌入了內核的各種關鍵數據結構中,它的數據結構要比spinlock大,這是內核鎖不能接受的,但是最終它還是合入了內核,只是沒有人去用它。但是它的存在為后一步的優化,仍然起到了非常重要的作用。它的實現思路是把ticket spinlock的廣播機制轉變為擊鼓傳花,也就是實際上可以我并需要偵聽廣播,主要在我前面排隊的人在使用完鎖以后告訴我可以了。

  • linux-4.2,實現了queued spinlock(簡稱qspinlock),替換了ticket spinlock。

    它首先肯定要解決mcs_spinlock占用大小,實際上它結合了ticket spinlock和mcs_spinlock的優點,大小保持一致,如果只有1個或2個CPU試圖獲取鎖,只用這個新的spinlock,當有3個以上的CPU試圖獲取鎖,才需要mcs_spinlock。它的數據結構有表示當前鎖的持有情況、是否有還有一個競爭者,已經需要快速找到對應CPU的per-cpu結構的mcs_spinlock節點,這3個大的域被塞在ticket spinlock同樣大小數據結構中。這種遵守原先架構約束的情況而做出的改進,非常值得我們學習。

  • linux-4.19,ARM體系結構將queued spinlock替換成默認實現。

    原因是什么,作者同樣沒去考證。

  • 3. 原始spinlock實現

    3.1 關鍵數據結構和公共代碼

    typedef struct {volatile unsigned int lock; } raw_spinlock_t;typedef struct {raw_spinlock_t raw_lock; } spinlock_t;#define spin_lock(lock) _spin_lock(lock)void __lockfunc _spin_lock(spinlock_t *lock) {preempt_disable();_raw_spin_lock(lock); } # define _raw_spin_lock(lock) __raw_spin_lock(&(lock)->raw_lock)

    3.2 ARM體系結構的加鎖實現

    //arm32(那時候還沒arm64)的實現,這個時期的內核大體對應ARMV6 static inline void __raw_spin_lock(raw_spinlock_t *lock) {unsigned long tmp;__asm__ __volatile__( "1: ldrex %0, [%1]\n" //1. " teq %0, #0\n" //2. " strexeq %0, %2, [%1]\n" //3. " teqeq %0, #0\n" //4. #ifdef CONFIG_CPU_32v6K " wfene\n" //5. #endif " bne 1b" //6.: "=&r" (tmp): "r" (&lock->lock), "r" (1) : "cc");smp_mb(); //7. }

    通過數據結構,可以看出,此時的lock還是一個unsigned int類型的數據,加鎖的時候,首先會關閉搶占,然后會轉到各個體系結構的實現,我們關注ARM的實現,__raw_spin_lock的分析如下:

  • 讀取lock的狀態值給tmp,并將&lock->lock標記為獨占。

  • 判斷lock的狀態是否為0。如果是0說明可以繼續往下走(跳到第3步);如果不為0,說明自旋鎖處于上鎖狀態,不能訪問,跳到第5步(如果不支持WFE則直接跳到第6步)自旋,最后回到第1步自旋。teq執行會影響標志寄存器中Z標志位,后面帶eq或者ne后綴的執行都受該標志位影響。

  • 執行strex執行,只有從上一次ldrex執行到本次strex這個被標記為獨占的地址(&lock->lock)沒有改變,才會執行成功(lock的狀態改寫為1)。通過strex執和ldrex實現原子性操作。

  • 繼續判斷lock的狀態是否為0,為0說明獲得鎖;不為0說明沒有獲得鎖,跳到第5步(如果支持WFE的話)。

  • 執行WFE指令(如果支持的話),CPU進低功耗狀態,省點功耗。

  • 如果收到SEV指令(如果有第五步的話),繼續判斷lock的狀態是否為0,不為0跳到第1步,繼續循環;如果lock為0,繼續跳到第7步

  • 執行barrier(多核情況下為)DMB指令,保證訪存順序按我們的編程順序執行(即后面的load/store絕不允許越過smp_mb()屏障亂序到前面執行)。

  • 3.3 ARM體系結構的解鎖實現

    static inline void __raw_spin_unlock(raw_spinlock_t *lock) {smp_mb();__asm__ __volatile__( " str %1, [%0]\n" #ifdef CONFIG_CPU_32v6K " mcr p15, 0, %1, c7, c10, 4\n" /* DSB */ " sev" #endif:: "r" (&lock->lock), "r" (0): "cc"); }

    解鎖的操作相對簡單,str將lock->lock賦值,然后使用DSB保序,使用sev通知持鎖cpu得到鎖。

    4. ticket spinlock

    4.1 關鍵數據結構和公共代碼

    typedef struct {union {u32 slock;struct __raw_tickets { //只考慮小端u16 owner;u16 next;} tickets;}; } arch_spinlock_t;typedef struct raw_spinlock {arch_spinlock_t raw_lock; } raw_spinlock_t;typedef struct spinlock {union {struct raw_spinlock rlock;}; } spinlock_t;static inline void spin_lock(spinlock_t *lock) {raw_spin_lock(&lock->rlock); } #define raw_spin_lock(lock) _raw_spin_lock(lock) void __lockfunc _raw_spin_lock(raw_spinlock_t *lock) {__raw_spin_lock(lock); } static inline void __raw_spin_lock(raw_spinlock_t *lock) {preempt_disable();do_raw_spin_lock(); } static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock) {arch_spin_lock(&lock->raw_lock); }

    4.2 AArch32的加鎖實現

    static inline void arch_spin_lock(arch_spinlock_t *lock) {unsigned long tmp;u32 newval;arch_spinlock_t lockval;__asm__ __volatile__( "1: ldrex %0, [%3]\n" //1. " add %1, %0, %4\n" //2. " strex %2, %1, [%3]\n" //3. " teq %2, #0\n" //4. " bne 1b" //5.: "=&r" (lockval), "=&r" (newval), "=&r" (tmp): "r" (&lock->slock), "I" (1 << TICKET_SHIFT): "cc");while (lockval.tickets.next != lockval.tickets.owner) { //6.wfe(); //7.lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);//8.}smp_mb(); //9. }
  • 把lock->slock值保存到lock_val

  • newval = lockval + (1 << TICKET_SHIFT)= lockval + (1 << 16),等價于newval =lockval.tickets.next++,相當于從銀行取號機取號。

  • strex tmp, newval, [&lock->slock],將新的值newval 存在lock中,strex將是否成功結果存入在tmp中

  • 檢查是否寫入成功lockval.tickets.next

  • 成功則跳到第6步,否則返回第1步重試,同上文類似也是實現原子操作

  • lockval.tickets.next和owner成員是否相等,相等跳到第9步,成功獲得鎖;沒有則跳到第7步。成功的話,相當于銀行柜臺已經叫我的號了,我就可以去辦理業務了,沒有的話,我還要繼續等。

  • 執行WFE指令,CPU進低功耗狀態,省點功耗。

  • 如果收到SEV指令,從低功耗狀態恢復,重新獲得新的owner值,因為一般是別人釋放了鎖才會發送SEV指令,這時owner的值已經發生了變化,需要重新從內存中獲取(ACCESS_ONCE本身的實現就是增加了volatile這個關鍵字,它確保編譯器每次訪問的變量都是從內存中獲取,防止編譯器優化)。

  • 執行barrier,同上文描述,不再贅述。

  • 4.3 AArch32的解鎖實現

    static inline void arch_spin_unlock(arch_spinlock_t *lock) {unsigned long tmp;u32 slock;smp_mb();__asm__ __volatile__( " mov %1, #1\n" //1. "1: ldrex %0, [%2]\n" //2. " uadd16 %0, %0, %1\n" //3. " strex %1, %0, [%2]\n" //4. " teq %1, #0\n" //5. " bne 1b" //6.: "=&r" (slock), "=&r" (tmp): "r" (&lock->slock): "cc");dsb_sev(); //7. }
  • 將tmp賦值為1

  • 將lock->slock的值賦值給slock。

  • 將slock的低16bit,也就是owner成員的值加1。

  • 將新的值新的ower,使用strex寫入中lock->slock,將是否成功結果存入在tmp中

  • 檢查是否寫入成功,成功,跳到第7步,實現了原子操作;不成功跳到第6步;

  • tmp不等于0(不成功),繼續返回label 1重試,即跳回第2步

  • 4.4 AArch64的加鎖實現

    static inline void arch_spin_lock(arch_spinlock_t *lock) {unsigned int tmp;arch_spinlock_t lockval, newval;asm volatile( " prfm pstl1strm, %3\n" //1. "1: ldaxr %w0, %3\n" //2. " add %w1, %w0, %w5\n" //3. " stxr %w2, %w1, %3\n" //4. " cbnz %w2, 1b\n" //5./* Did we get the lock? */ " eor %w1, %w0, %w0, ror #16\n" //6. " cbz %w1, 3f\n" //7. " sevl\n" //8. "2: wfe\n" //9. " ldaxrh %w2, %4\n" //10. " eor %w1, %w2, %w0, lsr #16\n" //11. " cbnz %w1, 2b\n" //12. "3:" //13.: "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock): "Q" (lock->owner), "I" (1 << TICKET_SHIFT): "memory"); }

    核心邏輯與AArch32類似,匯編實現會有不一樣,這里不再展開。

  • 從lock(memory)預取數據到L1cache中,加速執行。

  • 使用ldaxr指令(Load-acquire exclusive register,帶Exclusive和Acquire-Release 兩種語義),將lock的值賦值給lockval。

  • newval = lockval + (1 << TICKET_SHIFT)= lockval + (1 << 16),等價于newval =lockval.tickets.next++,相當于從銀行取號機取號。

  • 使用stxr指令,將newval賦值給lock,并將exclusive是否設置成功結果存放到tmp中。

  • 如果tmp != 0,說明exclusive失敗,需要重新跳到開始處(第2步)重試,因為這時候其他CPU核心可能有是執行流插入,搶在我們前面執行。否則繼續。

  • 用eor 異或運算實現lockval.tickets.next和owner成員是否相等的判斷

  • 如果相等,跳到label3(對應13步),獲得鎖進入臨界區。否則往下,執行自旋。

  • 使用SEVL指令(發送本地事件,Send Event Locally),喚醒本CPU核心,防止有丟失unlock的情況出現。

  • 執行WFE指令,CPU進低功耗狀態,省點功耗。

  • 獲取當前的Owner值存放在tmp中

  • 判斷lockval.tickets.next和owner成員的值是否相等

  • 如果不相等就回跳到label2(對應第9步)。相等繼續往下。

  • 結束退出。

  • 4.5 AArch64的解鎖實現

    static inline void arch_spin_unlock(arch_spinlock_t *lock) {asm volatile( " stlrh %w1, %0\n": "=Q" (lock->owner): "r" (lock->owner + 1): "memory"); }

    解鎖的操作相對簡單,stlrh除了將lock->owner++(相當于銀行柜臺叫下一個排隊者的號碼),將會兼有SEV和DSB的功能。

    5. mcs spinlock

    5.1 關鍵數據結構和變量

    struct mcs_spinlock {
    ????struct mcs_spinlock *next; //1.
    ????int nt locked; //2.
    };

  • 當一個CPU試圖獲取一個spinlock時,它就會將自己的MCS lock加到這個spinlock的等待隊列的隊尾,然后next指向這個新的MCS lock。

  • locked的值為1表示已經獲得spinlock,為0則表示還沒有持有該鎖。

  • 5.2 加鎖的實現

    static inline void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node) {struct mcs_spinlock *prev;node->locked = 0; //1.node->next = NULL;prev = xchg(lock, node); //2if (likely(prev == NULL)) { //3return;}ACCESS_ONCE(prev->next) = node; //4arch_mcs_spin_lock_contended(&node->locked); //5. }

    先看下兩個參數:

    • 第1個參數lock:是指向指針的指針(二級指針),是因為它指向的是末尾節點里的next域,而next本身是一個指向struct mcs_spinlock的指針。

    • 第2個參數node:試圖加鎖的CPU對應的MCS lock節點。

    接下來看代碼邏輯

  • 初始化node節點

  • 找隊列末尾的那個mcs lock。xchg完成兩件事,一是給一個指針賦值,二是獲取了這個指針在賦值前的值,相當于下面兩句:

    prev = *lock; //隊尾的lock *lock = node; //將lock指向新的node
  • 如果隊列為空,CPU可以立即獲得鎖,直接返回;否則繼續往下。不需要基于"locked"的值進行spin,所以此時locked的值不需要關心。

  • 等價于prev->next = node,把自己這個node加入等待隊列的末尾。

  • 調用arch_mcs_spin_lock_contended,等待當前鎖的持有者將鎖釋放給我。

  • #define arch_mcs_spin_lock_contended(l) \ do { \while (!(smp_load_acquire(l))) \ //1. arch_mutex_cpu_relax(); \ //2. } while (0)
  • 上文中的node->locked==0,說明沒有獲得鎖,需要繼續往下執行;說明已經獲得鎖,直接退出

  • ARM64中arch_mutex_cpu_relax調用cpu_relax函數的,有一個內存屏障指令防止編譯器優化。從4.1開始還存一個yield指令。該指令,為了提高性能,占用cpu的線程可以使用該給其他線程。

  • 5.3 解鎖的實現

    static inline void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node) {struct mcs_spinlock *next = ACCESS_ONCE(node->next); //1.if (likely(!next)) { //2.if (likely(cmpxchg(lock, node, NULL) == node)) //3return;while (!(next = ACCESS_ONCE(node->next))) //4.arch_mutex_cpu_relax(); //5.}arch_mcs_spin_unlock_contended(&next->locked); //6. }
  • 找到等待隊列中的下一個節點

  • 當前沒有其他CPU試圖獲得鎖

  • 直接釋放鎖。如果*lock == node,將*lock = NULL,然后直接返回;反之,說明當前隊列中有等待獲取鎖的CPU。繼續往下。cmpxchg作用翻譯大致如下所示:

    cmpxchg(lock, node, NULL) {ret = *lock;if (*lock == node)*lock = NULL;return ret; }
  • 距離函數開頭獲得"next"指針的值已經過去一段時間了,在這個時間間隔里,可能又有CPU把自己添加到隊列里來了。需要重新獲得next指針的值。于是,待新的node添加成功后,才可以通過arch_mcs_spin_unlock_contended()將spinlock傳給下一個CPU

  • 如果next為空,調用arch_mutex_cpu_relax,作用同上文。

  • arch_mcs_spin_unlock_contended(l),實際上是調用smp_store_release((l), 1),將next->locked設置為1。將spinlock傳給下一個CPU

  • 6. queued spinlock

    6.1 概述

    它首先肯定要解決mcs_spinlock的占用空間問題,否則設計再好,也無法合入主線。它是這樣的:大部分情況用的鎖大小控制在跟以前ticket spinlock一樣的水平,設計了兩個域:分別是locked和pending,分別表示鎖當前是否被持有,已經在持有時,是否又來了一個申請者競爭。爭鎖好比搶皇位,皇位永遠只有1個(對應locked域),除此之外還有1個太子位(對應pending域),防止皇帝出現意外能隨時候補上,不至于出現群龍無首的狀態,他們可以住在紫禁城內(使用qspinlock)。這兩個位置被占后,其他人還想來競爭皇位,只有等皇帝和太子都移交各自的位子以后才可以,在等待的時候你需要在紫禁城外待在自己的府邸里(使用mcs_spinlock),減小紫禁城的擁擠(減少系統損耗)。

    即當鎖的申請者小于或等于2時,只需要1個qspinlock就可以了,其所占內存的大小和ticket spinlock一樣。當鎖的競爭者大于2個時候,才需要(N-2)個mcs_spinlock。qspinlock還是全局的,為降低鎖的競爭,使用退化到per-cpu的mcs_spinlock鎖,所有的mcs_spinlock鎖串行構成一個等待隊列,這樣cacheline invalide帶來的損耗減輕了很多。這是它的基本設計思想。

    在大多數情況下,per-cpu的mcs lock只有1個,除非發生了嵌套task上下文被中斷搶占,因為中斷上下文只有3種類(softirq、hardirq和nmi),所有每個CPU核心至多有4個mcs_spinlock鎖競爭。而且,所有mcs_spinlock會串聯到一個等待隊列里的。

    上圖展示的是:qspinlock的locked和pending位都被占,需要進入mcs_spinlock等待隊列,而CPU(2)是第1個進入等待隊列的,qspinlock的tail.cpu則被賦值成2,CPU(2)的mcs_spinlock數組的第3個成員空閑,則qspinlock的tail.index被賦值成3。入隊的是CPU(3)、CPU(1)和CPU(0),通過mcs_spinlock的next域將大家連成隊列。

    假如等到競爭qspinlock的2個鎖的持有者,都釋放了,則CPU(2)的第3個空閑成員則獲得鎖,完成它的臨界區訪問后,通過qspinlock的tail.cpu(類似于頁表基址)和tail.index(類似于頁表內的偏移),快速找到下個mcs_spinlock的node節點。

    下面要進入細節分析了,本文只考慮小端模式、NR_CPUS < 16K的情況,不考慮虛擬化這塊,去掉qstats統計,力圖聚焦在該鎖實現的核心邏輯上。

    6.2 關鍵數據結構和變量

    struct qspinlock

    數據簡化如下:

    typedef struct qspinlock {union {atomic_t val;struct {u8 locked;u8 pending;};struct {u16 locked_pending;u16 tail;};}; } arch_spinlock_t;

    struct qspinlock包含了三個主要部分:

  • locked(0- 7bit):表示是否持有鎖,只有1和0兩個值,1表示某個CPU已經持有鎖,0則表示沒有持有鎖。

  • pending(8bit):作為競爭鎖的第一候補,第1個等待自旋鎖的CPU只需要簡單地設置它為1,則表示它成為第一候補,后面再有CPU來競爭鎖,則需要創建mcs lock節點了;為0則表示該候補位置是空閑的。

  • tail(16-31bit): 通過這個域可以找到自旋鎖隊列的最后一個節點。又細分為:

    • tail cpu(18-31bit):來記錄和快速索引需要訪問的mcs_spinlock位于哪個CPU上,作用類似于頁表基址。

    • tail index(16-17bit):用來標識當前處在哪種context中。Linux中一共有4種context,分別是task, softirq, hardirq和nmi,1個CPU在1種context下,至多試圖獲取一個spinlock,1個CPU至多同時試圖獲取4個spinlock。當然也表示嵌套的層數。對應per-cpu的mcs_spinlock的數組(對應下文的struct mcs_spinlock mcs_nodes[4])的下標,作用類似于類似于頁表內的偏移。

    struct mcs_spinlock

    mcs_spinlock的數據結構如下:

    struct mcs_spinlock {struct mcs_spinlock *next;int locked; /* 1 if lock acquired */int count; /* nesting count, see qspinlock.c */ };struct qnode {struct mcs_spinlock mcs; };static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[4]);

    使用per-cpu的struct mcs_spinlock mcs_nodes[4],可以用來減少對cacheline的競爭。數組數量為4前文已經解釋過了。struct mcs_spinlock具體含義如下:

  • locked:用來通知這個CPU你可以持鎖了,通過該域完成擊鼓傳花,當然這個動作是上一個申請者釋放的時候通知的。

  • count:嵌套的計數。只有第0個節點這個域才有用,用來索引空閑節點的。

  • next:指向下一個鎖的申請者,構成串行的等待隊列的鏈表。

  • 6.3 加鎖實現

    核心邏輯概述

    我們把一個qspinlock對應的**( tail, pending, locked)稱為一個三元組(x,y,z)**,以此描述鎖狀態及其遷移,有2中特殊狀態:

  • 初始狀態(無申請者競爭):(0, 0, 0)

  • 過渡狀態(類似于皇帝正在傳位給太子,處于交接期):(0, 1, 0)

  • 按加鎖原有代碼只有慢速路徑和非慢速路徑,我們為了行文方便,將源代碼的慢速路徑又分為中速路徑和慢速路徑,這樣就有以下三組狀態:

    不同路徑出現情況核心功能所在的函數和使用鎖的種類
    快速路徑鎖沒有持有者,locked位(皇位)空缺queued_spin_lock,使用qspinlock
    中速路徑鎖已經被持有,但pending位(太子位)空缺queued_spin_lock_slowpath,使用qspinlock
    慢速路徑鎖已經被持有,locked位和pending位都沒空缺queued_spin_lock_slowpath,使用mcs_spinlock

    核心邏輯就變成:

  • 如果沒有人持有鎖,那我們進入快速路徑,直接拿到鎖走人,走之前把locked位(皇位)標記成已搶占狀態。期間,只需要使用qspinlock。

  • 如果有人持有鎖(搶到了皇位成為皇帝),但pending位(太子位)空缺,那我們先搶這個位置,進入的是中速路徑,等這個人(皇帝)釋放鎖(傳位)了,我們就可以拿到鎖了。期間,只需要用到qspinlock。

  • 如果這兩個位置我們都搶不到,則進入慢速路徑,需要使用per-cpu的mcs_spinlock。

  • 總體狀態流程圖如下:

    梳理一下對應偽碼描述:

    static __always_inline void queued_spin_lock(struct qspinlock *lock) {if (初始狀態){ //沒人持鎖//進入快速路徑return;}queued_spin_lock_slowpath(lock, val); //進入中速或者慢速路徑 }void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) //lock是原始鎖,val是之前拿到的lock->val的值 {//代碼片段1:過渡狀態判斷:看是否處于過渡狀態,嘗試等這個狀態完成//代碼片段2:慢速路徑判斷:看是否有其他申請者競爭,則直接進入慢速路徑的核心片段//代碼片段3:中速路徑queue: //代碼判斷4:慢速路徑//1. 調用__this_cpu_inc(mcs_nodes[0].count)將當前cpu競爭鎖的數量+1//2. mcs node的尋址和初始化//3. 調用queued_spin_trylock(),看看準備期間spinlock是不是已經被釋放了//4. 處理等待隊列已經有人的情況,重點是arch_mcs_spin_lock_contended();//5. 處理我們加入隊列就是隊首的情況 locked:// 已經等到鎖了//6.處理我們是隊列最后一個節點的情況//7.處理我們前面還有節點的情況//8.調用arch_mcs_spin_unlock_contended()通知下一個節點 release://9.__this_cpu_dec(mcs_nodes[0].count); }

    下面我們開始分析

    queued_spin_lock和queued_spin_lock_slowpath函數的實現細節。

    實現細節分析

    static __always_inline void queued_spin_lock(struct qspinlock *lock) {u32 val;val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL); //1if (likely(val == 0)) //2return;queued_spin_lock_slowpath(lock, val); //3 }
  • 通過atomic_cmpxchg_acquire,與之前的cmpxchg類似,用原子操作實現:

    • 將val賦值為lock->val。

    • lock->val值如果為0(沒人拿到鎖),將lock->val的值設為1(即*_Q_LOCKED_VAL*),三元組狀態由**( tail, pending, locked)**= (0, 0, 0)遷移為(0, 0, 1)。

    • lock->val值如果不為0,保持lock->val的值不變。

    如果當前沒有人獲得鎖,直接拿到鎖返回。三元組的狀態遷移已經在上一步完成了。否則需要繼續往下,走中速/慢速路徑。

    進入中速/慢速路徑,調用queued_spin_lock_slowpath函數

    快速路徑

    lock->val值==0(沒人拿到鎖),三元組狀態由**( tail, pending, locked)**= (0, 0, 0)遷移為(0, 0, 1)。快速返回,不用等待。

    下面進入queued_spin_lock_slowpath函數的分析,我們先分析過渡狀態判斷和中速路徑兩個代碼片段:

    void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) {/* 過渡狀態判斷 */if (val == _Q_PENDING_VAL) { //0. int cnt = _Q_PENDING_LOOPS;val = atomic_cond_read_relaxed(&lock->val,(VAL != _Q_PENDING_VAL) || !cnt--);}/*其他部分代碼*/}

    過渡狀態判斷

  • 如果三元組( tail,pending,locked)狀態如果是(0, 1, 0),則嘗試等待狀態變為 (0, 0, 1),但是只會循環等待1次。

    簡單翻譯一下atomic_cond_read_relaxed語句的意思為:如果(lock->val != _Q_PENDING_VAL) || !cnt--)則跳出循環,繼續往下。

  • 中速路徑

    進入中速路徑的前提是當前沒有其他競爭者在等待隊列中排隊以及pending位空缺,之后我們先將pending位占住。如果已經這段期間已經有人釋放了,那直接獲取鎖并將pending位重置為空閑,反正則要自旋等持鎖者釋放鎖再做其他動作。

    void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) {/* ... 過渡狀態判斷及處理,代碼省略 *//* ... 判斷是否有pending和tail是否有競爭者,有競爭者直接進入慢速路徑排隊,代碼省略 *//* 中速路徑開始 */val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); //1. if (!(val & ~_Q_LOCKED_MASK)) { //2.if (val & _Q_LOCKED_MASK) { //3.atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK)); //4.}clear_pending_set_locked(lock); //5.return; //6.}if (!(val & _Q_PENDING_MASK)) //7.clear_pending(lock);/* 中速路徑結束 *//*其他部分代碼*/}

    這部分代碼的核心邏輯是在競爭qspinloc的pending,即競爭太子位。進入中速路徑的前提是三元組( tail,pending,locked)=(0, 0, *)

  • 離“是否是過渡狀態”和“除鎖的持有者者之外是否競爭者”的判斷已經過了一段時間了。需要用atomic_fetch_or_acquire函數,通過原子操作,重新看一下鎖的狀態,執行了兩個動作:

    執行完,三元組狀態會從(0, 0, *)改為(0, 1, *)。

    • val = lock->val(成為“原始的lock->val”)

    • lock->val的peding域被置位。

    如果原始的lock->val的pending和tail域都為0,則表明沒有pending位沒有其他競爭者,即太子位空閑可以去競爭;否則,則表明有其他競爭者,不滿足中速路徑的條件,需要進入慢速路徑。lock->val的和tail域就不用關心了。所以此時的三元狀態可以使(*, 1, *)

    如果locked域為1,表明位置被占著,已經有人在持鎖了,我們需要跳到第4步,等鎖被釋放;否則,沒有人持鎖,皇位是空著的,我們跳到第5步,直接去上位就好了。

    spin等待,直到lock->val的locked域變回0,也就是等皇位空出來。三元組狀態會從(*, 1, 1) 轉變為( *, 1, 0)

    拿到鎖了,可以登基上位了,但還要做些清理工作,調用clear_pending_set_locked,將lock->val的pending域清零以及將locked置1。即三元組狀態由(*, 1, 0) 轉變為( *, 0, 1)。

    結束,提前返回。

    如果沒有進入中速路徑,第1步開始時獲得的鎖的狀態是(n, 0, *) ,需要把在第一步設置的現在鎖的pending域恢復成0。

    慢速路徑

    上面流程圖有些地方不太準確,但沒有關系,可以先幫我們建立起總體流程印象,細節后續我們會展開。

    在拿到mcs lock的空閑節點之后,我們先用queued_spin_trylock函數,檢查一下在我們準備的這段時間里,是不是已經有人釋放了該鎖了,如果有人釋放了,就提前返回了。

    獲得鎖之前,要分兩種情況分別處理:

    • 如果隊列中只有我們,那只用自旋等lock的pending位空出來即可。

    • 如果隊列中還有其他競爭者在排在我們前面,則需要自旋等前序節點釋放該鎖。

    獲得鎖之后,也要分兩種情況處理:

    • 如果隊列中只有我們,將鎖的locked位置位,表明鎖已經被我們持有了,不需要再做其他處理。

    • 如果隊列中還有其他競爭者在排在我們后面,則需要將鎖傳遞這位競爭者,當然需要要等它正式加入。

    慢速路徑的簡化代碼如下所示:

    void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) {struct mcs_spinlock *prev, *next, *node;u32 old, tail;int idx;/* ... 過渡狀態判斷及處理,省略 *//* 先判斷是否有pending和tail是否有競爭者,有競爭者直接進入慢速路徑排隊 */if (val & ~_Q_LOCKED_MASK)goto queue;/* 中速路徑代碼開始 */val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);if (!(val & ~_Q_LOCKED_MASK)) {// ...中速路徑處理,省略return;}/* 中速路徑代碼結束 *//* 之前代碼的收尾清理工作 */if (!(val & _Q_PENDING_MASK))clear_pending(lock); //將pending狀態恢復。//慢速路徑代碼開始位置 queue://進入mcs lock隊列node = this_cpu_ptr(&mcs_nodes[0]); //1.idx = node->count++;tail = encode_tail(smp_processor_id(), idx);node += idx; //2.barrier();node->locked = 0;node->next = NULL;if (queued_spin_trylock(lock)) //3.goto release;smp_wmb();old = xchg_tail(lock, tail); //4.next = NULL;if (old & _Q_TAIL_MASK) { //5.prev = decode_tail(old); //6.WRITE_ONCE(prev->next, node); //7.arch_mcs_spin_lock_contended(&node->locked); //8.next = READ_ONCE(node->next); //9.if (next)prefetchw(next);}val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK)); //10.locked: if (((val & _Q_TAIL_MASK) == tail) && //11.atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))goto release; /* No contention */set_locked(lock); //12.if (!next) //13.next = smp_cond_load_relaxed(&node->next, (VAL));arch_mcs_spin_unlock_contended(&next->locked); //14.release:__this_cpu_dec(mcs_nodes[0].count); //15. }

    之前是進入queued_spin_lock_slowpath函數,有“是否過渡狀態的判斷”及處理,“是否要進入慢速路徑的判斷”及處理,如果進了中速路徑的處理。下面正式進入慢速路徑的處理。因為系統可能有多個cpu并發,即使在同一個cpu上也可能切換了4種context上下文的中一種,說不定大家競爭的鎖已經都被釋放了也說不準,所以此時三元組的狀態是未知的。

  • 獲得local CPU上的第0個節點mcs_spinlock類型的node節點,并將節點的count值+1,count值記錄了local CPU上空閑節點的起始下標,該值也只在第0個有意義。encode_tail函數,節點編號編碼成tail(2bit的tail index和14bit的tail cpu),同時還完成:

    • 區分到底是tail域沒有指向任何節點,還是指向了第0個CPU的第0個節點。

    • 檢查當前CPU上自旋鎖嵌套的層數是否超過4層。

    根據idx取到取到空閑節點,相當于node = mcs_nodes[idx]。這里有barrier保序,防止編譯器優化。然后對空閑節點進行初始化。

    調用queued_spin_trylock函數,檢查一下在我們準備的這段時間里,是不是已經有人釋放了該鎖了。如果成功獲得鎖,則直接跳到lable release處;否則,繼續往下。queued_spin_trylock函數的作用具體來講是這樣:

    連續兩次使用原子操作檢查鎖的狀態:如果val為0(對應三元組(0, 0, 0))并且再次讀取val的locked域也為0,則表示可以獲得鎖。上文說過,此時鎖的狀態是未知的。

    在調用xchg_tail函數之前,有smp_wmb內存屏障,保證tail值獲得的正確性。該函數將lock->tail設置為新生成的tail值,并將舊的值存在old中。

    如果舊的tail為0,說明隊列里只有我們這個新生成的節點,直接跳到第10步;否則繼續往下執行

    通過舊的tail拿到等待隊列的尾結點prev。

    將當前節點插入等待隊列,作為新的尾節點。

    自旋等待本節點的locked域變為1。在local CPU上自旋等待自己的locked成員被置位。arch_mcs_spin_lock_contended在arm64上最終會調用__cmpwait_case_##name宏,展開后同arm64的tick spinlock的arch_spin_lock的實現類似。核心功能都是在自旋時執行WFE節省部分能耗。

    后面3行代碼,是看下在我們后面是否還有其他人在排隊,如果有的話,使用prfm指令,提前將相關值預取到local CPU的cache中,加速后續的執行。

    使用原子操作重新獲取lock->val的值,并且循環等到直到pending和locked都為0。三元組( tail, pending, locked)的值為(*, 0, 0),也就是說需要等到皇位和太子位都是空閑的狀態才是我們真正獲得了鎖的條件。

    如果tail還是我們設置的,說明我們同時是等待隊列的最后一個節點或者說是唯一節點,后面沒人在排隊了。通過atomic_try_cmpxchg_relaxed原子的將locked域設置為1,至此才真正完成了鎖的合法擁有,直接跳到最后1步。三元組的狀態遷移是(n, 0, 0) --> (0, 0, 1)。否則,如果tail發生了改變,說明后面還有節點或者pending位被占,則繼續往下處理。

    先將locked設置為1。三元組的狀態遷移是(*, *, 0) --> (*, 0, 1)。

    等待的下一個節點還有正式加入進來,則需要等next變成非空(非空才真正了完成加入)。

    調用arch_mcs_spin_unlock_contended,將下一個節點的locked值設置成1 ,完成了鎖的傳遞。也就是完成了擊鼓傳花。

    釋放本節點。

    6.4 解鎖實現

    static __always_inline void queued_spin_unlock(struct qspinlock *lock) {smp_store_release(&lock->locked, 0); }

    將qspinlock的locked清零即可。

    【參考資料】

    1. 【原創】linux spinlock/rwlock/seqlock原理剖析(基于ARM64)

    2.? 宋寶華:幾個人一起搶spinlock,到底誰先搶到?

    3.?Linux內核同步機制之(四):spin lock,蝸窩科技

    4.?Linux中的spinlock機制[一] - CAS和ticket spinlock,蘭新宇,知乎


    推薦閱讀:

    專輯|Linux文章匯總

    專輯|程序人生

    專輯|C語言

    我的知識小密圈

    關注公眾號,后臺回復「1024」獲取學習資料網盤鏈接。

    歡迎點贊,關注,轉發,在看,您的每一次鼓勵,我都將銘記于心~

    總結

    以上是生活随笔為你收集整理的Linux同步原语系列-spinlock及其演进优化的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 欧美午夜精品一区二区 | 最新在线观看av | 丁香六月婷婷 | 希岛婚前侵犯中文字幕在线 | 麻豆久久久久久 | 五月av| 国产aaa毛片| 欧美××××黑人××性爽 | 人人妻人人澡人人爽精品 | 四川一级毛毛片 | 久草久热 | 男女靠逼视频 | 午夜伦伦 | 中文字幕亚洲在线 | 亚洲女同二女同志 | 欧美黄色小视频 | 亚洲精品免费电影 | yw在线观看| 日本一区二区三区四区视频 | 国产一区二区三区四区五区在线 | 一本一道av无码中文字幕 | 在线播放波多野结衣 | 91亚洲精华 | wwww欧美| 日韩精品区 | 91精产品一区观看 | 中文字幕精品三级久久久 | 欧美日韩在线视频免费播放 | 欧美三区视频 | 怡红院成人在线 | 国产又粗又猛又色又 | 欧美另类videosbestsex日本 | 原神女裸体看个够无遮挡 | 色偷偷网 | 高清不卡一区二区 | 日韩精品国产一区二区 | 韩国伦理电影免费在线 | 婷婷五月综合激情 | 边啃奶头边躁狠狠躁 | 在线看成人| 成人免费做受小说 | 亚洲aaaa级特黄毛片 | 国产拍拍拍拍拍拍拍拍拍拍拍拍拍 | 精品免费一区 | brazzers欧美大波霸 | 少妇被躁爽到高潮无码文 | 亚洲成人黄色 | 偷偷操不一样 | 2025国产精品 | 偷操 | 一区二区三区视频免费看 | 一本一道久久综合狠狠老精东影业 | 成人福利网址 | 欧美日本韩国在线 | 特级西西人体 | 有机z中国电影免费观看 | 国产精品海角社区 | 三级特黄视频 | 91麻豆精品国产91久久久更新时间 | 日本大胆裸体做爰视频 | 亚洲精品粉嫩小泬 | 国产一卡二卡三卡 | 国产一区二区三区乱码 | 亚洲人交配 | 三级性生活视频 | 欧美日韩加勒比 | 日本成人激情视频 | 亚洲成人一级片 | 精品欧美一区二区三区在线观看 | 中文字幕23| 亚洲精品国产精品乱码不66 | 噼里啪啦高清 | 蝌蚪久久| 中文字幕一二三区 | 国产乱淫av片 | 久久久噜噜噜久久中文字幕色伊伊 | 日本亚洲一区二区 | 色图社区 | 成人xxx视频 | 亚洲免费观看在线 | 精品久久久久久久久中文字幕 | 亚洲乱子伦 | 免费在线播放 | 日本a在线天堂 | 国产精品中文在线 | 下面一进一出好爽视频 | 麻豆免费观看网站 | 91精品国产综合久久精品图片 | 日韩黄色精品视频 | 中文字幕一区二区三区在线播放 | 人妻少妇精品久久 | 久久中文字幕人妻熟av女蜜柚m | www,久久久 | 欧美日韩电影一区二区三区 | 老司机深夜福利影院 | 国产粉嫩呻吟一区二区三区 | 91精品国产入口 | 国产一区二区四区 | 中文字幕av久久爽一区 |