JUC并发工具的使用和原理
Condition
Condition原理
Condition 是一個多線程協調通信的工具類,可以讓某些線程一起等待某個條件(condition),只有滿足條件時,線程才會被喚醒。
在AQS中存在兩個FIFO隊列:同步隊列 和 等待隊列。本篇文章主要是講condition實現原理(即等待隊里),同步隊列實現原理看這篇文章:深入剖析AQS。等待隊列是由Condition內部實現的,是一個虛擬的FIFO單向隊列,在AQS中同步隊列、等待隊列組成關系如下圖:
?
?
-
(1)AQS中tail 和 head主要構成了一個FIFO雙向的同步隊列。
-
(2)AQS中condition構成了一個FIFO單向等待隊列。condition是AQS內部類,每個Condition對象中保存了firstWaiter和lastWaiter作為隊列首節點和尾節點,每個節點使用Node.nextWaiter保存下一個節點的引用,因此等待隊列是一個單向隊列。
在Object的監視器(monitor)模型上,一個對象擁有一個同步隊列和一個等待隊列;而并發包中的AQS上擁有一個同步隊列和多個等待隊列。兩者的具體實現原理的有所不同,但在多線程下等待/喚醒 操作的思路有相同之處,Object的監視器模型 和 AQS對同步隊列、等待隊列對應關系如下圖
(1)Object的監視器模型同步、等待隊列對應關系圖
?
?
多個線程并發訪問某個對象監視器(Monitor對象)的時候,即多線程執行Synchonized處的代碼時,monitor處理過程包括:
-
(1)thread進入Synchonized代碼時,會執行Monitor.Enter命令來獲取monitor對象。如果命令執行成功獲取Monitor對象成功,執行失敗線程會進入synchronized同步隊列中,線程處于BLOCKED,直到monitor對象被釋放。
-
(2)thread執行完Synchonized同步代碼塊后,會執行Monitor.exit命令來釋放monitor對象,并通知同步隊列會獲取monitor對象。
-
(3)如果線程執行object.wait(),線程會進入synchronized等待隊列進行WAITING,直到其他線程線程執行notify()或notifyAll()方法,將等待隊列中的一個或多個等待線程從等待隊列中移到同步隊列中,被移動的線程狀態由WAITING變為BLOCKED。
(2)AQS中同步、等待隊列對應關系圖
?
?
當多線程并發訪問AQS的lock()、await()、single()方法時,同步隊列和等待隊列變化處理過程包括:
-
(1)多個形成執行lock()方法時,線程會競爭獲取同步鎖state,獲取成功的線程占有鎖state、獲取失敗的線程會封裝成node加入到AQS的同步隊列中,等待鎖state的釋放。
-
(2)等獲取了state鎖的線程(同步隊列中head節點)執行await()方法時,condition會將當前線程封裝成一個新的node添加到condition等待隊列的尾部,同時阻塞(waiting),直到被喚醒。
-
(3)等獲取了state鎖的線程(同步隊列中head節點)single()方法時,condition會將等待隊列首節點移動到同步隊列的尾部,直到獲取同步鎖state才被喚醒。
Condition 的基本使用
public class ConditionDemoWait implements Runnable {private Lock lock;private Condition condition; ?public ConditionDemoWait(Lock lock,Condition condition) {this.lock = lock;this.condition = condition;} ?@Overridepublic void run() {System.out.println("begin - ConditionDemoWait");try {lock.lock();condition.await();System.out.println("end - ConditionDemoWait");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}} ? } ? ? public class ConditionDemoSignal implementsRunnable {private Lock lock;private Condition condition; ?public ConditionDemoSignal(Lock lock,Condition condition) {this.lock = lock;this.condition = condition;} ?@Overridepublic void run() {System.out.println("begin - ConditionDemoSignal");try {lock.lock();condition.signal();System.out.println("end - ConditionDemoSignal");} finally {lock.unlock();}} ? }通過這個案例簡單實現了 wait 和 notify 的功能,當調用await 方法后,當前線程會釋放鎖并等待,而其他線程調用condition 對象的 signal 或者 signalall 方法通知并被阻塞的線程,然后自己執行 unlock 釋放鎖,被喚醒的線程獲得之前的鎖繼續執行,最后釋放鎖。所以,condition 中兩個最重要的方法,一個是 await,一個是 signal 方法
await:把當前線程阻塞掛起
signal:喚醒阻塞的線程
Condition 實現源碼分析
1. 等待的實現
當線程調用Condition.await()方法時,將會把前線程封裝成node節點,并將節點加入等待隊列的尾部,然后釋放同步state狀態,喚醒同步隊列中的后繼節點,然后當前線程會進入等待狀態。當前線程加入Condition的等待隊列邏輯如下圖:
?
?
-
能夠調用Condition.await()方法的節點是獲取了同步state鎖的node,即同步隊列中的head節點;調用Condition的await()方法(或者以await開頭的方法)會使當前線程進入等待隊列并釋放鎖、喚醒同步隊列中的后繼節點,最后線程狀態變為等待狀態。
-
Condition擁有首尾節點的引用,而新增節點只需要將原有的尾節點nextWaiter指向它,并且更新尾節點即可。
-
調用Condition.await()節點引用更新的過程并沒有使用CAS保證,原因在于調用await()方法的線程必定是獲取了state鎖的線程,也就是說該過程是由鎖來保證線程安全的。
await()方法源碼
整個await()的執行的過程可以總結如下幾步:
-
將當前線程封裝成node加入Condition等待隊列尾部。
-
釋放state鎖:不管重入幾次,都把state釋放為0,同時喚醒同步隊列的后繼節點。
-
自旋:直到node節點在等待隊列上的節點移動到了同步隊列(通過其他線程調用signal())或被中斷。
-
阻塞當前節點,直到node獲取到了鎖,也就是node在同步隊列上的節點排隊排到了隊首。
addConditionWaiter()方法源碼
//addConditionWaiter()方法主要是將線程封裝成節點,添加到等待隊列尾部 private Node addConditionWaiter() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node t = lastWaiter;//Condition里面的節點狀態不是等待狀態CONDITION時,會清除節點// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;} //將當前線程分裝成一個node,加到等待多了尾部Node node = new Node(Node.CONDITION); ?if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}fullyRelease()源碼
fullyRelease()方法中會調用release()釋放掉state鎖,不管重入幾次,都把state釋放為0,同時喚醒同步隊列的后繼節點。
final int fullyRelease(Node node) {try {//獲取持有state鎖的次數int savedState = getState();//把state釋放為0if (release(savedState))return savedState;throw new IllegalMonitorStateException();} catch (Throwable t) {//釋放鎖失敗,再將node設置為CANCELLED狀態node.waitStatus = Node.CANCELLED;throw t;}}isOnSyncQueue()源碼
final boolean isOnSyncQueue(Node node) {//如果當前節點狀態是CONDITION或node.prev是null,則證明當前節點在等待隊列上而不是同步隊列上。用node.prev來判斷,是因為一個節點如果要加入同步隊列,在加入前就會設置好prev字段。if (node.waitStatus == Node.CONDITION || node.prev == null)return false;//如果node.next不為null,則一定在同步隊列上,因為node.next是在節點加入同步隊列后設置的if (node.next != null) // If has successor, it must be on queuereturn true;//從等待隊列的尾部遍歷判斷node是否在等待隊列return findNodeFromTail(node);}reportInterruptAfterWait()方法源碼
reportInterruptAfterWait()方法會根據中斷狀態來判斷是拋出異常,還是執行中斷。即判斷線程是在被signal前中斷,還是在被signal后中斷;如果是被signal前就被中斷則拋出 InterruptedException,否則執行 Thread.currentThread().interrupt()。
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}到此condition的wait()方法分析就完了,可以看出,await()的操作過程和Object.wait()方法是一樣,只不過await()采用了Condition等待隊列的方式實現了Object.wait()的功能。
2.通知的實現
調用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將等待隊列中節點移到同步隊列中。Condition的signal()方法將節點從等待隊列移動到同步隊列邏輯如下圖:
?
?
整個signal()的過程可以總結如下:
-
(1)執行signal()喚醒線程時,先判斷當前線程是否是同步鎖state持有線程,所以能夠調用signal()方法的線程一定持有了同步鎖state。
-
(2)自旋喚醒等待隊列的firstWaiter(首節點),在喚醒firstWaiter節點之前,會將等待隊列首節點移到同步隊列中。
總結:
-
(1)Condition等待通知的本質就是等待隊列 和 同步隊列的交互的過程,跟object的wait()/notify()機制一樣;Condition是基于同步鎖state實現的,而objec是基于monitor模式實現的。
-
(2)一個lock(AQS)可以有多個Condition,即多個等待隊列,只有一個同步隊列。
-
(3)Condition.await()方法執行時,會將同步隊列里的head鎖釋放掉,把線程封裝成新node添加到等待隊列中;Condition.signal()方法執行時,會把等待隊列中的首節點移到同步隊列中去,直到鎖state被獲取才被喚醒。
CountDownLatch
countdownlatch 是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程的操作執行完畢再執行。從命名可以解讀到 countdown 是倒數的意思,類似于我們倒計時的概念。countdownlatch 提供了兩個方法,一個是 countDown,一個是 await,countdownlatch 初始化的時候需要傳入一個整數,在這個整數倒數到 0 之前,調用了 await 方法的程序都必須要等待,然后通過 countDown 來倒數。
使用案例
在應急項目中的首頁查詢接口中用到了countDownLatch,目的是阻塞主線程防止子線程還未查詢完成就直接返回不完整的結果給前端
CountDownLatch endGate = new CountDownLatch(9); LoginUserDetails loginUserDetails = UserUtil.getUser(); ? try {//1.化工危化new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"905265f501ab74ed2a968436d08ecde0"},loginUserDetails.getDistrict()),"化工危化企業(個)",1,null));endGate.countDown();}).start(); ?//2.規上企業new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"6850908220874f05ca5cfdf87ceda4dd"},loginUserDetails.getDistrict()),"規上企業(個)",2,null));endGate.countDown();}).start(); ?//3.非煤礦山new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"506a689d0f42df11cfaa63af3eec90e4"},loginUserDetails.getDistrict()),"非煤礦山(個)",3,null));endGate.countDown();}).start(); ?//4.煙花爆竹new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"e8a2668a0d7b79a66a197a7a14925201"},loginUserDetails.getDistrict()),"煙花爆竹(個)",4,null));endGate.countDown();}).start(); ?//5.自然災害new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"1ff895900f7c7a708ef103168597b1c1"},loginUserDetails.getDistrict()),"自然災害",5,null));endGate.countDown();}).start(); ?//6.道路運輸new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String []{"5357f32abc4dda610c622a89ee88d006"},loginUserDetails.getDistrict()),"道路運輸",6,null));endGate.countDown();}).start(); ?//7.建設工程new Thread(() -> { listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,getJSGCWJFW(loginUserDetails.getDistrict()),"建設工程及危舊房屋",7,null));endGate.countDown();}).start(); ?//8.密集場所new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String[]{"f0422a6fd0a15f5d99602bca4f8e0446"},loginUserDetails.getDistrict()),"人員密集場所",8,null));endGate.countDown();}).start(); ?//9.山塘易澇積水new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"47c95f2c234549f54c5213d0ae0d6ba9"},loginUserDetails.getDistrict()),"水利工程",9,null));endGate.countDown();}).start(); ?endGate.await(); ? } catch (InterruptedException e) {result.error500("查詢失敗");e.printStackTrace(); }模擬高并發場景
static CountDownLatch countDownLatch=new CountDownLatch(1); @Override public void run() {try {countDownLatch.await();//TODO} catch (InterruptedException e) {e.printStackTrace();}System.out.println("ThreadName:"+Thread.currentThread().getName());} public static void main(String[] args) throws InterruptedException {for(int i=0;i<1000;i++){new Demo().start();}countDownLatch.countDown(); }總的來說,凡事涉及到需要指定某個人物在執行之前,要等到前置人物執行完畢之后才執行的場景,都可以使用CountDownLatch
源碼分析
CountDownLatch的UML類圖如下:
?
?
CountDownLatch的數據結構很簡單,它是通過共享鎖實現的。它包含了sync對象,sync是Sync類型。Sync是實例類,它繼承于AQS。對于 CountDownLatch,我們僅僅需要關心兩個方法,一個是 countDown() 方法,另一個是 await() 方法。countDown() 方法每次調用都會將 state 減 1,直到state 的值為 0;而 await 是一個阻塞方法,當 state 減 為 0 的時候,await 方法才會返回。await 可以被多個線程調用,大家在這個時候腦子里要有個圖:所有調用了await 方法的線程阻塞在 AQS 的阻塞隊列中,等待條件滿足(state == 0),將線程從隊列中一個個喚醒過來。
首先我們來看一下await() 的源碼:
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1); }//判斷當前線程是否獲取到了共享鎖( 在CountDownLatch 中,使用的是共享鎖機制,因為CountDownLatch并不需要實現互斥的特性) public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//state 如果不等于0,說明當前線程需要加入到共享鎖隊列中if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {創建一個共享模式的節點添加到隊列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {//就判斷嘗試獲取鎖int r = tryAcquireShared(arg);if (r >= 0) {r>=0 表示獲取到了執行權限,這個時候因為 state!=0,所以不會執行這段代碼setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//阻塞線程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}加入這個時候有 3 個線程調用了 await 方法,由于這個時候 state 的值還不為 0,所以這三個線程都會加入到 AQS隊列中。并且三個線程都處于阻塞狀態
?
?
CountDownLatch.countDown
由于線程被 await 方法阻塞了,所以只有等到countdown 方法使得 state=0 的時候才會被喚醒,我們來看看 countdown 做了什么
只有當 state 減為 0 的時候,tryReleaseShared 才返回 true, 否則只是簡單的 state = state - 1
如果 state=0, 則調用 doReleaseShared喚醒處于 await 狀態下的線程
AQS. doReleaseShared
共享鎖的釋放和獨占鎖的釋放有一定的差別前面喚醒鎖的邏輯和獨占鎖是一樣,先判斷頭結點是不是SIGNAL 狀態,如果是,則修改為 0,并且喚醒頭結點的下一個節點PROPAGATE: 標識為PROPAGATE狀態的節點,是共享鎖模式下的節點狀態,處于這個狀態下的節點,會對線程的喚醒進行傳播
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}//這個 CAS 失敗的場景是:執行到這里的時候,剛好有一個節點入隊,入隊會將這個 ws 設置為 -1else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果到這里的時候,前面喚醒的線程已經占領了 head,那么再循環// 通過檢查頭節點是否改變了,如果改變了就繼續循環if (h == head) // loop if head changedbreak;}}h == head:說明頭節點還沒有被剛剛用unparkSuccessor 喚醒的線程(這里可以理解為ThreadB)占有,此時 break 退出循環。h != head:頭節點被剛剛喚醒的線程(這里可以理解為ThreadB)占有,那么這里重新進入下一輪循環,喚醒下一個節點(這里是 ThreadB )。我們知道,等到ThreadB 被喚醒后,其實是會主動喚醒 ThreadC...
doAcquireSharedInterruptibly
一旦 ThreadA 被喚醒,代碼又會繼續回到doAcquireSharedInterruptibly 中來執行。如果當前 state滿足=0 的條件,則會執行 setHeadAndPropagate 方法
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}setHeadAndPropagate
這個方法的主要作用是把被喚醒的節點,設置成 head 節 點。 然后繼續喚醒隊列中的其他線程。由于現在隊列中有 3 個線程處于阻塞狀態,一旦 ThreadA被喚醒,并且設置為 head 之后,會繼續喚醒后續的ThreadB
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}?
?
Semaphore
semaphore 也就是我們常說的信號燈,semaphore 可以控制同時訪問的線程個數,通過 acquire 獲取一個許可,如果沒有就等待,通過 release 釋放一個許可。有點類似限流的作用。叫信號燈的原因也和他的用處有關,比如某商場就 5 個停車位,每個停車位只能停一輛車,如果這個時候來了 10 輛車,必須要等前面有空的車位才能進入。
使用案例
public class SemaphoreDemo {//限流(AQS)//permits; 令牌(5)//公平和非公平static class Car extends Thread{private int num;private Semaphore semaphore;public Car(int num, Semaphore semaphore) {this.num = num;this.semaphore = semaphore;}public void run(){try {semaphore.acquire(); //獲得一個令牌, 如果拿不到令牌,就會阻塞System.out.println("第"+num+" 搶占一個車位");Thread.sleep(2000);System.out.println("第"+num+" 開走嘍");semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {Semaphore semaphore=new Semaphore(5);for(int i=0;i<10;i++){new Car(i,semaphore).start();}} }使用場景
Semaphore 比較常見的就是用來做限流操作了
源碼分析
從 Semaphore 的功能來看,我們基本能猜測到它的底層實現一定是基于 AQS 的共享鎖,因為需要實現多個線程共享一個領排池創建 Semaphore 實例的時候,需要一個參數 permits,這個基本上可以確定是設置給 AQS 的 state 的,然后每個線程調用 acquire 的時候,執行 state = state - 1,release 的時候執行 state = state + 1,當然,acquire 的時候,如果 state = 0,說明沒有資源了,需要等待其他線程 release。Semaphore 分公平策略和非公平策略
FairSync
?
?
NonfairSync
?
?
兩者區別在于是不是會先判斷是否有線程在排隊,然后才進行 CAS 減操作
CyclicBarrier
CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續工作。CyclicBarrier 默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 當前線程已經到達了屏障,然后當前線程被阻塞
使用場景
當存在需要所有的子任務都完成時,才執行主任務,這個時候就可以選擇使用 CyclicBarrier(跟countDownLatch很像,不過countDownLatch不可重用)
案例略......
注意點
1)對于指定計數值 parties,若由于某種原因,沒有足夠的線程調用 CyclicBarrier 的 await,則所有調用 await 的線程都會被阻塞;
2)同樣的 CyclicBarrier 也可以調用 await(timeout, unit),設置超時時間,在設定時間內,如果沒有足夠線程到達,則解除阻塞狀態,繼續工作;
3)通過 reset 重置計數,會使得進入 await 的線程出現BrokenBarrierException;
4 )如果采用是 CyclicBarrier(int parties, Runnable barrierAction) 構造方法,執行 barrierAction 操作的是最后一個到達的線程
Condition
Condition原理
Condition 是一個多線程協調通信的工具類,可以讓某些線程一起等待某個條件(condition),只有滿足條件時,線程才會被喚醒。
在AQS中存在兩個FIFO隊列:同步隊列 和 等待隊列。本篇文章主要是講condition實現原理(即等待隊里),同步隊列實現原理看這篇文章:深入剖析AQS。等待隊列是由Condition內部實現的,是一個虛擬的FIFO單向隊列,在AQS中同步隊列、等待隊列組成關系如下圖:
?
?
- (1)AQS中tail 和 head主要構成了一個FIFO雙向的同步隊列。
- (2)AQS中condition構成了一個FIFO單向等待隊列。condition是AQS內部類,每個Condition對象中保存了firstWaiter和lastWaiter作為隊列首節點和尾節點,每個節點使用Node.nextWaiter保存下一個節點的引用,因此等待隊列是一個單向隊列。
在Object的監視器(monitor)模型上,一個對象擁有一個同步隊列和一個等待隊列;而并發包中的AQS上擁有一個同步隊列和多個等待隊列。兩者的具體實現原理的有所不同,但在多線程下等待/喚醒 操作的思路有相同之處,Object的監視器模型 和 AQS對同步隊列、等待隊列對應關系如下圖
(1)Object的監視器模型同步、等待隊列對應關系圖
?
?
多個線程并發訪問某個對象監視器(Monitor對象)的時候,即多線程執行Synchonized處的代碼時,monitor處理過程包括:
- (1)thread進入Synchonized代碼時,會執行Monitor.Enter命令來獲取monitor對象。如果命令執行成功獲取Monitor對象成功,執行失敗線程會進入synchronized同步隊列中,線程處于BLOCKED,直到monitor對象被釋放。
- (2)thread執行完Synchonized同步代碼塊后,會執行Monitor.exit命令來釋放monitor對象,并通知同步隊列會獲取monitor對象。
- (3)如果線程執行object.wait(),線程會進入synchronized等待隊列進行WAITING,直到其他線程線程執行notify()或notifyAll()方法,將等待隊列中的一個或多個等待線程從等待隊列中移到同步隊列中,被移動的線程狀態由WAITING變為BLOCKED。
(2)AQS中同步、等待隊列對應關系圖
?
?
當多線程并發訪問AQS的lock()、await()、single()方法時,同步隊列和等待隊列變化處理過程包括:
- (1)多個形成執行lock()方法時,線程會競爭獲取同步鎖state,獲取成功的線程占有鎖state、獲取失敗的線程會封裝成node加入到AQS的同步隊列中,等待鎖state的釋放。
- (2)等獲取了state鎖的線程(同步隊列中head節點)執行await()方法時,condition會將當前線程封裝成一個新的node添加到condition等待隊列的尾部,同時阻塞(waiting),直到被喚醒。
- (3)等獲取了state鎖的線程(同步隊列中head節點)single()方法時,condition會將等待隊列首節點移動到同步隊列的尾部,直到獲取同步鎖state才被喚醒。
Condition 的基本使用
public class ConditionDemoWait implements Runnable {private Lock lock;private Condition condition;public ConditionDemoWait(Lock lock,Condition condition) {this.lock = lock;this.condition = condition;}@Overridepublic void run() {System.out.println("begin - ConditionDemoWait");try {lock.lock();condition.await();System.out.println("end - ConditionDemoWait");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}} } public class ConditionDemoSignal implementsRunnable {private Lock lock;private Condition condition;public ConditionDemoSignal(Lock lock,Condition condition) {this.lock = lock;this.condition = condition;}@Overridepublic void run() {System.out.println("begin - ConditionDemoSignal");try {lock.lock();condition.signal();System.out.println("end - ConditionDemoSignal");} finally {lock.unlock();}} }通過這個案例簡單實現了 wait 和 notify 的功能,當調用await 方法后,當前線程會釋放鎖并等待,而其他線程調用condition 對象的 signal 或者 signalall 方法通知并被阻塞的線程,然后自己執行 unlock 釋放鎖,被喚醒的線程獲得之前的鎖繼續執行,最后釋放鎖。所以,condition 中兩個最重要的方法,一個是 await,一個是 signal 方法
await:把當前線程阻塞掛起
signal:喚醒阻塞的線程
Condition 實現源碼分析
1. 等待的實現
當線程調用Condition.await()方法時,將會把前線程封裝成node節點,并將節點加入等待隊列的尾部,然后釋放同步state狀態,喚醒同步隊列中的后繼節點,然后當前線程會進入等待狀態。當前線程加入Condition的等待隊列邏輯如下圖:
?
?
- 能夠調用Condition.await()方法的節點是獲取了同步state鎖的node,即同步隊列中的head節點;調用Condition的await()方法(或者以await開頭的方法)會使當前線程進入等待隊列并釋放鎖、喚醒同步隊列中的后繼節點,最后線程狀態變為等待狀態。
- Condition擁有首尾節點的引用,而新增節點只需要將原有的尾節點nextWaiter指向它,并且更新尾節點即可。
- 調用Condition.await()節點引用更新的過程并沒有使用CAS保證,原因在于調用await()方法的線程必定是獲取了state鎖的線程,也就是說該過程是由鎖來保證線程安全的。
await()方法源碼
整個await()的執行的過程可以總結如下幾步:
- 將當前線程封裝成node加入Condition等待隊列尾部。
- 釋放state鎖:不管重入幾次,都把state釋放為0,同時喚醒同步隊列的后繼節點。
- 自旋:直到node節點在等待隊列上的節點移動到了同步隊列(通過其他線程調用signal())或被中斷。
- 阻塞當前節點,直到node獲取到了鎖,也就是node在同步隊列上的節點排隊排到了隊首。
addConditionWaiter()方法源碼
//addConditionWaiter()方法主要是將線程封裝成節點,添加到等待隊列尾部 private Node addConditionWaiter() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node t = lastWaiter;//Condition里面的節點狀態不是等待狀態CONDITION時,會清除節點// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}//將當前線程分裝成一個node,加到等待多了尾部Node node = new Node(Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}fullyRelease()源碼
fullyRelease()方法中會調用release()釋放掉state鎖,不管重入幾次,都把state釋放為0,同時喚醒同步隊列的后繼節點。
final int fullyRelease(Node node) {try {//獲取持有state鎖的次數int savedState = getState();//把state釋放為0if (release(savedState))return savedState;throw new IllegalMonitorStateException();} catch (Throwable t) {//釋放鎖失敗,再將node設置為CANCELLED狀態node.waitStatus = Node.CANCELLED;throw t;}}isOnSyncQueue()源碼
final boolean isOnSyncQueue(Node node) {//如果當前節點狀態是CONDITION或node.prev是null,則證明當前節點在等待隊列上而不是同步隊列上。用node.prev來判斷,是因為一個節點如果要加入同步隊列,在加入前就會設置好prev字段。if (node.waitStatus == Node.CONDITION || node.prev == null)return false;//如果node.next不為null,則一定在同步隊列上,因為node.next是在節點加入同步隊列后設置的if (node.next != null) // If has successor, it must be on queuereturn true;//從等待隊列的尾部遍歷判斷node是否在等待隊列return findNodeFromTail(node);}reportInterruptAfterWait()方法源碼
reportInterruptAfterWait()方法會根據中斷狀態來判斷是拋出異常,還是執行中斷。即判斷線程是在被signal前中斷,還是在被signal后中斷;如果是被signal前就被中斷則拋出 InterruptedException,否則執行 Thread.currentThread().interrupt()。
private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode == THROW_IE)throw new InterruptedException();else if (interruptMode == REINTERRUPT)selfInterrupt();}到此condition的wait()方法分析就完了,可以看出,await()的操作過程和Object.wait()方法是一樣,只不過await()采用了Condition等待隊列的方式實現了Object.wait()的功能。
2.通知的實現
調用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將等待隊列中節點移到同步隊列中。Condition的signal()方法將節點從等待隊列移動到同步隊列邏輯如下圖:
?
?
整個signal()的過程可以總結如下:
- (1)執行signal()喚醒線程時,先判斷當前線程是否是同步鎖state持有線程,所以能夠調用signal()方法的線程一定持有了同步鎖state。
- (2)自旋喚醒等待隊列的firstWaiter(首節點),在喚醒firstWaiter節點之前,會將等待隊列首節點移到同步隊列中。
總結:
- (1)Condition等待通知的本質就是等待隊列 和 同步隊列的交互的過程,跟object的wait()/notify()機制一樣;Condition是基于同步鎖state實現的,而objec是基于monitor模式實現的。
- (2)一個lock(AQS)可以有多個Condition,即多個等待隊列,只有一個同步隊列。
- (3)Condition.await()方法執行時,會將同步隊列里的head鎖釋放掉,把線程封裝成新node添加到等待隊列中;Condition.signal()方法執行時,會把等待隊列中的首節點移到同步隊列中去,直到鎖state被獲取才被喚醒。
CountDownLatch
countdownlatch 是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程的操作執行完畢再執行。從命名可以解讀到 countdown 是倒數的意思,類似于我們倒計時的概念。countdownlatch 提供了兩個方法,一個是 countDown,一個是 await,countdownlatch 初始化的時候需要傳入一個整數,在這個整數倒數到 0 之前,調用了 await 方法的程序都必須要等待,然后通過 countDown 來倒數。
使用案例
在應急項目中的首頁查詢接口中用到了countDownLatch,目的是阻塞主線程防止子線程還未查詢完成就直接返回不完整的結果給前端
CountDownLatch endGate = new CountDownLatch(9); LoginUserDetails loginUserDetails = UserUtil.getUser(); try {//1.化工危化new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"905265f501ab74ed2a968436d08ecde0"},loginUserDetails.getDistrict()),"化工危化企業(個)",1,null));endGate.countDown();}).start();//2.規上企業new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"6850908220874f05ca5cfdf87ceda4dd"},loginUserDetails.getDistrict()),"規上企業(個)",2,null));endGate.countDown();}).start();//3.非煤礦山new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"506a689d0f42df11cfaa63af3eec90e4"},loginUserDetails.getDistrict()),"非煤礦山(個)",3,null));endGate.countDown();}).start();//4.煙花爆竹new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"e8a2668a0d7b79a66a197a7a14925201"},loginUserDetails.getDistrict()),"煙花爆竹(個)",4,null));endGate.countDown();}).start();//5.自然災害new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"1ff895900f7c7a708ef103168597b1c1"},loginUserDetails.getDistrict()),"自然災害",5,null));endGate.countDown();}).start();//6.道路運輸new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String []{"5357f32abc4dda610c622a89ee88d006"},loginUserDetails.getDistrict()),"道路運輸",6,null));endGate.countDown();}).start();//7.建設工程new Thread(() -> { listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,getJSGCWJFW(loginUserDetails.getDistrict()),"建設工程及危舊房屋",7,null));endGate.countDown();}).start();//8.密集場所new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String[]{"f0422a6fd0a15f5d99602bca4f8e0446"},loginUserDetails.getDistrict()),"人員密集場所",8,null));endGate.countDown();}).start();//9.山塘易澇積水new Thread(() -> {listVos.add(safetyRiskSourceService.getNaturalDataAuto(loginUserDetails,appendSufix(new String [] {"47c95f2c234549f54c5213d0ae0d6ba9"},loginUserDetails.getDistrict()),"水利工程",9,null));endGate.countDown();}).start();endGate.await(); } catch (InterruptedException e) {result.error500("查詢失敗");e.printStackTrace(); }模擬高并發場景
static CountDownLatch countDownLatch=new CountDownLatch(1); @Override public void run() {try {countDownLatch.await();//TODO} catch (InterruptedException e) {e.printStackTrace();}System.out.println("ThreadName:"+Thread.currentThread().getName());} public static void main(String[] args) throws InterruptedException {for(int i=0;i<1000;i++){new Demo().start();}countDownLatch.countDown(); }總的來說,凡事涉及到需要指定某個人物在執行之前,要等到前置人物執行完畢之后才執行的場景,都可以使用CountDownLatch
源碼分析
CountDownLatch的UML類圖如下:
?
?
CountDownLatch的數據結構很簡單,它是通過共享鎖實現的。它包含了sync對象,sync是Sync類型。Sync是實例類,它繼承于AQS。對于 CountDownLatch,我們僅僅需要關心兩個方法,一個是 countDown() 方法,另一個是 await() 方法。countDown() 方法每次調用都會將 state 減 1,直到state 的值為 0;而 await 是一個阻塞方法,當 state 減 為 0 的時候,await 方法才會返回。await 可以被多個線程調用,大家在這個時候腦子里要有個圖:所有調用了await 方法的線程阻塞在 AQS 的阻塞隊列中,等待條件滿足(state == 0),將線程從隊列中一個個喚醒過來。
首先我們來看一下await() 的源碼:
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1); } //判斷當前線程是否獲取到了共享鎖( 在CountDownLatch 中,使用的是共享鎖機制,因為CountDownLatch并不需要實現互斥的特性) public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//state 如果不等于0,說明當前線程需要加入到共享鎖隊列中if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {創建一個共享模式的節點添加到隊列中final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {//就判斷嘗試獲取鎖int r = tryAcquireShared(arg);if (r >= 0) {r>=0 表示獲取到了執行權限,這個時候因為 state!=0,所以不會執行這段代碼setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//阻塞線程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}加入這個時候有 3 個線程調用了 await 方法,由于這個時候 state 的值還不為 0,所以這三個線程都會加入到 AQS隊列中。并且三個線程都處于阻塞狀態
?
?
CountDownLatch.countDown
由于線程被 await 方法阻塞了,所以只有等到countdown 方法使得 state=0 的時候才會被喚醒,我們來看看 countdown 做了什么
AQS. doReleaseShared
共享鎖的釋放和獨占鎖的釋放有一定的差別前面喚醒鎖的邏輯和獨占鎖是一樣,先判斷頭結點是不是SIGNAL 狀態,如果是,則修改為 0,并且喚醒頭結點的下一個節點PROPAGATE: 標識為PROPAGATE狀態的節點,是共享鎖模式下的節點狀態,處于這個狀態下的節點,會對線程的喚醒進行傳播
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}//這個 CAS 失敗的場景是:執行到這里的時候,剛好有一個節點入隊,入隊會將這個 ws 設置為 -1else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果到這里的時候,前面喚醒的線程已經占領了 head,那么再循環// 通過檢查頭節點是否改變了,如果改變了就繼續循環if (h == head) // loop if head changedbreak;}}h == head:說明頭節點還沒有被剛剛用unparkSuccessor 喚醒的線程(這里可以理解為ThreadB)占有,此時 break 退出循環。h != head:頭節點被剛剛喚醒的線程(這里可以理解為ThreadB)占有,那么這里重新進入下一輪循環,喚醒下一個節點(這里是 ThreadB )。我們知道,等到ThreadB 被喚醒后,其實是會主動喚醒 ThreadC...
doAcquireSharedInterruptibly
一旦 ThreadA 被喚醒,代碼又會繼續回到doAcquireSharedInterruptibly 中來執行。如果當前 state滿足=0 的條件,則會執行 setHeadAndPropagate 方法
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}setHeadAndPropagate
這個方法的主要作用是把被喚醒的節點,設置成 head 節 點。 然后繼續喚醒隊列中的其他線程。由于現在隊列中有 3 個線程處于阻塞狀態,一旦 ThreadA被喚醒,并且設置為 head 之后,會繼續喚醒后續的ThreadB
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}?
?
Semaphore
semaphore 也就是我們常說的信號燈,semaphore 可以控制同時訪問的線程個數,通過 acquire 獲取一個許可,如果沒有就等待,通過 release 釋放一個許可。有點類似限流的作用。叫信號燈的原因也和他的用處有關,比如某商場就 5 個停車位,每個停車位只能停一輛車,如果這個時候來了 10 輛車,必須要等前面有空的車位才能進入。
使用案例
public class SemaphoreDemo {//限流(AQS)//permits; 令牌(5)//公平和非公平static class Car extends Thread{private int num;private Semaphore semaphore;public Car(int num, Semaphore semaphore) {this.num = num;this.semaphore = semaphore;}public void run(){try {semaphore.acquire(); //獲得一個令牌, 如果拿不到令牌,就會阻塞System.out.println("第"+num+" 搶占一個車位");Thread.sleep(2000);System.out.println("第"+num+" 開走嘍");semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {Semaphore semaphore=new Semaphore(5);for(int i=0;i<10;i++){new Car(i,semaphore).start();}} }使用場景
Semaphore 比較常見的就是用來做限流操作了
源碼分析
從 Semaphore 的功能來看,我們基本能猜測到它的底層實現一定是基于 AQS 的共享鎖,因為需要實現多個線程共享一個領排池創建 Semaphore 實例的時候,需要一個參數 permits,這個基本上可以確定是設置給 AQS 的 state 的,然后每個線程調用 acquire 的時候,執行 state = state - 1,release 的時候執行 state = state + 1,當然,acquire 的時候,如果 state = 0,說明沒有資源了,需要等待其他線程 release。Semaphore 分公平策略和非公平策略
FairSync
?
?
NonfairSync
?
?
兩者區別在于是不是會先判斷是否有線程在排隊,然后才進行 CAS 減操作
CyclicBarrier
CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續工作。CyclicBarrier 默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用 await 方法告訴 CyclicBarrier 當前線程已經到達了屏障,然后當前線程被阻塞
使用場景
當存在需要所有的子任務都完成時,才執行主任務,這個時候就可以選擇使用 CyclicBarrier(跟countDownLatch很像,不過countDownLatch不可重用)
案例略......
注意點
1)對于指定計數值 parties,若由于某種原因,沒有足夠的線程調用 CyclicBarrier 的 await,則所有調用 await 的線程都會被阻塞;
2)同樣的 CyclicBarrier 也可以調用 await(timeout, unit),設置超時時間,在設定時間內,如果沒有足夠線程到達,則解除阻塞狀態,繼續工作;
3)通過 reset 重置計數,會使得進入 await 的線程出現BrokenBarrierException;
4 )如果采用是 CyclicBarrier(int parties, Runnable barrierAction) 構造方法,執行 barrierAction 操作的是最后一個到達的線程
總結
以上是生活随笔為你收集整理的JUC并发工具的使用和原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 5G高级知识点
- 下一篇: 什么是半导体三大封装?