java并发编程之AbstractQueuedSynchronizer
引言
AbstractQueuedSynchronizer,隊列同步器,簡稱AQS,它是java并發用來構建鎖或者其他同步組件的基礎框架。
一般使用AQS的主要方式是繼承,子類通過實現它提供的抽象方法來管理同步狀態,主要管理的方式是通過tryAcquire和tryRelease類似的方法來操作狀態,同時,AQS提供以下線程安全的方法來對狀態進行操作:
protected final int getState(); protected final void setState(int newState); protected final boolean compareAndSetState(int expect, int update);AQS本身是沒有實現任何同步接口的,它僅僅只是定義了同步狀態的獲取和釋放的方法來供自定義的同步組件的使用。
注:AQS主要是怎么使用的呢?
在java的同步組件中,AQS的子類一般是同步組件的靜態內部類。
AQS是實現同步組件的關鍵,它倆的關系可以這樣描述:同步組件是面向使用者的,它定義了使用者與組件交互的接口,隱藏了具體的實現細節;而AQS面向的是同步組件的實現者,它簡化了具體的實現方式,屏蔽了線程切換相關底層操作,它們倆一起很好的對使用者和實現者所關注的領域做了一個隔離。
AQS實現分析
接下來將從實現的角度來具體分析AQS是如何來完成線程同步的。
同步隊列分析
AQS的實現依賴內部的同步隊列(FIFO雙向隊列)來完成同步狀態的管理,假如當前線程獲取同步狀態失敗,AQS會將該線程以及等待狀態等信息構造成一個Node,并將其加入同步隊列,同時阻塞當前線程。當同步狀態釋放時,喚醒隊列的首節點。
- Node
Node主要包含以下成員變量:
- prev:前驅節點;
- next:后繼節點;
- thread:進入隊列的當前線程;
- nextWaiter:存儲condition隊列中的后繼節點。
Node是sync隊列和condition隊列構建的基礎,AQS擁有三個成員變量:
AQS成員變量對于鎖的獲取,請求形成節點將其掛在隊列尾部,至于資源的轉移,是從頭到尾進行,隊列的基本結構就出來了:
AQS同步隊列結構同步隊列插入/刪除節點
節點插入
AQS提供基于CAS的設置尾節點的方法:
CAS設置尾節點
需要傳遞當前線程認為的尾節點和當前節點,設置成功后,當前節點與尾節點建立關聯。
同步隊列插入節點節點刪除
同步隊列刪除節點
同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態之后將會喚醒后繼節點,后繼節點將會在獲取同步狀態成功的時候將自己設置為首節點。
注:設置首節點是由獲取同步狀態成功的線程來完成,因為每次只會有一個線程能夠成功的獲取到同步狀態,所以,設置首節點并不需要CAS來保證。
AQS源碼解析
AQS提供以下接口以供實現自定義同步器:
獨占式獲取同步狀態,該方法的實現需要先查詢當前的同步狀態是否可以獲取,如果可以獲取再進行獲取;
釋放狀態;
共享式獲取同步狀態;
共享式釋放狀態;
獨占模式下,判斷同步狀態是否已經被占用。
使用者可以根據實際情況使用這些接口自定義同步組件。
AQS提供兩種方式來操作同步狀態,獨占式與共享式,下面就針對性做一下源碼分析。
獨占式同步狀態獲取 - acquire實現
獨占式同步狀態獲取具體執行流程如下:
下面我們具體來看一下節點的構造以及加入同步隊列部分的代碼實現。
- addWaiter實現
- enq實現
enq的邏輯可以確保Node可以有順序的添加到同步隊列中,具體的加入隊列的邏輯如下:
可以看出,整個enq方法通過“死循環”來保證節點的正確插入。
進入同步隊列之后接下來就是同步狀態的獲取了,或者說是訪問控制acquireQueued。對于同步隊列中的線程,在同一時刻只能由隊列首節點獲取同步狀態,其他的線程進入等待,直到符合條件才能繼續進行。
- acquireQueued實現
在整個方法中,當前線程一直都在“死循環”中嘗試獲取同步狀態:
節點自旋獲取同步狀態從代碼的邏輯也可以看出,其實在節點與節點之間在循環檢查的過程中是不會相互通信的,僅僅只是判斷自己當前的前驅是不是頭結點,這樣設計使得節點的釋放符合FIFO,同時也避免了過早通知。
注:過早通知是指前驅節點不是頭結點的線程由于中斷被喚醒。
acquire實現總結
- 同步狀態維護:
對同步狀態的操作是原子、非阻塞的,通過AQS提供的對狀態訪問的方法來對同步狀態進行操作,并且利用CAS來確保原子操作; - 狀態獲取:
一旦線程成功的修改了同步狀態,那么該線程會被設置為同步隊列的頭節點; - 同步隊列維護:
不符合獲取同步狀態的線程會進入等待狀態,直到符合條件被喚醒再開始執行。
整個執行流程如下:
acquire流程圖
當前線程獲取同步狀態并執行了相應的邏輯之后,就需要釋放同步狀態,讓后續節點可以獲取到同步狀態,調用方法release(int arg)方法可以釋放同步狀態。
獨占式同步狀態釋放 - release實現
- unparkSuccessor實現
取出當前節點的next節點,將該節點線程喚醒,被喚醒的線程獲取同步狀態。這里主要通過LockSupport的unpark方法喚醒線程。
共享式同步狀態獲取
共享式獲取與獨占式獲取最主要的區別就是在同一時刻能否有多個線程可以同時獲取到同步狀態。這兩種不同的方式在獲取資源區別如下圖所示:
AQS提供acquireShared方法來支持共享式獲取同步狀態。
- acquireShared實現
tryAcquireShared方法返回值 > 0時,表示能夠獲取到同步狀態;
- doAcquireShared實現
與獨占式獲取同步狀態一樣,共享式獲取也是需要釋放同步狀態的,AQS提供releaseShared(int arg)方法可以釋放同步狀態。
共享式同步狀態釋放 - releaseShared實現
releaseShared實現獨占式超時獲取 - doAcquireNanos
該方法提供了超時獲取同步狀態調用,假如在指定的時間段內可以獲取到同步狀態返回true,否則返回false。它是acquireInterruptibly(int arg)的增強版。
acquireInterruptibly實現
acquireInterruptibly實現
該方法提供了獲取同步狀態的能力,同樣,在無法獲取同步狀態時會進入同步隊列,這類似于acquire的功能,但是它和acquire還是區別的:acquireInterruptibly可以在外界對當前線程進行中斷的時候可以提前獲取到同步狀態的操作,換個通俗易懂的解釋吧:類似于synchronized獲取鎖時,這時候外界對當前線程中斷了,線程獲取鎖的這個操作能夠及時響應中斷并且提前返回。- 判斷當前線程是否被中斷,如果已經被中斷,拋出InterruptedException異常并將中斷標志位置為false;
- 獲取同步狀態,獲取成功并返回,獲取不成功調用doAcquireInterruptibly(int arg)排隊等待。
doAcquireInterruptibly實現
doAcquireInterruptibly實現- 構造節點Node,加入同步隊列;
- 假如當前節點是首節點并且可以獲取到同步狀態,將當前節點設置為頭結點,其他節點自旋等待;
- 節點每次被喚醒的時候,需要進行中斷檢測,假如當前線程被中斷,拋出異常InterruptedException,退出循環。
doAcquireNanos實現
doAcquireNanos實現
該方法在支持中斷響應的基礎上,增加了超時獲取的特性。針對超時獲取,主要在于計算出需要睡眠的時間間隔nanosTimeout,如果nanosTimeout > 0表示當前線程還需要睡眠,反之返回false。- nanosTimeout <= 0,表明當前線程不需要睡眠,返回false,不能獲取到同步狀態;
- 不滿足條件的線程加入同步隊列;
- 假如當前節點是首節點,并且可以獲取到同步狀態,將當前節點設置為頭結點并退出,返回true,表明在指定的時間內可以獲取到同步狀態;
- 不滿足條件3的線程,計算出當前休眠時間,nanosTimeout = 原有nanosTimeout + deadline(睡眠之前記錄的時間)- now(System.nanoTime():當前時間):
- 如果nanosTimeout <= 0,返回超時未獲取到同步狀態;
- 如果nanosTimeout > 0 && nanosTimeout <= 1000L,線程快速自旋
注:為什么不直接進入超時等待呢?原因在于非常短的超時等待是無法做到十分精確的,如果這時候再進入超時等待會讓nanosTimeout的超時從整體上表現的不精確,所以,在超時非常短的情況下,AQS都會無條件進入快速自旋;
- 如果nanosTimeout > 1000L,線程通過LockSupport.parkNanos進入超時等待。
整個流程可以總結如下圖所示:
doAcquireNanos流程圖
后記
在上述對AQS進行了實現層面的分析之后,我們就一個案例來加深對AQS的理解。
案例:設計一個AQS同步器,該工具在同一時刻,只能有兩個線程能夠訪問,其他的線程阻塞。
設計分析:針對上述案例,我們可以這樣定義AQS,設定一個初始狀態為2,每一個線程獲取一次就-1,正確的狀態為:0,1,2,0表示新的線程獲取同步狀態時阻塞。由于資源數量大與1,需要實現tryAcquireShared和tryReleaseShared方法。
代碼實現:
public class LockInstance implements Lock {private final Sync sync = new Sync(2);private static final class Sync extends AbstractQueuedSynchronizer { Sync(int state) {if (state <= 0) {throw new IllegalArgumentException("count must large than 0");}setState(state);}public int tryAcquireShared(int arg) {for (;;) {System.out.println("try acquire....");int current = getState();int now = current - arg;if (now < 0 || compareAndSetState(current, now)) {return now;}}}public boolean tryReleaseShared(int arg) {for(;;) {System.out.println("try release....");int current = getState();int now = current + arg;if (compareAndSetState(current, now)) {return true;}}}}public void lock() {sync.acquireShared(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock() {return sync.tryAcquireShared(1) >= 0;}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(time));}public void unlock() {sync.tryReleaseShared(1);}public Condition newCondition() {return null;} }作者:miaoLoveCode
鏈接:https://www.jianshu.com/p/df0d7d6571de
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
總結
以上是生活随笔為你收集整理的java并发编程之AbstractQueuedSynchronizer的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AbstractQueuedSynchr
- 下一篇: 深入分析AbstractQueuedSy