并发编程之ReentrantLock--Condition
介紹
Condition是條件鎖的意思,是指在獲取鎖之后發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。
最常見的生產者消費者模式,生產的元素都被消費完,則需要阻塞消費者,不可繼續消費直到生產者生產新的后喚醒消費者;或者生產者生產過快,容器已滿,則需要阻塞生產者,直到消費者消費一定數量才喚醒生產者。
Condition是一個接口,其定義如下
public interface Condition {//阻塞當前線程,等待同一個Condition對象上的signal()/signalAll()喚醒//該方法響應中斷,如果發生中斷,該方法拋出InterruptedException異常void await() throws InterruptedException;///阻塞當前線程,等待同一個Condition對象上的signal()/signalAll()喚醒//與上面方法的區別是,該方法等待過程中不響應中斷void awaitUninterruptibly();//阻塞線程,線程被阻塞指定的時間//當線程被中斷、超時或者signal()/signalAll(),都會喚醒線程long awaitNanos(long nanosTimeout) throws InterruptedException;//同awaitNanos()boolean await(long time, TimeUnit unit) throws InterruptedException;//同awaitNanos()boolean awaitUntil(Date deadline) throws InterruptedException;//喚醒一個等待線程void signal();//喚醒所有的等待線程void signalAll(); }AbstractQueuedSynchronizer提供了一個Condition的實現類ConditionObject,可以通過調用Lock.newCondition()獲得一個Condition對象。每個Condition對象都與一個Lock對象相關,調用Condition對象的方法前必須獲得對應Lock對象的鎖。
Condition的作用與Object的wait()/notify()作用類似,調用await()可以阻塞當前線程,signal()/signalAll()可以喚醒其他阻塞線程。
基本使用
public class ConditionThread{public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock();Condition condition = lock.newCondition();new Thread(()->{try {lock.lock(); try {System.out.println("await阻塞前"); // 等待條件condition.await(); System.out.println("await阻塞后"); } finally {lock.unlock();System.out.println("await阻塞線程解鎖");}} catch (InterruptedException e) {e.printStackTrace();}}).start();//睡眠讓await線程先拿到鎖Thread.sleep(1000);lock.lock(); try {System.out.println("signal喚醒前");condition.signal();System.out.println("signal喚醒后");} finally {System.out.println("signal喚醒線程解鎖");lock.unlock(); }} }運行結果:
await阻塞前 signal喚醒前 signal喚醒后 signal喚醒線程解鎖 await阻塞后 await阻塞線程解鎖可以看出,上方線程先拿到鎖,執行到await方法后阻塞了,下方線程拿到鎖,執行signal,直到下方線程執行完unlock操作,上方線程被喚醒,繼續后續操作。
源碼解析
AQS中有兩個隊列,一個為AQS同步隊列,未拿到鎖的線程會放入此隊列中;另一個是
Condition隊列,調用await會放入到Condition隊列。兩個有幾點不同,AQS同步隊列是雙向的,而Condition隊列是單向的;AQS同步隊列的首節點是虛擬節點,而Condition隊列的首節點是真實節點,參與排隊。
Condition隊列如下圖:(AQS同步隊列結構可見前文)
await
public final void await() throws InterruptedException {//線程中斷,拋出異常if (Thread.interrupted())throw new InterruptedException();//添加節點到Condition隊列中,并返回該節點Node node = addConditionWaiter();// 完全釋放當前線程獲取的鎖// 因為鎖是可重入的,state >= 1,所以這里要把獲取的鎖全部釋放int savedState = fullyRelease(node);int interruptMode = 0;//是否在同步隊列中//注意這是aqs同步隊列,區分Condition隊列while (!isOnSyncQueue(node)) {// 阻塞當前線程LockSupport.park(this);// 上面部分是調用await()時釋放自己占有的鎖,并阻塞自己等待條件的出現// *************************分界線************************* //// 下面部分是條件已經出現,嘗試去獲取鎖//中斷線程一系列操作if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 嘗試獲取鎖// 如果沒獲取到會再次阻塞(前文對此方法有說明)if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 清除取消的節點if (node.nextWaiter != null)unlinkCancelledWaiters();// 線程中斷相關if (interruptMode != 0)//拋出InterruptedException,重新中斷當前線程//或不執行任何操作,具體取決于interruptMode值reportInterruptAfterWait(interruptMode);}addConditionWaiter新建Node節點添加至Condition隊列
private Node addConditionWaiter() {//獲取條件隊列的尾節點Node t = lastWaiter;// 如果條件隊列的尾節點不為初始狀態,從頭節點開始清除所有不為初始狀態節點if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}// 新建一個節點,它的等待狀態是CONDITIONNode node = new Node(Thread.currentThread(), Node.CONDITION);// 如果尾節點為空,則把新節點賦值給頭節點(相當于初始化隊列)// 否則把新節點賦值給尾節點的nextWaiter指針if (t == null)firstWaiter = node;elset.nextWaiter = node;// 尾節點指向新節點lastWaiter = node;// 返回新節點return node;}fullyRelease 完全釋放鎖
final int fullyRelease(Node node) {boolean failed = true;try {//獲取state,state >= 1,下面方法將state改為0表示釋放鎖int savedState = getState();//釋放可重入鎖,此方法上文有介紹if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {//失敗將waitStatus設置為1,表示已取消的節點if (failed)node.waitStatus = Node.CANCELLED;}}isOnSyncQueue判斷是否在AQS同步隊列。
我們先說明下AQS同步隊列和Condition隊列中waitStatus等待狀態的區別:
1、在Condition隊列中,新建節點的初始等待狀態是CONDITION(-2);
2、移到AQS的隊列中時等待狀態會更改為0(AQS同步隊列節點的初始等待狀態為0);
3、在AQS的隊列中如果需要阻塞,會把它上一個節點的等待狀態設置為SIGNAL(-1);
4、Condition隊列和AQS隊列中,已取消節點的等待狀態設置為CANCELLED(1);
5、共享鎖的時候還有另外一種等待狀態叫PROPAGATE(-3)。
findNodeFromTail 從AQS的尾節點開始往前尋找看是否可以找到當前節點
private boolean findNodeFromTail(Node node) {Node t = tail;//從tail尾節點往前循環//存在node節點返回truefor (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}unlinkCancelledWaiters 清除取消的節點
private void unlinkCancelledWaiters() {//取Condition隊列首節點Node t = firstWaiter;Node trail = null;//從首節點開始循環Condition隊列while (t != null) {Node next = t.nextWaiter;//waitStatus 不為初始狀態,則從Condition隊列中清除if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = next;elsetrail.nextWaiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}}signal
public final void signal() {// 判斷持有鎖的線程不為當前線程,拋出異常// 說明signal()要在獲取鎖之后執行if (!isHeldExclusively())throw new IllegalMonitorStateException();//取Condition隊列首元素,執行doSignal操作Node first = firstWaiter;if (first != null)doSignal(first);}doSignal執行喚醒操作,循環執行transferForSignal方法,直到transferForSignal方法返回true或執行到尾節點,意思從頭開始循環,成功喚醒一位則退出循環
private void doSignal(Node first) {do {// 移到條件隊列的頭節點往后一位if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;// 相當于把頭節點從隊列中出隊first.nextWaiter = null;// 轉移節點到AQS隊列中} while (!transferForSignal(first) &&(first = firstWaiter) != null);}transferForSignal將Node節點從Condition隊列移動到AQS同步隊列
final boolean transferForSignal(Node node) {// 把節點的狀態更改為0,也就是說即將移到AQS隊列中// 如果失敗了,說明節點已經被改成取消狀態// 返回false,通過上面的循環可知會尋找下一個可用節點if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// 調用AQS的入隊方法把節點移到AQS的隊列中// 注意,這里enq()的返回值是node的上一個節點,也就是舊尾節點Node p = enq(node);// 上一個節點的等待狀態int ws = p.waitStatus;// 如果上一個節點已取消了,或者更新狀態為SIGNAL失敗(也是說明上一個節點已經取消了)// 則直接喚醒當前節點對應的線程if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);// 如果更新上一個節點的等待狀態為SIGNAL成功了// 則返回true,這時上面的循環不成立了,退出循環,也就是只通知了一個節點// 此時當前節點還是阻塞狀態// 也就是說調用signal()的時候并不會真正喚醒一個節點// 只是把節點從條件隊列移到AQS隊列中return true;}從上述源碼可知,signal()方法并不會真正喚醒一個節點,只是把節點從Condition條件隊列移到AQS隊列中,一直等到unlock解鎖后才會真正喚醒。
signalAll
public final void signalAll() {// 判斷持有鎖的線程不為當前線程,拋出異常// 說明signal()要在獲取鎖之后執行if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}doSignalAll和doSignal很相似,doSignal從Condition隊列頭元素往后循環,成功轉移一位元素則完成,而doSignalAll將所有符合條件的元素轉移至AQS同步隊列
private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;//從頭節點開始循環轉移,至到最后一個元素do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}總結
await()方法的流程:
1、新建一個節點加入到條件隊列中去;
2、完全釋放當前線程占有的鎖;
3、阻塞當前線程,并等待條件的出現;
8、條件已出現(此時節點已經移到AQS的隊列中),嘗試獲取鎖;
signal()方法的流程為:456
4、從條件隊列的頭節點開始尋找一個非取消狀態的節點;
5、把它從條件隊列移到AQS隊列;
6、且只移動一個節點
7、signal線程unlock釋放鎖
需要注意一點:signal()方法后,最終會執行unlock()方法,此時才會真正喚醒一個節點,喚醒的這個節點如果曾經是條件節點的話又會繼續執行await()方法LockSupport.park(this)下面的代碼。
總結
以上是生活随笔為你收集整理的并发编程之ReentrantLock--Condition的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: POJ3159 Candies 差分
- 下一篇: 基于TensorFlow的SSD车辆检测