java aqs源码_Java-AQS源码详解(细节很多!)
ReentrantLock調(diào)用lock()時時序圖:
addWaiter方法:
enq方法:自旋
它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這里volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:
getState()
setState()
compareAndSetState()
aqs有兩種資源訪問模式:獨占(ReentrantLock)和共享(CountDownLatch和Semaphore、CyclicBarrier)
不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在頂層實現(xiàn)好了!接下來開始擼吧。。至于這里雙向鏈表是怎么樣的一個結構,這里就不做多于的描述了,大家可以自行去補充。
首先我們由一張圖開頭,我們要知道AQS其實主要實現(xiàn)的是一個FIFO的雙向鏈表的維護,每個Node其實就是一個等待被釋放的線程,在競爭鎖失敗后,會封裝成Node的形式進入到鏈表尾部。。在了解了最基本的概念后,我們先來看看AQS最經(jīng)典的應用ReentrantLock的lock方法:
public voidlock() {
sync.lock();// sync主要兩種實現(xiàn)類
}
// 第一種非公平鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**非公平鎖實現(xiàn)的lock方法
*/
final void lock() {
if (compareAndSetState(0, 1))// CAS操作去嘗試將state變?yōu)?,也就是獨占狀態(tài)
setExclusiveOwnerThread(Thread.currentThread());// 非公平鎖并不會老老實實去排隊,而是一上來就插隊,插不了就只能去排隊了。。
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 第二種公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);// 相比于非公平鎖,就比較守規(guī)矩了
}
」
因為非公平和公平就只有這么一個差別,那我就以非公平鎖為切入點了,可以看到在嘗試搶占失敗后,調(diào)用acquire方法,ok進入到該方法:
// 此方法是AQS的,但是注意里面的tryAcquire是需要我們的自定義AQS實現(xiàn)的,直接調(diào)用AQS的會直接拋出異常UnsupportedOperationException
public final void acquire(intarg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire是NonfairSync實現(xiàn)的,而他內(nèi)部又直接調(diào)用Sync父類的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(intacquires) {final Thread current =Thread.currentThread();int c =getState();if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);// 直接CAS獨占return true;
}
}else if (current ==getExclusiveOwnerThread()) {int nextc = c +acquires;// 這里很確切的說明了ReentrantLock是一個可重入的鎖if (nextc < 0) //overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);return true;
}return false;
}
上面的方法相信大家應該很快就理解了,在嘗試獨占失敗后,tryAcquire操作返回false,然后這個時候要做的操作相信大家也可以猜到,就是插入到雙向鏈表中,看上面的代碼,第一個操作是addWaiter,于是我們貼出這個方法涉及的源碼:
privateNode addWaiter(Node mode) {
Node node= newNode(Thread.currentThread(), mode);// 在這里首先根據(jù)當前線程創(chuàng)建出一個節(jié)點
Node pred =tail;// 既然要插入節(jié)點,肯定是要插入到最尾部的,先獲取到tail節(jié)點if (pred != null) {
node.prev=pred;// 將當前節(jié)點的prev和尾部節(jié)點關聯(lián) --第五行if(compareAndSetTail(pred, node)) {
pred.next=node;// node和老tail關聯(lián)完成returnnode;
}
}
enq(node);returnnode;
}
如果某個線程在插入隊列沒有其他線程干擾的話,enq都不會進去的,直接在CAS設置成tail之后直接返回了,但是實際上,總是會有那么幾個“不長眼”的線程來和你對著干。。。來假設這么一個場景:A線程是tail節(jié)點,此時B和C進來,他們都同時進入到第五行那里,也就是你會發(fā)現(xiàn)A會有B和C兩個節(jié)點的prev指向它,但是下一行的CAS操作是一個原子性操作,所以B和C只能一個成為tail,那么又假設B成功CAS了,也就是B可以直接返回,但是C就比較“悲催”了,它得進入到下一個方法enq,因為此時的鏈表結構很是奇怪,C的prev指向了old tail:A,所以得做一個“修復”結構操作,將C的prev指向B,接下來看enq代碼:
private Node enq(finalNode node) {// 此時沒有成功CAS的C節(jié)點“失魂落魄”的走了進來for(;;) {
Node t=tail;if (t == null) { //
if (compareAndSetHead(newNode()))// 如果此時隊列完全為空(第一個線程進來),需要弄一個冗余head節(jié)點,之后你會看到作用的。。別急
tail=head;
}else{
node.prev=t;// 此時的C節(jié)點要和B節(jié)點綁上關系if(compareAndSetTail(t, node)) {
t.next=node;// 關聯(lián)完成returnt;
}
}
}
}
此時的C應該是可以回到正軌的,就算此時又一個線程打擾了C的關聯(lián)操作而導致CAS失敗,但是因為代碼在for循環(huán)里,可以重試,基本上很快就可以回到隊列正軌!!于是我們又可以愉快的進行下一個步驟了,再回到我們熟悉的acquire(有點繞,忘記的往上翻),可以看到addWaiter之后,會將當前節(jié)點返回給一個“新面孔”-acquireQueued方法作為參數(shù),我們再看看這個方法是怎么做的:
final boolean acquireQueued(final Node node, intarg) {boolean failed = true;try{boolean interrupted = false;for(;;) {final Node p =node.predecessor();// 當前節(jié)點的前置節(jié)點if (p == head &&tryAcquire(arg)) {
// 當前置節(jié)點為head,那么可以去嘗試獲取鎖,成功的話就調(diào)用setHead方法將自己設置為head節(jié)點
setHead(node);
p.next= null; //help GC
failed = false;returninterrupted;
}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
// 判斷當前節(jié)點是否可以被阻塞,shouldParkAfterFailedAcquire方法為核心
interrupted= true;
}
}finally{if(failed)
cancelAcquire(node);
}
}
可以看到,如果當前節(jié)點的上一個節(jié)點就是head的話,說明當前可以競爭到鎖的概率會很大,一旦head節(jié)點的線程執(zhí)行完unlock后,當前的state變?yōu)?,當前節(jié)點就可以進入到setHead方法,但是如果頭節(jié)點還在執(zhí)行中,那么當前節(jié)點只能老老實實的進入到shouldParkAfterFailedAcquire方法內(nèi)部,來決定當前節(jié)點是否應該能被阻塞:
private static booleanshouldParkAfterFailedAcquire(Node pred, Node node) {
//static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;static final int PROPAGATE = -3;
int ws =pred.waitStatus;// 在這里終于有用上這個變量了if (ws ==Node.SIGNAL)/*在這里我打算用一個很易于理解的方式來講述這個SIGNAL值有什么用:
相信大家都有過排隊的經(jīng)歷,在這里服務窗口相當于鎖,每個人過來時,發(fā)現(xiàn)窗口有其他人在,所以此時只能去隊尾排隊,也就是addWaiter操作,在隊尾后,waitStatus的值默認是0,但是此時剛排進隊的小伙伴,因為隊伍太長,
而且比較累,需要低頭打個盹,但是怕如果瞌睡打過頭了,就不知道什么時候窗口沒人了可以被服務,所以此時小伙伴為了保險,他需要一個可靠的“前置隊友”,也就是他前面的人如果業(yè)務辦完了,可以順便回頭來叫醒他,在這里可
以把“委托前面的人,如果結束了麻煩叫醒我,謝謝!”這個操作理解為將prev節(jié)點的waitStatus設置為SIGNAL,如果前置節(jié)點的waitStatus不是0,需要嘗試設置為SIGNAL,但如果前面的小伙伴已經(jīng)是SIGNAL了,直接返回,
說明當前小伙伴可以安心的打盹了(被阻塞)!!*/
return true;if (ws > 0) {/** 如果是CANCELLED,代表當前節(jié)點已經(jīng)不需要處理業(yè)務了,可以在隊列里直接清除出去,然后隊列重新規(guī)整*/
do{
node.prev= pred =pred.prev;
}while (pred.waitStatus > 0);
pred.next=node;
}else{/* 到這一步,就會嘗試去將前置節(jié)點設置為SIGNAL,但是有可能會設置失敗或者設置成功,但是不論成功還是失敗,都會返回false,也就是在上面的acquireQueued中,返回false后會繼續(xù)for循環(huán)里去嘗試獲取鎖,因為小伙伴必須要確定前面的伙伴要靠譜,也就是必須要是SIGNAL
*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}return false;
}
我們再來看看unlock方法,他有直接調(diào)用AQS的release方法,而tryRelease方法由自定義的AQS類實現(xiàn):
public final boolean release(intarg) {if(tryRelease(arg)) {
Node h=head;if (h != null && h.waitStatus != 0)
unparkSuccessor(h);// 關鍵操作,如何去喚醒后面的小伙伴return true;
}return false;
}
protected final boolean tryRelease(intreleases) {int c = getState() -releases;if (Thread.currentThread() !=getExclusiveOwnerThread())throw newIllegalMonitorStateException();boolean free = false;if (c == 0) {
free= true;
setExclusiveOwnerThread(null);// 徹底釋放鎖后,將ownerThread設置為null,重置state
}
setState(c);returnfree;
}
tryRelease操作其實很好理解,主要是unparkSuccessor方法:
private voidunparkSuccessor(Node node) {/** 在釋放完鎖后,此時的節(jié)點他已經(jīng)不需要SIGNAL這個狀態(tài)了,因為他覺得自己辦完業(yè)務了,就可以嘗試去給自己“放個假”,當變成0的時候,后面的小伙伴在shouldPark里就會返回false,代表當前前置節(jié)點很有可能不是剛剛初始化
導致的waitStatus == 0,而是前置節(jié)點剛釋放完鎖,所以就是head:“我此時已經(jīng)釋放完鎖了,后面的,你現(xiàn)在就別打盹了,趕緊再去嘗試搶鎖吧!”,于是此時心急的小伙伴就趕緊再進入for循環(huán)里嘗試tryAcquire*/
int ws =node.waitStatus;if (ws < 0)
compareAndSetWaitStatus(node, ws,0);/***/Node s=node.next;if (s == null || s.waitStatus > 0) {
s= null;for (Node t = tail; t != null && t != node; t =t.prev)
// 從后往前,找到第一個需要被喚醒的小伙伴,狀態(tài)也是必須>=0,至于為什么會==0,因為最后一個節(jié)點的status一定是0if (t.waitStatus <= 0)
s=t;
}if (s != null)
LockSupport.unpark(s.thread);// 此時的s就是下一個需要被喚醒的,于是unpark
}
不知道你們有沒有發(fā)現(xiàn),為什么上面的代碼里要從后往前掃描呢,雙向鏈表不是兩邊都可以掃嗎,這個就很有趣了,不知道你們有沒有看到在addWaiter和enq方法里,在將當前節(jié)點CAS成tail的前一步,有一個先將node的prev設置為前一個節(jié)點,也就是雙向表的建立關系是先后節(jié)點連接前節(jié)點開始的,但是因為設置兩個節(jié)點的關系時不是原子操作,那么就會導致可能prev關系存在,但是next關系不存在的時候,unpark操作就開始需要去遍歷鏈表了,而這個時候,用next操作就很可能會遺漏掉哪個“小伙伴”而導致出現(xiàn)誤“喚醒”!!
總結
以上是生活随笔為你收集整理的java aqs源码_Java-AQS源码详解(细节很多!)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: centos设置mysql为系统服务_C
- 下一篇: linux下mysql无法访问_Linu