日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java await signal_【Java并发008】原理层面:ReentrantLock中 await()、signal()/signalAll()全解析...

發布時間:2023/12/10 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java await signal_【Java并发008】原理层面:ReentrantLock中 await()、signal()/signalAll()全解析... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、前言

上篇的文章中我們介紹了AQS源碼中lock方法和unlock方法,這兩個方法主要是用來解決并發中互斥的問題,這篇文章我們主要介紹AQS中用來解決線程同步問題的await方法、signal方法和signalAll方法,這幾個方法主要對應的是synchronized中的wait方法、notify方法和notifAll方法。

二、使用層面:await()與signal()/signalAll()(了解即可)

2.1 使用層面:await()與signal()/signalAll()(了解即可)

我們實現一個阻塞的隊列。

import java.util.ArrayList;

import java.util.Arrays;

import java.util.List;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

public class MyBlockedQueue {

final Lock lock = new ReentrantLock();

// 條件變量:隊列不滿

final Condition notFull = lock.newCondition();

// 條件變量:隊列不空

final Condition notEmpty = lock.newCondition();

private volatile List list = new ArrayList<>();

// 入隊

void enq(T x) {

lock.lock();

try {

while (list.size() == 10) {

// 等待隊列不滿

try {

notFull.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

// 省略入隊操作

list.add(x);

// 入隊后, 通知可出隊

notEmpty.signal();

} finally {

lock.unlock();

}

}

// 出隊

void deq() {

lock.lock();

try {

while (list.isEmpty()) {

// 等待隊列不空

try {

notEmpty.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

list.remove(0);

// 出隊后,通知可入隊

notFull.signal();

} finally {

lock.unlock();

}

}

public List getList() {

return list;

}

public static void main(String[] args) throws InterruptedException {

MyBlockedQueue myBlockedQueue = new MyBlockedQueue<>();

Thread thread1 = new Thread(new Runnable() {

@Override

public void run() {

for (int i = 0; i < 20; i++) {

myBlockedQueue.enq(i);

}

}

});

Thread thread2 = new Thread(new Runnable() {

@Override

public void run() {

for (int i = 0; i < 10; i++) {

myBlockedQueue.deq();

}

}

});

thread1.start();

thread2.start();

try {

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(Arrays.toString(myBlockedQueue.getList().toArray()));

}

}

運行的結果如下(輸出的是后面的10位):

[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

我們可以看到condition在多線程的中使用,類似于實現了線程之前的通信:

當某個條件滿足的時候,執行某個線程中操作;

當某個條件不滿足的時候,將當前的線程掛起,等待這個條件滿足的時候,其他的線程喚醒當前線程。

2.2 ConditionObject類和Node類

2.2.1 ConditionObject類(屬性+方法)

ConditionObject類的屬性

public class ConditionObject implements Condition, java.io.Serializable {

private static final long serialVersionUID = 1173984872572414699L; // 實現Serializable接口,顯式指定序列化字段

private transient Node firstWaiter; // 作為Node類型,指向等待隊列中第一個節點

private transient Node lastWaiter; // 作為Node類型,指向等待隊列中最后一個節點

private static final int REINTERRUPT = 1; // reinterrupt 設置為final變量,命名易懂

private static final int THROW_IE = -1; // throw InterruptedException

}

ConditionObject類的方法

2.2.2 Node類

對于Node節點,屬性包括七個(重點是前面五個)

volatile int waitStatus; //當前節點等待狀態

volatile Node prev; //上一個節點

volatile Node next; //下一個節點

volatile Thread thread; //節點中的值

Node nextWaiter; //下一個等待節點

static final Node SHARED = new Node(); //指示節點共享還是獨占,默認初始是共享

static final Node EXCLUSIVE = null;

處在同步隊列中使用到的屬性(加鎖、解鎖)包括:next prev thread waitStatus。

所以同步隊列是雙向非循環鏈表,涉及的類變量AbstractQueuedSynchronizer類中的head和tail,分別指向同步隊列中的頭結點和尾節點。

處在等待隊列中使用到的屬性(阻塞、喚醒)包括:nextWaiter thread waitStatus。

所以等待隊列是單向非循環鏈表,涉及的類變量ConditionObject類中的firstWaiter和lastWaiter,分別指向等待隊列中的頭結點和尾節點。

AQS隊列是工作隊列、同步隊列,是非循環雙向隊列:

當使用到head tail的時候,就說AQS隊列建立起來了,單個線程不使用到head tail,所以AQS隊列沒有建立起來;

等待隊列,是非循環單向隊列:

當使用firstWaiter lastWaiter的時候,就說等待隊列建立起來了。

lock()和unlock()就是操作同步隊列:

lock()將線程封裝到節點里面(此時,節點使用到的屬性是thread nextWaiter waitStatus),放到同步隊列,即AQS隊列中,unlock()將存放線程的節點從同步隊列中拿出來,表示這個線程工作完成。

await()和signal()就是操作等待隊列:

await()將線程封裝到節點里面(此時,節點使用到的屬性是thread prev next waitStatus),放到等待隊列里面,signal()從等待隊列中拿出元素。

問題:為什么負責同步隊列的head和tail在AbstractQueuedSynchronizer類中,但是負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中?

回答:

對于線程同步互斥,是直接通過ReentrantLock類對象 lock.lock() lock.unlock()實現的,而ReentrantLock類對象是調用AQS類實現加鎖解鎖的,所以負責同步隊列的head和tail在AbstractQueuedSynchronizer類中;

對于線程阻塞和喚醒,是通過ReentrantLock類對象lock.newCondition()得到一個對應,condition引用指向這個對象,然后condition.await() condition.signal()實現的,所以負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中。

三、await()源碼

3.1 Condition.await()執行圖

我們再來看看await的源碼,具體如下圖:

對于上圖的解釋:

第一個方法插入到等待隊列中,第二個方法釋放同步鎖,第三個方法阻塞當前線程,三個一體,不能分開。如第二個方法先于第三個方法,先釋放同步鎖,再掛起線程,目的:為了避免當前線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況。

整體流程詳細:

第一步,如果某個線程的調用了await的方法,走來會將這個線程通過CAS和尾插法的方式將這個等待的線程添加到AQS的等待隊列中去(解釋:通過CAS和尾插法的方式是指:在cas保證線程安全的情況下,使用尾插法三步將這個線程放到一個Node結點中,插入到AQS的等待隊列),對應代碼 Node node = addConditionWaiter();

第二步,然后將當前的線程進行解鎖(解釋:對當前線程解鎖的目的是為了避免這個線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況),對應代碼 int savedState = fullyRelease(node);

第三步,然后將當前的線程進行park(解釋:park之后這個線程只能被動地等待其他的線程調用signal方法將當前的線程unpark),對應代碼 LockSupport.park(this);

while (!isOnSyncQueue(node)) { // addConditionWaiter()返回的node,不在同步隊列中,就park

LockSupport.park(this); // 將當前的線程進行park,this表示AbstractQueuedSynchronizer對象,表示當前線程

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

第一個方法使用數據結構插入到等待隊列中;

第二個方法使用unpark釋放同步鎖:unparkSuccessor(h);

第三個方法使用park阻塞當前線程:LockSupport.park(this);

3.2 condition.await()源碼解析

3.2.1 第一步,addConditionWaiter()添加節點到等待隊列中

private Node addConditionWaiter() {

Node t = lastWaiter;

// If lastWaiter is cancelled, clean out.

if (t != null && t.waitStatus != Node.CONDITION) {

unlinkCancelledWaiters();

t = lastWaiter;

}

// 新建一個節點,節點存放當前線程,狀態設置為正在等待條件CONDITION

Node node = new Node(Thread.currentThread(), Node.CONDITION);

if (t == null)

firstWaiter = node;

else

t.nextWaiter = node;

lastWaiter = node;

return node;

}

addConditionWaiter()三種情況:

第一種情況,當前等待隊列中沒有節點(此時firstWaiter和tailWaiter都為null)

第二種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點)

第三種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點),但是尾指針所指向節點不是在等待隊列中等待( t.waitStatus != Node.CONDITION)

3.2.1.1 第一種情況,當前等待隊列中沒有節點(此時firstWaiter和tailWaiter都為null)

程序執行如下:

Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來

Node node = new Node(Thread.currentThread(), Node.CONDITION);

firstWaiter = node; // 因為現在等待隊列中就只有這一個剛剛新建的Node節點,所以,將負責等待隊列的首尾指針都指向這個節點

lastWaiter = node;

return node; // 返回當前線程新建好的這個節點

程序執行如上,先將lastWaiter記錄下來(因為要采用尾插法,先將尾指針記錄下來)

使用當前線程新建一個節點,這個新節點Node的thread屬性為當前線程(表示這個節點存放的就是當前線程,將當前線程放到一個節點中,然后這個節點放入等待隊列中),waitStatus=Condition(-2),

尾插法經典兩步:第一次進入的時候lastWaiter==null(表示當前等待隊列中沒有節點),因為現在等待隊列中就只有這一個剛剛新建的Node節點,所以,將負責等待隊列的首尾指針都指向這個節點( firstWaiter = node; lastWaiter = node;),

最后返回當前線程新建好的這個節點。

3.2.1.2 第二種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點)

Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來

Node node = new Node(Thread.currentThread(), Node.CONDITION);

t.nextWaiter = node; // 尾插法經典兩步:(1)當前節點下一個節點為新建節點;(2)等待隊列尾指針指向新建節點

lastWaiter = node;

return node; // 返回新加入等待隊列的節點

程序執行為如上,先將lastWaiter記錄下來(因為要采用尾插法,先將尾指針記錄下來)

使用當前線程新建一個節點

尾插法經典兩步:(1)當前節點下一個節點為新建節點;(2)等待隊列尾指針指向新建節點

最后返回新加入等待隊列的節點(里面存放當前線程)。

3.2.1.3 第三種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點),但是尾指針所指向節點不是在等待隊列中等待( t.waitStatus != Node.CONDITION)

執行程序如下:

Node t = lastWaiter; // 記錄尾指針所指向節點,為使用尾插法準備

unlinkCancelledWaiters(); //相對于第二種的特殊情況,這里需要處理

t = lastWaiter; //相對于第二種的特殊情況,這里需要處理

Node node = new Node(Thread.currentThread(), Node.CONDITION);

t.nextWaiter = node; // 尾插法經典兩步

lastWaiter = node;

return node;

執行程序如上,沒什么問題,看新增的兩句

unlinkCancelledWaiters();

// 解釋:解綁所有的處于取消狀態的等待者,

// 這個使用canceled,表示已取消狀態,這里使用watiers,表示不止一個

t = lastWaiter; // 解釋:重置一下t,繼續記錄新的尾巴指針指向的節點,為下面尾插法準備

解釋unlinkCancelledWaiters()程序

private void unlinkCancelledWaiters() {

Node t = firstWaiter; // 1、記錄等待隊列中頭指針所指向節點

// 為什么這里記錄頭指針指向,因為等待隊列是非循環單鏈表,所以while循環刪除已取消結點,只能從頭結點開始遍歷

Node trail = null; // 2、局部變量trail,下面不斷移動t,用t來記錄當前節點,但是因為等待列表是單鏈表,所以無法記錄當前節點t的上一個節點,所有要在t還沒有移動時候,將當前t記錄下來放到trail中,然后t再移動

while (t != null) {

Node next = t.nextWaiter; // 3、準備移動,trail記錄t,單鏈表基本操作

if (t.waitStatus != Node.CONDITION) {

t.nextWaiter = null; //

if (trail == null) // 4、這是執行 trail=t 之前執行的,trail=t 執行之前,不斷向后移動,同時不斷修改頭指針firstWaiter

// 4.1 為什么trail=t 執行之前要不斷修改頭指針firstWaiter?

//因為t.waitStatus != Node.CONDITION,當前隊列不行,所以要不斷修改頭指針firstWaiter

firstWaiter = next; //唯一一個設置頭指針的地方,

// 4.2 為什么執行了trail=t之后就不要修改頭指針了?

// 因為只要找到了為Node.CONDITION的t,就不會刪除了,就是保留操作了,就是可以使用的等待隊列了

else // 這是執行 trail=t 之后執行的

trail.nextWaiter = next; // 5、執行了 trail=t 之后,trail -> t -> next,因為t.waitStatus != Node.CONDITION,所以要去掉t,就執行 trail.nextWaiter = next; 變為 trail -> next

//5.1 為什么trail=t 執行之前不需要刪除t,因為這時候trail==null

//5.2 為什么trail=t 執行之后要刪除t,因為這時候firstWaiter確定了,等待隊列確定了,當然要刪去不合法的t,t.waitStatus != Node.CONDITION

if (next == null) // 6、 這是最后一次循環執行的,當next為null,表示后面后面沒有了,要跳出while循環了,就是設置這是尾指針指向了,但是此時t.waitStatus != Node.CONDITION,不能設置 lastWaiter = t;所以設置為t的前置節點 lastWaiter = trail;

lastWaiter = trail;

}

else

trail = t; // t的上一個記錄到trail中

t = next; // t移動

}

}

對于unlinkCancelledWaiters()方法的解釋:記住上面的1 2 3 4 4.1 4.2 5 5.1 5.2 6就好。

附加問題:單鏈表基礎知識:trail是如何記錄t的上一節點的

回答:

如果 t= t.nextWaiter; 那么trail無法記錄t的上一個節點

于是我們將t的移動拆分開來

Node next = t.nextWaiter;

trail = t;

t = next; // 第一句和第三句實際上就是 t= t.nextWaiter;我們將其拆分開來,在t值還沒有修改的時候,在第二句的時候,執行 trail=t ,將t記錄下來

最后三句放在同一個while循環中就實時同步了

while (t != null){

Node next = t.nextWaiter;

trail = t;

t = next;

}

3.2.2 第二步,addConditionWaiter()返回存放當前線程的新節點后,將節點作為fullyRelease()方法的參數,這個fullyRelease()方法是要將新節點中存放的當前的線程進行解鎖,并返回savedState()

final int fullyRelease(Node node) {

boolean failed = true; // 標志位,默認失敗,為什么剛開始的時候默認失敗?因為剛開始的時候沒有執行,一定要執行之后才設置為成功

try {

int savedState = getState(); // 獲取當前類變量state,這個state是用來記錄當前線程的加鎖次數的(因為synchronized和lock都是可重入鎖,所以可以加鎖多次,)

if (release(savedState)) { // 如果釋放當前線程成功了,要阻塞,就是要進入等待隊列,就要釋放同步鎖

failed = false; // 不是失敗,成功了

return savedState; // 返回當前線程加鎖次數,為0就是完全解鎖成功了

} else {

throw new IllegalMonitorStateException();

}

} finally {

if (failed)

node.waitStatus = Node.CANCELLED;

}

}

public final boolean release(int arg) { // 參數就是線程加鎖次數

if (tryRelease(arg)) { // 釋放同步鎖 上一篇博客講過 參數是線程加鎖次數

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

解鎖的核心方法就是這個release()方法(fullyRelease()方法只是調用這個release()方法,由這個release()方法提供判斷,tryRelease()只是給這個release()方法提供判斷):

對于release()釋放同步鎖的邏輯總共有三種情況:

只有一個線程加鎖,沒有形成AQS隊列:

這個并發過程中,只有一個線程加鎖,所以AQS隊列沒有創建,這里if判斷不成立,就是tryRelease()判斷為false,release()方法直接返回false;

兩個線程加鎖,形成AQS隊列,當線程B解鎖的時候:

在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A解鎖完成了,AQS隊列中就只剩下線程B,然后線程B來解鎖,這個時候線程B就是AQS隊列的隊首元素,這個時候隊首線程B的waitStatus的值為0,if中的if也不會執行(tip:有了AQS隊列可以通過第一個if (tryRelease(arg)),但是 if (h != null && h.waitStatus != 0)判斷的時候 h != null h.waitStatus == 0,所以無法通過第二個if)整個方法返回true。

兩個線程加鎖,形成AQS隊列,當線程A解鎖的時候:

在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A先來解鎖,這個時候線程A就是AQS隊列的隊首元素,由于AQS隊列中有線程A和線程B兩個元素,這個時候隊首線程A的waitStatus的值不為0,if中的if執行,unparkSuccessor(h); 解鎖頭結點的下一個節點(tip:就是解鎖線程B),整個方法返回true。

這里解釋一下tryRelease(),很簡單

protected final boolean tryRelease(int releases) {

int c = getState() - releases; // 線程加鎖次數-線程加鎖次數=0

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false; // 默認釋放成功為false

if (c == 0) { // 加鎖次數為0

free = true; // 標志位釋放同步鎖成功

setExclusiveOwnerThread(null); // 獨占線程為null

}

setState(c); // 重新設置類變量state 線程加鎖次數

return free;

}

3.2.3 第三步,isOnSyncQueue()提供判斷,LockSupport.park(this);將當前的線程進行park(解釋:park就是阻塞,unpark就是解鎖)

final boolean isOnSyncQueue(Node node) {

// 當前處于等待狀態,而且同步隊列中沒有前驅者

if (node.waitStatus == Node.CONDITION || node.prev == null)

return false;

// 如果這個node節點在同步隊列中有后繼者,他一定在同步隊列中,

// prev和next是作用于同步隊列的指針,nextWaiter作用于等待隊列的指針

if (node.next != null)

return true;

// 上面兩個條件都不滿足,都被else了

// node.next==null node.prev!=null&&node.waitStatus != Node.CONDITION

return findNodeFromTail(node);

}

// 能夠進入這個方法的,一定是node.next==null node.prev!=null,

// 所以,在同步隊列中,從后往前遍歷,找到這個節點返回true,一直到最前面都沒找到,返回false

private boolean findNodeFromTail(Node node) {

Node t = tail;

for (;;) {

if (t == node)

return true;

if (t == null)

return false;

t = t.prev;

}

}

最后執行LockSupport.park(this),完成了將當前的線程進行park(解釋:park就是阻塞,unpark就是解鎖)。

四、signal()/signalAll()源碼

4.1 condition.notify()整體流程圖

我們再來看看signal方法和signalAll方法的源碼

大致的流程就是:某個線程調用signal方法或者signalAll方法,

第一步,將等待隊列中的節點放到AQS同步執行隊列中(每個節點Node里面存放著線程thread),具體地,signal方法會將當前的等待隊列中第一個等待的線程的節點加入到原來的AQS隊列中去,而signalAll方法是將等待隊列中的所有的等待線程的節點全部加入到原來的AQS的隊列中去。

第二步,獲取同步鎖:讓他們重新獲取鎖,進行unpark。

第三步,喚醒當前線程:線程被喚醒,執行對應的線程中代碼。

4.2 condition.notify()源碼解析

4.2.1 等待隊列中的節點放到AQS同步執行隊列中,每個節點Node里面存放著線程thread

4.2.1.1 等待隊列中的移除第一個節點

public final void signal() {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

Node first = firstWaiter; // 將等待隊列中頭指針指向的節點記錄下來,因為我們要刪去等待隊列中第一個節點

if (first != null)

doSignal(first);

}

private void doSignal(Node first) {

do {

if ( (firstWaiter = first.nextWaiter) == null) // 下一個等待元素為空,表示等待隊列中只有一個元素,因為這個元素被移除,就要置空等待隊列尾指針 lastWaiter = null;

lastWaiter = null;

first.nextWaiter = null;

} while (!transferForSignal(first) && (first = firstWaiter) != null);

}

對于doSignal()方法的解釋:對于等待隊列這個非循環單向鏈表的隊列,要刪除鏈表頭元素

需要修改等待隊列頭指針,指向當前等待隊列的下一個節點,即執行 firstWaiter = first.nextWaiter

需要置空當前等待隊列的第一個節點的nextWaiter指針,即執行 first.nextWaiter = null;

分為兩種情況:

第一種情況,等待隊列中只有一個節點,則執行過程為:

firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點

lastWaiter = null; // 沒有元素了,置空等待隊列尾指針

first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針

第二種情況,等待隊列中2-n個節點,則執行過程為:

firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點

first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針

transferForSignal(first) 方法,實參是first等待隊列隊首節點,表示等待隊列隊首刪除的節點,同步隊列隊尾添加的節點

transferForSignal(first) 要將等待隊列中移除的這個節點使用尾插法插入到同步隊列中,所以直接將這個first節點作為參數傳遞給transferForSignal()操作,記住,這個first節點就是被等待隊列移除的節點

final boolean transferForSignal(Node node) {

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

Node p = enq(node); // 將node節點這個被等待隊列移除的節點尾插法插入到的同步隊列中

int ws = p.waitStatus;

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

LockSupport.unpark(node.thread);

return true;

}

4.2.1.2 同步隊列中的添加這個節點:enq()

對于enq()兩種情況,

1、同步隊列中沒有節點,所以tail == null

2、同步隊列中有節點,所以tail != null

第一種情況:同步隊列中沒有節點,所以tail==null,程序執行如下:

Node t = tail; // 同步隊列中沒有節點,tail==null

compareAndSetHead(new Node()) //

tail = head;

t=tail; // 重新更新尾節點記錄

// 插入操作三步驟

node.prev = t;

compareAndSetTail(t, node)

t.next = node;

return t; // 返回尾節點前面的一個節點,當前同步隊列中一共兩個節點 t node,現在返回t

第二種情況:同步隊列中有節點,所以tail!=null,程序執行如下:

Node t = tail; // 同步隊列中沒有節點,tail!=null

// 插入操作三步驟

node.prev = t;

compareAndSetTail(t, node)

t.next = node;

return t; // 返回尾節點前面的一個節點,當前同步隊列中一共n個節點 node1 node2 ... t node,現在返回t

問題1:為什么enq()方法返回的是尾節點的前一個節點的狀態?

回答1:因為尾節點的前一個節點,就是插入前的尾節點啊,所有說enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點。

問題2:為什么enq()方法中tail==null,一定要新建一個節點?

回答2:接上面,因為enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點,因為要返回原來的尾節點,所有如果沒有原來的尾節點,就要新建一個節點當做原來的尾節點,為返回值服務。

4.2.2 獲取同步鎖(了解即可)

enq()方法給同步隊列隊尾添加節點后,不等transferForSignal()執行完,await()方法中的循環檢測很快就檢測到了,同步隊列中的有了剛剛被阻塞的節點(就是剛剛被阻塞的節點從阻塞隊列中出來了,到同步隊列中去了,所有這個這個節點里面的線程可以競爭同步鎖了,tryAcquire)

public final void await() throws InterruptedException {

if (Thread.interrupted())

throw new InterruptedException();

Node node = addConditionWaiter();

int savedState = fullyRelease(node);

int interruptMode = 0;

while (!isOnSyncQueue(node)) { // 當同步隊列中有了這個節點

LockSupport.park(this);

if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

break;

}

// acquireQueued(node, savedState)

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled

unlinkCancelledWaiters();

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

final boolean acquireQueued(final Node node, int arg) {

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) { // tryAcquire獲取同步鎖成功

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

4.2.3 喚醒當前線程(了解即可)

接上面,await()方法中被阻塞的節點放開后,可以參與同步鎖的搶占,CAS操作搶占同步鎖成功后,transferForSignal()方法里面喚醒node節點當前線程,即執行LockSupport.unpark(node.thread)語句。

final boolean transferForSignal(Node node) {

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

Node p = enq(node); // 將node節點這個被等待隊列移除的節點尾插法插入到的同步隊列中,返回尾節點的前一個節點

int ws = p.waitStatus; // 尾節點前一個節點狀態,上一個尾節點的狀態

if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果上一個尾節點ws==1,表示上一個節點已經被取消cannceled,或者上一個尾節點cas設置等待狀態失敗

LockSupport.unpark(node.thread); // 喚醒node線程重新同步

return true;

}

五、面試金手指

面試問題:lock機制是如何實現condition的await和signal的(因為synchronized直接使用Object類的wait()和notify()方法)?

回答:就是下面“面試金手指”部分的,都答上去就好了。

5.1 ConditionObject類和Node類

核心語言組織:AQS類和Node類

AQS本質是一個非循環的雙向鏈表(也可以稱為隊列),所以它是由一個個節點構成的,就是Node,后面的lock() unlock() await() signal()/signalAll()都是以Node為基本元素操作的,那么在這個Node類中需要保存什么信息呢?

回答:Node節點屬性包括七個(重點是前面五個)

volatile int waitStatus; //當前節點等待狀態

volatile Node prev; //上一個節點

volatile Node next; //下一個節點

volatile Thread thread; //節點中的值

Node nextWaiter; //下一個等待節點

static final Node SHARED = new Node(); //指示節點共享還是獨占,默認初始是共享

static final Node EXCLUSIVE = null;

處在同步隊列中使用到的屬性(加鎖、解鎖)包括:next prev thread waitStatus。

所以同步隊列是雙向非循環鏈表,涉及的類變量AbstractQueuedSynchronizer類中的head和tail,分別指向同步隊列中的頭結點和尾節點。

處在等待隊列中使用到的屬性(阻塞、喚醒)包括:nextWaiter thread waitStatus。

所以等待隊列是單向非循環鏈表,涉及的類變量ConditionObject類中的firstWaiter和lastWaiter,分別指向等待隊列中的頭結點和尾節點。

AQS隊列是工作隊列、同步隊列,是非循環雙向隊列:

當使用到head tail的時候,就說AQS隊列建立起來了,單個線程不使用到head tail,所以AQS隊列沒有建立起來;

等待隊列,是非循環單向隊列:

當使用firstWaiter lastWaiter的時候,就說等待隊列建立起來了。

lock()和unlock()就是操作同步隊列:

lock()將線程封裝到節點里面(此時,節點使用到的屬性是thread nextWaiter waitStatus),放到同步隊列,即AQS隊列中,unlock()將存放線程的節點從同步隊列中拿出來,表示這個線程工作完成。

await()和signal()就是操作等待隊列:

await()將線程封裝到節點里面(此時,節點使用到的屬性是thread prev next waitStatus),放到等待隊列里面,signal()從等待隊列中拿出元素。

問題:為什么負責同步隊列的head和tail在AbstractQueuedSynchronizer類中,但是負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中?

回答:

對于線程同步互斥,是直接通過ReentrantLock類對象 lock.lock() lock.unlock()實現的,而ReentrantLock類對象是調用AQS類實現加鎖解鎖的,所以負責同步隊列的head和tail在AbstractQueuedSynchronizer類中;

對于線程阻塞和喚醒,是通過ReentrantLock類對象lock.newCondition()得到一個對應,condition引用指向這個對象,然后condition.await() condition.signal()實現的,所以負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中。

5.2 condition.await()

5.2.1 核心-面試語言組織(等待隊列、釋放同步鎖、阻塞線程)

大致流程:第一個方法插入到等待隊列中,第二個方法釋放同步鎖,第三個方法阻塞當前線程,三個一體,不能分開,第二個方法先于第三個方法,先釋放同步鎖,再掛起線程,目的:為了避免當前線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況。

整體流程詳細:

第一步,如果某個線程的調用了await的方法,走來會將這個線程通過CAS和尾插法的方式將這個等待的線程添加到AQS的等待隊列中去(解釋:通過CAS和尾插法的方式是指:在cas保證線程安全的情況下,使用尾插法三步將這個線程放到一個Node結點中,插入到AQS的等待隊列),對應代碼 Node node = addConditionWaiter();

第二步,然后將當前的線程進行解鎖(解釋:對當前線程解鎖的目的是為了避免這個線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況),對應代碼 int savedState = fullyRelease(node);

第三步,然后將當前的線程進行park(解釋:park之后這個線程只能被動地等待其他的線程調用signal方法將當前的線程unpark),對應代碼 LockSupport.park(this);

小結: 第一個方法使用數據結構插入到等待隊列中,

第二個方法使用unpark釋放同步鎖:unparkSuccessor(h);

第三個方法使用park阻塞當前線程:LockSupport.park(this);

5.2.2 附加-應對面試官問題的解釋:等待隊列:addConditionWaiter()方法,三種情況

第一種情況,當前等待隊列中沒有節點(此時firstWaiter和tailWaiter都為null)

程序執行如下:

Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來

Node node = new Node(Thread.currentThread(), Node.CONDITION);

firstWaiter = node; // 因為現在等待隊列中就只有這一個剛剛新建的Node節點,所以,將負責等待隊列的首尾指針都指向這個節點

lastWaiter = node;

return node; // 返回當前線程新建好的這個節點

第二種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點)

執行程序如下:

Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來

Node node = new Node(Thread.currentThread(), Node.CONDITION);

t.nextWaiter = node; // 尾插法經典兩步:(1)當前節點下一個節點為新建節點;(2)等待隊列尾指針指向新建節點

lastWaiter = node;

return node; // 返回新加入等待隊列的節點

第三種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點),但是尾指針所指向節點不是在等待隊列中等待( t.waitStatus != Node.CONDITION)

執行程序如下:

Node t = lastWaiter; // 記錄尾指針所指向節點,為使用尾插法準備

unlinkCancelledWaiters(); //相對于第二種的特殊情況,這里需要處理

t = lastWaiter; //相對于第二種的特殊情況,這里需要處理

Node node = new Node(Thread.currentThread(), Node.CONDITION);

t.nextWaiter = node; // 尾插法經典兩步

lastWaiter = node;

return node;

執行程序如上,沒什么問題,看新增的兩句

unlinkCancelledWaiters();

// 解釋:解綁所有的處于取消狀態的等待者,

// 這個使用canceled,表示已取消狀態,這里使用watiers,表示不止一個

t = lastWaiter; // 解釋:重置一下t,繼續記錄新的尾巴指針指向的節點,為下面尾插法準備

特殊地,解釋unlinkCancelledWaiters()程序

private void unlinkCancelledWaiters() {

Node t = firstWaiter; // 1、記錄等待隊列中頭指針所指向節點

// 為什么這里記錄頭指針指向,因為等待隊列是非循環單鏈表,所以while循環刪除已取消結點,只能從頭結點開始遍歷

Node trail = null; // 2、局部變量trail,下面不斷移動t,用t來記錄當前節點,但是因為等待列表是單鏈表,所以無法記錄當前節點t的上一個節點,所有要在t還沒有移動時候,將當前t記錄下來放到trail中,然后t再移動

while (t != null) {

Node next = t.nextWaiter; // 3、準備移動,trail記錄t,單鏈表基本操作

if (t.waitStatus != Node.CONDITION) {

t.nextWaiter = null; //

if (trail == null) // 4、這是執行 trail=t 之前執行的,trail=t 執行之前,不斷向后移動,同時不斷修改頭指針firstWaiter

// 4.1 為什么trail=t 執行之前要不斷修改頭指針firstWaiter?

//因為t.waitStatus != Node.CONDITION,當前隊列不行,所以要不斷修改頭指針firstWaiter

firstWaiter = next; //唯一一個設置頭指針的地方,

// 4.2 為什么執行了trail=t之后就不要修改頭指針了?

// 因為只要找到了為Node.CONDITION的t,就不會刪除了,就是保留操作了,就是可以使用的等待隊列了

else // 這是執行 trail=t 之后執行的

trail.nextWaiter = next; // 5、執行了 trail=t 之后,trail -> t -> next,因為t.waitStatus != Node.CONDITION,所以要去掉t,就執行 trail.nextWaiter = next; 變為 trail -> next

//5.1 為什么trail=t 執行之前不需要刪除t,因為這時候trail==null

//5.2 為什么trail=t 執行之后要刪除t,因為這時候firstWaiter確定了,等待隊列確定了,當然要刪去不合法的t,t.waitStatus != Node.CONDITION

if (next == null) // 6、 這是最后一次循環執行的,當next為null,表示后面后面沒有了,要跳出while循環了,就是設置這是尾指針指向了,但是此時t.waitStatus != Node.CONDITION,不能設置 lastWaiter = t;所以設置為t的前置節點 lastWaiter = trail;

lastWaiter = trail;

}

else

trail = t; // t的上一個記錄到trail中

t = next; // t移動

}

}

5.2.3 附加-應對面試官問題的解釋:釋放同步鎖:release()方法,三種情況

解鎖的核心方法就是這個release()方法(fullyRelease()方法只是調用這個release()方法,由這個release()方法提供判斷,tryRelease()只是給這個release()方法提供判斷):

對于release()釋放同步鎖的邏輯總共有三種情況:

只有一個線程加鎖,沒有形成AQS隊列:

這個并發過程中,只有一個線程加鎖,所以AQS隊列沒有創建,這里if判斷不成立,就是tryRelease()判斷為false,release()方法直接返回false;

兩個線程加鎖,形成AQS隊列,當線程B解鎖的時候:

在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A解鎖完成了,AQS隊列中就只剩下線程B,然后線程B來解鎖,這個時候線程B就是AQS隊列的隊首元素,這個時候隊首線程B的waitStatus的值為0,if中的if也不會執行(tip:有了AQS隊列可以通過第一個if (tryRelease(arg)),但是 if (h != null && h.waitStatus != 0)判斷的時候 h != null h.waitStatus == 0,所以無法通過第二個if)整個方法返回true。

兩個線程加鎖,形成AQS隊列,當線程A解鎖的時候:

在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A先來解鎖,這個時候線程A就是AQS隊列的隊首元素,由于AQS隊列中有線程A和線程B兩個元素,這個時候隊首線程A的waitStatus的值不為0,if中的if執行,unparkSuccessor(h); 解鎖頭結點的下一個節點(tip:就是解鎖線程B),整個方法返回true。

5.2.4 附加-應對面試官問題的解釋:阻塞線程:isOnSyncQueue(),一種情況

final boolean isOnSyncQueue(Node node) {

// 當前處于等待狀態,而且同步隊列中沒有前驅者

if (node.waitStatus == Node.CONDITION || node.prev == null)

return false;

// 如果這個node節點在同步隊列中有后繼者,他一定在同步隊列中,

// prev和next是作用于同步隊列的指針,nextWaiter作用于等待隊列的指針

if (node.next != null)

return true;

// 上面兩個條件都不滿足,都被else了

// node.next==null node.prev!=null&&node.waitStatus != Node.CONDITION

return findNodeFromTail(node);

}

// 能夠進入這個方法的,一定是node.next==null node.prev!=null,

// 所以,在同步隊列中,從后往前遍歷,找到這個節點返回true,一直到最前面都沒找到,返回false

private boolean findNodeFromTail(Node node) {

Node t = tail;

for (;;) {

if (t == node)

return true;

if (t == null)

return false;

t = t.prev;

}

}

最后執行LockSupport.park(this),完成了將當前的線程進行park(解釋:park就是阻塞,unpark就是解鎖)。

5.3 condition.notify()

5.3.1 核心-面試語言組織(等待隊列、獲取同步鎖、喚醒線程)

第一步,等待隊列中的節點放到AQS同步執行隊列中,節點Node里面存放著線程thread,其中,signal方法會將當前的等待隊列中第一個等待的線程的節點加入到原來的AQS隊列中去,而signalAll方法是將等待隊列中的所有的等待線程的節點全部加入到原來的AQS的隊列中去

第二步,獲取同步鎖

第三步,喚醒當前線程

5.3.2 附加-應對面試官問題的解釋:等待隊列中的節點放到AQS同步執行隊列中,一種情況,兩個步驟

1、等待隊列刪去節點

分為兩種情況:

如果等待隊列中只有一個節點

firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點

lastWaiter = null; // 沒有元素了,置空等待隊列尾指針

first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針

如果等待隊列中2-n個節點

firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點

first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針

2、enq():同步隊列尾插法添加節點

first節點:等待隊列隊首刪除的節點,同步隊列隊尾添加的節點:transferForSignal(first) 要將等待隊列中移除的這個節點使用尾插法插入到同步隊列中,所以直接將這個first節點作為參數傳遞給transferForSignal()操作,記住,這個first節點就是被等待隊列移除的節點

同步隊列中的添加這個節點:enq()

對于enq()兩種情況,

1、同步隊列中沒有節點,所以tail==null

2、同步隊列中有節點,所以tail!=null

第一種情況:同步隊列中沒有節點,所以tail==null

程序執行如下:

Node t = tail; // 同步隊列中沒有節點,tail==null

compareAndSetHead(new Node()) //

tail = head;

t=tail; // 重新更新尾節點記錄

// 插入操作三步驟

node.prev = t;

compareAndSetTail(t, node)

t.next = node;

return t; // 返回尾節點前面的一個節點,當前同步隊列中一共兩個節點 t node,現在返回t

第二種情況:同步隊列中有節點,所以tail!=null

程序執行如下:

Node t = tail; // 同步隊列中沒有節點,tail!=null

// 插入操作三步驟

node.prev = t;

compareAndSetTail(t, node)

t.next = node;

return t; // 返回尾節點前面的一個節點,當前同步隊列中一共n個節點 node1 node2 ... t node,現在返回t

問題1:為什么enq()方法返回的是尾節點的前一個節點的狀態?

回答1:因為尾節點的前一個節點,就是插入前的尾節點啊,所有說enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點

問題2:為什么enq()方法中tail==null,一定要新建一個節點?

回答2:接上面,因為enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點,因為要返回原來的尾節點,所有如果沒有原來的尾節點,就要新建一個節點當做原來的尾節點,為返回值服務

5.3.3 附加-應對面試官問題的解釋:獲取同步鎖(簡單,略)

enq()方法為工作隊列添加節點后,不等transferForSignal()執行完,await()方法中的循環檢測很快就檢測到了,同步隊列中的有了剛剛被阻塞的節點(就是剛剛被阻塞的節點從阻塞隊列中出來了,到同步隊列中去了,所有這個這個節點里面的線程可以競爭同步鎖了,tryAcquire)

5.3.4 附加-應對面試官問題的解釋:喚醒當前線程(簡單,略)

await()搶占同步鎖后,transferForSignal()方法里面喚醒node節點當前線程

六、小結

原理層面:ReentrantLock中await()與signal()/signalAll()(核心:ConditionObject中的等待隊列),完成了。

天天打碼,天天進步!!!

總結

以上是生活随笔為你收集整理的java await signal_【Java并发008】原理层面:ReentrantLock中 await()、signal()/signalAll()全解析...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。