j.u.c系列(08)---之并发工具类:CountDownLatch
寫在前面
CountDownLatch所描述的是”在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待“:用給定的計數 初始化 CountDownLatch。由于調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。CountDownLatch的本質也是一個"共享鎖"
\
CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。// 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。 void await() // 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 boolean await(long timeout, TimeUnit unit) // 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。 void countDown() // 返回當前計數。 long getCount() // 返回標識此鎖存器及其狀態的字符串。 String toString()CountDownLatch是通過一個計數器來實現的,當我們在new 一個CountDownLatch對象的時候需要帶入該計數器值,該值就表示了線程的數量。每當一個線程完成自己的任務后,計數器的值就會減1。當計數器的值變為0時,就表示所有的線程均已經完成了任務,然后就可以恢復等待的線程繼續執行了。
雖然,CountDownlatch與CyclicBarrier(后續會接受。另外一并發工具類)區別:
?
實現分析
通過上面的結構圖我們可以看到,CountDownLatch內部依賴Sync實現,而Sync繼承AQS。CountDownLatch僅提供了一個構造方法:
CountDownLatch(int count) : 構造一個用給定計數初始化的 CountDownLatch
?
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}sync為CountDownLatch的一個內部類,其定義如下:
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}//獲取同步狀態int getCount() {return getState();}//獲取同步狀態protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}//釋放同步狀態protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}?
? 通過這個內部類Sync我們可以清楚地看到CountDownLatch是采用共享鎖來實現的。
CountDownLatch提供await()方法來使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷,定義如下:
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}?
? await其內部使用AQS的acquireSharedInterruptibly(int arg):
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}?
? 在內部類Sync中重寫了tryAcquireShared(int arg)方法:
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}getState()獲取同步狀態,其值等于計數器的值,從這里我們可以看到如果計數器值不等于0,則會調用doAcquireSharedInterruptibly(int arg),該方法為一個自旋方法會嘗試一直去獲取同步狀態:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {/*** 對于CountDownLatch而言,如果計數器值不等于0,那么r 會一直小于0*/int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//等待if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}?
? CountDownLatch提供countDown() 方法遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。
public void countDown() {sync.releaseShared(1);}?
?內部調用AQS的releaseShared(int arg)方法來釋放共享鎖同步狀態:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}?
? tryReleaseShared(int arg)方法被CountDownLatch的內部類Sync重寫:
protected boolean tryReleaseShared(int releases) {for (;;) {//獲取鎖狀態int c = getState();//c == 0 直接返回,釋放鎖成功if (c == 0)return false;//計算新“鎖計數器”int nextc = c-1;//更新鎖狀態(計數器)if (compareAndSetState(c, nextc))return nextc == 0;}}?
?
總結
CountDownLatch內部通過共享鎖實現。在創建CountDownLatch實例時,需要傳遞一個int型的參數:count,該參數為計數器的初始值,也可以理解為該共享鎖可以獲取的總次數。當某個線程調用await()方法,程序首先判斷count的值是否為0,如果不會0的話則會一直等待直到為0為止。當其他線程調用countDown()方法時,則執行釋放共享鎖狀態,使count值 - 1。當在創建CountDownLatch時初始化的count參數,必須要有count線程調用countDown方法才會使計數器count等于0,鎖才會釋放,前面等待的線程才會繼續運行。注意CountDownLatch不能回滾重置。
應用示例
示例仍然使用開會案例。老板進入會議室等待5個人全部到達會議室才會開會。所以這里有兩個線程老板等待開會線程、員工到達會議室:
public class CountDownLatchTest {private volatile static CountDownLatch countDownLatch = new CountDownLatch(5);/*** Boss線程,等待員工到達開會*/static class BossThread extends Thread{BossThread(String name){super(name);}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":Boss在會議室等待,總共有" + countDownLatch.getCount() + "個人開會...");try {//Boss等待 countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + ":所有人都已經到齊了,開會吧...");}}//員工到達會議室static class EmpleoyeeThread extends Thread{@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ",到達會議室....");//員工到達會議室 count - 1 countDownLatch.countDown();}}public static void main(String[] args) throws InterruptedException{//Boss線程啟動new BossThread("張總").start();new BossThread("李總").start();new BossThread("王總").start();Thread.sleep(1000);for(int i = 0 ; i < 5 ; i++){new EmpleoyeeThread().start();}} } 張總:Boss在會議室等待,總共有5個人開會... 李總:Boss在會議室等待,總共有5個人開會... 王總:Boss在會議室等待,總共有5個人開會... Thread-0,到達會議室.... Thread-1,到達會議室.... Thread-2,到達會議室.... Thread-3,到達會議室.... Thread-4,到達會議室.... 張總:所有人都已經到齊了,開會吧... 王總:所有人都已經到齊了,開會吧... 李總:所有人都已經到齊了,開會吧...?
轉載于:https://www.cnblogs.com/chihirotan/p/8526692.html
總結
以上是生活随笔為你收集整理的j.u.c系列(08)---之并发工具类:CountDownLatch的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php怎么设置浏览器禁止打开新窗口,JS
- 下一篇: web存储机制localStorage和