Java并发编程,Condition的await和signal等待通知机制
Condition簡(jiǎn)介
Object類(lèi)是Java中所有類(lèi)的父類(lèi), 在線(xiàn)程間實(shí)現(xiàn)通信的往往會(huì)應(yīng)用到Object的幾個(gè)方法: wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll() 實(shí)現(xiàn)等待/通知機(jī)制,同樣的, 在Java Lock體系下依然會(huì)有同樣的方法實(shí)現(xiàn)等待/通知機(jī)制。 從整體上來(lái)看Object的wait和notify/notify是與對(duì)象監(jiān)視器配合完成線(xiàn)程間的等待/通知機(jī)制,Condition與Lock配合完成等待/通知機(jī)制, 前者是Java底層級(jí)別的,后者是語(yǔ)言級(jí)別的,具有更高的可控制性和擴(kuò)展性。 兩者除了在使用方式上不同外,在功能特性上還是有很多的不同:
Condition能夠支持不響應(yīng)中斷,而通過(guò)使用Object方式不支持
Condition能夠支持多個(gè)等待隊(duì)列(new 多個(gè)Condition對(duì)象),而Object方式只能支持一個(gè)
Condition能夠支持超時(shí)時(shí)間的設(shè)置,而Object不支持
參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:
針對(duì)Object的wait方法
針對(duì)Object的notify/notifyAll方法
Condition實(shí)現(xiàn)原理分析
等待隊(duì)列
創(chuàng)建一個(gè)Condition對(duì)象是通過(guò)lock.newCondition(), 而這個(gè)方法實(shí)際上是會(huì)創(chuàng)建ConditionObject對(duì)象,該類(lèi)是AQS的一個(gè)內(nèi)部類(lèi)。 Condition是要和Lock配合使用的也就是Condition和Lock是綁定在一起的,而lock的實(shí)現(xiàn)原理又依賴(lài)于AQS, 自然而然ConditionObject作為AQS的一個(gè)內(nèi)部類(lèi)無(wú)可厚非。 我們知道在鎖機(jī)制的實(shí)現(xiàn)上,AQS內(nèi)部維護(hù)了一個(gè)同步隊(duì)列,如果是獨(dú)占式鎖的話(huà), 所有獲取鎖失敗的線(xiàn)程的尾插入到同步隊(duì)列, 同樣的,Condition內(nèi)部也是使用同樣的方式,內(nèi)部維護(hù)了一個(gè)等待隊(duì)列, 所有調(diào)用condition.await方法的線(xiàn)程會(huì)加入到等待隊(duì)列中,并且線(xiàn)程狀態(tài)轉(zhuǎn)換為等待狀態(tài)。 另外注意到ConditionObject中有兩個(gè)成員變量:
/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;復(fù)制代碼ConditionObject通過(guò)持有等待隊(duì)列的頭尾指針來(lái)管理等待隊(duì)列。 注意Node類(lèi)復(fù)用了在AQS中的Node類(lèi),Node類(lèi)有這樣一個(gè)屬性:
//后繼節(jié)點(diǎn)Node nextWaiter;復(fù)制代碼等待隊(duì)列是一個(gè)單向隊(duì)列,而在之前說(shuō)AQS時(shí)知道同步隊(duì)列是一個(gè)雙向隊(duì)列。
等待隊(duì)列示意圖:
注意: 我們可以多次調(diào)用lock.newCondition()方法創(chuàng)建多個(gè)Condition對(duì)象,也就是一個(gè)lLock可以持有多個(gè)等待隊(duì)列。 利用Object的方式實(shí)際上是指在對(duì)象Object對(duì)象監(jiān)視器上只能擁有一個(gè)同步隊(duì)列和一個(gè)等待隊(duì)列; 并發(fā)包中的Lock擁有一個(gè)同步隊(duì)列和多個(gè)等待隊(duì)列。示意圖如下:
ConditionObject是AQS的內(nèi)部類(lèi), 因此每個(gè)ConditionObject能夠訪(fǎng)問(wèn)到AQS提供的方法,相當(dāng)于每個(gè)Condition都擁有所屬同步器的引用。
await實(shí)現(xiàn)原理
當(dāng)調(diào)用condition.await()方法后會(huì)使得當(dāng)前獲取lock的線(xiàn)程進(jìn)入到等待隊(duì)列, 如果該線(xiàn)程能夠從await()方法返回的話(huà)一定是該線(xiàn)程獲取了與condition相關(guān)聯(lián)的lock。 await()方法源碼如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 1. 將當(dāng)前線(xiàn)程包裝成Node,尾插法插入到等待隊(duì)列中Node node = addConditionWaiter(); // 2. 釋放當(dāng)前線(xiàn)程所占用的lock,在釋放的過(guò)程中會(huì)喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 3. 當(dāng)前線(xiàn)程進(jìn)入到等待狀態(tài)LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;} // 4. 自旋等待獲取到同步狀態(tài)(即獲取到lock)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters(); // 5. 處理被中斷的情況if (interruptMode != 0)reportInterruptAfterWait(interruptMode); }復(fù)制代碼當(dāng)前線(xiàn)程調(diào)用condition.await()方法后,會(huì)使得當(dāng)前線(xiàn)程釋放lock然后加入到等待隊(duì)列中, 直至被signal/signalAll后會(huì)使得當(dāng)前線(xiàn)程從等待隊(duì)列中移至到同步隊(duì)列中去, 直到獲得了lock后才會(huì)從await方法返回,或者在等待時(shí)被中斷會(huì)做中斷處理。
addConditionWaiter()將當(dāng)前線(xiàn)程添加到等待隊(duì)列中,其源碼如下:
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;} //將當(dāng)前線(xiàn)程包裝成NodeNode node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //t==null,同步隊(duì)列為空的情況firstWaiter = node; else //尾插法t.nextWaiter = node; //更新lastWaiterlastWaiter = node; return node; }復(fù)制代碼這里通過(guò)尾插法將當(dāng)前線(xiàn)程封裝的Node插入到等待隊(duì)列中, 同時(shí)可以看出等待隊(duì)列是一個(gè)不帶頭結(jié)點(diǎn)的鏈?zhǔn)疥?duì)列,之前我們學(xué)習(xí)AQS時(shí)知道同步隊(duì)列是一個(gè)帶頭結(jié)點(diǎn)的鏈?zhǔn)疥?duì)列。
將當(dāng)前節(jié)點(diǎn)插入到等待對(duì)列之后,使用fullyRelease(0)方法釋放當(dāng)前線(xiàn)程釋放lock,源碼如下:
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { //成功釋放同步狀態(tài)failed = false; return savedState;} else { //不成功釋放同步狀態(tài)拋出異常throw new IllegalMonitorStateException();}} finally { if (failed)node.waitStatus = Node.CANCELLED;} }復(fù)制代碼調(diào)用AQS的模板方法release()方法釋放AQS的同步狀態(tài)并且喚醒在同步隊(duì)列中頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)引用的線(xiàn)程, 如果釋放成功則正常返回,若失敗的話(huà)就拋出異常。
如何從await()方法中退出?再看await()方法有這樣一段代碼:
while (!isOnSyncQueue(node)) { // 3. 當(dāng)前線(xiàn)程進(jìn)入到等待狀態(tài)LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }復(fù)制代碼當(dāng)線(xiàn)程第一次調(diào)用condition.await()方法時(shí), 會(huì)進(jìn)入到這個(gè)while()循環(huán)中,然后通過(guò)LockSupport.park(this)方法使得當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài), 那么要想退出這個(gè)await方法就要先退出這個(gè)while循環(huán),退出while循環(huán)的出口有2個(gè):
break退出while循環(huán)
while循環(huán)中的邏輯判斷為false
第1種情況的條件是當(dāng)前等待的線(xiàn)程被中斷后會(huì)走到break退出,
第2種情況是當(dāng)前節(jié)點(diǎn)被移動(dòng)到了同步隊(duì)列中,(即另外線(xiàn)程調(diào)用的condition的signal或者signalAll方法), while中邏輯判斷為false后結(jié)束while循環(huán)。
當(dāng)退出while循環(huán)后就會(huì)調(diào)用acquireQueued(node, savedState),該方法的作用是 在自旋過(guò)程中線(xiàn)程不斷嘗試獲取同步狀態(tài),直至成功(線(xiàn)程獲取到lock)。
這樣就說(shuō)明了退出await方法必須是已經(jīng)獲得了Condition引用(關(guān)聯(lián))的Lock。
await方法示意圖如下:
調(diào)用condition.await方法的線(xiàn)程必須是已經(jīng)獲得了lock,也就是當(dāng)前線(xiàn)程是同步隊(duì)列中的頭結(jié)點(diǎn)。 調(diào)用該方法后會(huì)使得當(dāng)前線(xiàn)程所封裝的Node尾插入到等待隊(duì)列中。
超時(shí)機(jī)制的支持
condition還額外支持了超時(shí)機(jī)制,使用者可調(diào)用方法awaitNanos,awaitUtil。 這兩個(gè)方法的實(shí)現(xiàn)原理,基本上與AQS中的tryAcquire方法如出一轍。
不響應(yīng)中斷的支持
調(diào)用condition.awaitUninterruptibly()方法,該方法的源碼為:
public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted())interrupted = true;} if (acquireQueued(node, savedState) || interrupted)selfInterrupt(); }復(fù)制代碼與上面的await方法基本一致,只不過(guò)減少了對(duì)中斷的處理, 并省略了reportInterruptAfterWait方法拋被中斷的異常。
signal和signalAll實(shí)現(xiàn)原理
調(diào)用Condition的signal或者signalAll方法可以將 等待隊(duì)列中等待時(shí)間最長(zhǎng)的節(jié)點(diǎn)移動(dòng)到同步隊(duì)列中,使得該節(jié)點(diǎn)能夠有機(jī)會(huì)獲得lock。 按照等待隊(duì)列是先進(jìn)先出(FIFO)的, 所以等待隊(duì)列的頭節(jié)點(diǎn)必然會(huì)是等待時(shí)間最長(zhǎng)的節(jié)點(diǎn), 也就是每次調(diào)用condition的signal方法是將頭節(jié)點(diǎn)移動(dòng)到同步隊(duì)列中。 signal()源碼如下:
public final void signal() { //1. 先檢測(cè)當(dāng)前線(xiàn)程是否已經(jīng)獲取lockif (!isHeldExclusively()) throw new IllegalMonitorStateException(); //2. 獲取等待隊(duì)列中第一個(gè)節(jié)點(diǎn),之后的操作都是針對(duì)這個(gè)節(jié)點(diǎn) Node first = firstWaiter; if (first != null)doSignal(first); }復(fù)制代碼signal方法首先會(huì)檢測(cè)當(dāng)前線(xiàn)程是否已經(jīng)獲取lock, 如果沒(méi)有獲取lock會(huì)直接拋出異常,如果獲取的話(huà)再得到等待隊(duì)列的頭指針引用的節(jié)點(diǎn),doSignal方法也是基于該節(jié)點(diǎn)。 doSignal方法源碼如下:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null; //1. 將頭結(jié)點(diǎn)從等待隊(duì)列中移除first.nextWaiter = null; //2. while中transferForSignal方法對(duì)頭結(jié)點(diǎn)做真正的處理} while (!transferForSignal(first) &&(first = firstWaiter) != null); }復(fù)制代碼真正對(duì)頭節(jié)點(diǎn)做處理的是transferForSignal(),該方法源碼如下:
final boolean transferForSignal(Node node) { //1. 更新?tīng)顟B(tài)為0if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //2.將該節(jié)點(diǎn)移入到同步隊(duì)列中去Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }復(fù)制代碼這段代碼主要做了兩件事情:
1.將頭結(jié)點(diǎn)的狀態(tài)更改為CONDITION
2.調(diào)用enq方法,將該節(jié)點(diǎn)尾插入到同步隊(duì)列中
調(diào)用condition的signal的前提條件是 當(dāng)前線(xiàn)程已經(jīng)獲取了lock,該方法會(huì)使得等待隊(duì)列中的頭節(jié)點(diǎn)(等待時(shí)間最長(zhǎng)的那個(gè)節(jié)點(diǎn))移入到同步隊(duì)列, 而移入到同步隊(duì)列后才有機(jī)會(huì)使得等待線(xiàn)程被喚醒, 即從await方法中的LockSupport.park(this)方法中返回,從而才有機(jī)會(huì)使得調(diào)用await方法的線(xiàn)程成功退出。
signal方法示意圖如下:
signalAll
sigllAll與sigal方法的區(qū)別體現(xiàn)在doSignalAll方法上。doSignalAll()的源碼如下:
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null); }復(fù)制代碼doSignal方法只會(huì)對(duì)等待隊(duì)列的頭節(jié)點(diǎn)進(jìn)行操作,而doSignalAll方法將等待隊(duì)列中的每一個(gè)節(jié)點(diǎn)都移入到同步隊(duì)列中, 即“通知”當(dāng)前調(diào)用condition.await()方法的每一個(gè)線(xiàn)程。
await與signal和signalAll的結(jié)合
await和signal和signalAll方法就像一個(gè)開(kāi)關(guān)控制著線(xiàn)程A(等待方)和線(xiàn)程B(通知方)。 它們之間的關(guān)系可以用下面一個(gè)圖來(lái)表現(xiàn)得更加貼切:
線(xiàn)程awaitThread先通過(guò)lock.lock()方法獲取鎖成功后調(diào)用了condition.await方法進(jìn)入等待隊(duì)列, 而另一個(gè)線(xiàn)程signalThread通過(guò)lock.lock()方法獲取鎖成功后調(diào)用了condition.signal或者signalAll方法, 使得線(xiàn)程awaitThread能夠有機(jī)會(huì)移入到同步隊(duì)列中, 當(dāng)其他線(xiàn)程釋放lock后使得線(xiàn)程awaitThread能夠有機(jī)會(huì)獲取lock, 從而使得線(xiàn)程awaitThread能夠從await方法中退出,然后執(zhí)行后續(xù)操作。 如果awaitThread獲取lock失敗會(huì)直接進(jìn)入到同步隊(duì)列。
轉(zhuǎn)載于:https://juejin.im/post/5ce63cc4f265da1bb564d0f8
總結(jié)
以上是生活随笔為你收集整理的Java并发编程,Condition的await和signal等待通知机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 日志单例log4cpp简述
- 下一篇: 想做大牛,Java开发的必备技术点你了解