深入剖析线程同步工具CountDownLatch原理
0 CountDownLatch的作用
CountDownLatch作為一個多線程間的同步工具,它允許一個或多個線程等待其他線程(可以是多個)完成工作后,再恢復執行。
就像下面這樣:
1 從一個Demo說起
我們直接拿源碼中給出的Demo看一下,源碼中的這個demo可以看做模擬一個賽跑的場景。 賽跑肯定有跑得快的運動員也有跑的慢的運動員,每個運動員就表示一個線程。 運動員聽到槍聲后開始起跑,而最后一個運動員到達終點后,標志的比賽的結束。 整個過程如下圖所示:
CountDownLatch跑步模擬
源碼如下所示
public class Race {private static final int N = 4;public static void main(String[] args) throws InterruptedException {CountDownLatch startSignal = new CountDownLatch(1); // 鳴槍開始信號CountDownLatch doneSignal = new CountDownLatch(N); // 等待N個運動員都跑完后,比賽結束(結束信號)for (int i = 0; i < N; ++i) // N個運動員準備就緒,等待槍聲new Thread(new Runner(startSignal, doneSignal, i)).start();Thread.sleep(1000); // 等待所有運動員就緒System.out.println("所有運動員就緒");startSignal.countDown(); // 鳴槍,開賽System.out.println("比賽進行中...");doneSignal.await(); // 等待N個運動員全部跑完(等待doneSignal變為0)System.out.println("比賽結束");} }class Runner implements Runnable {private final CountDownLatch startSignal;private final CountDownLatch doneSignal;private int number;Runner(CountDownLatch startSignal, CountDownLatch doneSignal, int number) {this.startSignal = startSignal;this.doneSignal = doneSignal;this.number = number;}public void run() {try {// 等待槍聲(等待開始信號startSignal變為0)System.out.println(number + "號運動員準備就緒");startSignal.await();// 賽跑System.out.println(number + "號運動員跑步中...");Thread.sleep(new Random().nextInt(10) * 1000);// 此運動員跑到終點System.out.println(number + "號運動員到達終點");doneSignal.countDown();} catch (InterruptedException ex) {} // return;} }上面代碼運行后,輸出如下:
0號運動員準備就緒 3號運動員準備就緒 2號運動員準備就緒 1號運動員準備就緒 所有運動員就緒 比賽進行中... 0號運動員跑步中... 1號運動員跑步中... 2號運動員跑步中... 3號運動員跑步中... 2號運動員到達終點 1號運動員到達終點 0號運動員到達終點 3號運動員到達終點 比賽結束下面,深入到代碼細節,看一下CountDownLatch初始化、countDown方法、await方法是如何實現的。
2 CountDownLatch類圖
通過下圖來了解一下CountDownLatch的類繼承關系
CountDownLatch類圖 3 CountDownLatch的初始化 CountDownLatch只有一個構造方法:public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count); }他會初始化一個Sync,這是他的一個內部類,類似于ReentrantLock,Sync也繼承于AbstractQueuedSynchronizer(AQS)。
AQS是個啥?可以參考筆者的另一篇文章:Java隊列同步器(AQS)到底是怎么一回事
然后看一下Sync的源碼
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 調用AQS的setState方法,將state賦值為count的值Sync(int count) {setState(count);}// 獲取AQS state的當前值int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}} }所以CountDownLatch的初始化,其實是將參數count的值賦值給AQS的state,依然是用state來控制同步狀態。
4 await方法的實現
依然用上面賽跑的例子來說明這個問題。這里我們只考慮所有運動員等待槍聲的情景。
回憶一下,賽跑的例子中,通過下面的方式創建了鳴槍信號:
CountDownLatch startSignal = new CountDownLatch(1); // 鳴槍開始信號然后創建了N個線程(表示N個運動員),并調用其start方法讓其開始執行(運動員準備就緒,等待鳴槍開跑)。
然后通過在run方法中調用startSignal.await(),來實現等待鳴槍的動作(其實就是等startSignal的值降為0)。
我們來看一下他是怎么await的。
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1); }調用了AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 判斷線程是否已經被中斷if (Thread.interrupted())throw new InterruptedException();// 調用CountDownLatch.Sync的tryAcquireShared方法// 此方法判斷count的值是否==0,如果==0,返回1,否則返回-1// 目前我們還沒有執行countDown,所以count肯定不等于0,這里肯定返回-1// 所以會執行到AQS的doAcquireSharedInterruptibly方法中if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1; }AQS.doAcquireSharedInterruptibly的實現如下
/* Acquires in shared interruptible mode.* @param arg the acquire argument*/ // 此方法會在count>0時將當前線程加入到等待隊列中 // 由于我們目前還沒有執行countDown,所以count會保持>0 // 啟動的N個線程會全部加入到隊列中 private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 將當前線程添加到等待隊列中(SHARED模式)final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 自旋獲取同步狀態for (;;) {final Node p = node.predecessor();if (p == head) {// 依然調用CountDownLatch.Sync的tryAcquireShared方法判斷// 如果count降為0,退出自旋int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 將node的waitStatus設置為-1(常量SIGNAL,表示需要喚醒),并阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }** ```**假設N=4,那么4個線程全部start后,會全部加入到隊列中自旋等待,像下面這樣:CountDownLatch.await自旋等待 5 countDown方法的實現 countDown方法實際上就是將AQS中的state的值-1。然后判斷當前state的值是否==0,如果等于0,說明所有線程都執行結束了,需要喚醒所有等待的線程。依然繼續上面的場景,鳴槍后,所有的運動員開跑。鳴槍這個動作實際上就是在主線程中執行:startSignal.countDown();
這就相當于向剛才隊列中的所有線程發了一個恢復執行的信號,所有線程會被喚醒,繼續執行await后面的代碼。countDown具體干了啥呢?public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 調用CountDownLatch.Sync的tryReleaseShared方法
// 該方法嘗試將count值-1,并判斷-1后的count是否==0,如果==0,返回true,否則false
// 該方法的源碼已經在Sync的源碼中給出,可翻閱上文查看
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/**
- Release action for shared mode -- signals successor and ensures
- propagation. (Note: For exclusive mode, release just amounts
- to calling unparkSuccessor of head if it needs signal.)
/
private void doReleaseShared() {
/- Ensure that a release propagates, even if there are other
- in-progress acquires/releases. This proceeds in the usual
- way of trying to unparkSuccessor of head if it needs
- signal. But if it does not, status is set to PROPAGATE to
- ensure that upon release, propagation continues.
- Additionally, we must loop in case a new node is added
- while we are doing this. Also, unlike other uses of
- unparkSuccessor, we need to know if CAS to reset status
- fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// Node.SIGNAL == -1
// 由上文可知,進入隊列的線程的waitStatus都等于-1
// 所以這里為true
if (ws == Node.SIGNAL) {
// 嘗試將waitStatus從-1改為0,如果修改成功,就恢復這個線程的執行狀態
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
這里,被阻塞的線程又恢復執行,恢復到哪了呢?就是剛才自旋等待的那里。
把上面的源碼直接拿下來,再說明一下(注釋部分)
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 將當前線程添加到等待隊列中(SHARED模式)final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 線程被釋放后,繼續下一次循環for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);// 獲取頭節點,從頭結點開始釋放,由于count已經降為0,所以r >= 0為true// 然后會將自己摘除當前隊列,使下一個節點成為頭節點// 等下一個節點也恢復過來后,同樣執行上面的過程// 這樣,隊列中的所有線程就被釋放了if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 將node的waitStatus設置為-1(常量SIGNAL,表示需要喚醒),并阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }6 總結
本文從源碼層面詳細說明了CountDownLatch是如何運作的。 CountDownLatch也是基于AQS實現,所以了解AQS的機制,對于理解本文至關重要。 其實,CountDownLatch最核心的就是通過控制AQS的state,來同步多個線程之間的狀態。
總結
以上是生活随笔為你收集整理的深入剖析线程同步工具CountDownLatch原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于简单的打地鼠游戏开发总结
- 下一篇: 补码求法