Java Review - 并发编程_ CountDownLatch原理源码剖析
文章目錄
- Pre
- 小Demo
- CountDownLatch VS join方法
- 類圖關(guān)系
- 核心方法&源碼解析
- void await()
- boolean await(long timeout, TimeUnit unit)
- void countDown()
- long getCount()
- 小結(jié)
Pre
每日一博 - CountDownLatch使用場景分析以及源碼分析
在日常開發(fā)中經(jīng)常會遇到需要在主線程中開啟多個線程去并行執(zhí)行任務(wù),并且主線程需要等待所有子線程執(zhí)行完畢后再進(jìn)行匯總的場景。
在CountDownLatch出現(xiàn)之前一般都使用線程的join()方法來實現(xiàn)這一點,但是join方法不夠靈活,不能夠滿足不同場景的需要,所以JDK開發(fā)組提供了CountDownLatch這個類,使用CountDownLatch會更優(yōu)雅.
小Demo
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/19 10:46* @mark: show me the code , change the world*/ public class CountDownLatchTest {// 創(chuàng)建一個CountDownLatch實例private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 模擬業(yè)務(wù)運行");try {TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + " 業(yè)務(wù)運行Over");} catch (InterruptedException e) {e.printStackTrace();}finally {// 子線程執(zhí)行結(jié)束,減1countDownLatch.countDown();}});executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 模擬業(yè)務(wù)運行");try {TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + " 業(yè)務(wù)運行Over");} catch (InterruptedException e) {e.printStackTrace();}finally {// 子線程執(zhí)行結(jié)束,減1countDownLatch.countDown();}});// 等待子線程執(zhí)行執(zhí)行結(jié)束 返回countDownLatch.await();System.out.println( "子線程業(yè)務(wù)運行Over,主線程繼續(xù)工作");executorService.shutdown();} }
如上代碼中,
-
創(chuàng)建了一個CountDownLatch實例,因為有兩個子線程所以構(gòu)造函數(shù)的傳參為2。
-
主線程調(diào)用countDownLatch.await()方法后會被阻塞。
-
子線程執(zhí)行完畢后調(diào)用countDownLatch.countDown()方法讓countDownLatch內(nèi)部的計數(shù)器減1
-
所有子線程執(zhí)行完畢并調(diào)用countDown()方法后計數(shù)器會變?yōu)?,這時候主線程的await()方法才會返回。
CountDownLatch VS join方法
-
調(diào)用一個子線程的join()方法后,該線程會一直被阻塞直到子線程運行完畢
-
而CountDownLatch則使用計數(shù)器來允許子線程運行完畢或者在運行中遞減計數(shù),也就是CountDownLatch可以在子線程運行的任何時候讓await方法返回而不一定必須等到線程結(jié)束
-
另外,使用線程池來管理線程時一般都是直接添加Runable到線程池,這時候就沒有辦法再調(diào)用線程的join方法了,就是說countDownLatch相比join方法讓我們對線程同步有更靈活的控制
類圖關(guān)系
從類圖可以看出,CountDownLatch是使用AQS實現(xiàn)的。
通過下面的構(gòu)造函數(shù), 實際上是把計數(shù)器的值賦給了AQS的狀態(tài)變量state,也就是這里使用AQS的狀態(tài)值來表示計數(shù)器值。
/*** Constructs a {@code CountDownLatch} initialized with the given count.** @param count the number of times {@link #countDown} must be invoked* before threads can pass through {@link #await}* @throws IllegalArgumentException if {@code count} is negative*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);} Sync(int count) {setState(count);}int getCount() {return getState();}核心方法&源碼解析
接下來分析CountDownLatch中的幾個重要的方法,看它們是如何調(diào)用AQS來實現(xiàn)功能的。
void await()
當(dāng)線程調(diào)用CountDownLatch對象的await方法后,當(dāng)前線程會被阻塞,直到下面的情況之一發(fā)生才會返回
-
當(dāng)所有線程都調(diào)用了CountDownLatch對象的countDown方法后,也就是計數(shù)器的值為0時
-
其他線程調(diào)用了當(dāng)前線程的interrupt()方法中斷了當(dāng)前線程,當(dāng)前線程就會拋出InterruptedException異常,然后返回
await()方法委托sync調(diào)用了AQS的acquireSharedInterruptibly方法
// AQS獲取共享資源時響應(yīng)中斷的方法public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 響應(yīng)中斷 if (Thread.interrupted())throw new InterruptedException();// 查看當(dāng)前計數(shù)器是否為0 ,為0 直接返回,否則進(jìn)入AQS隊列等待 if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);} protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}-
由如上代碼可知,該方法的特點是線程獲取資源時可以被中斷,并且獲取的資源是共享資源。
-
acquireSharedInterruptibly首先判斷當(dāng)前線程是否已被中斷,若是則拋出異常,否則調(diào)用sync實現(xiàn)的tryAcquireShared方法查看當(dāng)前狀態(tài)值(計數(shù)器值)是否為0,是則當(dāng)前線程的await()方法直接返回,否則調(diào)用AQS的doAcquireSharedInterruptibly方法讓當(dāng)前線程阻塞。
-
另外可以看到,這里tryAcquireShared傳遞的arg參數(shù)沒有被用到,調(diào)用tryAcquireShared的方法僅僅是為了檢查當(dāng)前狀態(tài)值是不是為0,并沒有調(diào)用CAS讓當(dāng)前狀態(tài)值減1。
boolean await(long timeout, TimeUnit unit)
當(dāng)線程調(diào)用了CountDownLatch對象的該方法后,當(dāng)前線程會被阻塞,直到下面的情況之一發(fā)生才會返回
-
當(dāng)所有線程都調(diào)用了CountDownLatch對象的countDown方法后,也就是計數(shù)器值為0時,這時候會返回true
-
設(shè)置的timeout時間到了,因為超時而返回false
-
其他線程調(diào)用了當(dāng)前線程的interrupt()方法中斷了當(dāng)前線程,當(dāng)前線程會拋出InterruptedException異常,然后返回
void countDown()
線程調(diào)用該方法后,計數(shù)器的值遞減,遞減后如果計數(shù)器值為0則喚醒所有因調(diào)用await方法而被阻塞的線程,否則什么都不做
下面看下countDown()方法是如何調(diào)用AQS的方法的。
/*** Decrements the count of the latch, releasing all waiting threads if* the count reaches zero.** <p>If the current count is greater than zero then it is decremented.* If the new count is zero then all waiting threads are re-enabled for* thread scheduling purposes.** <p>If the current count equals zero then nothing happens.*/public void countDown() {// 委托調(diào)用AQS的releaseSharedsync.releaseShared(1);}AQS的方法
/*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {// 調(diào)用syn實現(xiàn)的tryReleaseSharedif (tryReleaseShared(arg)) {// AQS釋放資源的方法doReleaseShared();return true;}return false;}在如上代碼中,releaseShared首先調(diào)用了sync實現(xiàn)的AQS的tryReleaseShared方法,代碼如下
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zero// 循環(huán)進(jìn)行CAS, 直到當(dāng)前線程成功彎沉CAS使計數(shù)器值(狀態(tài)值state)減一 并更新statefor (;;) {int c = getState();// 1 如果狀態(tài)值為0 ,則直接返回if (c == 0)return false;// 2 使用CAS讓計數(shù)器減1 int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}如上代碼
-
首先獲取當(dāng)前狀態(tài)值(計數(shù)器值)。
-
代碼(1)判斷如果當(dāng)前狀態(tài)值為0則直接返回false,從而countDown()方法直接返回
-
否則執(zhí)行代碼(2)使用CAS將計數(shù)器值減1,CAS失敗則循環(huán)重試,否則如果當(dāng)前計數(shù)器值為0則返回true,返回true說明是最后一個線程調(diào)用的countdown方法,那么該線程除了讓計數(shù)器值減1外,還需要喚醒因調(diào)用CountDownLatch的await方法而被阻塞的線程,具體是調(diào)用AQS的doReleaseShared方法來激活阻塞的線程
-
這里代碼(1)貌似是多余的,其實不然,之所以添加代碼(1)是為了防止當(dāng)計數(shù)器值為0后,其他線程又調(diào)用了countDown方法,如果沒有代碼(1),狀態(tài)值就可能會變成負(fù)數(shù)。
long getCount()
獲取當(dāng)前計數(shù)器的值,也就是AQS的state的值,一般在測試時使用該方法
/*** Returns the current count.** <p>This method is typically used for debugging and testing purposes.** @return the current count*/public long getCount() {return sync.getCount();}在其內(nèi)部還是調(diào)用了AQS的getState方法來獲取state的值(計數(shù)器當(dāng)前值)
小結(jié)
CountDownLatch是使用AQS實現(xiàn)的。使用AQS的狀態(tài)變量state來存放計數(shù)器的值。
首先在初始化CountDownLatch時設(shè)置狀態(tài)值(計數(shù)器值),當(dāng)多個線程調(diào)用countdown方法時實際是原子性遞減AQS的狀態(tài)值。
當(dāng)線程調(diào)用await方法后當(dāng)前線程會被放入AQS的阻塞隊列等待計數(shù)器為0再返回。其他線程調(diào)用countdown方法讓計數(shù)器值遞減1,當(dāng)計數(shù)器值變?yōu)?時,當(dāng)前線程還要調(diào)用AQS的doReleaseShared方法來激活由于調(diào)用await()方法而被阻塞的線程。
總結(jié)
以上是生活随笔為你收集整理的Java Review - 并发编程_ CountDownLatch原理源码剖析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_S
- 下一篇: Java Review - 并发编程_