AQS的原理及应用
前言
Java中的大部分同步類(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(簡稱為AQS)實(shí)現(xiàn)的。AQS是一種提供了原子式管理同步狀態(tài)、阻塞和喚醒線程功能以及隊(duì)列模型的簡單框架。本文會(huì)從應(yīng)用層逐漸深入到原理層,并通過ReentrantLock的基本特性和ReentrantLock與AQS的關(guān)聯(lián),來深入解讀AQS相關(guān)獨(dú)占鎖的知識(shí)點(diǎn),同時(shí)采取問答的模式來幫助大家理解AQS。由于篇幅原因,本篇文章主要闡述AQS中獨(dú)占鎖的邏輯和Sync Queue,不講述包含共享鎖和Condition Queue的部分(本篇文章核心為AQS原理剖析,只是簡單介紹了ReentrantLock,感興趣同學(xué)可以閱讀一下ReentrantLock的源碼)。
下面列出本篇文章的大綱和思路,以便于大家更好地理解:
1 ReentrantLock
1.1 ReentrantLock特性概覽
ReentrantLock意思為可重入鎖,指的是一個(gè)線程能夠?qū)σ粋€(gè)臨界資源重復(fù)加鎖。為了幫助大家更好地理解ReentrantLock的特性,我們先將ReentrantLock跟常用的Synchronized進(jìn)行比較,其特性如下(藍(lán)色部分為本篇文章主要剖析的點(diǎn)):
下面通過偽代碼,進(jìn)行更加直觀的比較:
//?**************************Synchronized的使用方式************************** //?1.用于代碼塊 synchronized?(this)?{} //?2.用于對(duì)象 synchronized?(object)?{} //?3.用于方法 public?synchronized?void?test?()?{} //?4.可重入 for?(int?i?=?0;?i?<?100;?i++)?{synchronized?(this)?{} } //?**************************ReentrantLock的使用方式************************** public?void?test?()?throw?Exception?{//?1.初始化選擇公平鎖、非公平鎖ReentrantLock?lock?=?new?ReentrantLock(true);//?2.可用于代碼塊lock.lock();try?{try?{//?3.支持多種加鎖方式,比較靈活;?具有可重入特性if(lock.tryLock(100,?TimeUnit.MILLISECONDS)){?}}?finally?{//?4.手動(dòng)釋放鎖lock.unlock()}}?finally?{lock.unlock();} }1.2 ReentrantLock與AQS的關(guān)聯(lián)
通過上文我們已經(jīng)了解,ReentrantLock支持公平鎖和非公平鎖(關(guān)于公平鎖和非公平鎖的原理分析,可參考《不可不說的Java“鎖”事》),并且ReentrantLock的底層就是由AQS來實(shí)現(xiàn)的。那么ReentrantLock是如何通過公平鎖和非公平鎖與AQS關(guān)聯(lián)起來呢?我們著重從這兩者的加鎖過程來理解一下它們與AQS之間的關(guān)系(加鎖過程中與AQS的關(guān)聯(lián)比較明顯,解鎖流程后續(xù)會(huì)介紹)。
非公平鎖源碼中的加鎖流程如下:
//?java.util.concurrent.locks.ReentrantLock#NonfairSync//?非公平鎖 static?final?class?NonfairSync?extends?Sync?{...final?void?lock()?{if?(compareAndSetState(0,?1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}... }這塊代碼的含義為:
-
若通過CAS設(shè)置變量State(同步狀態(tài))成功,也就是獲取鎖成功,則將當(dāng)前線程設(shè)置為獨(dú)占線程。
-
若通過CAS設(shè)置變量State(同步狀態(tài))失敗,也就是獲取鎖失敗,則進(jìn)入Acquire方法進(jìn)行后續(xù)處理。
第一步很好理解,但第二步獲取鎖失敗后,后續(xù)的處理策略是怎么樣的呢?這塊可能會(huì)有以下思考:
-
某個(gè)線程獲取鎖失敗的后續(xù)流程是什么呢?有以下兩種可能:
-
將當(dāng)前線程獲鎖結(jié)果設(shè)置為失敗,獲取鎖流程結(jié)束。這種設(shè)計(jì)會(huì)極大降低系統(tǒng)的并發(fā)度,并不滿足我們實(shí)際的需求。所以就需要下面這種流程,也就是AQS框架的處理流程。
-
存在某種排隊(duì)等候機(jī)制,線程繼續(xù)等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續(xù)。
-
-
對(duì)于問題1的第二種情況,既然說到了排隊(duì)等候機(jī)制,那么就一定會(huì)有某種隊(duì)列形成,這樣的隊(duì)列是什么數(shù)據(jù)結(jié)構(gòu)呢?
-
處于排隊(duì)等候機(jī)制中的線程,什么時(shí)候可以有機(jī)會(huì)獲取鎖呢?
-
如果處于排隊(duì)等候機(jī)制中的線程一直無法獲取鎖,還是需要一直等待嗎,還是有別的策略來解決這一問題?
帶著非公平鎖的這些問題,再看下公平鎖源碼中獲鎖的方式:
//?java.util.concurrent.locks.ReentrantLock#FairSyncstatic?final?class?FairSync?extends?Sync?{...??final?void?lock()?{acquire(1);}... }看到這塊代碼,我們可能會(huì)存在這種疑問:Lock函數(shù)通過Acquire方法進(jìn)行加鎖,但是具體是如何加鎖的呢?
結(jié)合公平鎖和非公平鎖的加鎖流程,雖然流程上有一定的不同,但是都調(diào)用了Acquire方法,而Acquire方法是FairSync和UnfairSync的父類AQS中的核心方法。
對(duì)于上邊提到的問題,其實(shí)在ReentrantLock類源碼中都無法解答,而這些問題的答案,都是位于Acquire方法所在的類AbstractQueuedSynchronizer中,也就是本文的核心——AQS。下面我們會(huì)對(duì)AQS以及ReentrantLock和AQS的關(guān)聯(lián)做詳細(xì)介紹(相關(guān)問題答案會(huì)在2.3.5小節(jié)中解答)。
2 AQS
首先,我們通過下面的架構(gòu)圖來整體了解一下AQS框架:
-
上圖中有顏色的為Method,無顏色的為Attribution。
-
總的來說,AQS框架共分為五層,自上而下由淺入深,從AQS對(duì)外暴露的API到底層基礎(chǔ)數(shù)據(jù)。
-
當(dāng)有自定義同步器接入時(shí),只需重寫第一層所需要的部分方法即可,不需要關(guān)注底層具體的實(shí)現(xiàn)流程。當(dāng)自定義同步器進(jìn)行加鎖或者解鎖操作時(shí),先經(jīng)過第一層的API進(jìn)入AQS內(nèi)部方法,然后經(jīng)過第二層進(jìn)行鎖的獲取,接著對(duì)于獲取鎖失敗的流程,進(jìn)入第三層和第四層的等待隊(duì)列處理,而這些處理方式均依賴于第五層的基礎(chǔ)數(shù)據(jù)提供層。
下面我們會(huì)從整體到細(xì)節(jié),從流程到方法逐一剖析AQS框架,主要分析過程如下:
2.1 原理概覽
AQS核心思想是,如果被請(qǐng)求的共享資源空閑,那么就將當(dāng)前請(qǐng)求資源的線程設(shè)置為有效的工作線程,將共享資源設(shè)置為鎖定狀態(tài);如果共享資源被占用,就需要一定的阻塞等待喚醒機(jī)制來保證鎖分配。這個(gè)機(jī)制主要用的是CLH隊(duì)列的變體實(shí)現(xiàn)的,將暫時(shí)獲取不到鎖的線程加入到隊(duì)列中。
CLH:Craig、Landin and Hagersten隊(duì)列,是單向鏈表,AQS中的隊(duì)列是CLH變體的虛擬雙向隊(duì)列(FIFO),AQS是通過將每條請(qǐng)求共享資源的線程封裝成一個(gè)節(jié)點(diǎn)來實(shí)現(xiàn)鎖的分配。
主要原理圖如下:
AQS使用一個(gè)Volatile的int類型的成員變量來表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取的排隊(duì)工作,通過CAS完成對(duì)State值的修改。
2.1.1 AQS數(shù)據(jù)結(jié)構(gòu)
先來看下AQS中最基本的數(shù)據(jù)結(jié)構(gòu)——Node,Node即為上面CLH變體隊(duì)列中的節(jié)點(diǎn)。
解釋一下幾個(gè)方法和屬性值的含義:
| waitStatus | 當(dāng)前節(jié)點(diǎn)在隊(duì)列中的狀態(tài) |
| prev | 前驅(qū)指針 |
| next | 后繼指針 |
| thread | 表示處于該節(jié)點(diǎn)的線程 |
| nextWaiter | 指向下一個(gè)處于CONDITION狀態(tài)的節(jié)點(diǎn)(由于本篇文章不講述Condition Queue隊(duì)列,這個(gè)指針不多介紹) |
| predecessor | 返回前驅(qū)節(jié)點(diǎn),沒有的話拋出npe |
線程兩種鎖的模式:
| SHARED | 表示線程以共享的模式等待鎖 |
| EXCLUSIVE | 表示線程正在以獨(dú)占的方式等待鎖 |
waitStatus有下面幾個(gè)枚舉值:
| CANCELLED | 為1,表示線程獲取鎖的請(qǐng)求已經(jīng)取消了 |
| SIGNAL | 為-1,表示線程已經(jīng)準(zhǔn)備好了,就等資源釋放了 |
| CONDITION | 為-2,表示節(jié)點(diǎn)在等待隊(duì)列中,節(jié)點(diǎn)線程等待喚醒 |
| PROPAGATE | 為-3,當(dāng)前線程處在SHARED情況下,該字段才會(huì)使用 |
| 0 | 當(dāng)一個(gè)Node被初始化的時(shí)候的默認(rèn)值 |
2.1.2 同步狀態(tài)State
在了解數(shù)據(jù)結(jié)構(gòu)后,接下來了解一下AQS的同步狀態(tài)——State。AQS中維護(hù)了一個(gè)名為state的字段,意為同步狀態(tài),是由Volatile修飾的,用于展示當(dāng)前臨界資源的獲鎖情況。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?volatile?int?state;下面提供了幾個(gè)訪問這個(gè)字段的方法:
| protected final int getState() | 獲取State的值 |
| protected final void setState(int newState) | 設(shè)置State的值 |
| protected final boolean compareAndSetState(int expect, int update) | 使用CAS方式更新State |
這幾個(gè)方法都是Final修飾的,說明子類中無法重寫它們。我們可以通過修改State字段表示的同步狀態(tài)來實(shí)現(xiàn)多線程的獨(dú)占模式和共享模式(加鎖過程)。
對(duì)于我們自定義的同步工具,需要自定義獲取同步狀態(tài)和釋放狀態(tài)的方式,也就是AQS架構(gòu)圖中的第一層:API層。
2.2 AQS重要方法與ReentrantLock的關(guān)聯(lián)
從架構(gòu)圖中可以得知,AQS提供了大量用于自定義同步器實(shí)現(xiàn)的Protected方法。自定義同步器實(shí)現(xiàn)的相關(guān)方法也只是為了通過修改State字段來實(shí)現(xiàn)多線程的獨(dú)占模式或者共享模式。自定義同步器需要實(shí)現(xiàn)以下方法(ReentrantLock需要實(shí)現(xiàn)的方法如下,并不是全部):
| protected boolean isHeldExclusively() | 該線程是否正在獨(dú)占資源。只有用到Condition才需要去實(shí)現(xiàn)它。 |
| protected boolean tryAcquire(int arg) | 獨(dú)占方式。arg為獲取鎖的次數(shù),嘗試獲取資源,成功則返回True,失敗則返回False。 |
| protected boolean tryRelease(int arg) | 獨(dú)占方式。arg為釋放鎖的次數(shù),嘗試釋放資源,成功則返回True,失敗則返回False。 |
| protected int tryAcquireShared(int arg) | 共享方式。arg為獲取鎖的次數(shù),嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。 |
| protected boolean tryReleaseShared(int arg) | 共享方式。arg為釋放鎖的次數(shù),嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回True,否則返回False。 |
一般來說,自定義同步器要么是獨(dú)占方式,要么是共享方式,它們也只需實(shí)現(xiàn)tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。AQS也支持自定義同步器同時(shí)實(shí)現(xiàn)獨(dú)占和共享兩種方式,如ReentrantReadWriteLock。ReentrantLock是獨(dú)占鎖,所以實(shí)現(xiàn)了tryAcquire-tryRelease。
以非公平鎖為例,這里主要闡述一下非公平鎖與AQS之間方法的關(guān)聯(lián)之處,具體每一處核心方法的作用會(huì)在文章后面詳細(xì)進(jìn)行闡述。
為了幫助大家理解ReentrantLock和AQS之間方法的交互過程,以非公平鎖為例,我們將加鎖和解鎖的交互流程單獨(dú)拎出來強(qiáng)調(diào)一下,以便于對(duì)后續(xù)內(nèi)容的理解。
加鎖:
-
通過ReentrantLock的加鎖方法Lock進(jìn)行加鎖操作。
-
會(huì)調(diào)用到內(nèi)部類Sync的Lock方法,由于Sync#lock是抽象方法,根據(jù)ReentrantLock初始化選擇的公平鎖和非公平鎖,執(zhí)行相關(guān)內(nèi)部類的Lock方法,本質(zhì)上都會(huì)執(zhí)行AQS的Acquire方法。
-
AQS的Acquire方法會(huì)執(zhí)行tryAcquire方法,但是由于tryAcquire需要自定義同步器實(shí)現(xiàn),因此執(zhí)行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通過公平鎖和非公平鎖內(nèi)部類實(shí)現(xiàn)的tryAcquire方法,因此會(huì)根據(jù)鎖類型不同,執(zhí)行不同的tryAcquire。
-
tryAcquire是獲取鎖邏輯,獲取失敗后,會(huì)執(zhí)行框架AQS的后續(xù)邏輯,跟ReentrantLock自定義同步器無關(guān)。
?
解鎖:
-
通過ReentrantLock的解鎖方法Unlock進(jìn)行解鎖。
-
Unlock會(huì)調(diào)用內(nèi)部類Sync的Release方法,該方法繼承于AQS。
-
Release中會(huì)調(diào)用tryRelease方法,tryRelease需要自定義同步器實(shí)現(xiàn),tryRelease只在ReentrantLock中的Sync實(shí)現(xiàn),因此可以看出,釋放鎖的過程,并不區(qū)分是否為公平鎖。
-
釋放成功后,所有處理由AQS框架完成,與自定義同步器無關(guān)。
通過上面的描述,大概可以總結(jié)出ReentrantLock加鎖解鎖時(shí)API層核心方法的映射關(guān)系。
2.3 通過ReentrantLock理解AQS
ReentrantLock中公平鎖和非公平鎖在底層是相同的,這里以非公平鎖為例進(jìn)行分析。
在非公平鎖中,有一段這樣的代碼:
//?java.util.concurrent.locks.ReentrantLockstatic?final?class?NonfairSync?extends?Sync?{...final?void?lock()?{if?(compareAndSetState(0,?1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}... }看一下這個(gè)Acquire是怎么寫的:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?void?acquire(int?arg)?{if?(!tryAcquire(arg)?&&?acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))selfInterrupt(); }再看一下tryAcquire方法:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprotected?boolean?tryAcquire(int?arg)?{throw?new?UnsupportedOperationException(); }可以看出,這里只是AQS的簡單實(shí)現(xiàn),具體獲取鎖的實(shí)現(xiàn)方法是由各自的公平鎖和非公平鎖單獨(dú)實(shí)現(xiàn)的(以ReentrantLock為例)。如果該方法返回了True,則說明當(dāng)前線程獲取鎖成功,就不用往后執(zhí)行了;如果獲取失敗,就需要加入到等待隊(duì)列中。下面會(huì)詳細(xì)解釋線程是何時(shí)以及怎樣被加入進(jìn)等待隊(duì)列中的。
2.3.1 線程加入等待隊(duì)列
2.3.1.1 加入隊(duì)列的時(shí)機(jī)
當(dāng)執(zhí)行Acquire(1)時(shí),會(huì)通過tryAcquire獲取鎖。在這種情況下,如果獲取鎖失敗,就會(huì)調(diào)用addWaiter加入到等待隊(duì)列中去。
2.3.1.2 如何加入隊(duì)列
獲取鎖失敗后,會(huì)執(zhí)行addWaiter(Node.EXCLUSIVE)加入等待隊(duì)列,具體實(shí)現(xiàn)方法如下:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?addWaiter(Node?mode)?{Node?node?=?new?Node(Thread.currentThread(),?mode);//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failureNode?pred?=?tail;if?(pred?!=?null)?{node.prev?=?pred;if?(compareAndSetTail(pred,?node))?{pred.next?=?node;return?node;}}enq(node);return?node; } private?final?boolean?compareAndSetTail(Node?expect,?Node?update)?{return?unsafe.compareAndSwapObject(this,?tailOffset,?expect,?update); }主要的流程如下:
(1)通過當(dāng)前的線程和鎖模式新建一個(gè)節(jié)點(diǎn)。
(2)Pred指針指向尾節(jié)點(diǎn)Tail。
(3)將New中Node的Prev指針指向Pred。
(4)通過compareAndSetTail方法,完成尾節(jié)點(diǎn)的設(shè)置。這個(gè)方法主要是對(duì)tailOffset和Expect進(jìn)行比較,如果tailOffset的Node和Expect的Node地址是相同的,那么設(shè)置Tail的值為Update的值。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerstatic?{try?{stateOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset?=?unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset?=?unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset?=?unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));}?catch?(Exception?ex)?{?throw?new?Error(ex);?} }從AQS的靜態(tài)代碼塊可以看出,都是獲取一個(gè)對(duì)象的屬性相對(duì)于該對(duì)象在內(nèi)存當(dāng)中的偏移量,這樣我們就可以根據(jù)這個(gè)偏移量在對(duì)象內(nèi)存當(dāng)中找到這個(gè)屬性。tailOffset指的是tail對(duì)應(yīng)的偏移量,所以這個(gè)時(shí)候會(huì)將new出來的Node置為當(dāng)前隊(duì)列的尾節(jié)點(diǎn)。同時(shí),由于是雙向鏈表,也需要將前一個(gè)節(jié)點(diǎn)指向尾節(jié)點(diǎn)。
(5)?如果Pred指針是Null(說明等待隊(duì)列中沒有元素),或者當(dāng)前Pred指針和Tail指向的位置不同(說明被別的線程已經(jīng)修改),就需要看一下Enq的方法。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?enq(final?Node?node)?{for?(;;)?{Node?t?=?tail;if?(t?==?null)?{?//?Must?initializeif?(compareAndSetHead(new?Node()))tail?=?head;}?else?{node.prev?=?t;if?(compareAndSetTail(t,?node))?{t.next?=?node;return?t;}}} }如果沒有被初始化,需要進(jìn)行初始化一個(gè)頭結(jié)點(diǎn)出來。但請(qǐng)注意,初始化的頭結(jié)點(diǎn)并不是當(dāng)前線程節(jié)點(diǎn),而是調(diào)用了無參構(gòu)造函數(shù)的節(jié)點(diǎn)。如果經(jīng)歷了初始化或者并發(fā)導(dǎo)致隊(duì)列中有元素,則與之前的方法相同。其實(shí),addWaiter就是一個(gè)在雙端鏈表添加尾節(jié)點(diǎn)的操作,需要注意的是,雙端鏈表的頭結(jié)點(diǎn)是一個(gè)無參構(gòu)造函數(shù)的頭結(jié)點(diǎn)。
總結(jié)一下,線程獲取鎖的時(shí)候,過程大體如下:
a. 當(dāng)沒有線程獲取到鎖時(shí),線程1獲取鎖成功。
b. 線程2申請(qǐng)鎖,但是鎖被線程1占有。
c. 如果再有線程要獲取鎖,依次在隊(duì)列中往后排隊(duì)即可。
回到上邊的代碼,hasQueuedPredecessors是公平鎖加鎖時(shí)判斷等待隊(duì)列中是否存在有效節(jié)點(diǎn)的方法。如果返回False,說明當(dāng)前線程可以爭取共享資源;如果返回True,說明隊(duì)列中存在有效節(jié)點(diǎn),當(dāng)前線程必須加入到等待隊(duì)列中。
//?java.util.concurrent.locks.ReentrantLockpublic?final?boolean?hasQueuedPredecessors()?{//?The?correctness?of?this?depends?on?head?being?initialized//?before?tail?and?on?head.next?being?accurate?if?the?current//?thread?is?first?in?queue.Node?t?=?tail;?//?Read?fields?in?reverse?initialization?orderNode?h?=?head;Node?s;return?h?!=?t?&&?((s?=?h.next)?==?null?||?s.thread?!=?Thread.currentThread()); }看到這里,我們理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什么要判斷的頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)?第一個(gè)節(jié)點(diǎn)儲(chǔ)存的數(shù)據(jù)是什么?
雙向鏈表中,第一個(gè)節(jié)點(diǎn)為虛節(jié)點(diǎn),其實(shí)并不存儲(chǔ)任何信息,只是占位。真正的第一個(gè)有數(shù)據(jù)的節(jié)點(diǎn),是在第二個(gè)節(jié)點(diǎn)開始的。當(dāng)h != t時(shí):
?
a. 如果(s = h.next) == null,等待隊(duì)列正在有線程進(jìn)行初始化,但只是進(jìn)行到了Tail指向Head,沒有將Head指向Tail,此時(shí)隊(duì)列中有元素,需要返回True(這塊具體見下邊代碼分析)。
b. 如果(s = h.next) != null,說明此時(shí)隊(duì)列中至少有一個(gè)有效節(jié)點(diǎn)。如果此時(shí)s.thread == Thread.currentThread(),說明等待隊(duì)列的第一個(gè)有效節(jié)點(diǎn)中的線程與當(dāng)前線程相同,那么當(dāng)前線程是可以獲取資源的;如果s.thread != Thread.currentThread(),說明等待隊(duì)列的第一個(gè)有效節(jié)點(diǎn)線程與當(dāng)前線程不同,當(dāng)前線程必須加入進(jìn)等待隊(duì)列。
?
//?java.util.concurrent.locks.AbstractQueuedSynchronizer#enqif?(t?==?null)?{?//?Must?initializeif?(compareAndSetHead(new?Node()))tail?=?head; }?else?{node.prev?=?t;if?(compareAndSetTail(t,?node))?{t.next?=?node;return?t;} }?
節(jié)點(diǎn)入隊(duì)不是原子操作,所以會(huì)出現(xiàn)短暫的head != tail,此時(shí)Tail指向最后一個(gè)節(jié)點(diǎn),而且Tail指向Head。如果Head沒有指向Tail(可見5、6、7行),這種情況下也需要將相關(guān)線程加入隊(duì)列中。所以這塊代碼是為了解決極端情況下的并發(fā)問題。
2.3.1.3 等待隊(duì)列中線程出隊(duì)列時(shí)機(jī)
回到最初的源碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?void?acquire(int?arg)?{if?(!tryAcquire(arg)?&&?acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))selfInterrupt(); }上文解釋了addWaiter方法,這個(gè)方法其實(shí)就是把對(duì)應(yīng)的線程以Node的數(shù)據(jù)結(jié)構(gòu)形式加入到雙端隊(duì)列里,返回的是一個(gè)包含該線程的Node。而這個(gè)Node會(huì)作為參數(shù),進(jìn)入到acquireQueued方法中。acquireQueued方法可以對(duì)排隊(duì)中的線程進(jìn)行“獲鎖”操作。
總的來說,一個(gè)線程獲取鎖失敗了,被放入等待隊(duì)列,acquireQueued會(huì)把放入隊(duì)列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲取(中斷)。
下面我們從“何時(shí)出隊(duì)列?”和“如何出隊(duì)列?”兩個(gè)方向來分析一下acquireQueued源碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{//?標(biāo)記是否成功拿到資源boolean?failed?=?true;try?{//?標(biāo)記等待過程中是否中斷過boolean?interrupted?=?false;//?開始自旋,要么獲取鎖,要么中斷for?(;;)?{//?獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)final?Node?p?=?node.predecessor();//?如果p是頭結(jié)點(diǎn),說明當(dāng)前節(jié)點(diǎn)在真實(shí)數(shù)據(jù)隊(duì)列的首部,就嘗試獲取鎖(別忘了頭結(jié)點(diǎn)是虛節(jié)點(diǎn))if?(p?==?head?&&?tryAcquire(arg))?{//?獲取鎖成功,頭指針移動(dòng)到當(dāng)前nodesetHead(node);p.next?=?null;?//?help?GCfailed?=?false;return?interrupted;}//?說明p為頭節(jié)點(diǎn)且當(dāng)前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結(jié)點(diǎn),這個(gè)時(shí)候就要判斷當(dāng)前node是否要被阻塞(被阻塞條件:前驅(qū)節(jié)點(diǎn)的waitStatus為-1),防止無限循環(huán)浪費(fèi)資源。具體兩個(gè)方法下面細(xì)細(xì)分析if?(shouldParkAfterFailedAcquire(p,?node)?&&?parkAndCheckInterrupt())interrupted?=?true;}}?finally?{if?(failed)cancelAcquire(node);} }注:setHead方法是把當(dāng)前節(jié)點(diǎn)置為虛節(jié)點(diǎn),但并沒有修改waitStatus,因?yàn)樗且恢毙枰玫臄?shù)據(jù)。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?setHead(Node?node)?{head?=?node;node.thread?=?null;node.prev?=?null; }?
//?java.util.concurrent.locks.AbstractQueuedSynchronizer//?靠前驅(qū)節(jié)點(diǎn)判斷當(dāng)前線程是否應(yīng)該被阻塞 private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{//?獲取頭結(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)int?ws?=?pred.waitStatus;//?說明頭結(jié)點(diǎn)處于喚醒狀態(tài)if?(ws?==?Node.SIGNAL)return?true;?//?通過枚舉值我們知道waitStatus>0是取消狀態(tài)if?(ws?>?0)?{do?{//?循環(huán)向前查找取消節(jié)點(diǎn),把取消節(jié)點(diǎn)從隊(duì)列中剔除node.prev?=?pred?=?pred.prev;}?while?(pred.waitStatus?>?0);pred.next?=?node;}?else?{//?設(shè)置前任節(jié)點(diǎn)等待狀態(tài)為SIGNALcompareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);}return?false; }parkAndCheckInterrupt主要用于掛起當(dāng)前線程,阻塞調(diào)用棧,返回當(dāng)前線程的中斷狀態(tài)。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?final?boolean?parkAndCheckInterrupt()?{LockSupport.park(this);return?Thread.interrupted(); }上述方法的流程圖如下:
從上圖可以看出,跳出當(dāng)前循環(huán)的條件是當(dāng)“前置節(jié)點(diǎn)是頭結(jié)點(diǎn),且當(dāng)前線程獲取鎖成功”。為了防止因死循環(huán)導(dǎo)致CPU資源被浪費(fèi),我們會(huì)判斷前置節(jié)點(diǎn)的狀態(tài)來決定是否要將當(dāng)前線程掛起,具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):
從隊(duì)列中釋放節(jié)點(diǎn)的疑慮打消了,那么又有新問題了:
-
shouldParkAfterFailedAcquire中取消節(jié)點(diǎn)是怎么生成的呢?什么時(shí)候會(huì)把一個(gè)節(jié)點(diǎn)的waitStatus設(shè)置為-1?
-
是在什么時(shí)間釋放節(jié)點(diǎn)通知到被掛起的線程呢?
2.3.2 CANCELLED狀態(tài)節(jié)點(diǎn)生成
acquireQueued方法中的Finally代碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{boolean?failed?=?true;try?{...for?(;;)?{final?Node?p?=?node.predecessor();if?(p?==?head?&&?tryAcquire(arg))?{...failed?=?false;...}...}?finally?{if?(failed)cancelAcquire(node);} }通過cancelAcquire方法,將Node的狀態(tài)標(biāo)記為CANCELLED。接下來,我們逐行來分析這個(gè)方法的原理:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?cancelAcquire(Node?node)?{//?將無效節(jié)點(diǎn)過濾if?(node?==?null)return;//?設(shè)置該節(jié)點(diǎn)不關(guān)聯(lián)任何線程,也就是虛節(jié)點(diǎn)node.thread?=?null;Node?pred?=?node.prev;//?通過前驅(qū)節(jié)點(diǎn),跳過取消狀態(tài)的nodewhile?(pred.waitStatus?>?0)node.prev?=?pred?=?pred.prev;//?獲取過濾后的前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn)Node?predNext?=?pred.next;//?把當(dāng)前node的狀態(tài)設(shè)置為CANCELLEDnode.waitStatus?=?Node.CANCELLED;//?如果當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn),將從后往前的第一個(gè)非取消狀態(tài)的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)//?更新失敗的話,則進(jìn)入else,如果更新成功,將tail的后繼節(jié)點(diǎn)設(shè)置為nullif?(node?==?tail?&&?compareAndSetTail(node,?pred))?{compareAndSetNext(pred,?predNext,?null);}?else?{int?ws;//?如果當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn),1:判斷當(dāng)前節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)的是否為SIGNAL,2:如果不是,則把前驅(qū)節(jié)點(diǎn)設(shè)置為SINGAL看是否成功//?如果1和2中有一個(gè)為true,再判斷當(dāng)前節(jié)點(diǎn)的線程是否為null//?如果上述條件都滿足,把當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的后繼指針指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)if?(pred?!=?head?&&?((ws?=?pred.waitStatus)?==?Node.SIGNAL?||?(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&?pred.thread?!=?null)?{Node?next?=?node.next;if?(next?!=?null?&&?next.waitStatus?<=?0)compareAndSetNext(pred,?predNext,?next);}?else?{//?如果當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn),或者上述條件不滿足,那就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)unparkSuccessor(node);}node.next?=?node;?//?help?GC} }當(dāng)前的流程:
獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果前驅(qū)節(jié)點(diǎn)的狀態(tài)是CANCELLED,那就一直往前遍歷,找到第一個(gè)waitStatus <= 0的節(jié)點(diǎn),將找到的Pred節(jié)點(diǎn)和當(dāng)前Node關(guān)聯(lián),將當(dāng)前Node設(shè)置為CANCELLED。
根據(jù)當(dāng)前節(jié)點(diǎn)的位置,考慮以下三種情況:
當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)。
當(dāng)前節(jié)點(diǎn)是Head的后繼節(jié)點(diǎn)。
當(dāng)前節(jié)點(diǎn)不是Head的后繼節(jié)點(diǎn),也不是尾節(jié)點(diǎn)。
根據(jù)上述第二條,我們來分析每一種情況的流程。
當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)。
當(dāng)前節(jié)點(diǎn)是Head的后繼節(jié)點(diǎn)。
當(dāng)前節(jié)點(diǎn)不是Head的后繼節(jié)點(diǎn),也不是尾節(jié)點(diǎn)。
通過上面的流程,我們對(duì)于CANCELLED節(jié)點(diǎn)狀態(tài)的產(chǎn)生和變化已經(jīng)有了大致的了解,但是為什么所有的變化都是對(duì)Next指針進(jìn)行了操作,而沒有對(duì)Prev指針進(jìn)行操作呢?什么情況下會(huì)對(duì)Prev指針進(jìn)行操作?
(1)執(zhí)行cancelAcquire的時(shí)候,當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)可能已經(jīng)從隊(duì)列中出去了(已經(jīng)執(zhí)行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時(shí)修改Prev指針,有可能會(huì)導(dǎo)致Prev指向另一個(gè)已經(jīng)移除隊(duì)列的Node,因此這塊變化Prev指針不安全。
(2)shouldParkAfterFailedAcquire方法中,會(huì)執(zhí)行下面的代碼,其實(shí)就是在處理Prev指針。shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會(huì)執(zhí)行,進(jìn)入該方法后,說明共享資源已被獲取,當(dāng)前節(jié)點(diǎn)之前的節(jié)點(diǎn)都不會(huì)出現(xiàn)變化,因此這個(gè)時(shí)候變更Prev指針比較安全。
?
do?{node.prev?=?pred?=?pred.prev; }?while?(pred.waitStatus?>?0);2.3.3 如何解鎖
我們已經(jīng)剖析了加鎖過程中的基本流程,接下來再對(duì)解鎖的基本流程進(jìn)行分析。由于ReentrantLock在解鎖的時(shí)候,并不區(qū)分公平鎖和非公平鎖,所以我們直接看解鎖的源碼:
//?java.util.concurrent.locks.ReentrantLockpublic?void?unlock()?{sync.release(1); }可以看到,本質(zhì)釋放鎖的地方,是通過框架來完成的。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?boolean?release(int?arg)?{if?(tryRelease(arg))?{Node?h?=?head;if?(h?!=?null?&&?h.waitStatus?!=?0)unparkSuccessor(h);return?true;}return?false; }在ReentrantLock里面的公平鎖和非公平鎖的父類Sync定義了可重入鎖的釋放鎖機(jī)制。
//?java.util.concurrent.locks.ReentrantLock.Sync//?方法返回當(dāng)前鎖是不是沒有被線程持有 protected?final?boolean?tryRelease(int?releases)?{//?減少可重入次數(shù)int?c?=?getState()?-?releases;//?當(dāng)前線程不是持有鎖的線程,拋出異常if?(Thread.currentThread()?!=?getExclusiveOwnerThread())throw?new?IllegalMonitorStateException();boolean?free?=?false;//?如果持有線程全部釋放,將當(dāng)前獨(dú)占鎖所有線程設(shè)置為null,并更新stateif?(c?==?0)?{free?=?true;setExclusiveOwnerThread(null);}setState(c);return?free; }我們來解釋下述源碼:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerpublic?final?boolean?release(int?arg)?{//?上邊自定義的tryRelease如果返回true,說明該鎖沒有被任何線程持有if?(tryRelease(arg))?{//?獲取頭結(jié)點(diǎn)Node?h?=?head;//?頭結(jié)點(diǎn)不為空并且頭結(jié)點(diǎn)的waitStatus不是初始化節(jié)點(diǎn)情況,解除線程掛起狀態(tài)if?(h?!=?null?&&?h.waitStatus?!=?0)unparkSuccessor(h);return?true;}return?false; }這里的判斷條件為什么是h != null && h.waitStatus != 0?
(1)h == null ? Head還沒初始化。初始情況下,head == null,第一個(gè)節(jié)點(diǎn)入隊(duì),Head會(huì)被初始化一個(gè)虛擬節(jié)點(diǎn)。所以說,這里如果還沒來得及入隊(duì),就會(huì)出現(xiàn)head == null 的情況。
(2)h != null && waitStatus == 0 ? 表明后繼節(jié)點(diǎn)對(duì)應(yīng)的線程仍在運(yùn)行中,不需要喚醒。
(3)h != null && waitStatus < 0 ?表明后繼節(jié)點(diǎn)可能被阻塞了,需要喚醒。
在看一下unparkSuccessor方法:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?void?unparkSuccessor(Node?node)?{//?獲取頭結(jié)點(diǎn)waitStatusint?ws?=?node.waitStatus;if?(ws?<?0)compareAndSetWaitStatus(node,?ws,?0);//?獲取當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)Node?s?=?node.next;//?如果下個(gè)節(jié)點(diǎn)是null或者下個(gè)節(jié)點(diǎn)被cancelled,就找到隊(duì)列最開始的非cancelled的節(jié)點(diǎn)if?(s?==?null?||?s.waitStatus?>?0)?{s?=?null;//?就從尾部節(jié)點(diǎn)開始找,到隊(duì)首,找到隊(duì)列第一個(gè)waitStatus<0的節(jié)點(diǎn)。for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)if?(t.waitStatus?<=?0)s?=?t;}//?如果當(dāng)前節(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)不為空,而且狀態(tài)<=0,就把當(dāng)前節(jié)點(diǎn)unparkif?(s?!=?null)LockSupport.unpark(s.thread); }為什么要從后往前找第一個(gè)非Cancelled的節(jié)點(diǎn)呢?原因如下。
之前的addWaiter方法:
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?Node?addWaiter(Node?mode)?{Node?node?=?new?Node(Thread.currentThread(),?mode);//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failureNode?pred?=?tail;if?(pred?!=?null)?{node.prev?=?pred;if?(compareAndSetTail(pred,?node))?{pred.next?=?node;return?node;}}enq(node);return?node; }我們從這里可以看到,節(jié)點(diǎn)入隊(duì)并不是原子操作,也就是說,node.prev = pred; ?compareAndSetTail(pred, node) 這兩個(gè)地方可以看作Tail入隊(duì)的原子操作,但是此時(shí)pred.next = node;還沒執(zhí)行,如果這個(gè)時(shí)候執(zhí)行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找。還有一點(diǎn)原因,在產(chǎn)生CANCELLED狀態(tài)節(jié)點(diǎn)的時(shí)候,先斷開的是Next指針,Prev指針并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node。
綜上所述,如果是從前往后找,由于極端情況下入隊(duì)的非原子操作和CANCELLED節(jié)點(diǎn)產(chǎn)生過程中斷開Next指針的操作,可能會(huì)導(dǎo)致無法遍歷所有的節(jié)點(diǎn)。所以,喚醒對(duì)應(yīng)的線程后,對(duì)應(yīng)的線程就會(huì)繼續(xù)往下執(zhí)行。繼續(xù)執(zhí)行acquireQueued方法以后,中斷如何處理?
2.3.4 中斷恢復(fù)后的執(zhí)行流程
喚醒后,會(huì)執(zhí)行return Thread.interrupted();,這個(gè)函數(shù)返回的是當(dāng)前執(zhí)行線程的中斷狀態(tài),并清除。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?final?boolean?parkAndCheckInterrupt()?{LockSupport.park(this);return?Thread.interrupted(); }再回到acquireQueued代碼,當(dāng)parkAndCheckInterrupt返回True或者False的時(shí)候,interrupted的值不同,但都會(huì)執(zhí)行下次循環(huán)。如果這個(gè)時(shí)候獲取鎖成功,就會(huì)把當(dāng)前interrupted返回。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerfinal?boolean?acquireQueued(final?Node?node,?int?arg)?{boolean?failed?=?true;try?{boolean?interrupted?=?false;for?(;;)?{final?Node?p?=?node.predecessor();if?(p?==?head?&&?tryAcquire(arg))?{setHead(node);p.next?=?null;?//?help?GCfailed?=?false;return?interrupted;}if?(shouldParkAfterFailedAcquire(p,?node)?&&?parkAndCheckInterrupt())interrupted?=?true;}}?finally?{if?(failed)cancelAcquire(node);} }如果acquireQueued為True,就會(huì)執(zhí)行selfInterrupt方法。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerstatic?void?selfInterrupt()?{Thread.currentThread().interrupt(); }該方法其實(shí)是為了中斷線程。但為什么獲取了鎖以后還要中斷線程呢?這部分屬于Java提供的協(xié)作式中斷知識(shí)內(nèi)容,感興趣同學(xué)可以查閱一下。這里簡單介紹一下:
(1)?當(dāng)中斷線程被喚醒時(shí),并不知道被喚醒的原因,可能是當(dāng)前線程在等待中被中斷,也可能是釋放了鎖以后被喚醒。因此我們通過Thread.interrupted()方法檢查中斷標(biāo)記(該方法返回了當(dāng)前線程的中斷狀態(tài),并將當(dāng)前線程的中斷標(biāo)識(shí)設(shè)置為False),并記錄下來,如果發(fā)現(xiàn)該線程被中斷過,就再中斷一次。
(2)?線程在等待資源的過程中被喚醒,喚醒后還是會(huì)不斷地去嘗試獲取鎖,直到搶到鎖為止。也就是說,在整個(gè)流程中,并不響應(yīng)中斷,只是記錄中斷記錄。最后搶到鎖返回了,那么如果被中斷過的話,就需要補(bǔ)充一次中斷。
這里的處理方式主要是運(yùn)用線程池中基本運(yùn)作單元Worder中的runWorker,通過Thread.interrupted()進(jìn)行額外的判斷處理,感興趣的同學(xué)可以看下ThreadPoolExecutor源碼。
2.3.5 小結(jié)
我們?cè)?.3小節(jié)中提出了一些問題,現(xiàn)在來回答一下。
Q:某個(gè)線程獲取鎖失敗的后續(xù)流程是什么呢?
A:存在某種排隊(duì)等候機(jī)制,線程繼續(xù)等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續(xù)。
Q:既然說到了排隊(duì)等候機(jī)制,那么就一定會(huì)有某種隊(duì)列形成,這樣的隊(duì)列是什么數(shù)據(jù)結(jié)構(gòu)呢?
A:是CLH變體的FIFO雙端隊(duì)列。
Q:處于排隊(duì)等候機(jī)制中的線程,什么時(shí)候可以有機(jī)會(huì)獲取鎖呢?
A:可以詳細(xì)看下2.3.1.3小節(jié)。
Q:如果處于排隊(duì)等候機(jī)制中的線程一直無法獲取鎖,需要一直等待么?還是有別的策略來解決這一問題?
A:線程所在節(jié)點(diǎn)的狀態(tài)會(huì)變成取消狀態(tài),取消狀態(tài)的節(jié)點(diǎn)會(huì)從隊(duì)列中釋放,具體可見2.3.2小節(jié)。
Q:Lock函數(shù)通過Acquire方法進(jìn)行加鎖,但是具體是如何加鎖的呢?
A:AQS的Acquire會(huì)調(diào)用tryAcquire方法,tryAcquire由各個(gè)自定義同步器實(shí)現(xiàn),通過tryAcquire完成加鎖過程。
3 AQS應(yīng)用
3.1 ReentrantLock的可重入應(yīng)用
ReentrantLock的可重入性是AQS很好的應(yīng)用之一,在了解完上述知識(shí)點(diǎn)以后,我們很容易得知ReentrantLock實(shí)現(xiàn)可重入的方法。在ReentrantLock里面,不管是公平鎖還是非公平鎖,都有一段邏輯。
公平鎖:
//?java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquireif?(c?==?0)?{if?(!hasQueuedPredecessors()?&&?compareAndSetState(0,?acquires))?{setExclusiveOwnerThread(current);return?true;} } else?if?(current?==?getExclusiveOwnerThread())?{int?nextc?=?c?+?acquires;if?(nextc?<?0)throw?new?Error("Maximum?lock?count?exceeded");setState(nextc);return?true; }非公平鎖:
//?java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquireif?(c?==?0)?{if?(compareAndSetState(0,?acquires)){setExclusiveOwnerThread(current);return?true;} } else?if?(current?==?getExclusiveOwnerThread())?{int?nextc?=?c?+?acquires;if?(nextc?<?0)?//?overflowthrow?new?Error("Maximum?lock?count?exceeded");setState(nextc);return?true; }從上面這兩段都可以看到,有一個(gè)同步狀態(tài)State來控制整體可重入的情況。State是Volatile修飾的,用于保證一定的可見性和有序性。
//?java.util.concurrent.locks.AbstractQueuedSynchronizerprivate?volatile?int?state;接下來看State這個(gè)字段主要的過程:
(1) State初始化的時(shí)候?yàn)?,表示沒有任何線程持有鎖。
(2)?當(dāng)有線程持有該鎖時(shí),值就會(huì)在原來的基礎(chǔ)上+1,同一個(gè)線程多次獲得鎖是,就會(huì)多次+1,這里就是可重入的概念。
(3)?解鎖也是對(duì)這個(gè)字段-1,一直到0,此線程對(duì)鎖釋放。
3.2 JUC中的應(yīng)用場景
除了上邊ReentrantLock的可重入性的應(yīng)用,AQS作為并發(fā)編程的框架,為很多其他同步工具提供了良好的解決方案。下面列出了JUC中的幾種同步工具,大體介紹一下AQS的應(yīng)用場景:
| ReentrantLock | 使用AQS保存鎖重復(fù)持有的次數(shù)。當(dāng)一個(gè)線程獲取鎖時(shí),ReentrantLock記錄當(dāng)前獲得鎖的線程標(biāo)識(shí),用于檢測是否重復(fù)獲取,以及錯(cuò)誤線程試圖解鎖操作時(shí)異常情況的處理。 |
| ReentrantReadWriteLock | 使用AQS同步狀態(tài)中的16位保存寫鎖持有的次數(shù),剩下的16位用于保存讀鎖的持有次數(shù)。 |
| Semaphore | 使用AQS同步狀態(tài)來保存信號(hào)量的當(dāng)前計(jì)數(shù)。tryRelease會(huì)增加計(jì)數(shù),acquireShared會(huì)減少計(jì)數(shù)。 |
| CountDownLatch | 使用AQS同步狀態(tài)來表示計(jì)數(shù)。計(jì)數(shù)為0時(shí),所有的Acquire操作(CountDownLatch的await方法)才可以通過。 |
| ThreadPoolExecutor | Worker利用AQS同步狀態(tài)實(shí)現(xiàn)對(duì)獨(dú)占線程變量的設(shè)置(tryAcquire和tryRelease)。 |
3.3 自定義同步工具
了解AQS基本原理以后,按照上面所說的AQS知識(shí)點(diǎn),自己實(shí)現(xiàn)一個(gè)同步工具。
public?class?LeeLock??{private?static?class?Sync?extends?AbstractQueuedSynchronizer?{@Overrideprotected?boolean?tryAcquire?(int?arg)?{return?compareAndSetState(0,?1);}@Overrideprotected?boolean?tryRelease?(int?arg)?{setState(0);return?true;}@Overrideprotected?boolean?isHeldExclusively?()?{return?getState()?==?1;}}private?Sync?sync?=?new?Sync();public?void?lock?()?{sync.acquire(1);}public?void?unlock?()?{sync.release(1);} }通過我們自己定義的Lock完成一定的同步功能。
public?class?LeeMain?{static?int?count?=?0;static?LeeLock?leeLock?=?new?LeeLock();public?static?void?main?(String[]?args)?throws?InterruptedException?{Runnable?runnable?=?new?Runnable()?{@Overridepublic?void?run?()?{try?{leeLock.lock();for?(int?i?=?0;?i?<?10000;?i++)?{count++;}}?catch?(Exception?e)?{e.printStackTrace();}?finally?{leeLock.unlock();}}};Thread?thread1?=?new?Thread(runnable);Thread?thread2?=?new?Thread(runnable);thread1.start();thread2.start();thread1.join();thread2.join();System.out.println(count);} }上述代碼每次運(yùn)行結(jié)果都會(huì)是20000。通過簡單的幾行代碼就能實(shí)現(xiàn)同步功能,這就是AQS的強(qiáng)大之處。
總結(jié)
我們?nèi)粘i_發(fā)中使用并發(fā)的場景太多,但是對(duì)并發(fā)內(nèi)部的基本框架原理了解的人卻不多。由于篇幅原因,本文僅介紹了可重入鎖ReentrantLock的原理和AQS原理,希望能夠成為大家了解AQS和ReentrantLock等同步器的“敲門磚”。
參考資料
-
Lea D. The java. util. concurrent synchronizer framework[J]. Science of Computer Programming, 2005, 58(3): 293-309.
-
《Java并發(fā)編程實(shí)戰(zhàn)》
-
不可不說的Java“鎖”事
總結(jié)
- 上一篇: 复方枸杞子颗粒_功效作用注意事项用药禁忌
- 下一篇: Learning to rank基本算法