聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)
上一篇介紹了AQS的基本設(shè)計(jì)思路以及兩個(gè)內(nèi)部類Node和ConditionObject的實(shí)現(xiàn)?聊聊高并發(fā)(二十一)解析java.util.concurrent各個(gè)組件(三) 深入理解AQS(一)?這篇說(shuō)一說(shuō)AQS的主要方法的實(shí)現(xiàn)。AQS和CLHLock的最大區(qū)別是,CLHLock是自旋鎖,而AQS使用Unsafe的park操作讓線程進(jìn)入等待(阻塞)。
?
線程加入同步隊(duì)列,和CLHLock一樣,從隊(duì)尾入隊(duì)列,使用CAS+輪詢的方式實(shí)現(xiàn)無(wú)鎖化。入隊(duì)列后設(shè)置節(jié)點(diǎn)的prev和next引用,形成雙向鏈表的結(jié)構(gòu)
?
?private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
線程指定獨(dú)享還是共享方式加入隊(duì)列,先嘗試加入一次,如果失敗再用enq()輪詢地嘗試,比如addWaiter(Node.EXCLUSIVE), addWaiter(Node.SHARED)
?
?
?private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
?
喚醒后繼節(jié)點(diǎn),最典型的情況就是在線程釋放鎖后,會(huì)喚醒后繼節(jié)點(diǎn)。會(huì)從節(jié)點(diǎn)的next開始,找到一個(gè)后繼節(jié)點(diǎn),如果next是null,就從隊(duì)尾開始往head找,直到找到最靠近當(dāng)前節(jié)點(diǎn)的后續(xù)節(jié)點(diǎn)。 waitStatus <= 0的隱含意思是線程沒有被取消。 然后用LockSupport喚醒這個(gè)找到的后繼節(jié)點(diǎn)的線程。
這個(gè)方法類似于CLHLock里面釋放鎖時(shí),通知后續(xù)節(jié)點(diǎn)來(lái)獲取鎖。AQS使用了阻塞的方式,所以這個(gè)方法的后續(xù)方法是acquireXXX方法,它負(fù)責(zé)將后續(xù)節(jié)點(diǎn)喚醒,后續(xù)節(jié)點(diǎn)再根據(jù)狀態(tài)去判斷是否獲得鎖
?
?
?private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
共享模式下的釋放操作,從隊(duì)首開始向隊(duì)尾擴(kuò)散,如果節(jié)點(diǎn)的waitStatu是SIGNAL,就喚醒后繼節(jié)點(diǎn),如果waitStatus是0,就設(shè)置標(biāo)記成PROPAGATE
?
?
?private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
取消獲取操作,要把節(jié)點(diǎn)從同步隊(duì)列中去除,通過(guò)鏈表操作將它的前置節(jié)點(diǎn)的next指向它的后繼節(jié)點(diǎn)集合。如果該節(jié)點(diǎn)是在隊(duì)尾,直接刪除即可,否則要通知后繼節(jié)點(diǎn)去獲取鎖
?
?
?private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
?
獨(dú)占模式并且不可中斷地獲取隊(duì)列鎖的操作,這個(gè)方法在ConditionObject.await()中被使用,當(dāng)線程被Unsafe.unpark喚醒后,需要調(diào)用acquireQueued來(lái)獲取鎖,從而結(jié)束await(). accquireQueued()方法要么獲得鎖,要么被tryAcquire方法拋出的異常打斷,如果拋出異常,最后在finally里面取消獲取
值得注意的是只有節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是head的時(shí)候,才能獲得鎖。這里隱含了一個(gè)意思,就是head指向當(dāng)前獲得鎖的節(jié)點(diǎn)。當(dāng)程序進(jìn)入if(p == head and tryAcquire(arg))這個(gè)分支時(shí),表示線程獲得了鎖或者被中斷,將自己設(shè)置為head,將next設(shè)置為null.
shouldParkAfterFailedAcquired()方法的目的是將節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的waitStatus設(shè)置為SIGNAL,表示會(huì)通知后續(xù)節(jié)點(diǎn),這樣后續(xù)節(jié)點(diǎn)才能放心去park,而不用擔(dān)心被丟失喚醒的通知。
parkAndCheckInterupt()方法會(huì)真正執(zhí)行阻塞,并返回中斷狀態(tài),這個(gè)方法有兩種情況返回,一種是park被unpark喚醒,這時(shí)候中斷狀態(tài)為false。另一種情況是park被中斷了,由于這個(gè)accquireQueued方法是不可中斷的版本,所以即使線程被中斷了,也只是設(shè)置了中斷標(biāo)志為true,沒有跑出中斷異常。在支持中斷的獲取版本里,這時(shí)會(huì)拋出中斷異常。
這個(gè)方法可以理解為L(zhǎng)ock的lock里沒有獲取鎖的分支,在CLHLock自旋鎖的實(shí)現(xiàn)里,是對(duì)前驅(qū)節(jié)點(diǎn)的狀態(tài)自旋,而AQS是阻塞,所以這里是在同步隊(duì)列里面進(jìn)入了阻塞狀態(tài),等待被前驅(qū)節(jié)點(diǎn)釋放鎖時(shí)喚醒。
釋放鎖時(shí)會(huì)根據(jù)狀態(tài)調(diào)用unparkSuccessor()方法來(lái)喚醒后續(xù)節(jié)點(diǎn),這樣就會(huì)在這個(gè)方法里面把阻塞的線程喚醒并獲得鎖。
隊(duì)列鎖的好處是線程都在多個(gè)共享狀態(tài)上自旋或阻塞,所以u(píng)nparkSuccessor()方法只會(huì)喚醒它后繼沒有取消的節(jié)點(diǎn)。
而取消只有兩種情況,中斷或者超時(shí)
?
?
?final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
獨(dú)占模式支持中斷的獲取隊(duì)列鎖操作,可以看到和不支持中斷版本的區(qū)別,這里如果parkAndCheckInterrupt()方法返回時(shí)顯示被中斷了,就拋出中斷異常
?
?
?private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
?
獨(dú)占模式限時(shí)獲取隊(duì)列鎖操作, 這個(gè)獲取的整體邏輯和前面的類似,區(qū)別是它支持限時(shí)操作,如果等待時(shí)間大于spinForTimeoutThreshold,就使用阻塞的方式等待,否則用自旋等待。使用了LockSupport.parkNanos()方法來(lái)實(shí)現(xiàn)限時(shí)地等待,并支持中斷
這里隱含的一個(gè)含義是parkNanos方法退出有3種方式,
1. 限時(shí)到了自動(dòng)退出,這時(shí)候會(huì)超時(shí)
2. 沒有到限時(shí)被喚醒了,這時(shí)候是不超時(shí)的
3. 被中斷
?
?private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
?
?
共享模式獲得隊(duì)列鎖操作,獲得操作也是從head的下一個(gè)節(jié)點(diǎn)開始,和獨(dú)占模式只unparkSuccessor一個(gè)節(jié)點(diǎn)不同,共享模式下,等head的后續(xù)節(jié)點(diǎn)被喚醒了,它要擴(kuò)散這種共享的獲取,使用setHeadAndPropagate操作,把自己設(shè)置為head,并且把釋放的狀態(tài)往下傳遞,這里采用了鏈?zhǔn)絾拘训姆椒?#xff0c;1個(gè)節(jié)點(diǎn)負(fù)責(zé)喚醒1個(gè)后續(xù)節(jié)點(diǎn),直到不能喚醒。當(dāng)后繼節(jié)點(diǎn)是共享模式isShared,就調(diào)用doReleaseShared來(lái)喚醒后繼節(jié)點(diǎn)
doReleaseShared會(huì)從head開始往后檢查狀態(tài),如果節(jié)點(diǎn)是SIGNAL狀態(tài),就喚醒它的后繼節(jié)點(diǎn)。如果是0就標(biāo)記為PROPAGATE, 等它釋放鎖的時(shí)候會(huì)再次喚醒后繼節(jié)點(diǎn)。
這里有個(gè)隱含的意思:
1. 加入同步隊(duì)列并阻塞的節(jié)點(diǎn),它的前驅(qū)節(jié)點(diǎn)只會(huì)是SIGNAL,表示前驅(qū)節(jié)點(diǎn)釋放鎖時(shí),后繼節(jié)點(diǎn)會(huì)被喚醒。shouldParkAfterFailedAcquire()方法保證了這點(diǎn),如果前驅(qū)節(jié)點(diǎn)不是SIGNAL,它會(huì)把它修改成SIGNAL。這里不是SIGNAL就有可能是PROPAGATE
2. 造成前驅(qū)節(jié)點(diǎn)是PROPAGATE的情況是前驅(qū)節(jié)點(diǎn)獲得鎖時(shí),會(huì)喚醒一次后繼節(jié)點(diǎn),但這時(shí)候后繼節(jié)點(diǎn)還沒有加入到同步隊(duì)列,所以暫時(shí)把節(jié)點(diǎn)狀態(tài)設(shè)置為PROPAGATE,當(dāng)后繼節(jié)點(diǎn)加入同步隊(duì)列后,會(huì)把PROPAGATE設(shè)置為SIGNAL,這樣前驅(qū)節(jié)點(diǎn)釋放鎖時(shí)會(huì)再次doReleaseShared,這時(shí)候它的狀態(tài)已經(jīng)是SIGNAL了,就可以喚醒后續(xù)節(jié)點(diǎn)了
?
?
?private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
??????? Node h = head; // Record old head for check below
??????? setHead(node);
??????? /*
???????? * Try to signal next queued node if:
???????? *?? Propagation was indicated by caller,
???????? *???? or was recorded (as h.waitStatus) by a previous operation
???????? *???? (note: this uses sign-check of waitStatus because
???????? *????? PROPAGATE status may transition to SIGNAL.)
???????? * and
???????? *?? The next node is waiting in shared mode,
???????? *???? or we don't know, because it appears null
???????? *
???????? * The conservatism in both of these checks may cause
???????? * unnecessary wake-ups, but only when there are multiple
???????? * racing acquires/releases, so most need signals now or soon
???????? * anyway.
???????? */
??????? if (propagate > 0 || h == null || h.waitStatus < 0) {
??????????? Node s = node.next;
??????????? if (s == null || s.isShared())
??????????????? doReleaseShared();
??????? }
??? }
private void doReleaseShared() {
??????? for (;;) {
??????????? Node h = head;
??????????? if (h != null && h != tail) {
??????????????? int ws = h.waitStatus;
??????????????? if (ws == Node.SIGNAL) {
??????????????????? if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
??????????????????????? continue;??????????? // loop to recheck cases
??????????????????? unparkSuccessor(h);
??????????????? }
??????????????? else if (ws == 0 &&
???????????????????????? !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
??????????????????? continue;??????????????? // loop on failed CAS
??????????? }
??????????? if (h == head)?????????????????? // loop if head changed
??????????????? break;
??????? }
??? }
tryXXXX 方法,這幾個(gè)方法是給子類重寫的,用來(lái)擴(kuò)展響應(yīng)的同步器操作
?
?
?protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
??????? throw new UnsupportedOperationException();
??? }
protected int tryAcquireShared(int arg) {
??????? throw new UnsupportedOperationException();
??? }
protected boolean tryReleaseShared(int arg) {
??????? throw new UnsupportedOperationException();
??? }
獨(dú)占模式獲取操作的頂層方法,如果沒有tryAcquired,或者沒有獲得隊(duì)列鎖,就中斷
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
獨(dú)占模式釋放操作的頂層方法,如果tryRelease()成功,那么就喚醒后繼節(jié)點(diǎn)去獲取鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
總結(jié)
以上是生活随笔為你收集整理的聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 聊聊高并发(二十一)解析java.uti
- 下一篇: 聊聊高并发(二十三)解析java.uti