多线程锁--怎么理解Condition
在java.util.concurrent包中,有兩個很特殊的工具類,Condition和ReentrantLock,使用過的人都知道,ReentrantLock(重入鎖)是jdk的concurrent包提供的一種獨占鎖的實現。它繼承自Dong Lea的?AbstractQueuedSynchronizer(同步器),確切的說是ReentrantLock的一個內部類繼承了AbstractQueuedSynchronizer,ReentrantLock只不過是代理了該類的一些方法,可能有人會問為什么要使用內部類在包裝一層? 我想是安全的關系,因為AbstractQueuedSynchronizer中有很多方法,還實現了共享鎖,Condition(稍候再細說)等功能,如果直接使ReentrantLock繼承它,則很容易出現AbstractQueuedSynchronizer中的API被無用的情況。
言歸正傳,今天,我們討論下Condition工具類的實現。
ReentrantLock和Condition的使用方式通常是這樣的:
運行后,結果如下:
可以看到,
Condition的執行方式,是當在線程1中調用await方法后,線程1將釋放鎖,并且將自己沉睡,等待喚醒,
線程2獲取到鎖后,開始做事,完畢后,調用Condition的signal方法,喚醒線程1,線程1恢復執行。
以上說明Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶調用)時?,這些等待線程才會被喚醒,從而重新爭奪鎖。
那,它是怎么實現的呢?
首先還是要明白,reentrantLock.newCondition() 返回的是Condition的一個實現,該類在AbstractQueuedSynchronizer中被實現,叫做newCondition()
它可以訪問AbstractQueuedSynchronizer中的方法和其余內部類(?AbstractQueuedSynchronizer是個抽象類,至于他怎么能訪問,這里有個很奇妙的點,后面我專門用demo說明?)
現在,我們一起來看下Condition類的實現,還是從上面的demo入手,
為了方便書寫,我將AbstractQueuedSynchronizer縮寫為AQS
當await被調用時,代碼如下:
| public final void await() throws InterruptedException { if (Thread.interrupted()) ?throw new InterruptedException(); ?Node node = addConditionWaiter();?//將當前線程包裝下后, ???????????????????????????????????//添加到Condition自己維護的一個鏈表中。 int savedState = fullyRelease(node);//釋放當前線程占有的鎖,從demo中看到, ???????????????????????????????????????//調用await前,當前線程是占有鎖的 int interruptMode = 0; ?while (!isOnSyncQueue(node)) {//釋放完畢后,遍歷AQS的隊列,看當前節點是否在隊列中, ???????????????????????????//不在 說明它還沒有競爭鎖的資格,所以繼續將自己沉睡。 ?????????????????????????????//直到它被加入到隊列中,聰明的你可能猜到了, ????????????????????????????//沒有錯,在singal的時候加入不就可以了? ?LockSupport.park(this); ?if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) ?break; ?} //被喚醒后,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) ?interruptMode = REINTERRUPT; ?if (node.nextWaiter != null) ?unlinkCancelledWaiters(); ?if (interruptMode != 0) ?reportInterruptAfterWait(interruptMode); ?} |
回到上面的demo,鎖被釋放后,線程1開始沉睡,這個時候線程因為線程1沉睡時,會喚醒AQS隊列中的頭結點,所所以線程2會開始競爭鎖,并獲取到,等待3秒后,線程2會調用signal方法,“發出”signal信號,signal方法如下:
| public final void signal() { ?if (!isHeldExclusively()) ?throw new IllegalMonitorStateException(); ?Node first = firstWaiter; //firstWaiter為condition自己維護的一個鏈表的頭結點, ??????????????????????????//取出第一個節點后開始喚醒操作 ?if (first != null) ?doSignal(first); ?} |
說明下,其實Condition內部維護了等待隊列的頭結點和尾節點,該隊列的作用是存放等待signal信號的線程,該線程被封裝為Node節點后存放于此。
關鍵的就在于此,我們知道AQS自己維護的隊列是當前等待資源的隊列,AQS會在資源被釋放后,依次喚醒隊列中從前到后的所有節點,使他們對應的線程恢復執行。直到隊列為空。
而Condition自己也維護了一個隊列,該隊列的作用是維護一個等待signal信號的隊列,兩個隊列的作用是不同,事實上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:
1. 線程1調用reentrantLock.lock時,線程被加入到AQS的等待隊列中。
2. 線程1調用await方法被調用時,該線程從AQS中移除,對應操作是鎖的釋放。
3. 接著馬上被加入到Condition的等待隊列中,以為著該線程需要signal信號。
4. 線程2,因為線程1釋放鎖的關系,被喚醒,并判斷可以獲取鎖,于是線程2獲取鎖,并被加入到AQS的等待隊列中。
5. ?線程2調用signal方法,這個時候Condition的等待隊列中只有線程1一個節點,于是它被取出來,并被加入到AQS的等待隊列中。??注意,這個時候,線程1?并沒有被喚醒。
6. signal方法執行完畢,線程2調用reentrantLock.unLock()方法,釋放鎖。這個時候因為AQS中只有線程1,于是,AQS釋放鎖后按從頭到尾的順序喚醒線程時,線程1被喚醒,于是線程1回復執行。
7.?直到釋放所整個過程執行完畢。
可以看到,整個協作過程是靠結點在AQS的等待隊列和Condition的等待隊列中來回移動實現的,Condition作為一個條件類,很好的自己維護了一個等待信號的隊列,并在適時的時候將結點加入到AQS的等待隊列中來實現的喚醒操作。
看到這里,signal方法的代碼應該不難理解了。
取出頭結點,然后doSignal
| private void doSignal(Node first) { ?do { ?if ( (firstWaiter = first.nextWaiter) == null) //修改頭結點,完成舊頭結點的移出工作 ?lastWaiter = null; ?first.nextWaiter = null; ?} while (!transferForSignal(first) &&//將老的頭結點,加入到AQS的等待隊列中 ?(first = firstWaiter) != null); ?} final boolean transferForSignal(Node node) { ?/* ?* If cannot change waitStatus, the node has been cancelled. ?*/ ?if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) ?return false; /* ?* Splice onto queue and try to set waitStatus of predecessor to ?* indicate that thread is (probably) waiting. If cancelled or ?* attempt to set waitStatus fails, wake up to resync (in which ?* case the waitStatus can be transiently and harmlessly wrong). ?*/ ?Node p = enq(node); ?int ws = p.waitStatus; //如果該結點的狀態為cancel?或者修改waitStatus失敗,則直接喚醒。 ?if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) ?LockSupport.unpark(node.thread); ?return true; ?} |
可以看到,正常情況?ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 這個判斷是不會為true的,所以,不會在這個時候喚醒該線程。
只有到發送signal信號的線程調用reentrantLock.unlock()后因為它已經被加到AQS的等待隊列中,所以才會被喚醒。
總結:
? ? ?本文從代碼的角度說明了Condition的實現方式,其中,涉及到了AQS的很多操作,比如AQS的等待隊列實現獨占鎖功能,不過,這不是本文討論的重點,等有機會再將AQS的實現單獨分享出來。
更多
Zemanta ???? 0.00?avg. rating (0% score) -?0?votes?program language?AbstractQueuedSynchronizer,?Condition,?java,?多線程permalink
Post navigation
?使用WORDPRESS搭建自己的博客 關于CONDITION中訪問AQS方法的問題?7 thoughts on “怎么理解Condition”
向樓主請教一個問題,如最后的流程所述:
“1. 線程1調用reentrantLock.lock時,線程被加入到AQS的等待隊列中。”
此時無線程爭用鎖,線程1會先tryAcquire一次,成功則無需入隊。因此我認為此處線程1并未加入到AQS的等待隊列中。
“2. 線程1調用await方法被調用時,該線程從AQS中移除,對應操作是鎖的釋放。”
我理解lock之后unlock之前都是在臨界區內,此時調await直接釋放鎖(離開臨界區)OK,但無需從AQS移除,因為移除是即將進入臨界區那一刻的事情。
“4. 線程2,因為線程1釋放鎖的關系,被喚醒,并判斷可以獲取鎖,于是線程2獲取鎖,并被加入到AQS的等待隊列中。”
同理,我認為最后一句加入到AQS隊列有誤。
另外,樓主的代碼中變量名最好改成thread1和thread2方便對號入座,謝謝!
回復你好,根據你的描述,依次回復下你的問題:
1. AQS中維護者唯一的一個隊列,該隊列支持兩種模式:獨占模式和共享模式,本文中提到的reentrantlock使用的是其獨占模式,該隊列描述了多線程環境下對鎖資源的占用情況,其中,頭結點即是表明占有該資源的線程。
所以,如果線程1成功獲取鎖,則線程1會被包裝成一個Node(AQS中的內部數據結構)加入到AQS的隊列中,你所說的并未加入,是不準確的。
2.調用await方法后,是會從AQS的該隊列中移除該Node的,從我本文貼出的源碼中可以看到,在await方法中有fullyRelease操作,這個操作會引起結點的移除。
最后,再說明下,AQS只是維護了一個在多線程環境下對某個資源的占用情況,對外,可以理解成“臨界區” 但在AQS內部來說,不過是檢查在當前條件下是否可以獲取資源這種操作的一種封裝。所以,AQS的隊列上掛了所有對該資源請求的線程,而AQS定義了頭結點是表示占有該資源的線程(獨占模式)。在共享模式下,則隊列上的一系列結點都可以同時占有資源,對應于,喚醒的時候,這一些列線程都會被喚醒。
回復感謝樓主的回復。
我查了源碼,ReentrantLock.lock()調了內部類Sync的抽象方法lock,后者有一個公平和另一個不公平的實現。以不公平的實現NonfairSync(默認)為例,lock方法源碼為:
final void lock() {
if (compareAndSetState(0, 1))//Try immediate barge
setExclusiveOwnerThread(Thread.currentThread());
else//backing up to normal acquire on failure.
acquire(1);
}
如果cas操作成功,直接進入臨界區(執行lock后續的代碼)否則走常規流程調acquire(),
acquire源碼為:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire如果成功,返回true,if表達式短路就直接結束了。
似乎沒有源碼能對應到包裝成一個Node加入AQS隊列?
如果tryAcquire 成功了,沒有必要增加到AQS的等待隊列中了, 反之,如果增加不成功,進入到acquireQueued方法中去,則會將當先現線程增加到AQS的等待隊列中去的。
回復再看fullyRelease的源碼(似乎沒有出現結點從隊列移除的代碼):
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
if (release(savedState)) {//調用release
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
它調用了release:
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
再調用了tryRelease,如果成功,喚醒AQS隊列的頭結點讓它嘗試進入臨界區(因此我理解的AQS隊列上的每個結點都代表了一個正等待進入臨界區而被block的線程)
而tryRelease純粹是狀態值的操作,也不涉及出隊列:
protected final boolean tryRelease(int releases) {
int c = getState() – releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor 方法中有移除節點的方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
總結
以上是生活随笔為你收集整理的多线程锁--怎么理解Condition的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机课程设计
- 下一篇: PyRun_SimpleString的无