日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)

發(fā)布時(shí)間:2024/1/17 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

上一篇介紹了AQS的基本設(shè)計(jì)思路以及兩個(gè)內(nèi)部類Node和ConditionObject的實(shí)現(xiàn)?聊聊高并發(fā)(二十一)解析java.util.concurrent各個(gè)組件(三) 深入理解AQS(一)?這篇說(shuō)一說(shuō)AQS的主要方法的實(shí)現(xiàn)。AQS和CLHLock的最大區(qū)別是,CLHLock是自旋鎖,而AQS使用Unsafe的park操作讓線程進(jìn)入等待(阻塞)。

?

線程加入同步隊(duì)列,和CLHLock一樣,從隊(duì)尾入隊(duì)列,使用CAS+輪詢的方式實(shí)現(xiàn)無(wú)鎖化。入隊(duì)列后設(shè)置節(jié)點(diǎn)的prev和next引用,形成雙向鏈表的結(jié)構(gòu)

?

?
  • private Node enq(final Node node) {

  • for (;;) {

  • Node t = tail;

  • if (t == null) { // Must initialize

  • if (compareAndSetHead(new Node()))

  • tail = head;

  • } else {

  • node.prev = t;

  • if (compareAndSetTail(t, node)) {

  • t.next = node;

  • return t;

  • }

  • }

  • }

  • }


  • 線程指定獨(dú)享還是共享方式加入隊(duì)列,先嘗試加入一次,如果失敗再用enq()輪詢地嘗試,比如addWaiter(Node.EXCLUSIVE), addWaiter(Node.SHARED)

    ?

    ?

    ?
  • private Node addWaiter(Node mode) {

  • Node node = new Node(Thread.currentThread(), mode);

  • // Try the fast path of enq; backup to full enq on failure

  • Node pred = tail;

  • if (pred != null) {

  • node.prev = pred;

  • if (compareAndSetTail(pred, node)) {

  • pred.next = node;

  • return node;

  • }

  • }

  • enq(node);

  • return node;

  • }

  • ?

    喚醒后繼節(jié)點(diǎn),最典型的情況就是在線程釋放鎖后,會(huì)喚醒后繼節(jié)點(diǎn)。會(huì)從節(jié)點(diǎn)的next開始,找到一個(gè)后繼節(jié)點(diǎn),如果next是null,就從隊(duì)尾開始往head找,直到找到最靠近當(dāng)前節(jié)點(diǎn)的后續(xù)節(jié)點(diǎn)。 waitStatus <= 0的隱含意思是線程沒有被取消。 然后用LockSupport喚醒這個(gè)找到的后繼節(jié)點(diǎn)的線程。

    這個(gè)方法類似于CLHLock里面釋放鎖時(shí),通知后續(xù)節(jié)點(diǎn)來(lái)獲取鎖。AQS使用了阻塞的方式,所以這個(gè)方法的后續(xù)方法是acquireXXX方法,它負(fù)責(zé)將后續(xù)節(jié)點(diǎn)喚醒,后續(xù)節(jié)點(diǎn)再根據(jù)狀態(tài)去判斷是否獲得鎖

    ?

    ?

    ?
  • private void unparkSuccessor(Node node) {

  • /*

  • * If status is negative (i.e., possibly needing signal) try

  • * to clear in anticipation of signalling. It is OK if this

  • * fails or if status is changed by waiting thread.

  • */

  • int ws = node.waitStatus;

  • if (ws < 0)

  • compareAndSetWaitStatus(node, ws, 0);

  • ?
  • /*

  • * Thread to unpark is held in successor, which is normally

  • * just the next node. But if cancelled or apparently null,

  • * traverse backwards from tail to find the actual

  • * non-cancelled successor.

  • */

  • Node s = node.next;

  • if (s == null || s.waitStatus > 0) {

  • s = null;

  • for (Node t = tail; t != null && t != node; t = t.prev)

  • if (t.waitStatus <= 0)

  • s = t;

  • }

  • if (s != null)

  • LockSupport.unpark(s.thread);

  • }


  • 共享模式下的釋放操作,從隊(duì)首開始向隊(duì)尾擴(kuò)散,如果節(jié)點(diǎn)的waitStatu是SIGNAL,就喚醒后繼節(jié)點(diǎn),如果waitStatus是0,就設(shè)置標(biāo)記成PROPAGATE

    ?

    ?

    ?
  • private void doReleaseShared() {

  • for (;;) {

  • Node h = head;

  • if (h != null && h != tail) {

  • int ws = h.waitStatus;

  • if (ws == Node.SIGNAL) {

  • if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

  • continue; // loop to recheck cases

  • unparkSuccessor(h);

  • }

  • else if (ws == 0 &&

  • !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  • continue; // loop on failed CAS

  • }

  • if (h == head) // loop if head changed

  • break;

  • }

  • }


  • 取消獲取操作,要把節(jié)點(diǎn)從同步隊(duì)列中去除,通過(guò)鏈表操作將它的前置節(jié)點(diǎn)的next指向它的后繼節(jié)點(diǎn)集合。如果該節(jié)點(diǎn)是在隊(duì)尾,直接刪除即可,否則要通知后繼節(jié)點(diǎn)去獲取鎖

    ?

    ?

    ?
  • private void cancelAcquire(Node node) {

  • // Ignore if node doesn't exist

  • if (node == null)

  • return;

  • ?
  • node.thread = null;

  • ?
  • // Skip cancelled predecessors

  • Node pred = node.prev;

  • while (pred.waitStatus > 0)

  • node.prev = pred = pred.prev;

  • ?
  • // predNext is the apparent node to unsplice. CASes below will

  • // fail if not, in which case, we lost race vs another cancel

  • // or signal, so no further action is necessary.

  • Node predNext = pred.next;

  • ?
  • // Can use unconditional write instead of CAS here.

  • // After this atomic step, other Nodes can skip past us.

  • // Before, we are free of interference from other threads.

  • node.waitStatus = Node.CANCELLED;

  • ?
  • // If we are the tail, remove ourselves.

  • if (node == tail && compareAndSetTail(node, pred)) {

  • compareAndSetNext(pred, predNext, null);

  • } else {

  • // If successor needs signal, try to set pred's next-link

  • // so it will get one. Otherwise wake it up to propagate.

  • int ws;

  • if (pred != head &&

  • ((ws = pred.waitStatus) == Node.SIGNAL ||

  • (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

  • pred.thread != null) {

  • Node next = node.next;

  • if (next != null && next.waitStatus <= 0)

  • compareAndSetNext(pred, predNext, next);

  • } else {

  • unparkSuccessor(node);

  • }

  • ?
  • node.next = node; // help GC

  • }

  • }

  • ?

    獨(dú)占模式并且不可中斷地獲取隊(duì)列鎖的操作,這個(gè)方法在ConditionObject.await()中被使用,當(dāng)線程被Unsafe.unpark喚醒后,需要調(diào)用acquireQueued來(lái)獲取鎖,從而結(jié)束await(). accquireQueued()方法要么獲得鎖,要么被tryAcquire方法拋出的異常打斷,如果拋出異常,最后在finally里面取消獲取

    值得注意的是只有節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是head的時(shí)候,才能獲得鎖。這里隱含了一個(gè)意思,就是head指向當(dāng)前獲得鎖的節(jié)點(diǎn)。當(dāng)程序進(jìn)入if(p == head and tryAcquire(arg))這個(gè)分支時(shí),表示線程獲得了鎖或者被中斷,將自己設(shè)置為head,將next設(shè)置為null.

    shouldParkAfterFailedAcquired()方法的目的是將節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的waitStatus設(shè)置為SIGNAL,表示會(huì)通知后續(xù)節(jié)點(diǎn),這樣后續(xù)節(jié)點(diǎn)才能放心去park,而不用擔(dān)心被丟失喚醒的通知。

    parkAndCheckInterupt()方法會(huì)真正執(zhí)行阻塞,并返回中斷狀態(tài),這個(gè)方法有兩種情況返回,一種是park被unpark喚醒,這時(shí)候中斷狀態(tài)為false。另一種情況是park被中斷了,由于這個(gè)accquireQueued方法是不可中斷的版本,所以即使線程被中斷了,也只是設(shè)置了中斷標(biāo)志為true,沒有跑出中斷異常。在支持中斷的獲取版本里,這時(shí)會(huì)拋出中斷異常。

    這個(gè)方法可以理解為L(zhǎng)ock的lock里沒有獲取鎖的分支,在CLHLock自旋鎖的實(shí)現(xiàn)里,是對(duì)前驅(qū)節(jié)點(diǎn)的狀態(tài)自旋,而AQS是阻塞,所以這里是在同步隊(duì)列里面進(jìn)入了阻塞狀態(tài),等待被前驅(qū)節(jié)點(diǎn)釋放鎖時(shí)喚醒。

    釋放鎖時(shí)會(huì)根據(jù)狀態(tài)調(diào)用unparkSuccessor()方法來(lái)喚醒后續(xù)節(jié)點(diǎn),這樣就會(huì)在這個(gè)方法里面把阻塞的線程喚醒并獲得鎖。

    隊(duì)列鎖的好處是線程都在多個(gè)共享狀態(tài)上自旋或阻塞,所以u(píng)nparkSuccessor()方法只會(huì)喚醒它后繼沒有取消的節(jié)點(diǎn)。

    而取消只有兩種情況,中斷或者超時(shí)

    ?

    ?

    ?
  • final boolean acquireQueued(final Node node, int arg) {

  • boolean failed = true;

  • try {

  • boolean interrupted = false;

  • for (;;) {

  • final Node p = node.predecessor();

  • if (p == head && tryAcquire(arg)) {

  • setHead(node);

  • p.next = null; // help GC

  • failed = false;

  • return interrupted;

  • }

  • if (shouldParkAfterFailedAcquire(p, node) &&

  • parkAndCheckInterrupt())

  • interrupted = true;

  • }

  • } finally {

  • if (failed)

  • cancelAcquire(node);

  • }

  • }


  • 獨(dú)占模式支持中斷的獲取隊(duì)列鎖操作,可以看到和不支持中斷版本的區(qū)別,這里如果parkAndCheckInterrupt()方法返回時(shí)顯示被中斷了,就拋出中斷異常

    ?

    ?

    ?
  • private void doAcquireInterruptibly(int arg)

  • throws InterruptedException {

  • final Node node = addWaiter(Node.EXCLUSIVE);

  • boolean failed = true;

  • try {

  • for (;;) {

  • final Node p = node.predecessor();

  • if (p == head && tryAcquire(arg)) {

  • setHead(node);

  • p.next = null; // help GC

  • failed = false;

  • return;

  • }

  • if (shouldParkAfterFailedAcquire(p, node) &&

  • parkAndCheckInterrupt())

  • throw new InterruptedException();

  • }

  • } finally {

  • if (failed)

  • cancelAcquire(node);

  • }

  • }

  • ?

    獨(dú)占模式限時(shí)獲取隊(duì)列鎖操作, 這個(gè)獲取的整體邏輯和前面的類似,區(qū)別是它支持限時(shí)操作,如果等待時(shí)間大于spinForTimeoutThreshold,就使用阻塞的方式等待,否則用自旋等待。使用了LockSupport.parkNanos()方法來(lái)實(shí)現(xiàn)限時(shí)地等待,并支持中斷

    這里隱含的一個(gè)含義是parkNanos方法退出有3種方式,

    1. 限時(shí)到了自動(dòng)退出,這時(shí)候會(huì)超時(shí)

    2. 沒有到限時(shí)被喚醒了,這時(shí)候是不超時(shí)的

    3. 被中斷

    ?

    ?
  • private boolean doAcquireNanos(int arg, long nanosTimeout)

  • throws InterruptedException {

  • long lastTime = System.nanoTime();

  • final Node node = addWaiter(Node.EXCLUSIVE);

  • boolean failed = true;

  • try {

  • for (;;) {

  • final Node p = node.predecessor();

  • if (p == head && tryAcquire(arg)) {

  • setHead(node);

  • p.next = null; // help GC

  • failed = false;

  • return true;

  • }

  • if (nanosTimeout <= 0)

  • return false;

  • if (shouldParkAfterFailedAcquire(p, node) &&

  • nanosTimeout > spinForTimeoutThreshold)

  • LockSupport.parkNanos(this, nanosTimeout);

  • long now = System.nanoTime();

  • nanosTimeout -= now - lastTime;

  • lastTime = now;

  • if (Thread.interrupted())

  • throw new InterruptedException();

  • }

  • } finally {

  • if (failed)

  • cancelAcquire(node);

  • }

  • }

  • ?

    ?

    共享模式獲得隊(duì)列鎖操作,獲得操作也是從head的下一個(gè)節(jié)點(diǎn)開始,和獨(dú)占模式只unparkSuccessor一個(gè)節(jié)點(diǎn)不同,共享模式下,等head的后續(xù)節(jié)點(diǎn)被喚醒了,它要擴(kuò)散這種共享的獲取,使用setHeadAndPropagate操作,把自己設(shè)置為head,并且把釋放的狀態(tài)往下傳遞,這里采用了鏈?zhǔn)絾拘训姆椒?#xff0c;1個(gè)節(jié)點(diǎn)負(fù)責(zé)喚醒1個(gè)后續(xù)節(jié)點(diǎn),直到不能喚醒。當(dāng)后繼節(jié)點(diǎn)是共享模式isShared,就調(diào)用doReleaseShared來(lái)喚醒后繼節(jié)點(diǎn)

    doReleaseShared會(huì)從head開始往后檢查狀態(tài),如果節(jié)點(diǎn)是SIGNAL狀態(tài),就喚醒它的后繼節(jié)點(diǎn)。如果是0就標(biāo)記為PROPAGATE, 等它釋放鎖的時(shí)候會(huì)再次喚醒后繼節(jié)點(diǎn)。

    這里有個(gè)隱含的意思:

    1. 加入同步隊(duì)列并阻塞的節(jié)點(diǎn),它的前驅(qū)節(jié)點(diǎn)只會(huì)是SIGNAL,表示前驅(qū)節(jié)點(diǎn)釋放鎖時(shí),后繼節(jié)點(diǎn)會(huì)被喚醒。shouldParkAfterFailedAcquire()方法保證了這點(diǎn),如果前驅(qū)節(jié)點(diǎn)不是SIGNAL,它會(huì)把它修改成SIGNAL。這里不是SIGNAL就有可能是PROPAGATE

    2. 造成前驅(qū)節(jié)點(diǎn)是PROPAGATE的情況是前驅(qū)節(jié)點(diǎn)獲得鎖時(shí),會(huì)喚醒一次后繼節(jié)點(diǎn),但這時(shí)候后繼節(jié)點(diǎn)還沒有加入到同步隊(duì)列,所以暫時(shí)把節(jié)點(diǎn)狀態(tài)設(shè)置為PROPAGATE,當(dāng)后繼節(jié)點(diǎn)加入同步隊(duì)列后,會(huì)把PROPAGATE設(shè)置為SIGNAL,這樣前驅(qū)節(jié)點(diǎn)釋放鎖時(shí)會(huì)再次doReleaseShared,這時(shí)候它的狀態(tài)已經(jīng)是SIGNAL了,就可以喚醒后續(xù)節(jié)點(diǎn)了

    ?

    ?

    ?
  • private void doAcquireShared(int arg) {

  • final Node node = addWaiter(Node.SHARED);

  • boolean failed = true;

  • try {

  • boolean interrupted = false;

  • for (;;) {

  • final Node p = node.predecessor();

  • if (p == head) {

  • int r = tryAcquireShared(arg);

  • if (r >= 0) {

  • setHeadAndPropagate(node, r);

  • p.next = null; // help GC

  • if (interrupted)

  • selfInterrupt();

  • failed = false;

  • return;

  • }

  • }

  • if (shouldParkAfterFailedAcquire(p, node) &&

  • parkAndCheckInterrupt())

  • interrupted = true;

  • }

  • } finally {

  • if (failed)

  • cancelAcquire(node);

  • }

  • }

  • ?
  • private void setHeadAndPropagate(Node node, int propagate) {

  • ??????? Node h = head; // Record old head for check below

  • ??????? setHead(node);

  • ??????? /*

  • ???????? * Try to signal next queued node if:

  • ???????? *?? Propagation was indicated by caller,

  • ???????? *???? or was recorded (as h.waitStatus) by a previous operation

  • ???????? *???? (note: this uses sign-check of waitStatus because

  • ???????? *????? PROPAGATE status may transition to SIGNAL.)

  • ???????? * and

  • ???????? *?? The next node is waiting in shared mode,

  • ???????? *???? or we don't know, because it appears null

  • ???????? *

  • ???????? * The conservatism in both of these checks may cause

  • ???????? * unnecessary wake-ups, but only when there are multiple

  • ???????? * racing acquires/releases, so most need signals now or soon

  • ???????? * anyway.

  • ???????? */

  • ??????? if (propagate > 0 || h == null || h.waitStatus < 0) {

  • ??????????? Node s = node.next;

  • ??????????? if (s == null || s.isShared())

  • ??????????????? doReleaseShared();

  • ??????? }

  • ??? }

  • ?
  • private void doReleaseShared() {

  • ??????? for (;;) {

  • ??????????? Node h = head;

  • ??????????? if (h != null && h != tail) {

  • ??????????????? int ws = h.waitStatus;

  • ??????????????? if (ws == Node.SIGNAL) {

  • ??????????????????? if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

  • ??????????????????????? continue;??????????? // loop to recheck cases

  • ??????????????????? unparkSuccessor(h);

  • ??????????????? }

  • ??????????????? else if (ws == 0 &&

  • ???????????????????????? !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  • ??????????????????? continue;??????????????? // loop on failed CAS

  • ??????????? }

  • ??????????? if (h == head)?????????????????? // loop if head changed

  • ??????????????? break;

  • ??????? }

  • ??? }


  • tryXXXX 方法,這幾個(gè)方法是給子類重寫的,用來(lái)擴(kuò)展響應(yīng)的同步器操作

    ?

    ?

    ?
  • protected boolean tryAcquire(int arg) {

  • throw new UnsupportedOperationException();

  • }

  • ?
  • protected boolean tryRelease(int arg) {

  • ??????? throw new UnsupportedOperationException();

  • ??? }

  • ?
  • protected int tryAcquireShared(int arg) {

  • ??????? throw new UnsupportedOperationException();

  • ??? }

  • ?
  • protected boolean tryReleaseShared(int arg) {

  • ??????? throw new UnsupportedOperationException();

  • ??? }


  • 獨(dú)占模式獲取操作的頂層方法,如果沒有tryAcquired,或者沒有獲得隊(duì)列鎖,就中斷

    ?
  • public final void acquire(int arg) {

  • if (!tryAcquire(arg) &&

  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

  • selfInterrupt();

  • }


  • 獨(dú)占模式釋放操作的頂層方法,如果tryRelease()成功,那么就喚醒后繼節(jié)點(diǎn)去獲取鎖

    ?
  • public final boolean release(int arg) {

  • if (tryRelease(arg)) {

  • Node h = head;

  • if (h != null && h.waitStatus != 0)

  • unparkSuccessor(h);

  • return true;

  • }

  • return false;

  • }

  • 總結(jié)

    以上是生活随笔為你收集整理的聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。