日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java Review - 并发编程_ 信号量Semaphore原理源码剖析

發布時間:2025/3/21 java 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - 并发编程_ 信号量Semaphore原理源码剖析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • 小Demo
  • 類關系概述
  • 核心方法源碼解讀
    • void acquire()
      • 非公平策略NonfairSync類的`tryAcquireShared`方法
      • 公平策略`FairSync`類的`tryAcquireShared`方法
    • void acquire(int permits)
    • void acquireUninterruptibly()
    • void acquireUninterruptibly(int permits)
    • void release()
    • void release(int permits)
  • 小結


概述

Semaphore信號量也是Java中的一個同步器,與CountDownLatch和CycleBarrier不同的是,它內部的計數器是遞增的,并且在一開始初始化Semaphore時可以指定一個初始值,但是并不需要知道需要同步的線程個數,而是在需要同步的地方調用acquire方法時指定需要同步的線程個數。


小Demo

同樣下面的例子也是在主線程中開啟兩個子線程讓它們執行,等所有子線程執行完畢后主線程再繼續向下運行。

import java.time.LocalTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/14 23:59* @mark: show me the code , change the world*/ public class SemphoreTest {// 1 創建Sempaphore實例 當前信號量計數器的值為0private static Semaphore semaphore = new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);// 線程1 提交到線程池executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();});// 線程2 提交到線程池executorService.submit(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}});// 1 等待子線程執行任務完成后返回semaphore.acquire(2);System.out.println(Thread.currentThread().getName() + "任務執行結束 " + LocalTime.now()) ;// 關閉線程池executorService.shutdown();} }

  • 首先創建了一個信號量實例,構造函數的入參為0,說明當前信號量計數器的值為0

  • 然后main函數向線程池添加兩個線程任務,在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1

  • 最后在main線程里面調用信號量的acquire方法,傳參為2說明調用acquire方法的線程會一直阻塞,直到信號量的計數變為2才會返回

看到這里也就明白了,如果構造Semaphore時傳遞的參數為N,并在M個線程中調用了該信號量的release方法,那么在調用acquire使M個線程同步時傳遞的參數應該是M+N。

下面舉個例子來模擬【CyclicBarrier復用】的功能,代碼如下

import java.time.LocalTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/14 23:59* @mark: show me the code , change the world*/ public class SemphoreTest2 {// 1 創建Sempaphore實例private static Semaphore semaphore = new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);// 線程1 提交到線程池executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();});// 線程2 提交到線程池executorService.submit(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}});// 1 等待子線程執行任務完成后返回semaphore.acquire(2);// 線程3 提交到線程池executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();});// 線程4 提交到線程池executorService.submit(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}});// 2等待子線程執行任務完成后返回semaphore.acquire(2);System.out.println(Thread.currentThread().getName() + "任務執行結束 " + LocalTime.now()) ;// 關閉線程池executorService.shutdown();} }
  • 首先將線程1和線程2加入到線程池。主線程執行代碼(1)后被阻塞。線程1和線程2調用release方法后信號量的值變為了2,這時候主線程的aquire方法會在獲取到2個信號量后返回(返回后當前信號量值為0)。

  • 然后主線程添加線程3和線程4到線程池,之后主線程執行代碼(2)后被阻塞(因為主線程要獲取2個信號量,而當前信號量個數為0)。當線程3和線程4執行完release方法后,主線程才返回。

從本例子可以看出,Semaphore在某種程度上實現了CyclicBarrier的復用功能。


類關系概述

由該類圖可知,Semaphore還是使用AQS實現的。Sync只是對AQS的一個修飾,并且Sync有兩個實現類,用來指定獲取信號量時是否采用公平策略。

例如,下面的代碼在創建Semaphore時會使用一個變量指定是否使用公平策略。

public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}Sync(int permits) {setState(permits);}

Semaphore默認采用非公平策略,如果需要使用公平策略則可以使用帶兩個參數的構造函數來構造Semaphore對象。

另外,如CountDownLatch構造函數傳遞的初始化信號量個數permits被賦給了AQS的state狀態變量一樣,這里AQS的state值也表示當前持有的信號量個數


核心方法源碼解讀

void acquire()

public void acquire() throws InterruptedException {// 傳遞參數為1 ,說明要獲取一個信號量資源sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 1 . 如果線程被中斷,拋出被中斷異常 if (Thread.interrupted())throw new InterruptedException();// 2 否則調用Syn子類方法嘗試重新獲取 if (tryAcquireShared(arg) < 0)// 如果獲取失敗,則放入阻塞隊列,然后再次嘗試,如果失敗則調用park方法掛起當前線程doAcquireSharedInterruptibly(arg);}

acquire()在內部調用了Sync的acquireSharedInterruptibly方法,后者會對中斷進行響應(如果當前線程被中斷,則拋出中斷異常)。

嘗試獲取信號量資源的AQS的方法tryAcquireShared是由Sync的子類實現的,所以這里分別從兩方面來討論。

非公平策略NonfairSync類的tryAcquireShared方法

繼續看下 nonfairTryAcquireShared

final int nonfairTryAcquireShared(int acquires) {for (;;) {// 獲取當前信號量的值int available = getState();// 計算當前剩余值int remaining = available - acquires;// 如果當前值<0 或者 CAS設置成功則返回if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
  • 先獲取當前信號量值(available),然后減去需要獲取的值(acquires),得到剩余的信號量個數(remaining)

  • 如果剩余值小于0則說明當前信號量個數滿足不了需求,那么直接返回負數,這時當前線程會被放入AQS的阻塞隊列而被掛起。

  • 如果剩余值大于0,則使用CAS操作設置當前信號量值為剩余值,然后返回剩余值。

另外,由于NonFairSync是非公平獲取的,也就是說先調用aquire方法獲取信號量的線程不一定比后來者先獲取到信號量。

舉個例子:

  • 線程A先調用了aquire()方法獲取信號量,但是當前信號量個數為0,那么線程A會被放入AQS的阻塞隊列
  • 過一段時間后線程C調用了release()方法釋放了一個信號量,如果當前沒有其他線程獲取信號量,那么線程A就會被激活,然后獲取該信號量
  • 但是假如線程C釋放信號量后,線程C調用了aquire方法,那么線程C就會和線程A去競爭這個信號量資源。
  • 如果采用非公平策略,由nonfairTryAcquireShared的代碼可知,線程C完全可以在線程A被激活前,或者激活后先于線程A獲取到該信號量,也就是在這種模式下阻塞線程和當前請求的線程是競爭關系,而不遵循先來先得的策略。


    公平策略FairSync類的tryAcquireShared方法

    /*** Fair version*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}

    可見公平性還是靠hasQueuedPredecessors這個函數來保證的。前幾篇博文里重點介紹了hasQueuedPredecessors。 公平策略是看當前線程節點的前驅節點是否也在等待獲取該資源,如果是則自己放棄獲取的權限,然后當前線程會被放入AQS阻塞隊列,否則就去獲取。


    void acquire(int permits)

    該方法與acquire()方法不同,后者只需要獲取一個信號量值,而前者則獲取permits個。

    public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}


    void acquireUninterruptibly()

    該方法與acquire()類似,不同之處在于該方法對中斷不響應,也就是當當前線程調用了acquireUninterruptibly獲取資源時(包含被阻塞后),其他線程調用了當前線程的interrupt()方法設置了當前線程的中斷標志,此時當前線程并不會拋出InterruptedException異常而返回。

    public void acquireUninterruptibly() {sync.acquireShared(1);}

    看看響應中斷的


    void acquireUninterruptibly(int permits)

    該方法與acquire(int permits)方法的不同之處在于,該方法對中斷不響應。

    public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}

    void release()

    該方法的作用是把當前Semaphore對象的信號量值增加1,如果當前有線程因為調用aquire方法被阻塞而被放入了AQS的阻塞隊列,則會根據公平策略選擇一個信號量個數能被滿足的線程進行激活,激活的線程會嘗試獲取剛增加的信號量。

    public void release() {// 默認釋放1個信號量 sync.releaseShared(1);} /*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {// 2嘗試釋放資源if (tryReleaseShared(arg)) {// 3 資源釋放成功,則調用park方法喚醒AQS 隊列里最先掛起的線程 doReleaseShared();return true;}return false;}

    protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) // cas return true;}

    由代碼release()->sync.releaseShared(1)可知,release方法每次只會對信號量值增加1,tryReleaseShared方法是無限循環,使用CAS保證了release方法對信號量遞增1的原子性操作。tryReleaseShared方法增加信號量值成功后會執行代碼(3)doReleaseShared();,即調用AQS的方法來激活因為調用aquire方法而被阻塞的線程。

    private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}

    void release(int permits)

    該方法與不帶參數的release方法的不同之處在于,前者每次調用會在信號量值原來的基礎上增加permits,而后者每次增加1。

    public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}

    另外可以看到,這里的sync.releaseShared是共享方法,這說明該信號量是線程共享的,信號量沒有和固定線程綁定,多個線程可以同時使用CAS去更新信號量的值而不會被阻塞。


    小結

    Semaphore也是使用AQS實現的,并且獲取信號量時有公平策略和非公平策略之分。

    總結

    以上是生活随笔為你收集整理的Java Review - 并发编程_ 信号量Semaphore原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。