Java并发编程笔记之Semaphore信号量源码分析
JUC 中 Semaphore 的使用與原理分析,Semaphore 也是 Java 中的一個同步器,與 CountDownLatch 和 CycleBarrier 不同在于它內部的計數器是遞增的,那么,Semaphore 的內部實現是怎樣的呢?
Semaphore 信號量也是Java 中一個同步容器,與CountDownLatch 和 CyclicBarrier 不同之處在于它內部的計數器是遞增的。為了能夠一覽Semaphore的內部結構,我們首先要看一下Semaphore的類圖,類圖,如下所示:
?
?如上類圖可以知道Semaphoren內部還是使用AQS來實現的,Sync只是對AQS的一個修飾,并且Sync有兩個實現類,分別代表獲取信號量的時候是否采取公平策略。創建Semaphore的時候會有一個變量標示是否使用公平策略,源碼如下:
public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}Sync(int permits) {setState(permits);}如上面代碼所示,Semaphore默認使用的是非公平策略,如果你需要公平策略,則可以使用帶兩個參數的構造函數來構造Semaphore對象,另外和CountDownLatch一樣,構造函數里面傳遞的初始化信號量個數 permits 被賦值給了AQS 的state狀態變量,也就是說這里AQS的state值表示當前持有的信號量個數。
?
接下來我們主要看看Semaphore實現的主要方法的源碼,如下:
1.void acquire() 當前線程調用該方法的時候,目的是希望獲取一個信號量資源,如果當前信號量計數個數大于 0 ,并且當前線程獲取到了一個信號量則該方法直接返回,當前信號量的計數會減少 1 。否則會被放入AQS的阻塞隊列,當前線程被掛起,直到其他線程調用了release方法釋放了信號量,并且當前線程通過競爭獲取到了改信號量。當前線程被其他線程調用了 interrupte()方法中斷后,當前線程會拋出 InterruptedException異常返回。源碼如下:
public void acquire() throws InterruptedException {//傳遞參數為1,說明要獲取1個信號量資源sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//(1)如果線程被中斷,則拋出中斷異常if (Thread.interrupted())throw new InterruptedException();//(2)否者調用sync子類方法嘗試獲取,這里根據構造函數確定使用公平策略if (tryAcquireShared(arg) < 0)//如果獲取失敗則放入阻塞隊列,然后再次嘗試如果失敗則調用park方法掛起當前線程 doAcquireSharedInterruptibly(arg);}如上代碼可知,acquire()內部調用了sync的acquireSharedInterruptibly? 方法,后者是對中斷響應的(如果當前線程被中斷,則拋出中斷異常),嘗試獲取信號量資源的AQS的方法tryAcquireShared 是由 sync 的子類實現,所以這里就要分公平性了,這里先討論非公平策略 NonfairSync 類的?tryAcquireShared 方法,源碼如下:
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}final int nonfairTryAcquireShared(int acquires) {for (;;) {//獲取當前信號量值int available = getState();//計算當前剩余值int remaining = available - acquires;//如果當前剩余小于0或者CAS設置成功則返回if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }
如上代碼,先計算當前信號量值(available)減去需要獲取的值(acquires) 得到剩余的信號量個數(remaining),如果剩余值小于 0 說明當前信號量個數滿足不了需求,則直接返回負數,然后當前線程會被放入AQS的阻塞隊列,當前線程被掛起。如果剩余值大于 0 則使用CAS操作設置當前信號量值為剩余值,然后返回剩余值。另外可以知道NonFairSync是非公平性獲取的,是說先調用aquire方法獲取信號量的線程不一定比后來者先獲取鎖。
?
接下來我們要看看公平性的FairSync 類是如何保證公平性的,源碼如下:
protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}可以知道公平性還是靠 hasQueuedPredecessors 這個方法來做的,以前的隨筆已經講過公平性是看當前線程節點是否有前驅節點也在等待獲取該資源,如果是則自己放棄獲取的權力,然后當前線程會被放入AQS阻塞隊列,否則就去獲取。hasQueuedPredecessors源碼如下:
public final boolean hasQueuedPredecessors() {Node t = tail; Node h = head;Node s;return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }如上面代碼所示,如果當前線程節點有前驅節點則返回true,否則如果當前AQS隊列為空 或者 當前線程節點是AQS的第一個節點則返回 false ,其中,如果 h == t 則說明當前隊列為空則直接返回 false,如果 h !=t 并且 s == null 說明有一個元素將要作為AQS的第一個節點入隊列(回顧下 enq 函數第一個元素入隊列是兩步操作,首先創建一個哨兵頭節點,然后第一個元素插入到哨兵節點后面),那么返回 true,如果? h !=t 并且 s != null 并且??s.thread != Thread.currentThread() 則說明隊列里面的第一個元素不是當前線程則返回 true。
?
2.void acquire(int permits) 該方法與 acquire() 不同在與后者只需要獲取一個信號量值,而前者則獲取指定 permits 個,源碼如下:
public void acquire(int permits) throws InterruptedException {if (permits < 0)throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits); }
?
3.void acquireUninterruptibly() 該方法與 acquire() 類似,不同之處在于該方法對中斷不響應,也就是當當前線程調用了 acquireUninterruptibly 獲取資源過程中(包含被阻塞后)其它線程調用了當前線程的 interrupt()方法設置了當前線程的中斷標志當前線程并不會拋出 InterruptedException 異常而返回。源碼如下:
public void acquireUninterruptibly() {sync.acquireShared(1); }?
4.void acquireUninterruptibly(int permits) 該方法與 acquire(int permits) 不同在于該方法對中斷不響應。源碼如如下:
public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}?
5.void release() 該方法作用是把當前 semaphore對象的信號量值增加 1 ,如果當前有線程因為調用 acquire 方法被阻塞放入了 AQS的阻塞隊列,則會根據公平策略選擇一個線程進行激活,激活的線程會嘗試獲取剛增加的信號量,源碼如下:
public void release() {//(1)arg=1sync.releaseShared(1);}public final boolean releaseShared(int arg) {//(2)嘗試釋放資源if (tryReleaseShared(arg)) {//(3)資源釋放成功則調用park喚醒AQS隊列里面最先掛起的線程 doReleaseShared();return true;}return false;}protected final boolean tryReleaseShared(int releases) {for (;;) {//(4)獲取當前信號量值int current = getState();//(5)當前信號量值增加releases,這里為增加1int next = current + releases;if (next < current) // 移除處理throw new Error("Maximum permit count exceeded");//(6)使用cas保證更新信號量值的原子性if (compareAndSetState(current, next))return true;}}如上面代碼可以看到 release()方法中對 sync.releaseShared(1),可以知道release方法每次只會對信號量值增加 1 ,tryReleaseShared方法是無限循環,使用CAS保證了 release 方法對信號量遞增 1 的原子性操作。當tryReleaseShared 方法增加信號量成功后會執行代碼(3),調用AQS的方法來激活因為調用acquire方法而被阻塞的線程。
?
6.void release(int permits) 該方法與不帶參數的不同之處在于前者每次調用會在信號量值原來基礎上增加 permits,而后者每次增加 1。源碼如下:
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits); }另外注意到這里調用的是 sync.releaseShared 是共享方法,這說明該信號量是線程共享的,信號量沒有和固定線程綁定,多個線程可以同時使用CAS去更新信號量的值而不會阻塞。
?
到目前已經知道了其原理,接下來用一個例子來加深對Semaphore的理解,例子如下:
package com.hjc;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;/*** Created by cong on 2018/7/8.*/ public class SemaphoreTest {// 創建一個Semaphore實例private static volatile Semaphore semaphore = new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);// 加入線程A到線程池executorService.submit(new Runnable() {public void run() {try {System.out.println(Thread.currentThread() + " over");semaphore.release();} catch (Exception e) {e.printStackTrace();}}});// 加入線程B到線程池executorService.submit(new Runnable() {public void run() {try {System.out.println(Thread.currentThread() + " over");semaphore.release();} catch (Exception e) {e.printStackTrace();}}});// 等待子線程執行完畢,返回semaphore.acquire(2);System.out.println("all child thread over!");//關閉線程池 executorService.shutdown();} }運行結果如下:
類似于 CountDownLatch,上面我們的例子也是在主線程中開啟兩個子線程進行執行,等所有子線程執行完畢后主線程在繼續向下運行。
如上代碼首先首先創建了一個信號量實例,構造函數的入參為 0,說明當前信號量計數器為 0,然后 main 函數添加兩個線程任務到線程池,每個線程內部調用了信號量的 release 方法,相當于計數值遞增一,最后在 main 線程里面調用信號量的 acquire 方法,參數傳遞為 2 說明調用 acquire 方法的線程會一直阻塞,直到信號量的計數變為 2 時才會返回。
看到這里也就明白了,如果構造 Semaphore 時候傳遞的參數為 N,在 M 個線程中調用了該信號量的 release 方法,那么在調用 acquire 對 M 個線程進行同步時候傳遞的參數應該是 M+N;
?
對CountDownLatch,CyclicBarrier,Semaphored這三者之間的比較總結:
1.CountDownLatch 通過計數器提供了更靈活的控制,只要檢測到計數器為 0,而不管當前線程是否結束調用 await 的線程就可以往下執行,相比使用 jion 必須等待線程執行完畢后主線程才會繼續向下運行更靈活。
2.CyclicBarrier 也可以達到 CountDownLatch 的效果,但是后者當計數器變為 0 后,就不能在被復用,而前者則使用 reset 方法可以重置后復用,前者對同一個算法但是輸入參數不同的類似場景下比較適用。
3.而 semaphore 采用了信號量遞增的策略,一開始并不需要關心需要同步的線程個數,等調用 aquire 時候在指定需要同步個數,并且提供了獲取信號量的公平性策略。
轉載于:https://www.cnblogs.com/huangjuncong/p/9280646.html
總結
以上是生活随笔為你收集整理的Java并发编程笔记之Semaphore信号量源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据库多表查询之 where I
- 下一篇: Java_枚举