日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

并发编程之 Semaphore 源码分析

發(fā)布時間:2024/1/17 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发编程之 Semaphore 源码分析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

并發(fā) JUC 包提供了很多工具類,比如之前說的 CountDownLatch,CyclicBarrier ,今天說說這個 Semaphore——信號量,關(guān)于他的使用請查看往期文章并發(fā)編程之 線程協(xié)作工具類,今天的任務就是從源碼層面分析一下他的原理。

源碼分析

如果先不看源碼,根據(jù)以往我們看過的 CountDownLatch CyclicBarrier 的源碼經(jīng)驗來看,Semaphore 會怎么設計呢?

首先,他要實現(xiàn)多個線程線程同時訪問一個資源,類似于共享鎖,并且,要控制進入資源的線程的數(shù)量。

如果根據(jù) JDK 現(xiàn)有的資源,我們是否可以使用 AQS 的 state 變量來控制呢?類似 CountDownLatch 一樣,有幾個線程我們就為這個 state 變量設置為幾,當 state 達到了閾值,其他線程就不能獲取鎖了,就需要等待。當 Semaphore 調(diào)用 release 方法的時候,就釋放鎖,將 state 減一,并喚醒 AQS 上的線程。

以上,就是我們的猜想,那我們看看 JDK 是不是和我們想的一樣。

首先看看 Semaphore 的 UML 結(jié)構(gòu):

內(nèi)部有 3 個類,繼承了 AQS。一個公平鎖,一個非公平鎖,這點和 ReentrantLock 一摸一樣。

看看他的構(gòu)造器:

public Semaphore(int permits) {sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

兩個構(gòu)造器,兩個參數(shù),一個是許可線程數(shù)量,一個是是否公平鎖,默認非公平。

而 Semaphore 有 2 個重要的方法,也是我們經(jīng)常使用的 2 個方法:

semaphore.acquire(); // doSomeing..... semaphore.release();

acquire 和 release 方法,我們今天重點看這兩個方法的源碼,一窺 Semaphore 的全貌。

acquire 方法源碼分析

代碼如下:

public void acquire() throws InterruptedException {// 嘗試獲取一個鎖sync.acquireSharedInterruptibly(1); }// 這是抽象類 AQS 的方法 public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果小于0,就獲取鎖失敗了。加入到AQS 等待隊列中。// 如果大于0,就直接執(zhí)行下面的邏輯了。不用進行阻塞等待。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); } // 這是抽象父類 Sync 的方法,默認是非公平的 protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); } // 非公平鎖的釋放鎖的方法 final int nonfairTryAcquireShared(int acquires) {// 死循環(huán)for (;;) {// 獲取鎖的狀態(tài)int available = getState();int remaining = available - acquires;// state 變量是否還足夠當前獲取的// 如果小于 0,獲取鎖就失敗了。// 如果大于 0,就循環(huán)嘗試使用 CAS 將 state 變量更新成減去輸入?yún)?shù)之后的。if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }

這里的釋放就是對 state 變量減一(或者更多)的。

返回了剩余的 state 大小。

當返回值小于 0 的時候,說明獲取鎖失敗了,那么就需要進入 AQS 的等待隊列了。代碼入下:

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 添加一個節(jié)點 AQS 隊列尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 死循環(huán)for (;;) {// 找到新節(jié)點的上一個節(jié)點final Node p = node.predecessor();// 如果這個節(jié)點是 head,就嘗試獲取鎖if (p == head) {// 繼續(xù)嘗試獲取鎖,這個方法是子類實現(xiàn)的int r = tryAcquireShared(arg);// 如果大于0,說明拿到鎖了。if (r >= 0) {// 將 node 設置為 head 節(jié)點// 如果大于0,就說明還有機會獲取鎖,那就喚醒后面的線程,稱之為傳播setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 如果他的上一個節(jié)點不是 head,就不能獲取鎖// 對節(jié)點進行檢查和更新狀態(tài),如果線程應該阻塞,返回 true。if (shouldParkAfterFailedAcquire(p, node) &&// 阻塞 park,并返回是否中斷,中斷則拋出異常parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)// 取消節(jié)點cancelAcquire(node);} }

總的邏輯就是:

  • 創(chuàng)建一個分享類型的 node 節(jié)點包裝當前線程追加到 AQS 隊列的尾部。

  • 如果這個節(jié)點的上一個節(jié)點是 head ,就是嘗試獲取鎖,獲取鎖的方法就是子類重寫的方法。如果獲取成功了,就將剛剛的那個節(jié)點設置成 head。

  • 如果沒搶到鎖,就阻塞等待。

  • release 方法源碼分析

    該方法用于釋放鎖,代碼如下:

    public void release() {sync.releaseShared(1); }public final boolean releaseShared(int arg) {// 死循環(huán)釋放成功if (tryReleaseShared(arg)) {// 喚醒 AQS 等待對列中的節(jié)點,從 head 開始 doReleaseShared();return true;}return false; } // Sync extends AbstractQueuedSynchronizer protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();// 對 state 變量 + 1int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;} }

    這里釋放鎖的邏輯寫在了抽象類 Sync 中。邏輯簡單,就是對 state 變量做加法。

    在加法成功后,執(zhí)行 doReleaseShared方法,這個方法是 AQS 的。

    private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {// 設置 head 的等待狀態(tài)為 0 ,并喚醒 head 上的線程if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}// 成功設置成 0 之后,將 head 狀態(tài)設置成傳播狀態(tài)else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} }

    該方法的主要作用就是從 AQS 的 head 節(jié)點開始喚醒線程,注意,這里喚醒是 head 節(jié)點的下一個節(jié)點,需要和 doAcquireSharedInterruptibly方法對應,因為 doAcquireSharedInterruptibly 方法喚醒的當前節(jié)點的上一個節(jié)點,也就是 head 節(jié)點。

    至此,釋放 state 變量,喚醒 AQS 頭節(jié)點結(jié)束。

    總結(jié)

    總結(jié)一下 Semaphore 的原理吧。

    總的來說,Semaphore 就是一個共享鎖,通過設置 state 變量來實現(xiàn)對這個變量的共享。當調(diào)用 acquire 方法的時候,state 變量就減去一,當調(diào)用 release 方法的時候,state 變量就加一。當 state 變量為 0 的時候,別的線程就不能進入代碼塊了,就會在 AQS 中阻塞等待。

    轉(zhuǎn)載于:https://www.cnblogs.com/stateis0/p/9062042.html

    總結(jié)

    以上是生活随笔為你收集整理的并发编程之 Semaphore 源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。