Java Review - 并发编程_抽象同步队列AQS
文章目錄
- 概述 AQS——鎖的底層支持
- state 的作用
- ConditionObject
- 獨(dú)占 VS 共享
- 獨(dú)占方式下,獲取與釋放資源的流程
- 共享方式下,獲取與釋放資源的流程
- Interruptibly
- 維護(hù)AQS提供的隊(duì)列 - 入隊(duì)操作
- AQS——條件變量的支持
- 基于AQS實(shí)現(xiàn)自定義同步器
概述 AQS——鎖的底層支持
AbstractQueuedSynchronizer抽象同步隊(duì)列簡(jiǎn)稱AQS,它是實(shí)現(xiàn)同步器的基礎(chǔ)組件,并發(fā)包中鎖的底層就是使用AQS實(shí)現(xiàn)的。
另外,我們基本上直接使用AQS框架開發(fā)的機(jī)會(huì)很少,但是知道其原理對(duì)于架構(gòu)設(shè)計(jì)還是很有幫助的。
-
AQS是一個(gè)FIFO的雙向隊(duì)列,其內(nèi)部通過節(jié)點(diǎn)head和tail記錄隊(duì)首和隊(duì)尾元素,隊(duì)列元素的類型為Node。
-
其中Node中的thread變量用來存放進(jìn)入AQS隊(duì)列里面的線程
-
Node節(jié)點(diǎn)內(nèi)部的SHARED用來標(biāo)記該線程是獲取共享資源時(shí)被阻塞掛起后放入AQS隊(duì)列的,EXCLUSIVE用來標(biāo)記線程是獲取獨(dú)占資源時(shí)被掛起后放入AQS隊(duì)列的
-
waitStatus記錄當(dāng)前線程等待狀態(tài),可以為CANCELLED(線程被取消了)、SIGNAL(線程需要被喚醒)、CONDITION(線程在條件隊(duì)列里面等待)、PROPAGATE(釋放共享資源時(shí)需要通知其他節(jié)點(diǎn))
-
prev記錄當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),next記錄當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
state 的作用
在AQS中維持了一個(gè)單一的狀態(tài)信息 state,可以通過getState、setState、compareAndSetState方法修改其值。
-
對(duì)于ReentrantLock的實(shí)現(xiàn)來說,state可以用來表示當(dāng)前線程獲取鎖的可重入次數(shù);
-
對(duì)于讀寫鎖ReentrantReadWriteLock來說,state的高16位表示讀狀態(tài),也就是獲取該讀鎖的次數(shù),低16位表示獲取到寫鎖的線程的可重入次數(shù);
-
對(duì)于semaphore來說,state用來表示當(dāng)前可用信號(hào)的個(gè)數(shù)
-
對(duì)于CountDownlatch來說,state用來表示計(jì)數(shù)器當(dāng)前的值。
ConditionObject
AQS有個(gè)內(nèi)部類ConditionObject,用來結(jié)合鎖實(shí)現(xiàn)線程同步。
ConditionObject可以直接訪問AQS對(duì)象內(nèi)部的變量,比如state狀態(tài)值和AQS隊(duì)列。
ConditionObject是條件變量,每個(gè)條件變量對(duì)應(yīng)一個(gè)條件隊(duì)列(單向鏈表隊(duì)列),其用來存放調(diào)用條件變量的await方法后被阻塞的線程, 這個(gè)條件隊(duì)列的頭、尾元素分別為firstWaiter和lastWaiter。
獨(dú)占 VS 共享
對(duì)于AQS來說,線程同步的關(guān)鍵是對(duì)狀態(tài)值state進(jìn)行操作。根據(jù)state是否屬于一個(gè)線程,操作state的方式分為獨(dú)占方式和共享方式。
在獨(dú)占方式下獲取和釋放資源使用的方法為
- void acquire(int arg)
- void acquireInterruptibly(int arg)
- boolean release(int arg)
在共享方式下獲取和釋放資源的方法為
- void acquireShared(int arg)
- void acquireSharedInterruptibly(int arg)
- boolean releaseShared(int arg)
使用獨(dú)占方式獲取的資源是與具體線程綁定的,就是說如果一個(gè)線程獲取到了資源,就會(huì)標(biāo)記是這個(gè)線程獲取到了,其他線程再嘗試操作state獲取資源時(shí)會(huì)發(fā)現(xiàn)當(dāng)前該資源不是自己持有的,就會(huì)在獲取失敗后被阻塞。
比如獨(dú)占鎖ReentrantLock的實(shí)現(xiàn),當(dāng)一個(gè)線程獲取了ReentrantLock的鎖后,在AQS內(nèi)部會(huì)首先使用CAS操作把state狀態(tài)值從0變?yōu)?,然后設(shè)置當(dāng)前鎖的持有者為當(dāng)前線程,當(dāng)該線程再次獲取鎖時(shí)發(fā)現(xiàn)它就是鎖的持有者,則會(huì)把狀態(tài)值從1變?yōu)?,也就是設(shè)置可重入次數(shù),而當(dāng)另外一個(gè)線程獲取鎖時(shí)發(fā)現(xiàn)自己并不是該鎖的持有者就會(huì)被放入AQS阻塞隊(duì)列后掛起。
共享方式的資源與具體線程是不相關(guān)的,當(dāng)多個(gè)線程去請(qǐng)求資源時(shí)通過CAS方式競(jìng)爭(zhēng)獲取資源,當(dāng)一個(gè)線程獲取到了資源后,另外一個(gè)線程再次去獲取時(shí)如果當(dāng)前資源還能滿足它的需要,則當(dāng)前線程只需要使用CAS方式進(jìn)行獲取即可。
比如Semaphore信號(hào)量,當(dāng)一個(gè)線程通過acquire()方法獲取信號(hào)量時(shí),會(huì)首先看當(dāng)前信號(hào)量個(gè)數(shù)是否滿足需要,不滿足則把當(dāng)前線程放入阻塞隊(duì)列,如果滿足則通過自旋CAS獲取信號(hào)量。
獨(dú)占方式下,獲取與釋放資源的流程
在獨(dú)占方式下,獲取與釋放資源的流程如下
- (1)當(dāng)一個(gè)線程調(diào)用acquire(int arg)方法獲取獨(dú)占資源時(shí),會(huì)首先使用tryAcquire方法嘗試獲取資源,具體是設(shè)置狀態(tài)變量state的值,成功則直接返回,失敗則將當(dāng)前線程封裝為類型為Node.EXCLUSIVE的Node節(jié)點(diǎn)后插入到AQS阻塞隊(duì)列的尾部,并調(diào)用LockSupport.park(this)方法掛起自己。
- (2)當(dāng)一個(gè)線程調(diào)用release(int arg)方法時(shí)會(huì)嘗試使用tryRelease操作釋放資源,這里是設(shè)置狀態(tài)變量state的值,然后調(diào)用LockSupport.unpark(thread)方法激活A(yù)QS隊(duì)列里面被阻塞的一個(gè)線程(thread)。被激活的線程則使用tryAcquire嘗試,看當(dāng)前狀態(tài)變量state的值是否能滿足自己的需要,滿足則該線程被激活,然后繼續(xù)向下運(yùn)行,否則還是會(huì)被放入AQS隊(duì)列并被掛起。
需要注意的是,AQS類并沒有提供可用的tryAcquire和tryRelease方法,正如AQS是鎖阻塞和同步器的基礎(chǔ)框架一樣,tryAcquire和tryRelease需要由具體的子類來實(shí)現(xiàn)。
子類在實(shí)現(xiàn)tryAcquire和tryRelease時(shí)要根據(jù)具體場(chǎng)景使用CAS算法嘗試修改state狀態(tài)值,成功則返回true,否則返回false。
子類還需要定義,在調(diào)用acquire和release方法時(shí)state狀態(tài)值的增減代表什么含義。
比如繼承自AQS實(shí)現(xiàn)的獨(dú)占鎖ReentrantLock,定義當(dāng)status為0時(shí)表示鎖空閑,為1時(shí)表示鎖已經(jīng)被占用。在重寫tryAcquire時(shí),在內(nèi)部需要使用CAS算法查看當(dāng)前state是否為0,如果為0則使用CAS設(shè)置為1,并設(shè)置當(dāng)前鎖的持有者為當(dāng)前線程,而后返回true,如果CAS失敗則返回false。
比如繼承自AQS實(shí)現(xiàn)的獨(dú)占鎖在實(shí)現(xiàn)tryRelease時(shí),在內(nèi)部需要使用CAS算法把當(dāng)前state的值從1修改為0,并設(shè)置當(dāng)前鎖的持有者為null,然后返回true,如果CAS失敗則返回false。
共享方式下,獲取與釋放資源的流程
在共享方式下,獲取與釋放資源的流程如下:
- (1)當(dāng)線程調(diào)用acquireShared(int arg)獲取共享資源時(shí),會(huì)首先使用tryAcquireShared嘗試獲取資源,具體是設(shè)置狀態(tài)變量state的值,成功則直接返回,失敗則將當(dāng)前線程封裝為類型為Node.SHARED的Node節(jié)點(diǎn)后插入到AQS阻塞隊(duì)列的尾部,并使用LockSupport.park(this)方法掛起自己。
- (2)當(dāng)一個(gè)線程調(diào)用releaseShared(int arg) 時(shí)會(huì)嘗試使用tryReleaseShared操作釋放資源,這里是設(shè)置狀態(tài)變量state的值,然后使用LockSupport.unpark(thread)激活A(yù)QS隊(duì)列里面被阻塞的一個(gè)線程(thread)。被激活的線程則使用tryReleaseShared查看當(dāng)前狀態(tài)變量state的值是否能滿足自己的需要,滿足則該線程被激活,然后繼續(xù)向下運(yùn)行,否則還是會(huì)被放入AQS隊(duì)列并被掛起。
同樣需要注意的是,AQS類并沒有提供可用的tryAcquireShared和tryReleaseShared方法,正如AQS是鎖阻塞和同步器的基礎(chǔ)框架一樣,tryAcquireShared和tryReleaseShared需要由具體的子類來實(shí)現(xiàn)。
子類在實(shí)現(xiàn)tryAcquireShared和tryReleaseShared時(shí)要根據(jù)具體場(chǎng)景使用CAS算法嘗試修改state狀態(tài)值,成功則返回true,否則返回false。
比如繼承自AQS實(shí)現(xiàn)的讀寫鎖ReentrantReadWriteLock里面的讀鎖在重寫tryAcquireShared時(shí),首先查看寫鎖是否被其他線程持有,如果是則直接返回false,否則使用CAS遞增state的高16位(在ReentrantReadWriteLock中,state的高16位為獲取讀鎖的次數(shù))。
比如繼承自AQS實(shí)現(xiàn)的讀寫鎖ReentrantReadWriteLock里面的讀鎖在重寫tryReleaseShared時(shí),在內(nèi)部需要使用CAS算法把當(dāng)前state值的高16位減1,然后返回true,如果CAS失敗則返回false。
基于AQS實(shí)現(xiàn)的鎖除了需要重寫上面介紹的方法外 ,還需要重寫isHeldExclusively方法,來判斷鎖是被當(dāng)前線程獨(dú)占還是被共享。
Interruptibly
獨(dú)占方式下的 void acquire(int arg) 和void acquireInterruptibly(int arg),與共享方式下的 void acquireShared(int arg)和void acquireSharedInterruptibly(int arg),這兩套函數(shù)中都有一個(gè)帶有Interruptibly關(guān)鍵字的函數(shù),那么帶這個(gè)關(guān)鍵字和不帶有什么區(qū)別呢?
-
不帶Interruptibly關(guān)鍵字的方法的意思是不對(duì)中斷進(jìn)行響應(yīng),也就是線程在調(diào)用不帶Interruptibly關(guān)鍵字的方法獲取資源時(shí)或者獲取資源失敗被掛起時(shí),其他線程中斷了該線程,那么該線程不會(huì)因?yàn)楸恢袛喽鴴伋霎惓?#xff0c;它還是繼續(xù)獲取資源或者被掛起,也就是說不對(duì)中斷進(jìn)行響應(yīng),忽略中斷。
-
帶Interruptibly關(guān)鍵字的方法要對(duì)中斷進(jìn)行響應(yīng),也就是線程在調(diào)用帶Interruptibly關(guān)鍵字的方法獲取資源時(shí)或者獲取資源失敗被掛起時(shí),其他線程中斷了該線程,那么該線程會(huì)拋出InterruptedException異常而返回。
維護(hù)AQS提供的隊(duì)列 - 入隊(duì)操作
最后,我們來看看如何維護(hù)AQS提供的隊(duì)列,主要看入隊(duì)操作。
入隊(duì)操作: 當(dāng)一個(gè)線程獲取鎖失敗后該線程會(huì)被轉(zhuǎn)換為Node節(jié)點(diǎn),然后就會(huì)使用enq(final Node node)方法將該節(jié)點(diǎn)插入到AQS的阻塞隊(duì)列.
/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail; // 1 if (t == null) { // Must initializeif (compareAndSetHead(new Node()))// 2tail = head;} else {node.prev = t; // 3 if (compareAndSetTail(t, node)) { // 4 t.next = node;return t;}}}}下面結(jié)合代碼和節(jié)點(diǎn)圖來講解入隊(duì)的過程。
【第一次循環(huán)】
- 如上代碼在第一次循環(huán)中,當(dāng)要在AQS隊(duì)列尾部插入元素時(shí),AQS隊(duì)列狀態(tài)如下所示
也就是隊(duì)列頭、尾節(jié)點(diǎn)都指向null;
-
當(dāng)執(zhí)行代碼(1)后節(jié)點(diǎn)t指向了尾部節(jié)點(diǎn),這時(shí)候隊(duì)列狀態(tài)如下圖所示。
-
這時(shí)候t為null,故執(zhí)行代碼(2),使用CAS算法設(shè)置一個(gè)哨兵節(jié)點(diǎn)為頭節(jié)點(diǎn),如果CAS設(shè)置成功,則讓尾部節(jié)點(diǎn)也指向哨兵節(jié)點(diǎn),這時(shí)候隊(duì)列狀態(tài)如下圖所示
【第二次循環(huán)】
-
到現(xiàn)在為止只插入了一個(gè)哨兵節(jié)點(diǎn),還需要插入node節(jié)點(diǎn),所以在第二次循環(huán)后執(zhí)行到代碼(1),這時(shí)候隊(duì)列狀態(tài)如下圖所示
-
然后執(zhí)行代碼(3)設(shè)置node(入?yún)?的前驅(qū)節(jié)點(diǎn)為尾部節(jié)點(diǎn),這時(shí)候隊(duì)列狀態(tài)如下圖所示
- 然后通過CAS算法設(shè)置node節(jié)點(diǎn)為尾部節(jié)點(diǎn),CAS成功后隊(duì)列狀態(tài)如下圖所示
- CAS成功后再設(shè)置原來的尾部節(jié)點(diǎn)的后驅(qū)節(jié)點(diǎn)為node,這時(shí)候就完成了雙向鏈表的插入,此時(shí)隊(duì)列狀態(tài)如下圖所示。
AQS——條件變量的支持
我們知道notify和wait,是配合synchronized內(nèi)置鎖實(shí)現(xiàn)線程間同步的基礎(chǔ)設(shè)施一樣,條件變量的signal和await方法也是用來配合鎖(使用AQS實(shí)現(xiàn)的鎖)實(shí)現(xiàn)線程間同步的基礎(chǔ)設(shè)施。
它們的不同在于,synchronized同時(shí)只能與一個(gè)共享變量的notify或wait方法實(shí)現(xiàn)同步,而AQS的一個(gè)鎖可以對(duì)應(yīng)多個(gè)條件變量。
在調(diào)用共享變量的notify和wait方法前必須先獲取該共享變量的內(nèi)置鎖,同理,在調(diào)用條件變量的signal和await方法前也必須先獲取條件變量對(duì)應(yīng)的鎖。
那么,到底什么是條件變量呢?如何使用呢?不急,下面看一個(gè)例子。
// 1ReentrantLock lock = new ReentrantLock();// 2Condition condition = lock.newCondition();// 3lock.lock();try {System.out.println("begin wait");// 4condition.await();System.out.println("end wait");} catch (Exception e) {e.printStackTrace();} finally {// 5lock.unlock();}// 6lock.lock();try {System.out.println("begin single");// 7condition.signal();System.out.println("end single");} catch (Exception e) {e.printStackTrace();} finally {// 8lock.unlock();}-
代碼(1)創(chuàng)建了一個(gè)獨(dú)占鎖ReentrantLock對(duì)象,ReentrantLock是基于AQS實(shí)現(xiàn)的鎖。
-
代碼(2)使用創(chuàng)建的Lock對(duì)象的newCondition()方法創(chuàng)建了一個(gè)ConditionObject變量,這個(gè)變量就是Lock鎖對(duì)應(yīng)的一個(gè)條件變量。需要注意的是,一個(gè)Lock對(duì)象可以創(chuàng)建多個(gè)條件變量。
-
代碼(3)首先獲取了獨(dú)占鎖
-
代碼(4)則調(diào)用了條件變量的await()方法阻塞掛起了當(dāng)前線程。 當(dāng)其他線程調(diào)用條件變量的signal方法時(shí),被阻塞的線程才會(huì)從await處返回。需要注意的是,和調(diào)用Object的wait方法一樣,如果在沒有獲取到鎖前調(diào)用了條件變量的await方法則會(huì)拋出 java.lang.IllegalMonitorStateException異常。
-
代碼(5)則釋放了獲取的鎖。
其實(shí)這里的Lock對(duì)象等價(jià)于synchronized加上共享變量,調(diào)用lock.lock()方法就相當(dāng)于進(jìn)入了synchronized塊(獲取了共享變量的內(nèi)置鎖),調(diào)用lock.unLock()方法就相當(dāng)于退出synchronized塊。 調(diào)用條件變量的await()方法就相當(dāng)于調(diào)用共享變量的wait()方法,調(diào)用條件變量的signal方法就相當(dāng)于調(diào)用共享變量的notify()方法。調(diào)用條件變量的signalAll()方法就相當(dāng)于調(diào)用共享變量的notifyAll()方法。
經(jīng)過上面解釋,知道條件變量是什么,它是用來做什么的了。
在上面代碼中,lock.newCondition()的作用其實(shí)是new了一個(gè)在AQS內(nèi)部聲明的ConditionObject對(duì)象,ConditionObject是AQS的內(nèi)部類,可以訪問AQS內(nèi)部的變量(例如狀態(tài)變量state)和方法。在每個(gè)條件變量?jī)?nèi)部都維護(hù)了一個(gè)條件隊(duì)列,用來存放調(diào)用條件變量的await()方法時(shí)被阻塞的線程。注意這個(gè)條件隊(duì)列和AQS隊(duì)列不是一回事。
在如下代碼中,當(dāng)線程調(diào)用條件變量的await()方法時(shí)(必須先調(diào)用鎖的lock()方法獲取鎖),在內(nèi)部會(huì)構(gòu)造一個(gè)類型為Node.CONDITION的node節(jié)點(diǎn),然后將該節(jié)點(diǎn)插入條件隊(duì)列末尾,之后當(dāng)前線程會(huì)釋放獲取的鎖(也就是會(huì)操作鎖對(duì)應(yīng)的state變量的值),并被阻塞掛起。
這時(shí)候如果有其他線程調(diào)用lock.lock()嘗試獲取鎖,就會(huì)有一個(gè)線程獲取到鎖,如果獲取到鎖的線程調(diào)用了條件變量的await()方法,則該線程也會(huì)被放入條件變量的阻塞隊(duì)列,然后釋放獲取到的鎖,在await()方法處阻塞。
/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 9 創(chuàng)建新的node節(jié)點(diǎn),并插入到條件隊(duì)列的對(duì)尾 Node node = addConditionWaiter();// 10 釋放當(dāng)前線程獲取的鎖 int savedState = fullyRelease(node);int interruptMode = 0;// 11 調(diào)用park方法阻塞掛起當(dāng)前線程 while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}在如下代碼中,當(dāng)另外一個(gè)線程調(diào)用條件變量的signal方法時(shí)(必須先調(diào)用鎖的lock()方法獲取鎖),在內(nèi)部會(huì)把條件隊(duì)列里面隊(duì)頭的一個(gè)線程節(jié)點(diǎn)從條件隊(duì)列里面移除并放入AQS的阻塞隊(duì)列里面,然后激活這個(gè)線程。
/*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)// 將條件隊(duì)列的隊(duì)首移動(dòng)到AQS隊(duì)列doSignal(first);}需要注意的是,AQS只提供了ConditionObject的實(shí)現(xiàn),并沒有提供newCondition函數(shù),該函數(shù)用來new一個(gè)ConditionObject對(duì)象。需要由AQS的子類來提供newCondition函數(shù)。
下面來看當(dāng)一個(gè)線程調(diào)用條件變量的await()方法而被阻塞后,如何將其放入條件隊(duì)列。
/*** Adds a new waiter to wait queue.* @return its new wait node*/private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 1 Node node = new Node(Thread.currentThread(), Node.CONDITION);// 2 if (t == null)firstWaiter = node;else// 3 t.nextWaiter = node;// 4 lastWaiter = node;return node;}代碼(1)首先根據(jù)當(dāng)前線程創(chuàng)建一個(gè)類型為Node.CONDITION的節(jié)點(diǎn),然后通過代碼(2)(3)(4)在單向條件隊(duì)列尾部插入一個(gè)元素。
注意:當(dāng)多個(gè)線程同時(shí)調(diào)用lock.lock()方法獲取鎖時(shí),只有一個(gè)線程獲取到了鎖,其他線程會(huì)被轉(zhuǎn)換為Node節(jié)點(diǎn)插入到lock鎖對(duì)應(yīng)的AQS阻塞隊(duì)列里面,并做自旋CAS嘗試獲取鎖。
如果獲取到鎖的線程又調(diào)用了對(duì)應(yīng)的條件變量的await()方法,則該線程會(huì)釋放獲取到的鎖,并被轉(zhuǎn)換為Node節(jié)點(diǎn)插入到條件變量對(duì)應(yīng)的條件隊(duì)列里面。
這時(shí)候因?yàn)檎{(diào)用lock.lock()方法被阻塞到AQS隊(duì)列里面的一個(gè)線程會(huì)獲取到被釋放的鎖,如果該線程也調(diào)用了條件變量的await()方法則該線程也會(huì)被放入條件變量的條件隊(duì)列里面。
當(dāng)另外一個(gè)線程調(diào)用條件變量的signal()或者signalAll()方法時(shí),會(huì)把條件隊(duì)列里面的一個(gè)或者全部Node節(jié)點(diǎn)移動(dòng)到AQS的阻塞隊(duì)列里面,等待時(shí)機(jī)獲取鎖。
最后使用一個(gè)圖總結(jié)如下:一個(gè)鎖對(duì)應(yīng)一個(gè)AQS阻塞隊(duì)列,對(duì)應(yīng)多個(gè)條件變量,每個(gè)條件變量有自己的一個(gè)條件隊(duì)列。
基于AQS實(shí)現(xiàn)自定義同步器
我們基于AQS實(shí)現(xiàn)一個(gè)不可重入的獨(dú)占鎖,。
自定義AQS需要重寫一系列函數(shù),還需要定義原子變量state的含義。這里我們定義,state為0表示目前鎖沒有被線程持有,state為1表示鎖已經(jīng)被某一個(gè)線程持有,由于是不可重入鎖,所以不需要記錄持有鎖的線程獲取鎖的次數(shù)。另外,我們自定義的鎖支持條件變量。
【基于AQS實(shí)現(xiàn)的不可重入的獨(dú)占鎖】
import java.io.Serializable; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/5 22:35* @mark: show me the code , change the world*/ public class NonReentrantLock implements Lock, Serializable {//靜態(tài)內(nèi)部類,用于輔助private static class Sync extends AbstractQueuedSynchronizer{@Overrideprotected boolean tryAcquire(int arg) {assert arg == 1;//如果state為0,則嘗試獲取鎖if (compareAndSetState(0,1)){setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {assert arg == 1;//如果state為0,則嘗試獲取鎖if (getState()==0){throw new IllegalMonitorStateException();}setExclusiveOwnerThread(null);setState(0);return true;}@Overrideprotected boolean isHeldExclusively() {// 是否鎖已經(jīng)被持有return getState()==1;}//提供條件變量接口public Condition newCondition(){return new ConditionObject();}}Sync sync = new Sync();@Overridepublic void lock() {sync.acquire(1);}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {sync.release(1);}@Overridepublic Condition newCondition() {return sync.newCondition();} }在如上代碼中,NonReentrantLock定義了一個(gè)內(nèi)部類Sync用來實(shí)現(xiàn)具體的鎖的操作,Sync則繼承了AQS。由于我們實(shí)現(xiàn)的是獨(dú)占模式的鎖,所以Sync重寫了tryAcquire、tryRelease和isHeldExclusively 3個(gè)方法。另外,Sync提供了newCondition這個(gè)方法用來支持條件變量。
【使用自定義鎖實(shí)現(xiàn)生產(chǎn)—消費(fèi)模型】
import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Condition; /*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/5 22:59* @mark: show me the code , change the world*/ public class NonReentrantLockTest {static NonReentrantLock lock = new NonReentrantLock();static Condition notFull = lock.newCondition();static Condition notEmpty = lock.newCondition();static Queue<String> queue = new LinkedBlockingQueue<>();static int queueSize = 10;public static void main(String[] args) {Thread producer = new Thread(() -> {lock.lock();try {//如果隊(duì)列滿了,則等待while (queue.size() == queueSize) {notEmpty.await();}//添加隊(duì)列元素queue.add("element ");//喚醒消費(fèi)線程notFull.signalAll();} catch (InterruptedException e) {e.printStackTrace();}finally {//釋放鎖lock.unlock();}});Thread consumer = new Thread(() -> {lock.lock();try {//隊(duì)列為空,則等待while (queue.size()==0){notFull.await();}//消費(fèi)元素queue.poll();//喚醒生產(chǎn)線程notEmpty.signalAll();}catch (InterruptedException e){e.printStackTrace();} finally {lock.unlock();}});producer.start();consumer.start();} }如上代碼首先創(chuàng)建了NonReentrantLock的一個(gè)對(duì)象lock,然后調(diào)用lock.newCondition創(chuàng)建了兩個(gè)條件變量,用來進(jìn)行生產(chǎn)者和消費(fèi)者線程之間的同步。
在main 函數(shù)里面,首先創(chuàng)建了producer生產(chǎn)線程,在線程內(nèi)部首先調(diào)用lock.lock()獲取獨(dú)占鎖,然后判斷當(dāng)前隊(duì)列是否已經(jīng)滿了,如果滿了則調(diào)用notEmpty.await()阻塞掛起當(dāng)前線程。需要注意的是,這里使用while 而不是if是為了避免虛假喚醒。如果隊(duì)列不滿則直接向隊(duì)列里面添加元素,然后調(diào)用notFull.signalAll()喚醒所有因?yàn)橄M(fèi)元素而被阻塞的消費(fèi)線程,最后釋放獲取的鎖。
然后在main函數(shù)里面創(chuàng)建了consumer生產(chǎn)線程,在線程內(nèi)部首先調(diào)用lock.lock()獲取獨(dú)占鎖,然后判斷當(dāng)前隊(duì)列里面是不是有元素,如果隊(duì)列為空則調(diào)用notFull.await()阻塞掛起當(dāng)前線程。需要注意的是,這里使用while 而不是if是為了避免虛假喚醒。如果隊(duì)列不為空則直接從隊(duì)列里面獲取并移除元素,然后喚醒因?yàn)殛?duì)列滿而被阻塞的生產(chǎn)線程,最后釋放獲取的鎖。
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Java Review - 并发编程_抽象同步队列AQS的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_读
- 下一篇: Java Review - 并发编程_C