AQS实现原理分析
AQS
隊(duì)列同步器(AbstractQueuedSynchronizer)簡(jiǎn)稱AQS,是J.U.C同步構(gòu)件的基礎(chǔ),包括ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphor都是基于AQS實(shí)現(xiàn)的。
理解AQS是理解這些同步工具的基礎(chǔ),基于AQS提供的同步語義可以定制各種功能的同步工具。
AQS原理
-
用一個(gè)int類型的狀態(tài)變量(volatile)state記錄同步狀態(tài),默認(rèn)值是0
用一個(gè)雙向鏈表實(shí)現(xiàn)的隊(duì)列對(duì)線程進(jìn)行排隊(duì)和調(diào)度 -
A線程使用compareAndSet(state,0,1)原子設(shè)置state的值,設(shè)置成功說明state當(dāng)前無其他線程爭(zhēng)用,A線程取鎖的使用權(quán)。
-
設(shè)置不成功,說明B線程對(duì)state的值進(jìn)行了設(shè)置,并且沒有復(fù)位(state!=0),B線程持有鎖的使用權(quán)(B線程還沒有釋放鎖)。A線程會(huì)構(gòu)造成一個(gè)Node節(jié)點(diǎn)加入隊(duì)列尾部并掛起。
-
當(dāng)B線程執(zhí)行完同步操作后,對(duì)state進(jìn)行復(fù)位(state==0),即釋放鎖,然后從隊(duì)列頭開始尋找,發(fā)現(xiàn)正在沉睡的A線程,將其喚醒。
AQS同步隊(duì)列
static final class Node {/**共享模式 */static final Node SHARED = new Node();/**獨(dú)占模式 */static final Node EXCLUSIVE = null;/**取消狀態(tài),由于在同步隊(duì)列中等待的線程等待超時(shí)或被中斷,* 需要從同步隊(duì)列中取消等待。 */static final int CANCELLED = 1;/**通知狀態(tài),當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行(unpark)* 當(dāng)前節(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者被取消,將通知后續(xù)節(jié)點(diǎn)。*/static final int SIGNAL = -1;/**條件阻塞狀態(tài),節(jié)點(diǎn)線程等待在Condition上,* 當(dāng)其他線程對(duì)Condition調(diào)用了signal()方法后,* 該節(jié)點(diǎn)將會(huì)從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,* 加入到對(duì)同步狀態(tài)的獲取中。 */static final int CONDITION = -2;/**傳播狀態(tài),表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行。*/static final int PROPAGATE = -3;/**節(jié)點(diǎn)的的狀態(tài) * 初始狀態(tài)為0 表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取狀態(tài)。*/volatile int waitStatus;/** 前驅(qū)節(jié)點(diǎn) */volatile Node prev;/** 后繼節(jié)點(diǎn) */volatile Node next;/**節(jié)點(diǎn)對(duì)應(yīng)的線程,等待獲取同步狀態(tài)的線程。 */volatile Thread thread;/**下一等待節(jié)點(diǎn)*/Node nextWaiter;/**是否共享模式 */final boolean isShared() {return nextWaiter == SHARED;}/**獲取前驅(qū)節(jié)點(diǎn) */final Node predecessor() throws NullPointerException {}Node() {}Node(Thread thread, Node mode) {}Node(Thread thread, int waitStatus) {} }AQS基于一個(gè)FIFO雙向隊(duì)列實(shí)現(xiàn),被設(shè)計(jì)給那些依賴一個(gè)代表狀態(tài)的原子int值的同步器使用。我們都知道,既然叫同步器,那個(gè)肯定有個(gè)代表同步狀態(tài)(臨界資源)的東西,在AQS中即為一個(gè)叫state的int值,該值通過CAS進(jìn)行原子修改。
在AQS中存在一個(gè)FIFO隊(duì)列,隊(duì)列中的節(jié)點(diǎn)表示被阻塞的線程,隊(duì)列節(jié)點(diǎn)元素有4種類型, 每種類型表示線程被阻塞的原因,這四種類型分別是:
- CANCELLED : 表示該線程是因?yàn)槌瑫r(shí)或者中斷原因而被放到隊(duì)列中
- CONDITION : 表示該線程是因?yàn)槟硞€(gè)條件不滿足而被放到隊(duì)列中,需要等待一個(gè)條件,直到條件成立后才會(huì)出隊(duì)
- SIGNAL : 表示該線程需要被喚醒
- PROPAGATE : 表示在共享模式下,當(dāng)前節(jié)點(diǎn)執(zhí)行釋放release操作后,當(dāng)前結(jié)點(diǎn)需要傳播通知給后面所有節(jié)點(diǎn)
由于一個(gè)共享資源同一時(shí)間只能由一條線程持有,也可以被多個(gè)線程持有,因此AQS中存在兩種模式,如下:
-
1、獨(dú)占模式
獨(dú)占模式表示共享狀態(tài)值state每次能由一條線程持有,其他線程如果需要獲取,則需要阻塞,如JUC中的ReentrantLock
-
2、共享模式
共享模式表示共享狀態(tài)值state每次可以由多個(gè)線程持有,如JUC中的CountDownLatch
AQS中的共享狀態(tài)值
之前提到,AQS是基于一個(gè)共享的int類型的state值來實(shí)現(xiàn)同步器同步的,其聲明如下:
/*** 同步狀態(tài)值*/ private volatile int state;/*** 獲取同步狀態(tài)值*/ protected final int getState() {return state; }/*** 修改同步狀態(tài)值*/ protected final void setState(int newState) {state = newState; }由源碼我們可以看出,AQS聲明了一個(gè)int類型的state值,為了達(dá)到多線程同步的功能,必然對(duì)該值的修改必須多線程可見,因此,state采用volatile修飾,而且getState()和setState()方法采用final進(jìn)行修飾,目的是限制AQS的子類只能調(diào)用這兩個(gè)方法對(duì)state的值進(jìn)行設(shè)置和獲取,而不能對(duì)其進(jìn)行重寫自定義設(shè)置/獲取邏輯。
AQS中提供對(duì)state值修改的方法不僅僅只有setState()和getState(),還有諸如采用CAS機(jī)制進(jìn)行設(shè)置的compareAndSetState()方法,同樣,該方法也是采用final修飾的,不允許子類重寫,只能調(diào)用。
AQS中的tryXXX方法
一般基于AQS實(shí)現(xiàn)的同步器,如ReentrantLock,CountDownLatch等,對(duì)于state的獲取操作,子類只需重寫其tryAcquire()和tryAcquireShared()方法即可,這兩個(gè)方法分別對(duì)應(yīng)獨(dú)占模式和共享模式下對(duì)state的獲取操作;而對(duì)于釋放操作,子類只需重寫tryRelease()和tryReleaseShared()方法即可。
至于如何維護(hù)隊(duì)列的出隊(duì)、入隊(duì)操作,子類不用管,AQS已經(jīng)幫你做好了。
AQS 設(shè)計(jì)妙處
CAS自旋鎖
當(dāng)我們執(zhí)行一個(gè)有確定結(jié)果的操作,同時(shí)又需要并發(fā)正確執(zhí)行,通??梢圆捎米孕i實(shí)現(xiàn)。在AQS中,自旋鎖采用 死循環(huán) + CAS 實(shí)現(xiàn)。針對(duì)AQS中的enq()進(jìn)行講解:
private Node enq(final Node node) {// 死循環(huán) + CAS ,解決入隊(duì)并發(fā)問題/*** 假設(shè)有三個(gè)線程同時(shí)都需要入隊(duì)操作,那么使用死循環(huán)和CAS可保證并發(fā)安全,同一時(shí)間只有一個(gè)節(jié)點(diǎn)安全入隊(duì),入隊(duì)失敗的線程則循環(huán)重試* * 1、如果不要死循環(huán)可以嗎?只用CAS.* 不可以,因?yàn)槿绻渌€程修改了tail的值,導(dǎo)致1處代碼返回false,那么方法enq方法將退出,導(dǎo)致該入隊(duì)的節(jié)點(diǎn)卻沒能入隊(duì)* * 2、如果只用死循環(huán),不需要CAS可以嗎?* 不可以,首先不需要使用CAS,那就沒必要再使用死循環(huán)了,再者,如果不使用CAS,那么當(dāng)執(zhí)行1處代碼時(shí),將會(huì)改變隊(duì)列的結(jié)構(gòu)*/for (;;) {// 獲取尾部節(jié)點(diǎn)Node t = tail;// 如果還沒有初始化,那么就初始化if (t == null) { // Must initializeif (compareAndSetHead(new Node()))// 剛開始肯定是頭指針和尾指針相等tail = head;} else {// 當(dāng)前結(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)等于尾部節(jié)點(diǎn)node.prev = t;// 如果當(dāng)前尾結(jié)點(diǎn)仍然是t,那么執(zhí)行入隊(duì)并返回true,否則返回false,然后重試if (compareAndSetTail(t, node)) { // 1t.next = node;return t;}}} }首先入隊(duì)操作要求的最終結(jié)果必須是一個(gè)節(jié)點(diǎn)插入到隊(duì)列中去,只能成功,不能失敗!然而這個(gè)入隊(duì)的操作是需要并發(fā)執(zhí)行的,有可能同時(shí)有很多的線程需要執(zhí)行入隊(duì)操作,因此我們需要采取相關(guān)的線程同步機(jī)制。自旋鎖采取樂觀策略,即使用了CAS中的compareAndSet()操作,如果某次執(zhí)行返回fasle,那么當(dāng)前操作必須重試,因此,采用for死循環(huán)直到成功為止,成功,則break跳出for循環(huán)或者直接return操作退出方法。
模板方法
在AQS中,模板方法設(shè)計(jì)模式體現(xiàn)在其acquire()、release()方法上,我們先來看下源碼:
public final void acquire(int arg) {// 首先嘗試獲取共享狀態(tài),如果獲取成功,則tryAcquire()返回trueif (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}其中調(diào)用tryAcquire()方法的默認(rèn)實(shí)現(xiàn)是拋出一個(gè)異常,也就是說tryAcquire()方法留給子類去實(shí)現(xiàn),acquire()方法定義了一個(gè)模板,一套處理邏輯,相關(guān)具體執(zhí)行方法留給子類去實(shí)現(xiàn)。
自定義AQS并發(fā)同步器
下邊以JDK文檔的一個(gè)實(shí)例進(jìn)行介紹:
class Mutex implements Lock, java.io.Serializable {// 自定義同步器private static class Sync extends AbstractQueuedSynchronizer {// 判斷是否鎖定狀態(tài)protected boolean isHeldExclusively() {return getState() == 1;}// 嘗試獲取資源,立即返回。成功則返回true,否則false。public boolean tryAcquire(int acquires) {assert acquires == 1; // 這里限定只能為1個(gè)量if (compareAndSetState(0, 1)) {//state為0才設(shè)置為1,不可重入!setExclusiveOwnerThread(Thread.currentThread());//設(shè)置為當(dāng)前線程獨(dú)占資源return true;}return false;}// 嘗試釋放資源,立即返回。成功則為true,否則false。protected boolean tryRelease(int releases) {assert releases == 1; // 限定為1個(gè)量if (getState() == 0)//既然來釋放,那肯定就是已占有狀態(tài)了。只是為了保險(xiǎn),多層判斷!throw new IllegalMonitorStateException();setExclusiveOwnerThread(null);setState(0);//釋放資源,放棄占有狀態(tài)return true;}}// 真正同步類的實(shí)現(xiàn)都依賴?yán)^承于AQS的自定義同步器!private final Sync sync = new Sync();//lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。public void lock() {sync.acquire(1);}//tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。public boolean tryLock() {return sync.tryAcquire(1);}//unlock<-->release。兩者語文一樣:釋放資源。public void unlock() {sync.release(1);}//鎖是否占有狀態(tài)public boolean isLocked() {return sync.isHeldExclusively();} }實(shí)現(xiàn)自己的同步類一般都會(huì)自定義同步器(sync),并且將該類定義為內(nèi)部類,供自己使用;而同步類自己(Mutex)則實(shí)現(xiàn)某個(gè)接口,對(duì)外服務(wù)。當(dāng)然,接口的實(shí)現(xiàn)要直接依賴sync,它們?cè)谡Z義上也存在某種對(duì)應(yīng)關(guān)系!!而sync只用實(shí)現(xiàn)資源state的獲取-釋放方式tryAcquire-tryRelelase,至于線程的排隊(duì)、等待、喚醒等,上層的AQS都已經(jīng)實(shí)現(xiàn)好了,我們不用關(guān)心。
除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實(shí)現(xiàn)方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點(diǎn),AQS的核心便被攻破了!
總結(jié)
- 上一篇: 【完整代码】使用Semaphore实现线
- 下一篇: AQS源码