日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入理解AbstractQueuedSynchronizer(AQS)

發(fā)布時間:2025/3/15 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入理解AbstractQueuedSynchronizer(AQS) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、AQS簡介

在同步組件的實現(xiàn)中,AQS是核心部分,同步組件的實現(xiàn)者通過使用AQS提供的模板方法實現(xiàn)同步組件語義,AQS則實現(xiàn)了對同步狀態(tài)的管理,以及對阻塞線程進行排隊,等待通知等等一些底層的實現(xiàn)處理。AQS的核心也包括了這些方面:同步隊列,獨占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時等待鎖獲取這些特性的實現(xiàn),而這些實際上則是AQS提供出來的模板方法,歸納整理如下:

  • AQS里有一個專門描述同步狀態(tài)的變量:private volatile int state;
  • state是一個狀態(tài)值,不同實現(xiàn)AbstractQueuedSynchronizer的鎖所代表的意思都不同,如果我們使用過CountDownLatch,那么它的方法countDown() 就是執(zhí)行的就是-1操作,操作的其實就是這個state值,就相當于一個計數(shù)器,對于Semaphore,CyclicBarrier,他們也是通過操作state來進行計數(shù),ReentrantLock中,也是通過操作state來實現(xiàn)線程是否獲取到鎖,當state為0時,代表當前鎖時空閑的,沒有被線程持有,如果state為1,則當前鎖被線程持有,如果大于1,則證明線程重入了,state+1,而對state的操作又分為獨占和共享。

    對于state的操作就是整個AQS的核心,操作包括:

    • 獲取、更新:getState()、setState()、compareAndSet()
    • 暴露同步狀態(tài)的值:acquire、acquireInterruptibly、tryAcquireNanos、release、acquireShared、acquireSharedInterruptibly、tryAcquireSharedNanos、releaseShared

    細分如下:

    • 獨占式鎖:獨占的意思就是同一時間只有一個線程能操作,其他線程過來都會被阻塞,只有當前線程完成任務(wù)后釋放了資源,其他線程才能繼續(xù)獲取資源,每一個線程都與資源進行綁定, 上面我們說的ReentrantLock就是獨占鎖。同一時間只有一個線程能拿到鎖執(zhí)行,鎖的狀態(tài)只有0和1兩種情況。
    //獨占式獲取同步狀態(tài),如果獲取失敗則插入同步隊列進行等待; void acquire(int arg) //與acquire方法相同,但在同步隊列中進行等待的時候可以檢測中斷; void acquireInterruptibly(int arg) //在acquireInterruptibly基礎(chǔ)上增加了超時等待功能,在超時時間內(nèi)沒有獲得同步狀態(tài)返回false; boolean tryAcquireNanos(int arg, long nanosTimeout) //釋放同步狀態(tài),該方法會喚醒在同步隊列中的下一個節(jié)點 boolean release(int arg)
    • 共享式鎖:同一時間有多個線程可以拿到鎖協(xié)同工作,鎖的狀態(tài)大于或等于0。
    //共享式獲取同步狀態(tài),與獨占式的區(qū)別在于同一時刻有多個線程獲取同步狀態(tài); void acquireShared(int arg) //在acquireShared方法基礎(chǔ)上增加了能響應(yīng)中斷的功能; void acquireSharedInterruptibly(int arg) //在acquireSharedInterruptibly基礎(chǔ)上增加了超時等待的功能; boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 共享式釋放同步狀態(tài) boolean releaseShared(int arg)

  • 有一個幫助AQS將線程在阻塞狀態(tài)和喚醒狀態(tài)中轉(zhuǎn)換的工具類LockSupport
  • 隊列
    • FIFO隊列(CLH):用于競爭鎖失敗時的排隊

    整個框架的關(guān)鍵就是如何管理被阻塞的線程的隊列,該隊列是嚴格的FIFO隊列,因此,框架不支持基于優(yōu)先級的同步。同步隊列的最佳選擇是自身沒有使用底層鎖來構(gòu)造的非阻塞數(shù)據(jù)結(jié)構(gòu),一直以來,CLH鎖僅被用于自旋鎖。因為CLH鎖可以更容易地去實現(xiàn)“取消(cancellation)”和“超時”功能,因此選擇了CLH鎖作為實現(xiàn)的基礎(chǔ)。CLH隊列實際上并不那么像隊列,因為它的入隊和出隊操作都與它的用途(即用作鎖)緊密相關(guān)。它是一個鏈表隊列,通過兩個字段head和tail來存取,這兩個字段是可原子更新的,兩者在初始化時都指向了一個空節(jié)點。

    • 條件隊列:給維護獨占同步的類以及實現(xiàn)Lock接口的類使用

    AQS框架提供了一個ConditionObject類,給維護獨占同步的類以及實現(xiàn)Lock接口的類使用。一個鎖對象可以關(guān)聯(lián)任意數(shù)目的條件對象,可以提供典型的管程風(fēng)格的await、signal和signalAll操作,包括帶有超時的,以及一些檢測、監(jiān)控的方法。通過修正一些設(shè)計決策,ConditionObject類有效地將條件(conditions)與其它同步操作結(jié)合到了一起。該類只支持Java風(fēng)格的管程訪問規(guī)則,這些規(guī)則中,僅當當前線程持有鎖且要操作的條件(condition)屬于該鎖時,條件操作才是合法的。這樣,一個ConditionObject關(guān)聯(lián)到一個ReentrantLock上就表現(xiàn)的跟內(nèi)置的管程(通過Object.wait等)一樣了。兩者的不同僅僅在于方法的名稱、額外的功能以及用戶可以為每個鎖聲明多個條件。ConditionObject使用了與同步器一樣的內(nèi)部隊列節(jié)點。但是,是在一個單獨的條件隊列中維護這些節(jié)點的。signal操作是通過將節(jié)點從條件隊列轉(zhuǎn)移到鎖隊列中來實現(xiàn)的,而沒有必要在需要喚醒的線程重新獲取到鎖之前將其喚醒。

    • 這兩個隊列的最基本結(jié)構(gòu)就是Node了
    abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {private static final long serialVersionUID = 7373984972572414691L;protected AbstractQueuedSynchronizer() {}static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED = 1;//節(jié)點從同步隊列中取消static final int SIGNAL = -1;//后繼節(jié)點的線程處于等待狀態(tài),如果當前節(jié)點釋放同步狀態(tài)會通知后繼節(jié)點,使得后繼節(jié)點的線程能夠運行;static final int CONDITION = -2;//當前節(jié)點進入等待隊列中static final int PROPAGATE = -3;//表示下一次共享式同步狀態(tài)獲取將會無條件傳播下去volatile int waitStatus //節(jié)點狀態(tài)volatile Node prev //當前節(jié)點/線程的前驅(qū)節(jié)點volatile Node next; //當前節(jié)點/線程的后繼節(jié)點volatile Thread thread;//加入同步隊列的線程引用Node nextWaiter;//等待隊列中的下一個節(jié)點}//隊列的頭指針private transient volatile Node head;//隊列的尾指針private transient volatile Node tail;private volatile int state; }

    二、同步隊列(CLH)

    • CLH鎖是一個自旋鎖。能確保無饑餓性。提供先來先服務(wù)的公平性。

    當共享資源被某個線程占有,其他請求該資源的線程將會阻塞,從而進入同步隊列。就數(shù)據(jù)結(jié)構(gòu)而言,隊列的實現(xiàn)方式無外乎兩者一是通過數(shù)組的形式,另外一種則是鏈表的形式。AQS中的同步隊列則是通過鏈式方式進行實現(xiàn)。接下來,很顯然我們至少會抱有這樣的疑問:

  • 節(jié)點的數(shù)據(jù)結(jié)構(gòu)是什么樣的?
  • 是單向還是雙向?
  • 是帶頭結(jié)點的還是不帶頭節(jié)點的?
  • 我們知道在AQS有一個靜態(tài)內(nèi)部類Node,而且我們知道了節(jié)點的數(shù)據(jù)結(jié)構(gòu)類型,每個節(jié)點擁有其前驅(qū)和后繼節(jié)點,很顯然這是一個雙向隊列。同樣的我們可以用一段demo看一下。

    public class LockDemo {private static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) {for (int i = 0; i < 5; i++) {Thread thread = new Thread(() -> {lock.lock();try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}});thread.start();}} }

    實例代碼中開啟了5個線程,先獲取鎖之后再睡眠10S中,實際上這里讓線程睡眠是想模擬出當線程無法獲取鎖時進入同步隊列的情況。通過debug,當Thread-4(在本例中最后一個線程)獲取鎖失敗后進入同步時,AQS時現(xiàn)在的同步隊列如圖所示:

    Thread-0先獲得鎖后進行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進入同步隊列,同時也可以很清楚的看出來每個節(jié)點有兩個域:prev(前驅(qū))和next(后繼),并且每個節(jié)點用來保存獲取同步狀態(tài)失敗的線程引用以及等待狀態(tài)等信息。另外AQS中有兩個重要的成員變量:

    private transient volatile Node head; private transient volatile Node tail;

    也就是說AQS實際上通過頭尾指針來管理同步隊列,同時實現(xiàn)包括獲取鎖失敗的線程進行入隊,釋放鎖時對同步隊列中的線程進行通知等核心方法。其示意圖如下:

    通過對源碼的理解以及做實驗的方式,現(xiàn)在我們可以清楚的知道2點:

    1.節(jié)點的數(shù)據(jù)結(jié)構(gòu),即AQS的靜態(tài)內(nèi)部類Node,節(jié)點的等待狀態(tài)等信息;
    2.同步隊列是一個雙向隊列,AQS通過持有頭尾指針管理同步隊列;

    那么,節(jié)點如何進行入隊和出隊是怎樣做的了?實際上這對應(yīng)著鎖的獲取和釋放兩個操作:獲取鎖失敗進行入隊操作,獲取鎖成功進行出隊操作。

    三、ConditionObject條件變量及條件隊列

    ConditionObject是AbstractQueuedSynchronizer的內(nèi)部類,他是實現(xiàn)線程間的同步的基礎(chǔ)設(shè)施,它是與鎖結(jié)合使用的(如ReentrantLock),ConditionObject實現(xiàn)了Condition接口,Condition接口提供了await(),signal()等方法,實現(xiàn)線程的掛起和喚醒,ConditionObject是一個條件變量,每個條件變量對應(yīng)一個條件隊列,當調(diào)用Condition的await()被掛起的線程將會存放在條件隊列中,調(diào)用signal()時將從條件隊列中移除并放入AQS隊列中。

    同步隊列和條件隊列的關(guān)系:


    四、獨占鎖

  • 獨占鎖的獲取(acquire方法)

    我們繼續(xù)通過看源碼和debug的方式來看,還是以上面的demo為例,調(diào)用lock()方法是獲取獨占式鎖,獲取失敗就將當前線程加入同步隊列,成功則線程執(zhí)行。而lock()方法實際上會調(diào)用AQS的acquire()方法,源碼如下

  • public final void acquire(int arg) {//先看同步狀態(tài)是否獲取成功,如果成功則方法結(jié)束返回//若失敗則先調(diào)用addWaiter()方法再調(diào)用acquireQueued()方法if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }

    關(guān)鍵信息請看注釋,acquire根據(jù)當前獲得同步狀態(tài)成功與否做了兩件事情:

  • 成功,則方法結(jié)束返回。
  • 失敗,則先調(diào)用addWaiter()然后在調(diào)用acquireQueued()方法。
  • 獲取同步狀態(tài)失敗,進行入隊操作。當線程獲取獨占式鎖失敗后就會將當前線程加入同步隊列,那么加入隊列的方式是怎樣的?我們接下來就應(yīng)該去研究一下addWaiter()和acquireQueued()。addWaiter()源碼如下:

    private Node addWaiter(Node mode) {// 1. 將當前線程構(gòu)建成Node類型Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure// 2. 當前尾節(jié)點是否為null?Node pred = tail;if (pred != null) {// 2.2 將當前節(jié)點尾插入的方式插入同步隊列中node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 2.1. 當前同步隊列尾節(jié)點為null,說明當前線程是第一個加入同步隊列進行等待的線程enq(node);return node; }

    分析可以看上面的注釋。程序的邏輯主要分為兩個部分:

  • 當前同步隊列的尾節(jié)點為null,調(diào)用方法enq()插入;
  • 當前隊列的尾節(jié)點不為null,則采用尾插入(compareAndSetTail()方法)的方式入隊。
  • 另外還會有另外一個問題:如果 if (compareAndSetTail(pred, node))為false怎么辦?會繼續(xù)執(zhí)行到enq()方法,同時很明顯compareAndSetTail是一個CAS操作,通常來說如果CAS操作失敗會繼續(xù)自旋(死循環(huán))進行重試。因此,經(jīng)過我們這樣的分析,enq()方法可能承擔(dān)兩個任務(wù):

  • 處理當前同步隊列尾節(jié)點為null時進行入隊操作;
  • 如果CAS尾插入節(jié)點失敗后負責(zé)自旋進行嘗試。
  • 那么是不是真的就像我們分析的一樣了?只有源碼會告訴我們答案,enq()源碼如下:

    private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize//1. 構(gòu)造頭結(jié)點if (compareAndSetHead(new Node()))tail = head;} else {// 2. 尾插入,CAS操作失敗自旋嘗試node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }

    在上面的分析中我們可以看出在第1步中會先創(chuàng)建頭結(jié)點,說明同步隊列是帶頭結(jié)點的鏈式存儲結(jié)構(gòu)。帶頭結(jié)點與不帶頭結(jié)點相比,會在入隊和出隊的操作中獲得更大的便捷性,因此同步隊列選擇了帶頭結(jié)點的鏈式存儲結(jié)構(gòu)。那么帶頭節(jié)點的隊列初始化時機是什么?自然而然是在tail為null時,即當前線程是第一次插入同步隊列。compareAndSetTail(t, node)方法會利用CAS操作設(shè)置尾節(jié)點,如果CAS操作失敗會在for (;;)死循環(huán)中不斷嘗試,直至成功return返回為止。因此,對enq()方法可以做這樣的總結(jié):

    在當前線程是第一個加入同步隊列時,調(diào)用compareAndSetHead(new Node())方法,完成鏈式隊列的頭結(jié)點的初始化;自旋不斷嘗試CAS尾插入節(jié)點直至成功為止。

    現(xiàn)在我們已經(jīng)很清楚獲取獨占式鎖失敗的線程包裝成Node然后插入同步隊列的過程了?那么緊接著會有下一個問題?在同步隊列中的節(jié)點(線程)會做什么事情了來保證自己能夠有機會獲得獨占式鎖了?帶著這樣的問題我們就來看看acquireQueued()方法,從方法名就可以很清楚,這個方法的作用就是排隊獲取鎖的過程,源碼如下:

    final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 1. 獲得當前節(jié)點的先驅(qū)節(jié)點final Node p = node.predecessor();// 2. 當前節(jié)點能否獲取獨占式鎖 // 2.1 如果當前節(jié)點的先驅(qū)節(jié)點是頭結(jié)點并且成功獲取同步狀態(tài),即可以獲得獨占式鎖if (p == head && tryAcquire(arg)) {//隊列頭指針用指向當前節(jié)點setHead(node);//釋放前驅(qū)節(jié)點p.next = null; // help GCfailed = false;return interrupted;}// 2.2 獲取鎖失敗,線程進入等待狀態(tài)等待獲取獨占式鎖if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }

    程序邏輯通過注釋已經(jīng)標出,整體來看這是一個這又是一個自旋的過程(for (;😉),代碼首先獲取當前節(jié)點的先驅(qū)節(jié)點,如果先驅(qū)節(jié)點是頭結(jié)點的并且成功獲得同步狀態(tài)的時候(if (p == head && tryAcquire(arg))),當前節(jié)點所指向的線程能夠獲取鎖。反之,獲取鎖失敗進入等待狀態(tài)。整體示意圖為下圖:

    獲取鎖成功,出隊操作

    獲取鎖的節(jié)點出隊的邏輯是:

    //隊列頭結(jié)點引用指向當前節(jié)點 setHead(node); //釋放前驅(qū)節(jié)點 p.next = null; // help GC failed = false; return interrupted;

    setHead()方法為:

    private void setHead(Node node) {head = node;node.thread = null;node.prev = null; }

    將當前節(jié)點通過setHead()方法設(shè)置為隊列的頭結(jié)點,然后將之前的頭結(jié)點的next域設(shè)置為null并且pre域也為null,即與隊列斷開,無任何引用方便GC時能夠?qū)?nèi)存進行回收。示意圖如下:

    經(jīng)過上面的分析,獨占式鎖的獲取過程也就是acquire()方法的執(zhí)行流程如下圖所示:

  • 獨占鎖的釋放(release()方法)
  • 獨占鎖的釋放就相對來說比較容易理解了,廢話不多說先來看下源碼:

    public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }

    這段代碼邏輯就比較容易理解了,如果同步狀態(tài)釋放成功(tryRelease返回true)則會執(zhí)行if塊中的代碼,當head指向的頭結(jié)點不為null,并且該節(jié)點的狀態(tài)值不為0的話才會執(zhí)行unparkSuccessor()方法。unparkSuccessor方法源碼:

    private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//頭節(jié)點的后繼節(jié)點Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//后繼節(jié)點不為null時喚醒該線程LockSupport.unpark(s.thread); }

    源碼的關(guān)鍵信息請看注釋,首先獲取頭節(jié)點的后繼節(jié)點,當后繼節(jié)點的時候會調(diào)用LookSupport.unpark()方法,該方法會喚醒該節(jié)點的后繼節(jié)點所包裝的線程。因此,每一次鎖釋放后就會喚醒隊列中該節(jié)點的后繼節(jié)點所引用的線程,從而進一步可以佐證獲得鎖的過程是一個FIFO(先進先出)的過程。
    到現(xiàn)在我們終于啃下了一塊硬骨頭了,通過學(xué)習(xí)源碼的方式非常深刻的學(xué)習(xí)到了獨占式鎖的獲取和釋放的過程以及同步隊列??梢宰鲆幌驴偨Y(jié):

  • 線程獲取鎖失敗,線程被封裝成Node進行入隊操作,核心方法在于addWaiter()和enq(),同時enq()完成對同步隊列的頭結(jié)點初始化工作以及CAS操作失敗的重試;
  • 線程獲取鎖是一個自旋的過程,當且僅當 當前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且成功獲得同步狀態(tài)時,節(jié)點出隊即該節(jié)點引用的線程獲得鎖,否則,當不滿足條件時就會調(diào)用LookSupport.park()方法使得線程阻塞;
  • 釋放鎖的時候會喚醒后繼節(jié)點;
  • 總體來說:在獲取同步狀態(tài)時,AQS維護一個同步隊列,獲取同步狀態(tài)失敗的線程會加入到隊列中進行自旋;移除隊列(或停止自旋)的條件是前驅(qū)節(jié)點是頭結(jié)點并且成功獲得了同步狀態(tài)。在釋放同步狀態(tài)時,同步器會調(diào)用unparkSuccessor()方法喚醒后繼節(jié)點。

  • 可中斷式獲取鎖(acquireInterruptibly方法)
  • 我們知道lock相較于synchronized有一些更方便的特性,比如能響應(yīng)中斷以及超時等待等特性,現(xiàn)在我們依舊采用通過學(xué)習(xí)源碼的方式來看看能夠響應(yīng)中斷是怎么實現(xiàn)的。可響應(yīng)中斷式鎖可調(diào)用方法lock.lockInterruptibly();而該方法其底層會調(diào)用AQS的acquireInterruptibly方法,源碼為:

    public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))//線程獲取鎖失敗doAcquireInterruptibly(arg); }

    在獲取同步狀態(tài)失敗后就會調(diào)用doAcquireInterruptibly方法:

    private void doAcquireInterruptibly(int arg)throws InterruptedException {//將節(jié)點插入到同步隊列中final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();//獲取鎖出隊if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//線程中斷拋異常throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }

    關(guān)鍵信息請看注釋,現(xiàn)在看這段代碼就很輕松了吧,與acquire方法邏輯幾乎一致,唯一的區(qū)別是當parkAndCheckInterrupt返回true時即線程阻塞時該線程被中斷,代碼拋出被中斷異常。

  • 超時等待式獲取鎖(tryAcquireNanos()方法)
  • 通過調(diào)用lock.tryLock(timeout,TimeUnit)方式達到超時等待獲取鎖的效果,該方法會在三種情況下才會返回:

    1.在超時時間內(nèi),當前線程成功獲取了鎖;
    2.當前線程在超時時間內(nèi)被中斷;
    3.超時時間結(jié)束,仍未獲得鎖返回false。

    我們?nèi)匀煌ㄟ^采取閱讀源碼的方式來學(xué)習(xí)底層具體是怎么實現(xiàn)的,該方法會調(diào)用AQS的方法tryAcquireNanos(),源碼為:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||//實現(xiàn)超時等待的效果doAcquireNanos(arg, nanosTimeout); }

    很顯然這段源碼最終是靠doAcquireNanos方法實現(xiàn)超時等待的效果,該方法源碼如下:

    private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;//1. 根據(jù)超時時間和當前時間計算出截止時間final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();//2. 當前線程獲得鎖出隊列if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}// 3.1 重新計算超時時間nanosTimeout = deadline - System.nanoTime();// 3.2 已經(jīng)超時返回falseif (nanosTimeout <= 0L)return false;// 3.3 線程阻塞等待 if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);// 3.4 線程被中斷拋出被中斷異常if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }

    程序邏輯如圖所示:

    程序邏輯同獨占鎖可響應(yīng)中斷式獲取基本一致,唯一的不同在于獲取鎖失敗后,對超時時間的處理上,在第1步會先計算出按照現(xiàn)在時間和超時時間計算出理論上的截止時間,比如當前時間是8h10min,超時時間是10min,那么根據(jù)deadline = System.nanoTime() + nanosTimeout計算出剛好達到超時時間時的系統(tǒng)時間就是8h 10min+10min = 8h 20min。然后根據(jù)deadline - System.nanoTime()就可以判斷是否已經(jīng)超時了,比如,當前系統(tǒng)時間是8h 30min很明顯已經(jīng)超過了理論上的系統(tǒng)時間8h 20min,deadline - System.nanoTime()計算出來就是一個負數(shù),自然而然會在3.2步中的If判斷之間返回false。如果還沒有超時即3.2步中的if判斷為true時就會繼續(xù)執(zhí)行3.3步通過LockSupport.parkNanos使得當前線程阻塞,同時在3.4步增加了對中斷的檢測,若檢測出被中斷直接拋出被中斷異常。

    五、共享鎖

  • 共享鎖的獲取(acquireShared()方法)
  • 在聊完AQS對獨占鎖的實現(xiàn)后,我們繼續(xù)一鼓作氣的來看看共享鎖是怎樣實現(xiàn)的?共享鎖的獲取方法為acquireShared,源碼為:

    public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg); }

    這段源碼的邏輯很容易理解,在該方法中會首先調(diào)用tryAcquireShared方法,tryAcquireShared返回值是一個int類型,當返回值為大于等于0的時候方法結(jié)束說明獲得成功獲取鎖,否則,表明獲取同步狀態(tài)失敗即所引用的線程獲取鎖失敗,會執(zhí)行doAcquireShared方法,該方法的源碼為:

    private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {// 當該節(jié)點的前驅(qū)節(jié)點是頭結(jié)點且成功獲取同步狀態(tài)setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }

    現(xiàn)在來看這段代碼會不會很容易了?邏輯幾乎和獨占式鎖的獲取一模一樣,這里的自旋過程中能夠退出的條件是當前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且tryAcquireShared(arg)返回值大于等于0即能成功獲得同步狀態(tài)。

  • 共享鎖的釋放(releaseShared()方法)
  • 共享鎖的釋放在AQS中會調(diào)用方法

    releaseShared: public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }

    當成功釋放同步狀態(tài)之后即tryReleaseShared會繼續(xù)執(zhí)行doReleaseShared方法:

    private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} }

    這段方法跟獨占式鎖釋放過程有點點不同,在共享式鎖的釋放過程中,對于能夠支持多個線程同時訪問的并發(fā)組件,必須保證多個線程能夠安全的釋放同步狀態(tài),這里采用的CAS保證,當CAS操作失敗continue,在下一次循環(huán)中進行重試。

  • 可中斷(acquireSharedInterruptibly()方法),超時等待(tryAcquireSharedNanos()方法)
  • 關(guān)于可中斷鎖以及超時等待的特性其實現(xiàn)和獨占式鎖可中斷獲取鎖以及超時等待的實現(xiàn)幾乎一致,具體的就不再說了,如果理解了上面的內(nèi)容對這部分的理解也是水到渠成的。

    六、線程獲取獨占資源分析(以ReentrantLock為例)

  • acquire(arg)
  • //首先進入acquire(arg)方法,然后做下一步的判斷。 public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }
  • tryAcquire(arg)
  • tryAcquire(arg)判斷資源是否被占用,在tryAcquire()中主要通過獲取state的值判斷資源是否被占用,getState()獲取state的值,如果為0,則證明資源沒被 占用,就使用CAS操作更新值,然后state的值為1,然后使用setExclusiveOwnerThread()設(shè)置獨占資源的線程(獨占資源和每一個線程進行綁定),然后返回,下面有一個else if判斷,如果當前線程等于獨占資源的線程,證明線程重入了,那么更新state的值為2(state原本為1,重入后+1),最后返回。

    protected final boolean tryAcquire(int acquires) {//獲取當前線程final Thread current = Thread.currentThread();//獲取stateint c = getState();//資源沒被獨占if (c == 0) {//使用CAS對state進行更新if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {//設(shè)置獨占資源的線程setExclusiveOwnerThread(current);return true;}}//如果線程等于服戰(zhàn)資源的線程(重入)else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");//state+1setState(nextc);return true;}return false; }
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
  • 如果資源已經(jīng)被獨占了,那么就需要將當前線程插入AQS隊列中,然后將當前線程掛起,我們看一下addWaiter(Node.EXCLUSIVE),可以看出將Node節(jié)點插入了AQS的隊尾。

    private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}
  • acquireQueued(final Node node, int arg)
  • acquireQueued就是將當前線程掛起,我們看里面的一個方法parkAndCheckInterrupt(), 可以看出使用 LockSupport.park(this)來將當前線程掛起。

    private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }
  • release(int arg)
  • 當一個線程調(diào)用了release(arg)方法釋放資源時,它會調(diào)用tryRelease(arg)嘗試釋放資源。

    public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }
  • tryRelease(int releases)
  • 嘗試釋放資源的邏輯很簡單,就是用AQS中的state-1,如果為0,則能證明可以釋放,就會通過setState(arg)方法設(shè)置state的值,注意,bash 如果c=0,則證明此線程不是重入的線程,那么就直接釋放資源,并且解除與資源綁定的線程(setExclusiveOwnerThread(null)),如果c != 0 ,則證明資源被重入, 則將state - 1 ,不解除與獨占資源綁定的線程。

    protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free

    釋放完資源后,就會喚醒使用LockSupport.unpark(s.thread)喚醒AQS中的線程,然后又重復(fù)上面的過程。

    private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}

    七、共享操作(以CountDownLatch為例)

    當線程進入countDownLatch.await()方式時,會調(diào)用AQS的acquireSharedInterruptibly方法,然后進入tryAcquireShared(arg)方法。

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }
  • tryAcquireShared(arg)
  • tryAcquireShared(arg)的作用就是判斷AQS狀態(tài)值state,如果state = 0 , 返回1,否則返回-1,如果返回1,則證明線程對資源的操作已經(jīng)結(jié)束,如果為-1,則證明對資源的操作
    尚未結(jié)束,此時就會調(diào)用doAcquireSharedInterruptibly(arg)方法。

    protected int tryAcquireShared(int acquires) { //如果state = 0 , 返回1,否則返回-1,如果返回1,則證明線程對資源的操作已經(jīng)結(jié)束,如果為-1,則證明對資源的操作尚未結(jié)束return (getState() == 0) ? 1 : -1; }
  • doAcquireSharedInterruptibly(arg)
  • 調(diào)用doAcquireSharedInterruptibly(arg)會通過addWaiter(Node.SHARED)將當前線程封裝成SHARED類型的Node,然后插入AQS隊列的尾部,然后調(diào)用
    parkAndCheckInterrupt()使用LockSupport.park(this)將當前線程掛起。

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }
  • releaseShared(int arg)
  • 當使用countDownLatch.countDown()時,其實就是對資源的釋放,最終會調(diào)用releaseShared(int arg)方法,然后調(diào)用tryReleaseShared(int releases)嘗試釋放資源

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }
  • tryReleaseShared(int releases)
  • 調(diào)用tryReleaseShared(int releases)嘗試釋放資源,釋放資源的邏輯時根據(jù)state來判斷,如果state = 0,證明當前資源沒有任何線程持有,所以就直接返回,如果state != 0 , 證明資源還被線程持有,所以當前線程就將state - 1,然后繼續(xù)向下執(zhí)行doReleaseShared()。

    protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}} }
  • doReleaseShared()
  • doReleaseShared()會調(diào)用unparkSuccessor(h)來喚醒線程(LockSupport.unpark(s.thread)),因為傳遞過去的參數(shù)h = head , 所以可知喚醒的線程是隊頭節(jié)點的線程。

    private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} }

    八、關(guān)于公平鎖非公平鎖的疑問

    公平鎖與非公平鎖的含義都很明白,公平鎖必須排隊獲取鎖,鎖的獲取順序完全根據(jù)排隊順序而來,而非公平就是誰搶到是誰的。我們都知道AQS中的鎖獲取,如果首次獲取失敗會進入到內(nèi)部的同步隊列中阻塞等待,只有前面的節(jié)點喚醒當前節(jié)點才能去嘗試獲取鎖。而鎖獲取的核心,實際上是在tryAcquire方法中定義的,在節(jié)點從阻塞中醒來時,都會用tryAcquire方法來獲取鎖,而公平與否在tryAcquire方法中體現(xiàn),即在公平鎖中,tryAcquire獲取的前提是需要沒有存在前驅(qū)節(jié)點的,而非公平鎖就沒有這個限制。

  • 疑問:同步隊列不是FIFO的嗎?它們?nèi)腙犈藕庙樞虿⑶野凑枕樞蛞粋€一個的醒來,只有醒來的才有機會去獲取鎖,才能去執(zhí)行tryAcquire方法,這不還是公平的嗎?
  • 解答:線程在do方法中獲取鎖時,會先加入同步隊列,之后根據(jù)情況再陷入阻塞。當阻塞后的節(jié)點一段時間后醒來時,這時候來了新的更多的線程來搶鎖,這些新線程還沒有加入到同步隊列中去,也就是在tryAcquire方法中獲取鎖。在公平鎖下,這些新線程會發(fā)現(xiàn)同步隊列中存在節(jié)點等待,那么這些新線程將無法獲取到鎖,乖乖去排隊;而在非公平鎖下,這些新線程會跟排隊蘇醒的線程進行鎖爭搶,失敗的去同步隊列中排隊。因此這里的公平與否,針對的其實是蘇醒線程與還未加入同步隊列的線程而對于已經(jīng)在同步隊列中阻塞的線程而言,它們內(nèi)部自身其實是公平的,因為它們是按順序被喚醒的,這是根據(jù)AQS節(jié)點喚醒機制和同步隊列的FIFO特性決定的。
  • 例子:以ReentrantLock為例

    可以看到,對沒公平和公平兩種方式都有實現(xiàn),直接看源碼:

  • 總結(jié)

    以上是生活随笔為你收集整理的深入理解AbstractQueuedSynchronizer(AQS)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 日韩欧美亚洲一区二区三区 | 在线观看黄色片网站 | 就操网| 一级女人毛片 | 亚洲一区观看 | 欧美熟妇7777一区二区 | 性欧美大战久久久久久久免费观看 | 96精品在线 | 黄色av网站免费在线观看 | 亚洲一级片| 综合色区 | 老妇裸体性激交老太视频 | 人妻少妇一区二区三区 | 久久免费视频3 | 潘金莲一级淫片aaaaaaa | 成人欧美一区二区三区在线观看 | 福利网站在线观看 | www在线视频 | 亚洲免费高清视频 | 欧美日韩国产精品一区二区三区 | 久青草国产在线 | 日韩毛片儿| 噼里啪啦国语版在线观看 | 欧美性生活精品 | 日韩簧片在线观看 | 高清国产午夜精品久久久久久 | 在线欧美一区 | 国产精品传媒麻豆hd | 怡红院久久 | 色姐| 亚洲gay视频 | 亚洲第一视频在线观看 | 成人在线你懂的 | 午夜免费看视频 | 少女情窦初开的第4集在线观看 | 老司机av导航 | 国产九色在线 | 午夜久久久久久噜噜噜噜 | 免费无遮挡无码永久视频 | 三级黄毛片 | 中文自拍| 性爱视频免费 | 日本黄色一区二区三区 | 国产精品福利视频 | 一级欧美在线 | 青青草原综合网 | 午夜精品一区二区三区三上悠亚 | 香港日本韩国三级网站 | 无码人妻精品一区二区三区99日韩 | 欧美性猛交xxxx乱大交蜜桃 | 久久久久久久久久久久国产精品 | 日本电影一区 | 大陆一级片 | 一本色道av| 欧美在线色视频 | 午夜在线免费观看 | 韩国一级淫片 | 97超碰人人澡人人爱学生 | 亚洲免费福利视频 | 欧美色吊丝 | 国产精品成人3p一区二区三区 | 狠狠躁日日躁夜夜躁av | 国产日韩欧美亚洲 | 成人小视频在线观看 | 玖玖爱资源站 | 亚洲欧美日韩精品久久亚洲区 | 亚州春色| 亚洲少妇自拍 | 亚洲图色在线 | 88福利视频| 日韩短视频 | 亚洲一区二区三区影院 | 中文字幕在线观看国产 | 丝瓜av| 一级片黑人 | 欧美精品日韩在线观看 | 欧美一区二区影院 | 免费成人av | 中国黄色一级大片 | 97人人爽人人爽人人爽 | 国产精品美女自拍视频 | 国产麻豆成人 | 色丁香婷婷 | 法国空姐在线观看视频 | 神马久久精品 | 国产大学生视频 | 2018自拍偷拍 | 丁香婷婷社区 | 亚洲国产精品一区 | 久久艹久久 | 国产第56页 | 亚洲50p| 中文字幕1区 | 亚洲第一成肉网 | 国产福利视频网站 | 人乳videos巨大吃奶 | 美女xx00 | 精品国语对白 | 糖心视频在线 |