日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java并发编程系列之CountDownLatch用法及详解

發布時間:2025/1/21 java 14 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java并发编程系列之CountDownLatch用法及详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

背景

前幾天一個同事問我,對這個CountDownLatch有沒有了解想問一些問題,當時我一臉懵逼,不知道如何回答。今天趕緊抽空好好補補。不得不說Doug Lea大師真的很牛,設計出如此好的類。

1、回顧舊知識

volatile關鍵字:當一個共享變量被volatile修飾時,它會保證修改的值會立即被更新到主存,當有其他線程需要讀取時,它會去內存中讀取新值。(這涉及到java內存模型了,有興趣了解java內存模型的可以先找資料看看)。

2、CountDownLatch簡介

CountDownLatch 可以理解就是個計數器,只能減不能加,同時它還有個門閂的作用,當計數器不為0時,門閂是鎖著的;當計數器減到0時,門閂就打開了。
如果還不是很理解的話,舉個簡單的例子就是,你去超市買東西,雖然已經到了關門時間但是只有顧客都走了超市才能關門,至于你買不買東西,超市不關心。只要顧客都走完了,我就可以關門了。

2、CountDownLatch具體使用場景

有A和B兩個任務,只有當A任務執行完之后,才能執行B任務。A和B都可以拆分小任務。比如下載一個大文件,可以使用多線程下載,等下載完之后,在統一處理。

2、CountDownLatch實現原理(jdk1.8)

CountDownLatch源碼

public class CountDownLatch {/*** 內部類繼承AQS* Uses AQS state to represent count.*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}private final Sync sync;/*** 構造方法一般傳線程總數*/public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}/*** 等待方法*/public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}/*** 等待重載超時等待*/public boolean await(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}/*** 計數減1*/public void countDown() {sync.releaseShared(1);}/*** 當前計數*/public long getCount() {return sync.getCount();}/****/public String toString() {return super.toString() + "[Count = " + sync.getCount() + "]";}

為什么定義一個內部類?這種結構的好處在于我們不必關心AbstractQueuedSynchronizer(以下簡稱AQS)的同步狀態管理、線程排隊、等待與喚醒等底層操作,我們只需重寫我們想要的方法。可生成特定并發工具類。
CountDownLatch主要兩個方法就是一是CountDownLatch.await()阻塞當前線程,二是CountDownLatch.countDown()當前線程把計數器減一
看完源碼,我們可以看出實現CountDownLatch主要思想就是使用volatile和同步隊列來放置這些阻塞隊列。
a、CountDownLatch.await()方法
如果讓我們自己實現一個await方法我們會怎么做
一、首先會想到是會使用線程間wait/notify,使用synchronized關鍵字,檢查計數器值不為0,然后調用Object.wait();直到計數器值0則調用notifyAll()喚醒等待線程。但是大量的
synchronized代碼塊會存在假喚醒。
我們還是看看Doug Lea是怎么實現這個類的。

CountDownLatch構造方法

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}

構造方法傳入了一個int變量,這個int變量是AQS中的state,類型是volatile的,它就是用來表示計數器值的。內存共享這個變量,只有有修改,其他線程都能讀取到。

await方法

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}

調用await()的方法后,會默認調用sync這個實例的acquireSharedInterruptibly這個方法,并且參數為1,需要注意的是,這個方法聲明了一個InterruptedException異常,表示調用該方法的線程支持打斷操作。

sync acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

acquireSharedInterruptibly這個方法是sync繼承AQS而來的,這個方法的調用是響應線程的打斷的,所以在前兩行會檢查線程是否被打斷。接著調用tryAcquireShared()方法來判斷返回值,根據值的大小決定是否執行doAcquireSharedInterruptibly()。

tryAcquireShared這個方法是在Sync中重寫方法

protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}

在子類sync的tryAcquireShared中它只驗證了計數器的值是否為0,如果為0則返回1,反之返回-1,根據上面代碼可以看出,整數就不會執行doAcquireSharedInterruptibly(),該線程就結束方法,繼續執行本身代碼了。

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);// 往同步隊列中添加節點boolean failed = true;try {for (;;) {//死循環final Node p = node.predecessor();//當前線程的前一個節點if (p == head) {//首節點int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

因為計數器值不為0需要阻塞線程,所以在進入方法時,將該線程包裝成節點并加入到同步隊列尾部(addWaiter方法),我們看到這個方法退出去的途徑直有兩個,一個是return,一個是throw InterruptedException。注意最后的finally的處理。return退出方法有必須滿足兩個條件首先是首節點,其次是計數值為0。
throw InterruptedException是響應打斷操作的,線程在阻塞期間,如果你不想在等待了,可以打斷線程讓它繼續運行后面的任務(注意異常處理)。

addWaiter添加節點

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);//包裝節點// Try the fast path of enq; backup to full enq on failureNode pred = tail; //同步隊列尾節點if (pred != null) {node.prev = pred;//同步隊列有尾節點 將我們的節點通過cas方式添加到隊列后面if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);// 兩種情況執行這個代碼 1.隊列尾節點為null 2.隊列尾節點不為null,但是我們原子添加尾節點失敗return node;}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // cas形式添加頭節點 注意 是頭節點if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;//cas形式添加尾節點if (compareAndSetTail(t, node)) {t.next = node;return t;//結束方法必須是尾節點添加成功}}}}

b、CountDownLatch.countDown()方法
當部分線程調用await()方法后,它們在同步隊列中被掛起,然后循環的檢查自己能否滿足醒來的條件(還記得那個條件嗎?1、state為0,2、該節點為頭節點),

* +------+ prev +-----+ +-----+* head | | <---- | | <---- | | tail* +------+ +-----+ +-----+

同步隊列

volatile Node prev; volatile Node next;

volatile的prev指向上一個node節點,volatile的next指向下一個node節點。當然如果是頭節點,那么它的prev為null,同理尾節點的next為null。

private transient volatile Node head; private transient volatile Node tail;

用來表示同步隊列的頭節點和尾節點

countDown方法

public void countDown() {sync.releaseShared(1);}

releaseShared方法

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

在Sync類中并沒有releaseShared()方法,所以繼承與AQS,看到AQS這個方法中,退出該方法的只有兩條路。tryReleaseShared(arg)條件為真執行一個doReleaseShared()退出;條件為假直接退出。

protected boolean tryReleaseShared(int releases) {for (;;) {//死循環int c = getState();// 獲取主存中的state值if (c == 0) //state已經為0 直接退出return false;int nextc = c-1; // 減一 準備cas更新該值if (compareAndSetState(c, nextc)) //cas更新return nextc == 0; //更新成功 判斷是否為0 退出;更新失敗則繼續for循環,直到線程并發更新成功} }

doReleaseShared方法

private void doReleaseShared() {for (;;) {//死循環Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {//如果當前節點是SIGNAL,它正在等待一個信號,或者說它在等待被喚醒,因此做兩件事,1是重置waitStatus標志位,2是重置成功后,喚醒下一個節點。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; unparkSuccessor(h);}else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//如果本身頭節點的waitStatus是出于重置狀態(waitStatus==0)的,將其設置為“傳播”狀態。意味著需要將狀態向后一個節點傳播。continue; }if (h == head) break;} }

重點來了

為啥要執行這個方法呀,因為state已經為0啦,我們該將同步隊列中的線程狀態設置為共享狀態(Node.PROPAGATE,默認狀態ws == 0),并向后傳播,實現狀態共享。

退出死循環,只有一條,那就是h==head,即該線程是頭節點,且狀態為共享狀態。

可能有人有疑問,state已經等于0了,我們也通過循環的方式把頭節點的狀態設置為共享狀態,但是它怎么醒過來的呢?看上面doAcquireSharedInterruptibly方法。

在同步隊列中掛起的線程,它們自旋的形式查看自己是否滿足條件醒來(state==0,且為頭節點),如果成立將調用setHeadAndPropagate這個方法

private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();} }

看一個例子在加深下印象

/*** @author shuliangzhao* @Title: TestCountDownLatch* @ProjectName design-parent* @Description: TODO* @date 2019/6/2 12:19*/ public class CountDownLatchExc {private static final int i = 2;static class MyRunable implements Runnable {private int num;private CountDownLatch countDownLatch;public MyRunable(int num,CountDownLatch countDownLatch) {this.num = num;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {System.out.println("第" + num + "個線程開始執行任務...");Thread.sleep(2000);System.out.println("第" + num + "個線程開始執行結束...");} catch (InterruptedException e) {e.printStackTrace();}countDownLatch.countDown();}}public static void main(String[] args) {CountDownLatch countDownLatch = new CountDownLatch(i);for (int i = 0;i < 5;i++) {Thread thread = new Thread(new MyRunable(i,countDownLatch));thread.start();}System.out.println("main thread wait.");try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("main thread end...");} }

運行結果

?

image.png

以上就是CountDownLatch兩大重要方法解釋,可能理解有偏差,歡迎指出。

總結

以上是生活随笔為你收集整理的Java并发编程系列之CountDownLatch用法及详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。