Java高并发系列 — AQS
只懂volatile和CAS是不是可以無視concurrent包了呢,發現一個好鏈接,繼續死磕,第一日;
首先,我承認很多時候要去看源碼才能更好搞懂一些事,但如果站在巨人肩膀上呢?有了大概思想源碼看還是挺容易的,關鍵是要調試一次,特別是類似CountDownLatch這樣的。
另外,沒有完全理解我是不會記錄一個字的,寫這個目的就是為了快速復習而已;不開玩笑,這個AQS我多年前也看過一次,不用也全忘了。
最后,文本是在自己理解下做的另一番解析,和原文不同。
介紹一副肩膀
https://www.cnblogs.com/waterystone/p/4920797.html
首先是AQS的整個構造,我喜歡圖,上圖:
就是上圖的構造,結合volatile(上圖的volatile int state),CAS(各種如compareAndSetTail()方法),for(;;)自旋,整個concurrent包的靈魂就出來了。
初認識AQS只需要知道用它分獲取和釋放兩種操作,每個操作分為獨占和共享,也就是acqurie、release、acquireShared、releaseshare這四個方法,可以多個子類同步器,如CyclicBarrire
記錄文:
獨占鎖acquire(int)
還有得到個好東西,一圖明確acquire()方法,如下圖:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}?
附帶acquire()的總結:
這里有個有趣的問題就是在加入等待隊列之后,線程是否應該進入wait狀態【圖中的park(),其實是用Unsafe.unpark實現的。unpark使得線程不需要獲取鎖進入wait狀態】,加入隊尾后,利用acquireQueued休息:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;//標記是否成功拿到資源try {boolean interrupted = false;//標記等待過程中是否被中斷過//又是一個“自旋”!這里自旋什么時候會跳出呢?看return可知,只有在獲取到了鎖會退出,如果獲取不到,那么就會被unpark把線程帶到wait狀態。//當unpark喚醒后會繼續自旋看自己是否是老二,一般情況下就是了,然后退出循環,返回中斷狀態,用作補償處理,比較wai是不響應中斷的for (;;) {final Node p = node.predecessor();//拿到前驅//如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。if (p == head && tryAcquire(arg)) {setHead(node);//拿到資源后,將head指向該結點。所以head所指的標桿結點,就是當前獲取到資源的那個結點或null。p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味著之前拿完資源的結點出隊了!failed = false;return interrupted;//返回等待過程中是否被中斷過 }//如果自己可以休息了,就進入waiting狀態,直到被unpark()if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標記為true }} finally {if (failed)cancelAcquire(node);} }
? acquireQueued方法是如何找到安全點的呢,找安全點其實就是找到一個前驅節點,該節點的waitStatus == Node.SIGNAL,而這個整型值為-1,這個標志的Node遵守一個協議就是該節點在釋放鎖的時候會喚醒下一個需要喚醒的節點(不一定是后繼節點);如果找不到,就會在自旋中變為直接爭取鎖,結合上面和下面代碼:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//拿到前驅的狀態if (ws == Node.SIGNAL)//如果已經告訴前驅拿完號后通知自己一下,那就可以安心休息了return true;if (ws > 0) {/** 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,并排在它的后邊。* 注意:那些放棄的結點,由于被自己“加塞”到它們前邊,它們相當于形成一個無引用鏈,稍后就會被保安大叔趕走了(GC回收)!*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢! compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }?
? 介紹下waitStatus的狀態取:
1、如果pred的waitStatus == 0,則通過CAS指令修改waitStatus為Node.SIGNAL。2、如果pred的waitStatus > 0,表明pred的線程狀態CANCELLED,需從隊列中刪除。
3、如果pred的waitStatus為Node.SIGNAL,則通過LockSupport.park()方法把線程A掛起,并等待被喚醒。 /** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;
?
我們得知,如果waitStatus為-1那么就可以讓線程upark進入等待狀態(同thread.wait()),那么CLH是如何辦到喚醒的呢?答案在realease()方法上。
獨占鎖釋放release(int):
該方法的目的是用unpark()喚醒等待隊列中最前邊的那個未放棄線程,問題是所有的線程要么被中斷放棄,要么在等待被喚醒,那么該如何喚醒?就是用到waitStatus,首先嘗試釋放首節點,如果釋放失敗,那么就從tail開始,一直往前找,如果找到就Node s變量用記下來,找到多個就把最后的覆蓋前面的,那么遍歷到頭節點后,自然s記錄的就是最前的需要喚醒的節點了。結合代碼就很好看懂:
private void unparkSuccessor(Node node) {//這里,node一般為當前線程所在的結點。int ws = node.waitStatus;if (ws < 0)//置零當前線程所在的結點狀態,允許失敗,使得找節點只需找<0的,可以跳過頭節點。compareAndSetWaitStatus(node, ws, 0);Node s = node.next;//找到下一個需要喚醒的結點s,先從頭結點開始,不對就從尾部開始往前找到最前的if (s == null || s.waitStatus > 0) {//如果為空或已取消s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)//從這里可以看出,<=0的結點,都是還有效的結點。s = t;}if (s != null)LockSupport.unpark(s.thread);//喚醒 }
?
從以上代碼可以看出,喚醒這個操作是當前線程做的,由獲取鎖的線程去喚醒下一個線程;這里只需要LocakSupport.unpark,當被喚醒的線程成功后,就會設置自己為head,然后退出代碼塊,具體見acquireQueue方法注釋。
共享獲取acquireShared()
這個其實和acquire差不多,只不過獲取的status是多個,而且獲取失敗后直接繼續喚醒后調用doAcquireShare
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}而doAcquireShare方法和acquireQueued方法是差不多的,具體看如下代碼注釋:
/*** Acquires in shared uninterruptible mode.* @param arg the acquire argument*/private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);//首先保證入隊boolean failed = true;try {boolean interrupted = false; //追蹤中斷狀態for (;;) {final Node p = node.predecessor();if (p == head) { //如果是頭結點那么久嘗試獲取資源,然后根據資源情況決定是否喚醒后面線程int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r); //r>0,喚醒后面線程p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}//這里也是找安全點if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}再看看該方法是如何繼續喚醒后面線程的,這里注意,喚醒線程有兩種情況,第一種是頭結點獲取到鎖后還有資源所以喚醒后面的一起共享,第二種情況是獲得鎖的線程釋放資源的時候去喚醒后面的節點,如下代碼所示,doReleaseShared方法就是用作喚醒線程的。
另外,在眾多代碼中我們都看到interrupted的標記位,可以看出,等待過程是不響應中斷的,只會在后期補上中斷響應,而中斷響應是線程自身決定的。
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; setHead(node);//head指向自己//如果還有剩余量,繼續喚醒下一個鄰居線程if (propagate > 0 || h == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();} }釋放共享releaseShare()
該方法同樣簡單,嘗試釋放資源,否則喚醒后面線程,這里我們同樣看到了doReleaseShared方法
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//嘗試釋放資源doReleaseShared();//喚醒后繼結點return true;}return false; }代碼:
這里又用到了unparkSuccessor方法,首先判斷頭結點的waitStates是否為SIGNAL標志,是就設置為0,代表已經不再需要資源,然后自旋(for ;;)調用unparkSusseor,直到后繼線程獲取成功,后繼線程獲取成功會被喚醒,喚醒后后繼線程第一件事就是在acquireQueue中自旋內部把自己設置為頭結點,從而導致head引用(注意,head引用是各個線程共用的)發生變化,這樣一來,這個釋放的線程節點就能退出循環,代表資源釋放完畢。
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;unparkSuccessor(h);//喚醒后繼 }else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head)// head發生變化,當喚醒成功后,老二結點會自動替換head為自己,所以這里就不再相等,釋放成功break;} }head發生變化,當喚醒成功后,老二結點會自動替換head為自己,所以這里就不再相等,釋放成功,請參看appendQueue代碼注釋、
?
Thread狀態轉換
另外因為復習該知識點通常要知道一下線程的狀態和之間的關系,所以再貼一張圖,由此特別說明一下,unpark方法是不加鎖進入waiting的唯一方法。轉載于:https://www.cnblogs.com/iCanhua/p/8965604.html
總結
以上是生活随笔為你收集整理的Java高并发系列 — AQS的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CAN总线冲突裁决
- 下一篇: Java中的三目运算符可能出现的问题