java await signal_【Java并发008】原理层面:ReentrantLock中 await()、signal()/signalAll()全解析...
一、前言
上篇的文章中我們介紹了AQS源碼中lock方法和unlock方法,這兩個方法主要是用來解決并發中互斥的問題,這篇文章我們主要介紹AQS中用來解決線程同步問題的await方法、signal方法和signalAll方法,這幾個方法主要對應的是synchronized中的wait方法、notify方法和notifAll方法。
二、使用層面:await()與signal()/signalAll()(了解即可)
2.1 使用層面:await()與signal()/signalAll()(了解即可)
我們實現一個阻塞的隊列。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyBlockedQueue {
final Lock lock = new ReentrantLock();
// 條件變量:隊列不滿
final Condition notFull = lock.newCondition();
// 條件變量:隊列不空
final Condition notEmpty = lock.newCondition();
private volatile List list = new ArrayList<>();
// 入隊
void enq(T x) {
lock.lock();
try {
while (list.size() == 10) {
// 等待隊列不滿
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 省略入隊操作
list.add(x);
// 入隊后, 通知可出隊
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 出隊
void deq() {
lock.lock();
try {
while (list.isEmpty()) {
// 等待隊列不空
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove(0);
// 出隊后,通知可入隊
notFull.signal();
} finally {
lock.unlock();
}
}
public List getList() {
return list;
}
public static void main(String[] args) throws InterruptedException {
MyBlockedQueue myBlockedQueue = new MyBlockedQueue<>();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 20; i++) {
myBlockedQueue.enq(i);
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
myBlockedQueue.deq();
}
}
});
thread1.start();
thread2.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Arrays.toString(myBlockedQueue.getList().toArray()));
}
}
運行的結果如下(輸出的是后面的10位):
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
我們可以看到condition在多線程的中使用,類似于實現了線程之前的通信:
當某個條件滿足的時候,執行某個線程中操作;
當某個條件不滿足的時候,將當前的線程掛起,等待這個條件滿足的時候,其他的線程喚醒當前線程。
2.2 ConditionObject類和Node類
2.2.1 ConditionObject類(屬性+方法)
ConditionObject類的屬性
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L; // 實現Serializable接口,顯式指定序列化字段
private transient Node firstWaiter; // 作為Node類型,指向等待隊列中第一個節點
private transient Node lastWaiter; // 作為Node類型,指向等待隊列中最后一個節點
private static final int REINTERRUPT = 1; // reinterrupt 設置為final變量,命名易懂
private static final int THROW_IE = -1; // throw InterruptedException
}
ConditionObject類的方法
2.2.2 Node類
對于Node節點,屬性包括七個(重點是前面五個)
volatile int waitStatus; //當前節點等待狀態
volatile Node prev; //上一個節點
volatile Node next; //下一個節點
volatile Thread thread; //節點中的值
Node nextWaiter; //下一個等待節點
static final Node SHARED = new Node(); //指示節點共享還是獨占,默認初始是共享
static final Node EXCLUSIVE = null;
處在同步隊列中使用到的屬性(加鎖、解鎖)包括:next prev thread waitStatus。
所以同步隊列是雙向非循環鏈表,涉及的類變量AbstractQueuedSynchronizer類中的head和tail,分別指向同步隊列中的頭結點和尾節點。
處在等待隊列中使用到的屬性(阻塞、喚醒)包括:nextWaiter thread waitStatus。
所以等待隊列是單向非循環鏈表,涉及的類變量ConditionObject類中的firstWaiter和lastWaiter,分別指向等待隊列中的頭結點和尾節點。
AQS隊列是工作隊列、同步隊列,是非循環雙向隊列:
當使用到head tail的時候,就說AQS隊列建立起來了,單個線程不使用到head tail,所以AQS隊列沒有建立起來;
等待隊列,是非循環單向隊列:
當使用firstWaiter lastWaiter的時候,就說等待隊列建立起來了。
lock()和unlock()就是操作同步隊列:
lock()將線程封裝到節點里面(此時,節點使用到的屬性是thread nextWaiter waitStatus),放到同步隊列,即AQS隊列中,unlock()將存放線程的節點從同步隊列中拿出來,表示這個線程工作完成。
await()和signal()就是操作等待隊列:
await()將線程封裝到節點里面(此時,節點使用到的屬性是thread prev next waitStatus),放到等待隊列里面,signal()從等待隊列中拿出元素。
問題:為什么負責同步隊列的head和tail在AbstractQueuedSynchronizer類中,但是負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中?
回答:
對于線程同步互斥,是直接通過ReentrantLock類對象 lock.lock() lock.unlock()實現的,而ReentrantLock類對象是調用AQS類實現加鎖解鎖的,所以負責同步隊列的head和tail在AbstractQueuedSynchronizer類中;
對于線程阻塞和喚醒,是通過ReentrantLock類對象lock.newCondition()得到一個對應,condition引用指向這個對象,然后condition.await() condition.signal()實現的,所以負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中。
三、await()源碼
3.1 Condition.await()執行圖
我們再來看看await的源碼,具體如下圖:
對于上圖的解釋:
第一個方法插入到等待隊列中,第二個方法釋放同步鎖,第三個方法阻塞當前線程,三個一體,不能分開。如第二個方法先于第三個方法,先釋放同步鎖,再掛起線程,目的:為了避免當前線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況。
整體流程詳細:
第一步,如果某個線程的調用了await的方法,走來會將這個線程通過CAS和尾插法的方式將這個等待的線程添加到AQS的等待隊列中去(解釋:通過CAS和尾插法的方式是指:在cas保證線程安全的情況下,使用尾插法三步將這個線程放到一個Node結點中,插入到AQS的等待隊列),對應代碼 Node node = addConditionWaiter();
第二步,然后將當前的線程進行解鎖(解釋:對當前線程解鎖的目的是為了避免這個線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況),對應代碼 int savedState = fullyRelease(node);
第三步,然后將當前的線程進行park(解釋:park之后這個線程只能被動地等待其他的線程調用signal方法將當前的線程unpark),對應代碼 LockSupport.park(this);
while (!isOnSyncQueue(node)) { // addConditionWaiter()返回的node,不在同步隊列中,就park
LockSupport.park(this); // 將當前的線程進行park,this表示AbstractQueuedSynchronizer對象,表示當前線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
第一個方法使用數據結構插入到等待隊列中;
第二個方法使用unpark釋放同步鎖:unparkSuccessor(h);
第三個方法使用park阻塞當前線程:LockSupport.park(this);
3.2 condition.await()源碼解析
3.2.1 第一步,addConditionWaiter()添加節點到等待隊列中
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 新建一個節點,節點存放當前線程,狀態設置為正在等待條件CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
addConditionWaiter()三種情況:
第一種情況,當前等待隊列中沒有節點(此時firstWaiter和tailWaiter都為null)
第二種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點)
第三種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點),但是尾指針所指向節點不是在等待隊列中等待( t.waitStatus != Node.CONDITION)
3.2.1.1 第一種情況,當前等待隊列中沒有節點(此時firstWaiter和tailWaiter都為null)
程序執行如下:
Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來
Node node = new Node(Thread.currentThread(), Node.CONDITION);
firstWaiter = node; // 因為現在等待隊列中就只有這一個剛剛新建的Node節點,所以,將負責等待隊列的首尾指針都指向這個節點
lastWaiter = node;
return node; // 返回當前線程新建好的這個節點
程序執行如上,先將lastWaiter記錄下來(因為要采用尾插法,先將尾指針記錄下來)
使用當前線程新建一個節點,這個新節點Node的thread屬性為當前線程(表示這個節點存放的就是當前線程,將當前線程放到一個節點中,然后這個節點放入等待隊列中),waitStatus=Condition(-2),
尾插法經典兩步:第一次進入的時候lastWaiter==null(表示當前等待隊列中沒有節點),因為現在等待隊列中就只有這一個剛剛新建的Node節點,所以,將負責等待隊列的首尾指針都指向這個節點( firstWaiter = node; lastWaiter = node;),
最后返回當前線程新建好的這個節點。
3.2.1.2 第二種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點)
Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來
Node node = new Node(Thread.currentThread(), Node.CONDITION);
t.nextWaiter = node; // 尾插法經典兩步:(1)當前節點下一個節點為新建節點;(2)等待隊列尾指針指向新建節點
lastWaiter = node;
return node; // 返回新加入等待隊列的節點
程序執行為如上,先將lastWaiter記錄下來(因為要采用尾插法,先將尾指針記錄下來)
使用當前線程新建一個節點
尾插法經典兩步:(1)當前節點下一個節點為新建節點;(2)等待隊列尾指針指向新建節點
最后返回新加入等待隊列的節點(里面存放當前線程)。
3.2.1.3 第三種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點),但是尾指針所指向節點不是在等待隊列中等待( t.waitStatus != Node.CONDITION)
執行程序如下:
Node t = lastWaiter; // 記錄尾指針所指向節點,為使用尾插法準備
unlinkCancelledWaiters(); //相對于第二種的特殊情況,這里需要處理
t = lastWaiter; //相對于第二種的特殊情況,這里需要處理
Node node = new Node(Thread.currentThread(), Node.CONDITION);
t.nextWaiter = node; // 尾插法經典兩步
lastWaiter = node;
return node;
執行程序如上,沒什么問題,看新增的兩句
unlinkCancelledWaiters();
// 解釋:解綁所有的處于取消狀態的等待者,
// 這個使用canceled,表示已取消狀態,這里使用watiers,表示不止一個
t = lastWaiter; // 解釋:重置一下t,繼續記錄新的尾巴指針指向的節點,為下面尾插法準備
解釋unlinkCancelledWaiters()程序
private void unlinkCancelledWaiters() {
Node t = firstWaiter; // 1、記錄等待隊列中頭指針所指向節點
// 為什么這里記錄頭指針指向,因為等待隊列是非循環單鏈表,所以while循環刪除已取消結點,只能從頭結點開始遍歷
Node trail = null; // 2、局部變量trail,下面不斷移動t,用t來記錄當前節點,但是因為等待列表是單鏈表,所以無法記錄當前節點t的上一個節點,所有要在t還沒有移動時候,將當前t記錄下來放到trail中,然后t再移動
while (t != null) {
Node next = t.nextWaiter; // 3、準備移動,trail記錄t,單鏈表基本操作
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null; //
if (trail == null) // 4、這是執行 trail=t 之前執行的,trail=t 執行之前,不斷向后移動,同時不斷修改頭指針firstWaiter
// 4.1 為什么trail=t 執行之前要不斷修改頭指針firstWaiter?
//因為t.waitStatus != Node.CONDITION,當前隊列不行,所以要不斷修改頭指針firstWaiter
firstWaiter = next; //唯一一個設置頭指針的地方,
// 4.2 為什么執行了trail=t之后就不要修改頭指針了?
// 因為只要找到了為Node.CONDITION的t,就不會刪除了,就是保留操作了,就是可以使用的等待隊列了
else // 這是執行 trail=t 之后執行的
trail.nextWaiter = next; // 5、執行了 trail=t 之后,trail -> t -> next,因為t.waitStatus != Node.CONDITION,所以要去掉t,就執行 trail.nextWaiter = next; 變為 trail -> next
//5.1 為什么trail=t 執行之前不需要刪除t,因為這時候trail==null
//5.2 為什么trail=t 執行之后要刪除t,因為這時候firstWaiter確定了,等待隊列確定了,當然要刪去不合法的t,t.waitStatus != Node.CONDITION
if (next == null) // 6、 這是最后一次循環執行的,當next為null,表示后面后面沒有了,要跳出while循環了,就是設置這是尾指針指向了,但是此時t.waitStatus != Node.CONDITION,不能設置 lastWaiter = t;所以設置為t的前置節點 lastWaiter = trail;
lastWaiter = trail;
}
else
trail = t; // t的上一個記錄到trail中
t = next; // t移動
}
}
對于unlinkCancelledWaiters()方法的解釋:記住上面的1 2 3 4 4.1 4.2 5 5.1 5.2 6就好。
附加問題:單鏈表基礎知識:trail是如何記錄t的上一節點的
回答:
如果 t= t.nextWaiter; 那么trail無法記錄t的上一個節點
于是我們將t的移動拆分開來
Node next = t.nextWaiter;
trail = t;
t = next; // 第一句和第三句實際上就是 t= t.nextWaiter;我們將其拆分開來,在t值還沒有修改的時候,在第二句的時候,執行 trail=t ,將t記錄下來
最后三句放在同一個while循環中就實時同步了
while (t != null){
Node next = t.nextWaiter;
trail = t;
t = next;
}
3.2.2 第二步,addConditionWaiter()返回存放當前線程的新節點后,將節點作為fullyRelease()方法的參數,這個fullyRelease()方法是要將新節點中存放的當前的線程進行解鎖,并返回savedState()
final int fullyRelease(Node node) {
boolean failed = true; // 標志位,默認失敗,為什么剛開始的時候默認失敗?因為剛開始的時候沒有執行,一定要執行之后才設置為成功
try {
int savedState = getState(); // 獲取當前類變量state,這個state是用來記錄當前線程的加鎖次數的(因為synchronized和lock都是可重入鎖,所以可以加鎖多次,)
if (release(savedState)) { // 如果釋放當前線程成功了,要阻塞,就是要進入等待隊列,就要釋放同步鎖
failed = false; // 不是失敗,成功了
return savedState; // 返回當前線程加鎖次數,為0就是完全解鎖成功了
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) { // 參數就是線程加鎖次數
if (tryRelease(arg)) { // 釋放同步鎖 上一篇博客講過 參數是線程加鎖次數
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
解鎖的核心方法就是這個release()方法(fullyRelease()方法只是調用這個release()方法,由這個release()方法提供判斷,tryRelease()只是給這個release()方法提供判斷):
對于release()釋放同步鎖的邏輯總共有三種情況:
只有一個線程加鎖,沒有形成AQS隊列:
這個并發過程中,只有一個線程加鎖,所以AQS隊列沒有創建,這里if判斷不成立,就是tryRelease()判斷為false,release()方法直接返回false;
兩個線程加鎖,形成AQS隊列,當線程B解鎖的時候:
在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A解鎖完成了,AQS隊列中就只剩下線程B,然后線程B來解鎖,這個時候線程B就是AQS隊列的隊首元素,這個時候隊首線程B的waitStatus的值為0,if中的if也不會執行(tip:有了AQS隊列可以通過第一個if (tryRelease(arg)),但是 if (h != null && h.waitStatus != 0)判斷的時候 h != null h.waitStatus == 0,所以無法通過第二個if)整個方法返回true。
兩個線程加鎖,形成AQS隊列,當線程A解鎖的時候:
在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A先來解鎖,這個時候線程A就是AQS隊列的隊首元素,由于AQS隊列中有線程A和線程B兩個元素,這個時候隊首線程A的waitStatus的值不為0,if中的if執行,unparkSuccessor(h); 解鎖頭結點的下一個節點(tip:就是解鎖線程B),整個方法返回true。
這里解釋一下tryRelease(),很簡單
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 線程加鎖次數-線程加鎖次數=0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false; // 默認釋放成功為false
if (c == 0) { // 加鎖次數為0
free = true; // 標志位釋放同步鎖成功
setExclusiveOwnerThread(null); // 獨占線程為null
}
setState(c); // 重新設置類變量state 線程加鎖次數
return free;
}
3.2.3 第三步,isOnSyncQueue()提供判斷,LockSupport.park(this);將當前的線程進行park(解釋:park就是阻塞,unpark就是解鎖)
final boolean isOnSyncQueue(Node node) {
// 當前處于等待狀態,而且同步隊列中沒有前驅者
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果這個node節點在同步隊列中有后繼者,他一定在同步隊列中,
// prev和next是作用于同步隊列的指針,nextWaiter作用于等待隊列的指針
if (node.next != null)
return true;
// 上面兩個條件都不滿足,都被else了
// node.next==null node.prev!=null&&node.waitStatus != Node.CONDITION
return findNodeFromTail(node);
}
// 能夠進入這個方法的,一定是node.next==null node.prev!=null,
// 所以,在同步隊列中,從后往前遍歷,找到這個節點返回true,一直到最前面都沒找到,返回false
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
最后執行LockSupport.park(this),完成了將當前的線程進行park(解釋:park就是阻塞,unpark就是解鎖)。
四、signal()/signalAll()源碼
4.1 condition.notify()整體流程圖
我們再來看看signal方法和signalAll方法的源碼
大致的流程就是:某個線程調用signal方法或者signalAll方法,
第一步,將等待隊列中的節點放到AQS同步執行隊列中(每個節點Node里面存放著線程thread),具體地,signal方法會將當前的等待隊列中第一個等待的線程的節點加入到原來的AQS隊列中去,而signalAll方法是將等待隊列中的所有的等待線程的節點全部加入到原來的AQS的隊列中去。
第二步,獲取同步鎖:讓他們重新獲取鎖,進行unpark。
第三步,喚醒當前線程:線程被喚醒,執行對應的線程中代碼。
4.2 condition.notify()源碼解析
4.2.1 等待隊列中的節點放到AQS同步執行隊列中,每個節點Node里面存放著線程thread
4.2.1.1 等待隊列中的移除第一個節點
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 將等待隊列中頭指針指向的節點記錄下來,因為我們要刪去等待隊列中第一個節點
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) // 下一個等待元素為空,表示等待隊列中只有一個元素,因為這個元素被移除,就要置空等待隊列尾指針 lastWaiter = null;
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
對于doSignal()方法的解釋:對于等待隊列這個非循環單向鏈表的隊列,要刪除鏈表頭元素
需要修改等待隊列頭指針,指向當前等待隊列的下一個節點,即執行 firstWaiter = first.nextWaiter
需要置空當前等待隊列的第一個節點的nextWaiter指針,即執行 first.nextWaiter = null;
分為兩種情況:
第一種情況,等待隊列中只有一個節點,則執行過程為:
firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點
lastWaiter = null; // 沒有元素了,置空等待隊列尾指針
first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針
第二種情況,等待隊列中2-n個節點,則執行過程為:
firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點
first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針
transferForSignal(first) 方法,實參是first等待隊列隊首節點,表示等待隊列隊首刪除的節點,同步隊列隊尾添加的節點
transferForSignal(first) 要將等待隊列中移除的這個節點使用尾插法插入到同步隊列中,所以直接將這個first節點作為參數傳遞給transferForSignal()操作,記住,這個first節點就是被等待隊列移除的節點
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 將node節點這個被等待隊列移除的節點尾插法插入到的同步隊列中
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
4.2.1.2 同步隊列中的添加這個節點:enq()
對于enq()兩種情況,
1、同步隊列中沒有節點,所以tail == null
2、同步隊列中有節點,所以tail != null
第一種情況:同步隊列中沒有節點,所以tail==null,程序執行如下:
Node t = tail; // 同步隊列中沒有節點,tail==null
compareAndSetHead(new Node()) //
tail = head;
t=tail; // 重新更新尾節點記錄
// 插入操作三步驟
node.prev = t;
compareAndSetTail(t, node)
t.next = node;
return t; // 返回尾節點前面的一個節點,當前同步隊列中一共兩個節點 t node,現在返回t
第二種情況:同步隊列中有節點,所以tail!=null,程序執行如下:
Node t = tail; // 同步隊列中沒有節點,tail!=null
// 插入操作三步驟
node.prev = t;
compareAndSetTail(t, node)
t.next = node;
return t; // 返回尾節點前面的一個節點,當前同步隊列中一共n個節點 node1 node2 ... t node,現在返回t
問題1:為什么enq()方法返回的是尾節點的前一個節點的狀態?
回答1:因為尾節點的前一個節點,就是插入前的尾節點啊,所有說enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點。
問題2:為什么enq()方法中tail==null,一定要新建一個節點?
回答2:接上面,因為enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點,因為要返回原來的尾節點,所有如果沒有原來的尾節點,就要新建一個節點當做原來的尾節點,為返回值服務。
4.2.2 獲取同步鎖(了解即可)
enq()方法給同步隊列隊尾添加節點后,不等transferForSignal()執行完,await()方法中的循環檢測很快就檢測到了,同步隊列中的有了剛剛被阻塞的節點(就是剛剛被阻塞的節點從阻塞隊列中出來了,到同步隊列中去了,所有這個這個節點里面的線程可以競爭同步鎖了,tryAcquire)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 當同步隊列中有了這個節點
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// acquireQueued(node, savedState)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // tryAcquire獲取同步鎖成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4.2.3 喚醒當前線程(了解即可)
接上面,await()方法中被阻塞的節點放開后,可以參與同步鎖的搶占,CAS操作搶占同步鎖成功后,transferForSignal()方法里面喚醒node節點當前線程,即執行LockSupport.unpark(node.thread)語句。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); // 將node節點這個被等待隊列移除的節點尾插法插入到的同步隊列中,返回尾節點的前一個節點
int ws = p.waitStatus; // 尾節點前一個節點狀態,上一個尾節點的狀態
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果上一個尾節點ws==1,表示上一個節點已經被取消cannceled,或者上一個尾節點cas設置等待狀態失敗
LockSupport.unpark(node.thread); // 喚醒node線程重新同步
return true;
}
五、面試金手指
面試問題:lock機制是如何實現condition的await和signal的(因為synchronized直接使用Object類的wait()和notify()方法)?
回答:就是下面“面試金手指”部分的,都答上去就好了。
5.1 ConditionObject類和Node類
核心語言組織:AQS類和Node類
AQS本質是一個非循環的雙向鏈表(也可以稱為隊列),所以它是由一個個節點構成的,就是Node,后面的lock() unlock() await() signal()/signalAll()都是以Node為基本元素操作的,那么在這個Node類中需要保存什么信息呢?
回答:Node節點屬性包括七個(重點是前面五個)
volatile int waitStatus; //當前節點等待狀態
volatile Node prev; //上一個節點
volatile Node next; //下一個節點
volatile Thread thread; //節點中的值
Node nextWaiter; //下一個等待節點
static final Node SHARED = new Node(); //指示節點共享還是獨占,默認初始是共享
static final Node EXCLUSIVE = null;
處在同步隊列中使用到的屬性(加鎖、解鎖)包括:next prev thread waitStatus。
所以同步隊列是雙向非循環鏈表,涉及的類變量AbstractQueuedSynchronizer類中的head和tail,分別指向同步隊列中的頭結點和尾節點。
處在等待隊列中使用到的屬性(阻塞、喚醒)包括:nextWaiter thread waitStatus。
所以等待隊列是單向非循環鏈表,涉及的類變量ConditionObject類中的firstWaiter和lastWaiter,分別指向等待隊列中的頭結點和尾節點。
AQS隊列是工作隊列、同步隊列,是非循環雙向隊列:
當使用到head tail的時候,就說AQS隊列建立起來了,單個線程不使用到head tail,所以AQS隊列沒有建立起來;
等待隊列,是非循環單向隊列:
當使用firstWaiter lastWaiter的時候,就說等待隊列建立起來了。
lock()和unlock()就是操作同步隊列:
lock()將線程封裝到節點里面(此時,節點使用到的屬性是thread nextWaiter waitStatus),放到同步隊列,即AQS隊列中,unlock()將存放線程的節點從同步隊列中拿出來,表示這個線程工作完成。
await()和signal()就是操作等待隊列:
await()將線程封裝到節點里面(此時,節點使用到的屬性是thread prev next waitStatus),放到等待隊列里面,signal()從等待隊列中拿出元素。
問題:為什么負責同步隊列的head和tail在AbstractQueuedSynchronizer類中,但是負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中?
回答:
對于線程同步互斥,是直接通過ReentrantLock類對象 lock.lock() lock.unlock()實現的,而ReentrantLock類對象是調用AQS類實現加鎖解鎖的,所以負責同步隊列的head和tail在AbstractQueuedSynchronizer類中;
對于線程阻塞和喚醒,是通過ReentrantLock類對象lock.newCondition()得到一個對應,condition引用指向這個對象,然后condition.await() condition.signal()實現的,所以負責等待隊列的firstWaiter和lastWaiter在ConditionObject類中。
5.2 condition.await()
5.2.1 核心-面試語言組織(等待隊列、釋放同步鎖、阻塞線程)
大致流程:第一個方法插入到等待隊列中,第二個方法釋放同步鎖,第三個方法阻塞當前線程,三個一體,不能分開,第二個方法先于第三個方法,先釋放同步鎖,再掛起線程,目的:為了避免當前線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況。
整體流程詳細:
第一步,如果某個線程的調用了await的方法,走來會將這個線程通過CAS和尾插法的方式將這個等待的線程添加到AQS的等待隊列中去(解釋:通過CAS和尾插法的方式是指:在cas保證線程安全的情況下,使用尾插法三步將這個線程放到一個Node結點中,插入到AQS的等待隊列),對應代碼 Node node = addConditionWaiter();
第二步,然后將當前的線程進行解鎖(解釋:對當前線程解鎖的目的是為了避免這個線程沒有釋放的鎖的時候,然后就被掛起,從而導致其他的線程獲取不到鎖,亦或者導致死鎖的情況),對應代碼 int savedState = fullyRelease(node);
第三步,然后將當前的線程進行park(解釋:park之后這個線程只能被動地等待其他的線程調用signal方法將當前的線程unpark),對應代碼 LockSupport.park(this);
小結: 第一個方法使用數據結構插入到等待隊列中,
第二個方法使用unpark釋放同步鎖:unparkSuccessor(h);
第三個方法使用park阻塞當前線程:LockSupport.park(this);
5.2.2 附加-應對面試官問題的解釋:等待隊列:addConditionWaiter()方法,三種情況
第一種情況,當前等待隊列中沒有節點(此時firstWaiter和tailWaiter都為null)
程序執行如下:
Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來
Node node = new Node(Thread.currentThread(), Node.CONDITION);
firstWaiter = node; // 因為現在等待隊列中就只有這一個剛剛新建的Node節點,所以,將負責等待隊列的首尾指針都指向這個節點
lastWaiter = node;
return node; // 返回當前線程新建好的這個節點
第二種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點)
執行程序如下:
Node t = lastWaiter; // 因為要采用尾插法,先將尾指針記錄下來
Node node = new Node(Thread.currentThread(), Node.CONDITION);
t.nextWaiter = node; // 尾插法經典兩步:(1)當前節點下一個節點為新建節點;(2)等待隊列尾指針指向新建節點
lastWaiter = node;
return node; // 返回新加入等待隊列的節點
第三種情況,當前等待隊列中1-n個節點(此時firstWaiter和tailWaiter都不為null,如果一個節點,則首尾指針都指向這個節點,如果大于一個節點,則首尾指針指向相應的節點),但是尾指針所指向節點不是在等待隊列中等待( t.waitStatus != Node.CONDITION)
執行程序如下:
Node t = lastWaiter; // 記錄尾指針所指向節點,為使用尾插法準備
unlinkCancelledWaiters(); //相對于第二種的特殊情況,這里需要處理
t = lastWaiter; //相對于第二種的特殊情況,這里需要處理
Node node = new Node(Thread.currentThread(), Node.CONDITION);
t.nextWaiter = node; // 尾插法經典兩步
lastWaiter = node;
return node;
執行程序如上,沒什么問題,看新增的兩句
unlinkCancelledWaiters();
// 解釋:解綁所有的處于取消狀態的等待者,
// 這個使用canceled,表示已取消狀態,這里使用watiers,表示不止一個
t = lastWaiter; // 解釋:重置一下t,繼續記錄新的尾巴指針指向的節點,為下面尾插法準備
特殊地,解釋unlinkCancelledWaiters()程序
private void unlinkCancelledWaiters() {
Node t = firstWaiter; // 1、記錄等待隊列中頭指針所指向節點
// 為什么這里記錄頭指針指向,因為等待隊列是非循環單鏈表,所以while循環刪除已取消結點,只能從頭結點開始遍歷
Node trail = null; // 2、局部變量trail,下面不斷移動t,用t來記錄當前節點,但是因為等待列表是單鏈表,所以無法記錄當前節點t的上一個節點,所有要在t還沒有移動時候,將當前t記錄下來放到trail中,然后t再移動
while (t != null) {
Node next = t.nextWaiter; // 3、準備移動,trail記錄t,單鏈表基本操作
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null; //
if (trail == null) // 4、這是執行 trail=t 之前執行的,trail=t 執行之前,不斷向后移動,同時不斷修改頭指針firstWaiter
// 4.1 為什么trail=t 執行之前要不斷修改頭指針firstWaiter?
//因為t.waitStatus != Node.CONDITION,當前隊列不行,所以要不斷修改頭指針firstWaiter
firstWaiter = next; //唯一一個設置頭指針的地方,
// 4.2 為什么執行了trail=t之后就不要修改頭指針了?
// 因為只要找到了為Node.CONDITION的t,就不會刪除了,就是保留操作了,就是可以使用的等待隊列了
else // 這是執行 trail=t 之后執行的
trail.nextWaiter = next; // 5、執行了 trail=t 之后,trail -> t -> next,因為t.waitStatus != Node.CONDITION,所以要去掉t,就執行 trail.nextWaiter = next; 變為 trail -> next
//5.1 為什么trail=t 執行之前不需要刪除t,因為這時候trail==null
//5.2 為什么trail=t 執行之后要刪除t,因為這時候firstWaiter確定了,等待隊列確定了,當然要刪去不合法的t,t.waitStatus != Node.CONDITION
if (next == null) // 6、 這是最后一次循環執行的,當next為null,表示后面后面沒有了,要跳出while循環了,就是設置這是尾指針指向了,但是此時t.waitStatus != Node.CONDITION,不能設置 lastWaiter = t;所以設置為t的前置節點 lastWaiter = trail;
lastWaiter = trail;
}
else
trail = t; // t的上一個記錄到trail中
t = next; // t移動
}
}
5.2.3 附加-應對面試官問題的解釋:釋放同步鎖:release()方法,三種情況
解鎖的核心方法就是這個release()方法(fullyRelease()方法只是調用這個release()方法,由這個release()方法提供判斷,tryRelease()只是給這個release()方法提供判斷):
對于release()釋放同步鎖的邏輯總共有三種情況:
只有一個線程加鎖,沒有形成AQS隊列:
這個并發過程中,只有一個線程加鎖,所以AQS隊列沒有創建,這里if判斷不成立,就是tryRelease()判斷為false,release()方法直接返回false;
兩個線程加鎖,形成AQS隊列,當線程B解鎖的時候:
在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A解鎖完成了,AQS隊列中就只剩下線程B,然后線程B來解鎖,這個時候線程B就是AQS隊列的隊首元素,這個時候隊首線程B的waitStatus的值為0,if中的if也不會執行(tip:有了AQS隊列可以通過第一個if (tryRelease(arg)),但是 if (h != null && h.waitStatus != 0)判斷的時候 h != null h.waitStatus == 0,所以無法通過第二個if)整個方法返回true。
兩個線程加鎖,形成AQS隊列,當線程A解鎖的時候:
在并發加鎖過程中(就是上一篇博客中的lock.lock()加鎖),線程A加鎖成功,線程B也來加鎖,但是現在線程A沒有解鎖,這時候形成一個AQS隊列,(tip:也就是一個加鎖隊列,線程A和線程B都鎖在這里,線程A在線程B前面,,就是上一篇博客中的),然后線程A先來解鎖,這個時候線程A就是AQS隊列的隊首元素,由于AQS隊列中有線程A和線程B兩個元素,這個時候隊首線程A的waitStatus的值不為0,if中的if執行,unparkSuccessor(h); 解鎖頭結點的下一個節點(tip:就是解鎖線程B),整個方法返回true。
5.2.4 附加-應對面試官問題的解釋:阻塞線程:isOnSyncQueue(),一種情況
final boolean isOnSyncQueue(Node node) {
// 當前處于等待狀態,而且同步隊列中沒有前驅者
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果這個node節點在同步隊列中有后繼者,他一定在同步隊列中,
// prev和next是作用于同步隊列的指針,nextWaiter作用于等待隊列的指針
if (node.next != null)
return true;
// 上面兩個條件都不滿足,都被else了
// node.next==null node.prev!=null&&node.waitStatus != Node.CONDITION
return findNodeFromTail(node);
}
// 能夠進入這個方法的,一定是node.next==null node.prev!=null,
// 所以,在同步隊列中,從后往前遍歷,找到這個節點返回true,一直到最前面都沒找到,返回false
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
最后執行LockSupport.park(this),完成了將當前的線程進行park(解釋:park就是阻塞,unpark就是解鎖)。
5.3 condition.notify()
5.3.1 核心-面試語言組織(等待隊列、獲取同步鎖、喚醒線程)
第一步,等待隊列中的節點放到AQS同步執行隊列中,節點Node里面存放著線程thread,其中,signal方法會將當前的等待隊列中第一個等待的線程的節點加入到原來的AQS隊列中去,而signalAll方法是將等待隊列中的所有的等待線程的節點全部加入到原來的AQS的隊列中去
第二步,獲取同步鎖
第三步,喚醒當前線程
5.3.2 附加-應對面試官問題的解釋:等待隊列中的節點放到AQS同步執行隊列中,一種情況,兩個步驟
1、等待隊列刪去節點
分為兩種情況:
如果等待隊列中只有一個節點
firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點
lastWaiter = null; // 沒有元素了,置空等待隊列尾指針
first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針
如果等待隊列中2-n個節點
firstWaiter = first.nextWaiter // 需要修改等待隊列頭指針,指向當前等待隊列的下一個節點
first.nextWaiter = null; // 需要置空當前等待隊列的第一個節點的nextWaiter指針
2、enq():同步隊列尾插法添加節點
first節點:等待隊列隊首刪除的節點,同步隊列隊尾添加的節點:transferForSignal(first) 要將等待隊列中移除的這個節點使用尾插法插入到同步隊列中,所以直接將這個first節點作為參數傳遞給transferForSignal()操作,記住,這個first節點就是被等待隊列移除的節點
同步隊列中的添加這個節點:enq()
對于enq()兩種情況,
1、同步隊列中沒有節點,所以tail==null
2、同步隊列中有節點,所以tail!=null
第一種情況:同步隊列中沒有節點,所以tail==null
程序執行如下:
Node t = tail; // 同步隊列中沒有節點,tail==null
compareAndSetHead(new Node()) //
tail = head;
t=tail; // 重新更新尾節點記錄
// 插入操作三步驟
node.prev = t;
compareAndSetTail(t, node)
t.next = node;
return t; // 返回尾節點前面的一個節點,當前同步隊列中一共兩個節點 t node,現在返回t
第二種情況:同步隊列中有節點,所以tail!=null
程序執行如下:
Node t = tail; // 同步隊列中沒有節點,tail!=null
// 插入操作三步驟
node.prev = t;
compareAndSetTail(t, node)
t.next = node;
return t; // 返回尾節點前面的一個節點,當前同步隊列中一共n個節點 node1 node2 ... t node,現在返回t
問題1:為什么enq()方法返回的是尾節點的前一個節點的狀態?
回答1:因為尾節點的前一個節點,就是插入前的尾節點啊,所有說enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點
問題2:為什么enq()方法中tail==null,一定要新建一個節點?
回答2:接上面,因為enq的意義在于兩點,插入參數指定的新節點+返回原來的尾節點,因為要返回原來的尾節點,所有如果沒有原來的尾節點,就要新建一個節點當做原來的尾節點,為返回值服務
5.3.3 附加-應對面試官問題的解釋:獲取同步鎖(簡單,略)
enq()方法為工作隊列添加節點后,不等transferForSignal()執行完,await()方法中的循環檢測很快就檢測到了,同步隊列中的有了剛剛被阻塞的節點(就是剛剛被阻塞的節點從阻塞隊列中出來了,到同步隊列中去了,所有這個這個節點里面的線程可以競爭同步鎖了,tryAcquire)
5.3.4 附加-應對面試官問題的解釋:喚醒當前線程(簡單,略)
await()搶占同步鎖后,transferForSignal()方法里面喚醒node節點當前線程
六、小結
原理層面:ReentrantLock中await()與signal()/signalAll()(核心:ConditionObject中的等待隊列),完成了。
天天打碼,天天進步!!!
總結
以上是生活随笔為你收集整理的java await signal_【Java并发008】原理层面:ReentrantLock中 await()、signal()/signalAll()全解析...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 信用卡申请平台 信用卡申请方法
- 下一篇: 终于公布了,日本三季度GDP增速超20%