日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)

發布時間:2024/1/17 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

AQS是AbstractQueuedSynchronizer的縮寫,AQS是Java并包里大部分同步器的基礎構件,利用AQS可以很方便的創建鎖和同步器。它封裝了一個狀態,提供了一系列的獲取和釋放操作,這些獲取和釋放操作都是基于狀態的。它的基本思想是由AQS負責管理同步器類中的狀態,其他的同步器比如可重入鎖ReentrantLock, 信號量Semaphore基于各自的特點來調用AQS提供了基礎能力進行狀態的同步。

?

在AQS的Javadoc里面提到它是CLHLock的變種,在聊聊高并發(八)實現幾種自旋鎖(三)?這篇文章中我們說了如何利用CLH鎖來構件自旋鎖,回顧一下CLHLock的一些基本特點:

1. CLHLock是一種隊列自旋鎖的實現,提供了FIFO先來先服務的公平性

2. 利用一個原子變量AtomicReference tail的CAS操作來構件一個虛擬的鏈式結構

3. 節點Node維護一個volatile狀態,維護一個prev指針指向前一個節點,獲取鎖時每個線程在prev節點的狀態上自旋

4. 當線程釋放鎖時,只需要修改自身狀態即可,后續節點會觀察到volatile狀態的改動而獲取鎖

?

AQS既然是CLHLock的一種變種,那么

1. 也維護以了一個基本的隊列結構

2. 也是提供了一個Tail指針從隊尾通過CAS操作入隊列。

3. 提供了一個volatile類型的int值來維護狀態

?

?
  • public abstract class AbstractQueuedSynchronizer

  • extends AbstractOwnableSynchronizer

  • implements java.io.Serializable {

  • ?private transient volatile Node head;

  • ?
  • ??? private transient volatile Node tail;

  • ?
  • ??? private volatile int state;

  • ?
  • ?
  • ??? protected final int getState() {

  • ??????? return state;

  • ??? }

  • ?
  • ??

  • ??? protected final void setState(int newState) {

  • ??????? state = newState;

  • ??? }

  • ?
  • ?
  • ??? protected final boolean compareAndSetState(int expect, int update) {

  • ??????? // See below for intrinsics setup to support this

  • ??????? return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

  • ??? }

  • ?
  • ..................

  • }

  • ?

    與標準CLHLock實現不同的是,AQS不是一個自旋鎖,它提供了更加豐富的語意:

    1. 提供了獨享(exclusive)方式和共享(share)方式來獲取/釋放,比如鎖是獨占方式的,信號量semaphore是共享方式的,可以有多個線程進入臨界區?

    2. 支持可中斷和不可中斷的獲取/釋放

    3. 支持普通的和具有時間限制的獲取/釋放

    4. 提供了自旋和阻塞的切換,可以先自旋,如果等待時間長,可以阻塞

    ?

    ?
  • /**

  • * The number of nanoseconds for which it is faster to spin

  • * rather than to use timed park. A rough estimate suffices

  • * to improve responsiveness with very short timeouts.

  • */

  • static final long spinForTimeoutThreshold = 1000L;


  • AQS定義了兩個內部類來輔助它的實現,一個是Node定義了隊列中的節點,另一個是ConditionObject,是Condition接口的實現類,負責管理條件隊列。關于條件隊列更多內容可以看這篇?聊聊高并發(十四)理解Java中的管程,條件隊列,Condition以及實現一個阻塞隊列

    ?

    先看下Node類,它比CLHLock中的Node有更多屬性,除了完成基本的隊列功能,還維護了是獨享還是共享的模式信息

    1. 維護了一個Node SHARED引用表示共享模式

    2. 維護了一個Node EXCLUSIVE引用表示獨占模式

    3. 維護了幾種節點等待的狀態 waitStatus, 其中CANCELLED = 1是正數,表示取消狀態,SIGNAL = -1,CONDITION = -2, PROPAGATE = -3都是負數,表示節點在條件隊列的某個狀態,SIGNAL表示后續節點需要被喚醒

    4. 維護了Node prev引用,指向隊列中的前一個節點,通過Tail的CAS操作來創建

    5. 維護了Node next引用,指向隊列中的下一個節點,也是在通過Tail入隊列的時候設置的,這樣就維護了一個雙向隊列

    6. 維護了一個volatile的Thread引用,把一個節點關聯到一個線程

    7. 維護了Node nextWaiter引用,指向在條件隊列中的下一個正在等待的節點,是給條件隊列使用的。值得注意的是條件隊列只有在獨享狀態下才使用

    ?

    ?
  • static final class Node {

  • /** Marker to indicate a node is waiting in shared mode */

  • static final Node SHARED = new Node();

  • /** Marker to indicate a node is waiting in exclusive mode */

  • static final Node EXCLUSIVE = null;

  • ?
  • /** waitStatus value to indicate thread has cancelled */

  • static final int CANCELLED = 1;

  • /** waitStatus value to indicate successor's thread needs unparking */

  • static final int SIGNAL = -1;

  • /** waitStatus value to indicate thread is waiting on condition */

  • static final int CONDITION = -2;

  • /**

  • * waitStatus value to indicate the next acquireShared should

  • * unconditionally propagate

  • */

  • static final int PROPAGATE = -3;

  • ?
  • volatile int waitStatus;

  • ?
  • volatile Node prev;

  • ?
  • volatile Node next;

  • ?
  • volatile Thread thread;

  • ?
  • Node nextWaiter;

  • ?
  • final boolean isShared() {

  • return nextWaiter == SHARED;

  • }

  • ?
  • final Node predecessor() throws NullPointerException {

  • Node p = prev;

  • if (p == null)

  • throw new NullPointerException();

  • else

  • return p;

  • }

  • ?
  • Node() { // Used to establish initial head or SHARED marker

  • }

  • ?
  • Node(Thread thread, Node mode) { // Used by addWaiter

  • this.nextWaiter = mode;

  • this.thread = thread;

  • }

  • ?
  • Node(Thread thread, int waitStatus) { // Used by Condition

  • this.waitStatus = waitStatus;

  • this.thread = thread;

  • }

  • }

  • ?

    再看一下ConditionObject,它是條件Condition接口的具體實現,維護了一個條件隊列,條件隊列是通過Node來構件的一個單向鏈表結構。底層的條件操作(等待和喚醒)使用LockSupport類來實現,在這篇中我們說了LockSupport底層使用sun.misc.Unsafe來提供條件隊列的park和unpark操作。聊聊高并發(十七)解析java.util.concurrent各個組件(一) 了解sun.misc.Unsafe類

    1. 維護了一個Node firstWaiter引用指向條件隊列的隊首節點

    2. 維護了一個Node lastWaiter引用指向條件隊列的隊尾節點

    3. 條件隊列支持節點的取消退出機制,CANCELLED節點來表示這種取消狀態

    4. 支持限時等待機制

    5. 支持可中斷和不可中斷的等待

    我們來看幾個典型的條件隊列的操作實現

    往條件隊列里面加入一個等待節點,這個是await()方法的基本操作

    1. 判斷尾節點的狀態是不是等待某個條件的狀態(CONDITION),如果不是,就把CANCELLED節點從隊列中踢出,然后把自己標記為尾節點

    ?

    ?
  • public class ConditionObject implements Condition, java.io.Serializable {

  • /** First node of condition queue. */

  • private transient Node firstWaiter;

  • /** Last node of condition queue. */

  • private transient Node lastWaiter;

  • ?
  • /**

  • ???????? * Adds a new waiter to wait queue.

  • ???????? * @return its new wait node

  • ???????? */

  • ??????? private Node addConditionWaiter() {

  • ??????????? Node t = lastWaiter;

  • ??????????? // If lastWaiter is cancelled, clean out.

  • ??????????? if (t != null && t.waitStatus != Node.CONDITION) {

  • ??????????????? unlinkCancelledWaiters();

  • ??????????????? t = lastWaiter;

  • ??????????? }

  • ??????????? Node node = new Node(Thread.currentThread(), Node.CONDITION);

  • ??????????? if (t == null)

  • ??????????????? firstWaiter = node;

  • ??????????? else

  • ??????????????? t.nextWaiter = node;

  • ??????????? lastWaiter = node;

  • ??????????? return node;

  • ??????? }

  • ?
  • private void unlinkCancelledWaiters() {

  • ??????????? Node t = firstWaiter;

  • ??????????? Node trail = null;

  • ??????????? while (t != null) {

  • ??????????????? Node next = t.nextWaiter;

  • ??????????????? if (t.waitStatus != Node.CONDITION) {

  • ??????????????????? t.nextWaiter = null;

  • ??????????????????? if (trail == null)

  • ??????????????????????? firstWaiter = next;

  • ??????????????????? else

  • ??????????????????????? trail.nextWaiter = next;

  • ??????????????????? if (next == null)

  • ??????????????????????? lastWaiter = trail;

  • ??????????????? }

  • ??????????????? else

  • ??????????????????? trail = t;

  • ??????????????? t = next;

  • ??????????? }

  • ??????? }

  • .................

  • }

  • 從條件隊列中喚醒一個節點,實際上doSignal只是把一個節點從條件隊列中移除,然后加入到同步隊列,并設置它在同步隊列的前置節點的waitStatus = SIGNAL, 如果設置失敗或者取消在條件隊列等待,直接把這個節點的線程unpark喚醒,需要注意的是unpark操作只是把線程從等待狀態轉化為可運行狀態,并不直接獲得鎖。

    ?

    ?
  • ?public final void signal() {

  • ??????????? if (!isHeldExclusively())

  • ??????????????? throw new IllegalMonitorStateException();

  • ??????????? Node first = firstWaiter;

  • ??????????? if (first != null)

  • ??????????????? doSignal(first);

  • ??????? }

  • ?
  • ?
  • /**

  • * Removes and transfers nodes until hit non-cancelled one or

  • * null. Split out from signal in part to encourage compilers

  • * to inline the case of no waiters.

  • * @param first (non-null) the first node on condition queue

  • */

  • private void doSignal(Node first) {

  • do {

  • if ( (firstWaiter = first.nextWaiter) == null)

  • lastWaiter = null;

  • first.nextWaiter = null;

  • } while (!transferForSignal(first) &&

  • (first = firstWaiter) != null);

  • }

  • ?
  • ? ?final boolean transferForSignal(Node node) {

  • ??????? /*

  • ???????? * If cannot change waitStatus, the node has been cancelled.

  • ???????? */

  • ??????? if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

  • ??????????? return false;

  • ?
  • ??????? /*

  • ???????? * Splice onto queue and try to set waitStatus of predecessor to

  • ???????? * indicate that thread is (probably) waiting. If cancelled or

  • ???????? * attempt to set waitStatus fails, wake up to resync (in which

  • ???????? * case the waitStatus can be transiently and harmlessly wrong).

  • ???????? */

  • ??????? Node p = enq(node);

  • ??????? int ws = p.waitStatus;

  • ??????? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

  • ??????????? LockSupport.unpark(node.thread);

  • ??????? return true;

  • ??? }


  • Java線程的幾種狀態如下

    ?

    支持中斷的等待操作,?主要做了兩個事情:新建一個Node進入條件隊列等待被喚醒;從同步隊列中移除并釋放鎖。它會相應線程的中斷拋出中斷異常,并且記錄中斷狀態

    ?

    ?
  • public final void await() throws InterruptedException {

  • if (Thread.interrupted())

  • throw new InterruptedException();

  • Node node = addConditionWaiter();

  • int savedState = fullyRelease(node);

  • int interruptMode = 0;

  • while (!isOnSyncQueue(node)) {

  • LockSupport.park(this);

  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  • break;

  • }

  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  • interruptMode = REINTERRUPT;

  • if (node.nextWaiter != null) // clean up if cancelled

  • unlinkCancelledWaiters();

  • if (interruptMode != 0)

  • reportInterruptAfterWait(interruptMode);

  • }


  • 不可中斷的等待,也是先進入條件隊列等待,并從同步隊列出隊列,釋放鎖。但是它不相應線程中斷狀態

    ?

    ?
  • public final void awaitUninterruptibly() {

  • Node node = addConditionWaiter();

  • int savedState = fullyRelease(node);

  • boolean interrupted = false;

  • while (!isOnSyncQueue(node)) {

  • LockSupport.park(this);

  • if (Thread.interrupted())

  • interrupted = true;

  • }

  • if (acquireQueued(node, savedState) || interrupted)

  • selfInterrupt();

  • }


  • 限時等待,也是先進入條件隊列等待,然后釋放鎖。輪詢等待時間,當超時后再次進入同步隊列,等待獲得鎖。如果獲得了鎖,就返回false. 如果在等待時被喚醒,就進入同步隊列,等待獲得鎖,如果獲得鎖就返回true

    ?

    ?
  • public final boolean await(long time, TimeUnit unit)

  • throws InterruptedException {

  • if (unit == null)

  • throw new NullPointerException();

  • long nanosTimeout = unit.toNanos(time);

  • if (Thread.interrupted())

  • throw new InterruptedException();

  • Node node = addConditionWaiter();

  • int savedState = fullyRelease(node);

  • long lastTime = System.nanoTime();

  • boolean timedout = false;

  • int interruptMode = 0;

  • while (!isOnSyncQueue(node)) {

  • if (nanosTimeout <= 0L) {

  • timedout = transferAfterCancelledWait(node);

  • break;

  • }

  • if (nanosTimeout >= spinForTimeoutThreshold)

  • LockSupport.parkNanos(this, nanosTimeout);

  • if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  • break;

  • long now = System.nanoTime();

  • nanosTimeout -= now - lastTime;

  • lastTime = now;

  • }

  • if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  • interruptMode = REINTERRUPT;

  • if (node.nextWaiter != null)

  • unlinkCancelledWaiters();

  • if (interruptMode != 0)

  • reportInterruptAfterWait(interruptMode);

  • return !timedout;

  • }


  • AQS使用了Unsafe直接操作內存來對字段進行CAS操作和設置值。

    ?

    ?
  • private static final Unsafe unsafe = Unsafe.getUnsafe();

  • private static final long stateOffset;

  • private static final long headOffset;

  • private static final long tailOffset;

  • private static final long waitStatusOffset;

  • private static final long nextOffset;

  • ?
  • static {

  • try {

  • stateOffset = unsafe.objectFieldOffset

  • (AbstractQueuedSynchronizer.class.getDeclaredField("state"));

  • headOffset = unsafe.objectFieldOffset

  • (AbstractQueuedSynchronizer.class.getDeclaredField("head"));

  • tailOffset = unsafe.objectFieldOffset

  • (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));

  • waitStatusOffset = unsafe.objectFieldOffset

  • (Node.class.getDeclaredField("waitStatus"));

  • nextOffset = unsafe.objectFieldOffset

  • (Node.class.getDeclaredField("next"));

  • ?
  • } catch (Exception ex) { throw new Error(ex); }

  • }

  • 總結

    以上是生活随笔為你收集整理的聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。