日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

AbstractQueuedSynchronizer源码解析

發(fā)布時間:2024/4/13 编程问答 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 AbstractQueuedSynchronizer源码解析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

?

關(guān)于AbstractQueuedSynchronizer

基本數(shù)據(jù)結(jié)構(gòu)

節(jié)點結(jié)構(gòu)

同步隊列結(jié)構(gòu)

實現(xiàn)

子類需要實現(xiàn)的方法

獨占模式實現(xiàn)

獨占模式同步隊列示意

共享模式

共享模式同步隊列示意:


關(guān)于AbstractQueuedSynchronizer

JDK1.5之后引入了并發(fā)包java.util.concurrent,里面包含了很多并發(fā)控制鎖類,其核心是:AbstractQueuedSynchronizer,其數(shù)據(jù)結(jié)構(gòu)為鏈表方式的雙向隊列。

基本數(shù)據(jù)結(jié)構(gòu)

節(jié)點結(jié)構(gòu)

鏈表節(jié)點的字段含義如下:

Class:Node
字段類型初始值意義
SHAREDfinal Node任意一個Node對象一個指示器,用于標識Node處于共享模式。
EXCLUSIVEfinal Nodenull一個指示器,用于標識Node處于獨占模式。
CANCELLEDfinal int1waitStatus值,表示Node處于取消狀態(tài)。一般當Node處于超時或者中斷,設置此值。取消節(jié)點關(guān)聯(lián)的線程不會重新阻塞。
SIGNALfinal int-1waitStatus值,表示Node的后續(xù)Node 已經(jīng)或者即將通過park 阻塞。當此節(jié)點取消或者release時,后續(xù)節(jié)點需要unpark。為了避免競爭。acquire方式必須指示需要SIGNAL,重試acquire,在失敗的情況下阻塞。
CONDITIONfinal int-2waitStatus值,表示節(jié)點處于等待隊列。當在某個時間點set to 0 ,用于同步隊列
PROPAGATEfinal int-3waitStatus值,用于共享模式,表示下一次acquire無條件的傳播。
waitStatusvolatile int0節(jié)點狀態(tài)
prevvolatile Nodenull前置節(jié)點
nextvolatile Nodenull后續(xù)節(jié)點
threadvolatile Threadnull關(guān)聯(lián)線程
nextWaiterNodenull指向下一個Node is waiting on condition。或者指向SHARED,EXCLUSIVE表示模式。

同步隊列結(jié)構(gòu)

同步隊列
字段類型初始值意義
headvolatile Nodenull同步隊列頭節(jié)點,延遲初始化,必須通過setHead方法設置值。
tailvolatile Nodenull同步隊列尾節(jié)點,延遲初始化,通過enq方法設置。
statevolatile int0狀態(tài)。用于追蹤同步狀態(tài),具體由各子類處理。

?

  • 每次加入節(jié)點到同步隊列,都加在尾部,釋放節(jié)點從頭節(jié)點的后續(xù)節(jié)點釋放。
  • 一個節(jié)點位于隊列頭,并不保證acquire成功,只是盡量嘗試acquire。
  • ?

    ?

    ?

    實現(xiàn)

    AbstractQueuedSynchronizer僅實現(xiàn)抽象方法控制并發(fā),由子類實現(xiàn)具體的資源控制。

    子類需要實現(xiàn)的方法

    方法意義
    tryAcquire嘗試獨占模式下acquire,失敗則進入同步隊列。
    tryRelease嘗試獨占模式下release。
    tryAcquireShared嘗試共享模式下acquire,失敗則進入同步隊列。
    tryReleaseShared嘗試共享模式下release。
    isHeldExclusively指示同步器是否在獨占模式下被當前線程占用

    獨占模式實現(xiàn)

    /**/public final void acquire(int arg) {if (!tryAcquire(arg) && --acquire失敗,/* 第一嘗試acquire失敗后,加入到同步隊列,會繼續(xù)嘗試*/acquireQueued(/* acquire失敗,則加入一個節(jié)點到同步隊列,節(jié)點為獨占模式。*/addWaiter(Node.EXCLUSIVE), arg))/*acquireQueued 返回的是中斷標識(中斷標識以清除),如果為true ,重新設置中斷標識*/selfInterrupt();}

    ???

    /* 節(jié)點加入同步隊列之后,會嘗試acquire。如果失敗會通過park阻塞。 */ final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {//中斷標志,標識此線程是否設置過中斷標識。boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //如果前一個節(jié)點是head節(jié)點,(head節(jié)點不關(guān)聯(lián)線程),即本節(jié)點是第一個acquire失敗的節(jié)點,則嘗試acquire。if (p == head && tryAcquire(arg)) {//acquire成功,則把此節(jié)點設置為head節(jié)點(thread關(guān)聯(lián)解除),返回中斷標識。//【10】setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//不是第一個accquire失敗的節(jié)點,則判斷是否通過park阻塞。if (//判斷是否需要park。shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//此線程 曾經(jīng)被中斷過。(中斷標識被清除了)。interrupted = true;}} finally {//僅當出現(xiàn)異常時,才會進入此代碼塊,一般timeout或者中斷,此時需取消節(jié)點。if (failed)cancelAcquire(node);}} /* 判斷是否需要park */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//前一個節(jié)點的status為SIGNAL,表示后續(xù)節(jié)點需要parkif (ws == Node.SIGNAL) //【4】return true;if (ws > 0) {//ws > 0 ,僅當status=CANCELLED。則把取消節(jié)點剔除同步隊列。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/*狀態(tài)為0或者PROPAGATE,獨占模式為0,共享模式為PROPAGATE。則需要設置為SIGNAL,表示即將park(是否park由下一次acquire決定)。*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //【3】}return false;} private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //【5】/**********此處時重點,以上代碼執(zhí)行后,線程會立即掛起。當線程unpark后,后續(xù)代碼接著運行。*///返回線程的中斷標識。同時清除中斷標識。return Thread.interrupted();}

    ?

    /*取消acquire*/private void cancelAcquire(Node node) {// 節(jié)點不存在,返回。if (node == null)return;//取消線程關(guān)聯(lián)。node.thread = null;// 跳過前置節(jié)點中的CANCELLED節(jié)點Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;//設置狀態(tài)為CANCELLEDnode.waitStatus = Node.CANCELLED;// 如果是尾節(jié)點,僅釋放自身。if (node == tail && compareAndSetTail(node, pred)) {//設置前置節(jié)點的next為null。compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head && //前置節(jié)點不為head。((ws = pred.waitStatus) == Node.SIGNAL //前置節(jié)點為CANCELLED||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) //前置節(jié)點為0,并且設置為CANCELLED成功) &&pred.thread != null //關(guān)聯(lián)線程) {//以上判斷條件,表示前置節(jié)點不為head節(jié)點,并且為SIGNALNode next = node.next;if (next != null && next.waitStatus <= 0)//如果本節(jié)點之后仍有后續(xù)節(jié)點,則剔除本節(jié)點,修改指針。//此處未處理node.next.prev,node節(jié)點通過next是剔除的,通過prev是可以訪問得到的。compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC}}

    ?

    private Node addWaiter(Node mode) {/* 構(gòu)造一個節(jié)點,關(guān)聯(lián)到當前線程*/Node node = new Node(Thread.currentThread(), mode);// 為了提高性能,先嘗試一次加入同步隊列。失敗再嘗試enq方式。enq方式時自旋方式(即死循環(huán))Node pred = tail;if (pred != null) { node.prev = pred;//CASif (compareAndSetTail(pred, node)) { //【6】pred.next = node;return node;}}enq(node);return node;} /* 節(jié)點加入同步隊列,并返回此節(jié)點。 采用自旋方式加入, */private Node enq(final Node node) {//自旋for (;;) {Node t = tail;//tail為null,則head必定為null,生成一個node作為head。if (t == null) { // Must initializeif (compareAndSetHead(new Node())) //【1】tail = head;} else {//加入節(jié)點成尾節(jié)點,并返回。node.prev = t;if (compareAndSetTail(t, node)) { //【2】t.next = node;return t;}}}} public final boolean release(int arg) {if (tryRelease(arg)) { //【7】//嘗試成功Node h = head;if (h != null && h.waitStatus != 0) //head節(jié)點不為null,并且head狀態(tài)不為0(在獨占模式下,不為0,也不可能為CANCELLED,則為SIGNAL),則unpark后續(xù)節(jié)點。unparkSuccessor(h); //【8】return true;}return false;} private void unparkSuccessor(Node node) {/*status < 0 ,則把status設置為0*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {//后續(xù)節(jié)點為null,或者狀態(tài)為CANCELLED。s = null;//從tail往前查找到第一個status<0的節(jié)點,選中作為要unpark的節(jié)點for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//unpark節(jié)點對應的線程。LockSupport.unpark(s.thread); //【9】}

    獨占模式同步隊列示意

    public class AbstractQueuedSynchronizerTest {@Testpublic void testAbstractQueuedSynchronizer() {Lock lock = new ReentrantLock();Runnable runnable0 = new ReentrantLockThread(lock);Thread thread0 = new Thread(runnable0);thread0.setName("t-0");Runnable runnable1 = new ReentrantLockThread(lock);Thread thread1 = new Thread(runnable1);thread1.setName("t-1");Runnable runnable2 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable2);thread2.setName("t-2");Runnable runnable3 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable3);thread3.setName("t-3");Runnable runnable4 = new ReentrantLockThread(lock);Thread thread4 = new Thread(runnable4);thread4.setName("t-4");thread0.start();thread1.start();thread2.start();thread3.start();thread4.start();thread2.interrupt();for (;;);}private class ReentrantLockThread implements Runnable {private Lock lock;public ReentrantLockThread(Lock lock) {this.lock = lock;}@Overridepublic void run() {try {lock.lock();for (int i=0;i<1000000;i++);} finally {lock.unlock();}}}}

    以前假設各線程按順序啟動

    隊列變化如下:

    1、同步隊列初始化。thread:t-0 acquire成功。

    2、thread:t-1 啟動,執(zhí)行代碼【1】,【2】,【3】,【4】處后狀態(tài)如下圖,然后執(zhí)行【5】阻塞。

    3、thread:t-2 啟動,執(zhí)行代碼【6】,【3】,【4】處后狀態(tài)如下圖,然后執(zhí)行【5】阻塞。

    3、thread:t-3 啟動,執(zhí)行代碼【6】,【3】,【4】處后狀態(tài)如下圖,然后執(zhí)行【5】阻塞。

    4、thread:t-0,釋放,執(zhí)行代碼【7】,【8】,經(jīng)過【9】后,t-1 線程在代碼【5】繼續(xù)執(zhí)行。經(jīng)過【10】后狀態(tài)如下圖

    5、thread,t-2,t-3,t-4類似

    共享模式

    public final void acquireShared(int arg) {//獨占模式:tryAcquire,返回boolean表示是否成功。//共享模式:tryAcquireShared,返回int,小于0,表示acquire失敗。if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);// nextWaiter為SHARED,表示共享模式。【11】boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg); //【14】if (r >= 0) {//如果r>=0表示,表示許可有剩余。設置head,并繼續(xù)傳播setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted) selfInterrupt(); //與acquire最后2行代碼一樣,如果中斷過,仍設置中斷標識。failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}} private void setHeadAndPropagate(Node node, int propagate) {Node h = head; //指向原來的head,后面使用。setHead(node);//設置head,【15】if (propagate > 0 // 許可有剩余 【16】|| h == null // 原有head為null,表示節(jié)點已釋放|| h.waitStatus < 0 // 狀態(tài)為PROPAGATE或SIGNAL||(h = head) == null // 新的head為null|| h.waitStatus < 0 //或者新的head的狀態(tài)<0) {Node s = node.next; //當前節(jié)點的后續(xù)節(jié)點if (s == null || s.isShared()) //當前節(jié)點為共享模式或者后續(xù)節(jié)點為nulldoReleaseShared(); //【17】}} public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) { //頭結(jié)點本身的waitStatus是SIGNAL且能通過CAS算法將頭結(jié)點的waitStatus從SIGNAL設置為0,喚醒頭結(jié)點的后繼節(jié)點if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //【12】continue; // loop to recheck casesunparkSuccessor(h);//CAS成功,則...【13】}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //頭結(jié)點本身的waitStatus是0的話,嘗試將其設置為PROPAGATE狀態(tài)的,意味著共享狀態(tài)可以向后傳播continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}

    共享模式同步隊列示意:

    package main.java.study;import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {public class MapOper implements Runnable {CountDownLatch latch ;public MapOper(CountDownLatch latch) {this.latch = latch;}public void run() {try {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(Thread.currentThread().getName() + "start:" + df.format(new Date()));latch.await();System.out.println(Thread.currentThread().getName() + "work:" + df.format(new Date()));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println(Thread.currentThread().getName()+" Sync Started!");}}public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubCountDownLatchTest test = new CountDownLatchTest();CountDownLatch latch = new CountDownLatch(1);Thread t1 = new Thread(test.new MapOper(latch));Thread t2 = new Thread(test.new MapOper(latch));Thread t3 = new Thread(test.new MapOper(latch));Thread t4 = new Thread(test.new MapOper(latch));t1.setName("Thread1");t2.setName("Thread2");t3.setName("Thread3");t4.setName("Thread4");t1.start();Thread.sleep(1500);t2.start();Thread.sleep(1500);t3.start();Thread.sleep(1500);t4.start();System.out.println("thread already start, sleep for a while...");Thread.sleep(1000);latch.countDown();}}

    隊列變化:

    1、同步隊列初始化:前3個工作線程調(diào)用await()方法,經(jīng)過【11】,……,【5】線程掛起,狀態(tài)如下:

    2、主線程調(diào)用countDown()方法,經(jīng)過【12】,【13】,【9】喚醒線程t-1,t-1繼續(xù)執(zhí)行,經(jīng)過【14】,【15】狀態(tài)如下:

    3、t-1繼續(xù)執(zhí)行【16】,【17】喚醒下一個線程(node-2)

    4、t-2,t-3依次,都喚醒下一個。(每個節(jié)點都由前置節(jié)點對應的線程喚醒,喚醒立即返回)

    5、結(jié)束

    ?

    ?

    ?

    總結(jié)

    以上是生活随笔為你收集整理的AbstractQueuedSynchronizer源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。