日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

多线程:AQS源码分析

發布時間:2025/3/21 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程:AQS源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

AQS 源碼分析

?

概述

Java的內置鎖一直都是備受爭議的,在JDK 1.6之前,synchronized這個重量級鎖其性能一直都是較為低下,雖然在1.6后,進行大量的鎖優化策略,但是與Lock相比synchronized還是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基于JVM機制),但是它卻缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時獲取鎖,且它為獨占式在高并發場景下性能大打折扣。

AQS,AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。它是JUC并發包中的核心基礎組件。

AQS解決了子類實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基于AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。

AQS的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態。

AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。

AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構造成一個節點(Node)并將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。

AQS可以實現獨占鎖和共享鎖,RenntrantLock實現的是獨占鎖,ReentrantReadWriteLock實現的是獨占鎖和共享鎖,CountDownLatch實現的是共享鎖。

  • 獨占式exclusive。保證一次只有一個線程可以經過阻塞點,只有一個線程可以獲取到鎖。
  • 共享式shared。可以允許多個線程阻塞點,可以多個線程同時獲取到鎖。
  • 下面我們通過源碼來分析下AQS的實現原理

    AbstractQueuedSynchronizer類結構

    public?abstract?class AbstractQueuedSynchronizer

    ????extends AbstractOwnableSynchronizer

    ????implements java.io.Serializable {

    ????protected AbstractQueuedSynchronizer() { }

    ????//同步器隊列頭結點

    ????private?transient?volatile?Node head;

    ????//同步器隊列尾結點

    ????private?transient?volatile?Node tail;

    ????//同步狀態(打的那個state為0時,無鎖,當state>0時說明有鎖。)

    ????private?volatile?int?state;

    ????//獲取鎖狀態

    ????protected final int getState() {

    ????????return?state;

    ????}

    ????//設置鎖狀態

    ????protected final void setState(int newState) {

    ????????state = newState;

    ????}

    ????......

    通過AQS的類結構我們可以看到它內部有一個隊列和一個state的int變量。
    隊列:通過一個雙向鏈表實現的隊列來存儲等待獲取鎖的線程。
    state:鎖的狀態。
    head、tail和state 都是volatile類型的變量,volatile可以保證多線程的內存可見性。

    同步隊列的基本結構如下:

    ?

    同步隊列

    同步器隊列Node元素的類結構如下:

    static?final?class Node {

    ????static?final?Node SHARED = new?Node();

    ????static?final?Node EXCLUSIVE = null;

    ????//表示當前的線程被取消;

    ????static?final?int?CANCELLED = ?1;

    ????//表示當前節點的后繼節點包含的線程需要運行,也就是unpark;

    ????static?final?int?SIGNAL ???= -1;

    ????//表示當前節點在等待condition,也就是在condition隊列中;

    ????static?final?int?CONDITION = -2;

    ????//表示當前場景下后續的acquireShared能夠得以執行;

    ????static?final?int?PROPAGATE = -3;

    ????//表示節點的狀態。默認為0,表示當前節點在sync隊列中,等待著獲取鎖。

    ????//其它幾個狀態為:CANCELLED、SIGNAL、CONDITION、PROPAGATE

    ????volatile?int?waitStatus;

    ????//前驅節點

    ????volatile?Node prev;

    ????//后繼節點

    ????volatile?Node next;

    ????//獲取鎖的線程

    ????volatile?Thread thread;

    ????//存儲condition隊列中的后繼節點。

    ????Node nextWaiter;

    ????......

    }

    從Node結構prev和next節點可以看出它是一個雙向鏈表,waitStatus存儲了當前線程的狀態信息

    waitStatus

  • CANCELLED,值為1,表示當前的線程被取消;
  • SIGNAL,值為-1,表示當前節點的后繼節點包含的線程需要運行,也就是unpark;
  • CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
  • PROPAGATE,值為-3,表示當前場景下后續的acquireShared能夠得以執行;
  • 值為0,表示當前節點在sync隊列中,等待著獲取鎖。
  • 下面我們通過以下五個方面來介紹AQS是怎么實現的鎖的獲取和釋放的

  • 獨占式獲得鎖
  • 獨占式釋放鎖
  • 共享式獲得鎖
  • 共享式釋放鎖
    5.獨占超時獲得鎖
  • 1.獨占式獲得鎖

    acquire方法代碼如下:

    public final void acquire(int arg) {

    ????????//嘗試獲得鎖,獲取不到則加入到隊列中等待獲取

    ????????if?(!tryAcquire(arg) &&

    ????????????acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    ????????????selfInterrupt();

    ????}

  • 首先執行tryAcquire方法,嘗試獲得鎖。
  • 如果獲取失敗則進入addWaiter方法,構造同步節點(獨占式Node.EXCLUSIVE),將該節點添加到同步隊列尾部,并返回此節點,進入acquireQueued方法。
  • acquireQueued方法,這個新節點死是循環的方式獲取同步狀態,如果獲取不到則阻塞節點中的線程,阻塞后的節點等待前驅節點來喚醒或阻塞線程被中斷。
  • addWaiter方法代碼如下:

    private Node addWaiter(Node mode) {

    ????Node node = new?Node(Thread.currentThread(), mode);

    ????// Try the fast path of enq; backup to full enq on failure

    ????Node pred = tail;

    ????if?(pred != null) {

    ????????node.prev = pred;

    ????????//將該節點添加到隊列尾部

    ????????if?(compareAndSetTail(pred, node)) {

    ????????????pred.next = node;

    ????????????return?node;

    ????????}

    ????}

    ????//如果前驅節點為null,則進入enq方法通過自旋方式入隊列

    ????enq(node);

    ????return?node;

    }

    將構造的同步節點加入到同步隊列中

  • 使用鏈表的方式把該Node節點添加到隊列尾部,如果tail的前驅節點不為空(隊列不為空),則進行CAS添加到隊列尾部。
  • 如果更新失敗(存在并發競爭更新),則進入enq方法進行添加
  • enq方法代碼如下:

    ????private Node enq(final Node node) {

    ????????for?(;;) {

    ????????????Node t = tail;

    ????????????if?(t == null) { // Must initialize

    ????????????????//如果隊列為空,則通過CAS把當前Node設置成頭節點

    ????????????????if?(compareAndSetHead(new?Node()))

    ????????????????????tail = head;

    ????????????} else?{

    ????????????????node.prev = t;

    ????????????????//如果隊列不為空,則向隊列尾部添加Node

    ????????????????if?(compareAndSetTail(t, node)) {

    ????????????????????t.next = node;

    ????????????????????return?t;

    ????????????????}

    ????????????}

    ????????}

    ????}

    該方法使用CAS自旋的方式來保證向隊列中添加Node(同步節點簡寫Node)

  • 如果隊列為空,則把當前Node設置成頭節點
  • 如果隊列不為空,則向隊列尾部添加Node
  • acquireQueued方法代碼如下:

    final boolean acquireQueued(final Node node, int arg) { ?

    ????boolean?failed = true; ?

    ????try?{ ?

    ????????boolean?interrupted = false; ?

    ????????for?(;;) { ?

    ????????????//找到當前節點的前驅節點

    ????????????final?Node p = node.predecessor(); ?

    ????????????//檢測p是否為頭節點,如果是,再次調用tryAcquire方法

    ????????????if?(p == head && tryAcquire(arg)) { ?

    ????????????????//如果p節點是頭節點且tryAcquire方法返回true。那么將當前節點設置為頭節點。

    ????????????????setHead(node); ?

    ????????????????p.next = null; // help GC ?

    ????????????????failed = false; ?

    ????????????????return?interrupted; ?

    ????????????} ?

    ????????????//如果p節點不是頭節點,或者tryAcquire返回false,說明請求失敗。 ?

    ????????????//那么首先需要判斷請求失敗后node節點是否應該被阻塞,如果應該 ?

    ????????????//被阻塞,那么阻塞node節點,并檢測中斷狀態。 ?

    ????????????if?(shouldParkAfterFailedAcquire(p, node) && ?

    ????????????????parkAndCheckInterrupt()) ?

    ????????????????//如果有中斷,設置中斷狀態。 ?

    ????????????????interrupted = true; ?

    ????????} ?

    ????} finally?{ ?

    ????????if?(failed) //最后檢測一下如果請求失敗(異常退出),取消請求。 ?

    ????????????cancelAcquire(node); ?

    ????} ?

    }

    在acquireQueued方法中,當前線程通過自旋的方式來嘗試獲取同步狀態,

  • 如果當前節點的前驅節點頭節點才能嘗試獲得鎖,如果獲得成功,則把當前線程設置成頭結點,把之前的頭結點從隊列中移除,等待垃圾回收(沒有對象引用)
  • 如果獲取鎖失敗則進入shouldParkAfterFailedAcquire方法中檢測當前節點是否可以被安全的掛起(阻塞),如果可以安全掛起則進入parkAndCheckInterrupt方法,把當前線程掛起,并檢查剛線程是否執行了interrupted方法。
  • 通過上面的代碼我們可以發現AQS內部的同步隊列是FIFO的方式存取的。節點自旋獲取同步狀態的行為如下圖所示

    ?

    節點自旋獲取同步狀態

    shouldParkAfterFailedAcquire方法代碼如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

    ????????//獲得前驅節點狀態

    ????????int?ws = pred.waitStatus;

    ????????if?(ws == Node.SIGNAL)

    ???????????//如果前驅節點狀態為SIGNAL,當前線程則可以阻塞。

    ???????????return?true;

    ????????if?(ws > 0) {

    ????????????do?{

    ????????????????//判斷如果前驅節點狀態為CANCELLED,那就一直往前找,直到找到最近一個正常等待的狀態

    ????????????????node.prev = pred = pred.prev;

    ????????????} while?(pred.waitStatus > 0);

    ????????????//并將當前Node排在它的后邊。

    ????????????pred.next = node;

    ????????} else?{

    ????????????//如果前驅節點正常,則修改前驅節點狀態為SIGNAL

    ????????????compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    ????????}

    ????????return?false;

    ????}

    節點的狀態如下表:

    狀態

    說明

    CANCELLED

    1

    等待超時或者中斷,需要從同步隊列中取消

    SIGNAL

    -1

    后繼節點出于等待狀態,當前節點釋放鎖后將會喚醒后繼節點

    CONDITION

    -2

    節點在等待隊列中,節點線程等待在Condition上,其它線程對Condition調用signal()方法后,該節點將會從等待同步隊列中移到同步隊列中,然后等待獲取鎖。

    PROPAGATE

    -3

    表示下一次共享式同步狀態獲取將會無條件地傳播下去

    INITIAL

    0

    初始狀態

  • 首先獲取前驅節點的狀態ws
  • 如果ws為SIGNAL則表示可以被前驅節點喚醒,當前線程就可以掛起,等待前驅節點喚醒,返回true(可以掛起)
  • 如果ws>0說明,前驅節點取消了,并循環查找此前驅節點之前所有連續取消的節點。并返回false(不能掛起)。
  • 嘗試將當前節點的前驅節點的等待狀態設為SIGNAL
  • parkAndCheckInterrupt方法代碼如下:

    private final boolean parkAndCheckInterrupt() {

    ????//阻塞當前線程

    ????LockSupport.park(this);

    ????//判斷是否中斷來喚醒的

    ????return?Thread.interrupted();

    }

  • 調用LockSupport.park(this);進行阻塞當前線程
  • 如果被喚醒判斷是不是被中斷的(喚醒有兩種可能性,一種是unpark,一種是interrupter)
  • 2. 獨占式釋放鎖

    release方法代碼如下:

    ????public final boolean release(int arg) {

    ????????//嘗試釋放鎖

    ????????if?(tryRelease(arg)) {

    ????????????Node h = head;

    ????????????if?(h != null?&& h.waitStatus != 0)

    ????????????????//喚醒后繼節點

    ????????????????unparkSuccessor(h);

    ????????????return?true;

    ????????}

    ????????return?false;

    ????}

    tryRelease(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。

  • 首先通過tryRelease方法釋放鎖如果釋放鎖成功,執行第2步。
  • 通過調用unparkSuccessor() 方法來喚醒頭結點的后繼節點。該方法內部是通過LockSupport.unpark(s.thread);來喚醒后繼節點的。
  • 3. 共享式獲得鎖

    acquireShared方法代碼如下:

    public final void acquireShared(int arg) {

    ????//嘗試獲取的鎖,如果獲取失敗執行doAcquireShared方法。

    ????if?(tryAcquireShared(arg) < 0)

    ????????doAcquireShared(arg);

    }

    tryAcquireShared()嘗試獲取鎖,如果獲取失敗則通過doAcquireShared()進入等待隊列,直到獲取到資源為止才返回。

    這里tryAcquireShared()需要自定義同步器去實現。
    AQS中規定:負值代表獲取失敗,非負數標識獲取成功。

    doAcquireShared方法代碼如下:

    private void doAcquireShared(int arg) {

    ????//構建共享Node

    ????final?Node node = addWaiter(Node.SHARED);

    ????boolean?failed = true;

    ????try?{

    ????????boolean?interrupted = false;

    ????????for?(;;) {

    ????????????//獲取前驅節點

    ????????????final?Node p = node.predecessor();

    ????????????//如果是頭節點進行嘗試獲得鎖

    ????????????if?(p == head) {

    ????????????????//如果返回值大于等于0,則說明獲得鎖

    ????????????????int?r = tryAcquireShared(arg);

    ????????????????if?(r >= 0) {

    ????????????????????//當前節點設置為隊列頭,并

    ????????????????????setHeadAndPropagate(node, r);

    ????????????????????p.next = null; // help GC

    ????????????????????if?(interrupted)

    ????????????????????????selfInterrupt();

    ????????????????????failed = false;

    ????????????????????return;

    ????????????????}

    ????????????}

    ????????????if?(shouldParkAfterFailedAcquire(p, node) &&

    ????????????????parkAndCheckInterrupt())

    ????????????????interrupted = true;

    ????????}

    ????} finally?{

    ????????if?(failed)

    ????????????cancelAcquire(node);

    ????}

    }

    在acquireQueued方法中,當前線程也通過自旋的方式來嘗試獲取同步狀態,同獨享式獲得鎖一樣

  • 如果當前節點的前驅節點頭節點才能嘗試獲得鎖,如果獲得成功,則把當前線程設置成頭結點,把之前的頭結點從隊列中移除,等待垃圾回收(沒有對象引用)
  • 如果獲取鎖失敗則進入shouldParkAfterFailedAcquire方法中檢測當前節點是否可以被安全的掛起(阻塞),如果可以安全掛起則進入parkAndCheckInterrupt方法,把當前線程掛起,并檢查剛線程是否執行了interrupted方法。
  • setHeadAndPropagate方法代碼如下:

    private void setHeadAndPropagate(Node node, int propagate) {

    ????????Node h = head; // Record old head for check below

    ????????setHead(node);

    ????????//如果propagate >0,說明共享鎖還有可以進行獲得鎖,繼續喚醒下一個節點

    ????????if?(propagate > 0?|| h == null?|| h.waitStatus < 0?||

    ????????????(h = head) == null?|| h.waitStatus < 0) {

    ????????????Node s = node.next;

    ????????????if?(s == null?|| s.isShared())

    ????????????????doReleaseShared();

    ????????}

    ????}

    設置當前節點為頭結點,并調用了doReleaseShared()方法,acquireShared方法最終調用了release方法,得看下為什么。原因其實也很簡單,shared模式下是允許多個線程持有一把鎖的,其中tryAcquire的返回值標志了是否允許其他線程繼續進入。如果允許的話,需要喚醒隊列中等待的線程。其中doReleaseShared方法的邏輯很簡單,就是喚醒后繼線程。

    因此acquireShared的主要邏輯就是嘗試加鎖,如果允許其他線程繼續加鎖,那么喚醒后繼線程,如果失敗,那么入隊阻塞等待。

    4. 共享式釋放鎖

    releaseShared方法代碼如下:

    public final boolean releaseShared(int arg) {

    ????if?(tryReleaseShared(arg)) {

    ????????doReleaseShared();

    ????????return?true;

    ????}

    ????return?false;

    }

    tryReleaseShared(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。

  • 首先通過tryReleaseShared方法釋放鎖如果釋放鎖成功,執行第2步。
  • 通過調用unparkSuccessor() 方法來喚醒頭結點的后繼節點。該方法內部是通過LockSupport.unpark(s.thread);來喚醒后繼節點的。
  • doReleaseShared方法代碼如下:

    private void doReleaseShared() {

    ????for?(;;) {

    ????????// 獲取隊列的頭節點

    ????????Node h = head;

    ????????// 如果頭節點不為null,并且頭節點不等于tail節點。

    ????????if?(h != null?&& h != tail) {

    ????????????// 獲取頭節點對應的線程的狀態

    ????????????int?ws = h.waitStatus;

    ????????????// 如果頭節點對應的線程是SIGNAL狀態,則意味著“頭節點的下一個節點所對應的線程”需要被unpark喚醒。

    ????????????if?(ws == Node.SIGNAL) {

    ????????????????// 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。

    ????????????????if?(!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

    ????????????????????continue;

    ????????????????// 喚醒“頭節點的下一個節點所對應的線程”。

    ????????????????unparkSuccessor(h);

    ????????????}

    ????????????// 如果頭節點對應的線程是空狀態,則設置“尾節點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。

    ????????????else?if?(ws == 0?&&

    ?????????????????????!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

    ????????????????continue; ???????????????// loop on failed CAS

    ????????}

    ????????// 如果頭節點發生變化,則繼續循環。否則,退出循環。

    ????????if?(h == head) ??????????????????// loop if head changed

    ????????????break;

    ????}

    }

    該方法主要是喚醒后繼節點。對于能夠支持多個線程同時訪問的并發組件(比如Semaphore),它和獨占式主要區別在于tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)線程安全釋放,一般是通過循環和CAS來保證的,因為釋放同步狀態的操作會同時來自多個線程。

    5. 獨占超時獲得鎖

    doAcquireNanos方法代碼如下:

    private boolean doAcquireNanos(int arg, long nanosTimeout)

    ????????throws InterruptedException {

    ????if?(nanosTimeout <= 0L)

    ????????return?false;

    ????//計算出超時時間點

    ????final?long?deadline = System.nanoTime() + nanosTimeout;

    ????final?Node node = addWaiter(Node.EXCLUSIVE);

    ????boolean?failed = true;

    ????try?{

    ????????for?(;;) {

    ????????????final?Node p = node.predecessor();

    ????????????if?(p == head && tryAcquire(arg)) {

    ????????????????setHead(node);

    ????????????????p.next = null; // help GC

    ????????????????failed = false;

    ????????????????return?true;

    ????????????}

    ????????????//計算剩余超時時間,超時時間點deadline減去當前時間點System.nanoTime()得到還應該睡眠的時間

    ????????????nanosTimeout = deadline - System.nanoTime();

    ????????????//如果超時,返回false,獲取鎖失敗

    ????????????if?(nanosTimeout <= 0L)

    ????????????????return?false;

    ????????????//判斷是否需要阻塞當前線程

    ????????????//如果需要,在判斷當前剩余納秒數是否大于1000

    ????????????if?(shouldParkAfterFailedAcquire(p, node) &&

    ????????????????nanosTimeout > spinForTimeoutThreshold)

    ????????????????//阻塞 nanosTimeout納秒數

    ????????????????LockSupport.parkNanos(this, nanosTimeout);

    ????????????if?(Thread.interrupted())

    ????????????????throw?new?InterruptedException();

    ????????}

    ????} finally?{

    ????????if?(failed)

    ????????????cancelAcquire(node);

    ????}

    }

    該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試獲取同步狀態,如果獲取成功則從該方法返回,這個過程和獨占式同步獲取的過程類似,但是在同步狀態獲取失敗的處理上有所不同。如果當前線程獲取同步狀態失敗,則首先重新計算超時間隔nanosTimeout,則判斷是否超時(nanosTimeout小于等于0表示已經超時),如果沒有超時,則使當前線程等待nanosTimeout納秒(當已到設置的超時時間,該線程會從LockSupport.parkNanos(Object blocker,long nanos)方法返回)。

    如果nanosTimeout小于等于spinForTimeoutThreshold(1000納秒)時,將不會使該線程進行
    超時等待,而是進入快速的自旋過程。原因在于,非常短的超時等待無法做到十分精確,如果
    這時再進行超時等待,相反會讓nanosTimeout的超時從整體上表現得反而不精確。因此,在超
    時非常短的場景下,同步器會進入無條件的快速自旋。

    總結

    以上是生活随笔為你收集整理的多线程:AQS源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。