并发编程之 Semaphore 源码分析
前言
并發(fā) JUC 包提供了很多工具類,比如之前說的 CountDownLatch,CyclicBarrier ,今天說說這個(gè) Semaphore——信號(hào)量,關(guān)于他的使用請(qǐng)查看往期文章并發(fā)編程之 線程協(xié)作工具類,今天的任務(wù)就是從源碼層面分析一下他的原理。
源碼分析
如果先不看源碼,根據(jù)以往我們看過的 CountDownLatch CyclicBarrier 的源碼經(jīng)驗(yàn)來看,Semaphore 會(huì)怎么設(shè)計(jì)呢?
首先,他要實(shí)現(xiàn)多個(gè)線程線程同時(shí)訪問一個(gè)資源,類似于共享鎖,并且,要控制進(jìn)入資源的線程的數(shù)量。
如果根據(jù) JDK 現(xiàn)有的資源,我們是否可以使用 AQS 的 state 變量來控制呢?類似 CountDownLatch 一樣,有幾個(gè)線程我們就為這個(gè) state 變量設(shè)置為幾,當(dāng) state 達(dá)到了閾值,其他線程就不能獲取鎖了,就需要等待。當(dāng) Semaphore 調(diào)用 release 方法的時(shí)候,就釋放鎖,將 state 減一,并喚醒 AQS 上的線程。
以上,就是我們的猜想,那我們看看 JDK 是不是和我們想的一樣。
首先看看 Semaphore 的 UML 結(jié)構(gòu):
內(nèi)部有 3 個(gè)類,繼承了 AQS。一個(gè)公平鎖,一個(gè)非公平鎖,這點(diǎn)和 ReentrantLock 一摸一樣。
看看他的構(gòu)造器:
public Semaphore(int permits) {sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }兩個(gè)構(gòu)造器,兩個(gè)參數(shù),一個(gè)是許可線程數(shù)量,一個(gè)是是否公平鎖,默認(rèn)非公平。
而 Semaphore 有 2 個(gè)重要的方法,也是我們經(jīng)常使用的 2 個(gè)方法:
semaphore.acquire(); // doSomeing..... semaphore.release();acquire 和 release 方法,我們今天重點(diǎn)看這兩個(gè)方法的源碼,一窺 Semaphore 的全貌。
acquire 方法源碼分析
代碼如下:
public void acquire() throws InterruptedException {// 嘗試獲取一個(gè)鎖sync.acquireSharedInterruptibly(1); }// 這是抽象類 AQS 的方法 public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果小于0,就獲取鎖失敗了。加入到AQS 等待隊(duì)列中。// 如果大于0,就直接執(zhí)行下面的邏輯了。不用進(jìn)行阻塞等待。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); } // 這是抽象父類 Sync 的方法,默認(rèn)是非公平的 protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); } // 非公平鎖的釋放鎖的方法 final int nonfairTryAcquireShared(int acquires) {// 死循環(huán)for (;;) {// 獲取鎖的狀態(tài)int available = getState();int remaining = available - acquires;// state 變量是否還足夠當(dāng)前獲取的// 如果小于 0,獲取鎖就失敗了。// 如果大于 0,就循環(huán)嘗試使用 CAS 將 state 變量更新成減去輸入?yún)?shù)之后的。if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }這里的釋放就是對(duì) state 變量減一(或者更多)的。
返回了剩余的 state 大小。
當(dāng)返回值小于 0 的時(shí)候,說明獲取鎖失敗了,那么就需要進(jìn)入 AQS 的等待隊(duì)列了。代碼入下:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 添加一個(gè)節(jié)點(diǎn) AQS 隊(duì)列尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 死循環(huán)for (;;) {// 找到新節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)final Node p = node.predecessor();// 如果這個(gè)節(jié)點(diǎn)是 head,就嘗試獲取鎖if (p == head) {// 繼續(xù)嘗試獲取鎖,這個(gè)方法是子類實(shí)現(xiàn)的int r = tryAcquireShared(arg);// 如果大于0,說明拿到鎖了。if (r >= 0) {// 將 node 設(shè)置為 head 節(jié)點(diǎn)// 如果大于0,就說明還有機(jī)會(huì)獲取鎖,那就喚醒后面的線程,稱之為傳播setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 如果他的上一個(gè)節(jié)點(diǎn)不是 head,就不能獲取鎖// 對(duì)節(jié)點(diǎn)進(jìn)行檢查和更新狀態(tài),如果線程應(yīng)該阻塞,返回 true。if (shouldParkAfterFailedAcquire(p, node) &&// 阻塞 park,并返回是否中斷,中斷則拋出異常parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)// 取消節(jié)點(diǎn)cancelAcquire(node);} }總的邏輯就是:
創(chuàng)建一個(gè)分享類型的 node 節(jié)點(diǎn)包裝當(dāng)前線程追加到 AQS 隊(duì)列的尾部。
如果這個(gè)節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)是 head ,就是嘗試獲取鎖,獲取鎖的方法就是子類重寫的方法。如果獲取成功了,就將剛剛的那個(gè)節(jié)點(diǎn)設(shè)置成 head。
如果沒搶到鎖,就阻塞等待。
release 方法源碼分析
該方法用于釋放鎖,代碼如下:
public void release() {sync.releaseShared(1); }public final boolean releaseShared(int arg) {// 死循環(huán)釋放成功if (tryReleaseShared(arg)) {// 喚醒 AQS 等待對(duì)列中的節(jié)點(diǎn),從 head 開始 doReleaseShared();return true;}return false; } // Sync extends AbstractQueuedSynchronizer protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();// 對(duì) state 變量 + 1int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;} }這里釋放鎖的邏輯寫在了抽象類 Sync 中。邏輯簡單,就是對(duì) state 變量做加法。
在加法成功后,執(zhí)行 doReleaseShared方法,這個(gè)方法是 AQS 的。
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {// 設(shè)置 head 的等待狀態(tài)為 0 ,并喚醒 head 上的線程if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}// 成功設(shè)置成 0 之后,將 head 狀態(tài)設(shè)置成傳播狀態(tài)else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} }該方法的主要作用就是從 AQS 的 head 節(jié)點(diǎn)開始喚醒線程,注意,這里喚醒是 head 節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),需要和 doAcquireSharedInterruptibly方法對(duì)應(yīng),因?yàn)?doAcquireSharedInterruptibly 方法喚醒的當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn),也就是 head 節(jié)點(diǎn)。
至此,釋放 state 變量,喚醒 AQS 頭節(jié)點(diǎn)結(jié)束。
總結(jié)
總結(jié)一下 Semaphore 的原理吧。
總的來說,Semaphore 就是一個(gè)共享鎖,通過設(shè)置 state 變量來實(shí)現(xiàn)對(duì)這個(gè)變量的共享。當(dāng)調(diào)用 acquire 方法的時(shí)候,state 變量就減去一,當(dāng)調(diào)用 release 方法的時(shí)候,state 變量就加一。當(dāng) state 變量為 0 的時(shí)候,別的線程就不能進(jìn)入代碼塊了,就會(huì)在 AQS 中阻塞等待。
轉(zhuǎn)載于:https://www.cnblogs.com/stateis0/p/9062042.html
總結(jié)
以上是生活随笔為你收集整理的并发编程之 Semaphore 源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TensorFlow Java+ecli
- 下一篇: 区块链学习之区块链思想的诞生(一)