多线程—AQS独占锁与共享锁原理
java.util.concurrent.locks包下,包含了多種鎖,ReentrantLock獨占鎖、ReentrantReadWriteLock讀寫鎖等,還有java.util.concurrent下的Semaphore、CountDownLatch都是基于AQS實現的。
AQS是一個抽象類,但事實上它并不包含任何抽象方法。AQS將一些需要子類覆寫的方法都設計成protect方法,將其默認實現為拋出UnsupportedOperationException異常。如果子類使用到這些方法,但是沒有覆寫,則會拋出異常;如果子類沒有使用到這些方法,則不需要做任何操作。
可重寫方法:
- tryAcquire(int) 嘗試獲取鎖
- tryRelease(int) 嘗試釋放鎖
- tryAcquireShared(int) 共享的方式嘗試獲取鎖
- tryReleaseShared(int) 共享的方式嘗試釋放鎖
- isHeldExclusively() 判斷當前是否為獨占鎖
final方法:
- getState
- setState(int)
- compareAndSetState(int, int)?
- setExclusiveOwnerThread(Thread.currentThread())? 將該線程設置為當前鎖的持有者
根據實現方式的不同,可以分為兩種:獨占鎖和共享鎖
- 獨占鎖:ReentrantLock
- 共享鎖:CountDownLatch、CyclicBarrier、Semaphore
- ReentrantReadWriteLock寫的時候是獨占鎖,讀的時候是共享鎖
使用方法:
推薦作為靜態內部類來繼承AQS。例如ReentrantLock作為外部類實現Lock接口,靜態抽象內部類Sync繼承AQS。重寫Lock接口方法時,是直接調用Sync類的實現類公平鎖NonfairSync或非公平鎖FairSync內的方法來完成相應邏輯。
- 公平鎖:線程獲取鎖的順序和調用lock的順序一樣,FIFO;
- 非公平鎖:線程獲取鎖的順序和調用lock的順序無關,全憑運氣。
AQS的3部分:
state:
private?volatile?int?state; //volatile,state為0表示鎖沒有被占用,可以把state變量當做是當前持有該鎖的線程數量。隊列:
一個FIFO的雙向鏈表,這種結構的特點是每個數據結構都有兩個指針,分別指向直接的后繼節點和直接前驅節點。所以雙向鏈表可以從任意一個節點開始很方便的訪問前驅和后繼。
每個Node其實是由線程封裝,當線程爭搶鎖失敗后會封裝成Node加入到隊列中去。
Node主要屬性:
// 節點所代表的線程 volatile Thread thread;// 雙向鏈表,每個節點需要保存自己的前驅節點和后繼節點的引用 volatile Node prev; volatile Node next;// 線程所處的等待鎖的狀態,初始化時,該值為0 volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;// 該屬性用于條件隊列或者共享鎖 Node nextWaiter;- CANCELLED:值為1,在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點,其結點的waitStatus為CANCELLED,即結束狀態,進入該狀態后的結點將不會再變化。
- SIGNAL:值為-1,被標識為該等待喚醒狀態的后繼結點,當其前繼結點的線程釋放了同步鎖或被取消,將會通知該后繼結點的線程執行。說白了,就是處于喚醒狀態,只要前繼結點釋放鎖,就會通知標識為SIGNAL狀態的后繼結點的線程執行。
- CONDITION:值為-2,與Condition相關,該標識的結點處于等待隊列中,結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
- PROPAGATE:值為-3,與共享模式相關,在共享模式中,該狀態標識結點的線程處于可運行狀態。
隊列的頭節點和尾節點?
// 頭節點,不代表任何線程,是一個啞結點 private transient volatile Node head;// 尾節點,每一個請求鎖的線程會加到隊尾 private transient volatile Node tail;CAS(Compare and Swap)操作:
JAVA使用?鎖和循環CAS來實現原子操作,原子操作意為”不可被中斷的一個或一系列操作”,保證只有一個線程操作數據。
悲觀鎖與樂觀鎖:
- 悲觀鎖: 假定會發生并發沖突,所以當某個線程獲取共享資源時,會阻止別的線程獲取共享資源。也稱獨占鎖或者互斥鎖,例如synchronized同步鎖。
- 樂觀鎖: 假設不會發生并發沖突,只有在最后更新共享資源的時候會判斷一下在此期間有沒有別的線程修改了這個共享資源。如果發生沖突就重試,直到沒有沖突,更新成功。CAS就是一種樂觀鎖實現方式。
CAS的思想很簡單:三個參數,一個當前內存值V、舊的預期值A、即將更新的值B,當且僅當預期值A和內存值V相同時,將內存值修改為B并返回true,否則什么都不做,并返回false。循環CAS實現的基本思路就是循環進行CAS操作直到成功為止。
Java中的CAS功能是通過Unsafe類來實現的。Java并發包中java.util.concurrent 大量使用了這種操作,來保證線程安全。compareAndSetState(int, int)方法就是通過Unsafe實現的,而setState就是線程不安全的。
除了try*()方法外,AQS自身實現了諸多的如acquire和doAcquire()等方法,他們之間的區別在于try*()方法代表一次嘗試性的獲取鎖操作,如果獲取到了就拿到了鎖,否則直接返回。而AQS自身實現的acquire和doAcquire()等方法如果獲取不到鎖會能夠進入同步/等待隊列中阻塞等待進行鎖的爭奪,直到拿到了鎖返回。對于共享鎖,try*()也會進行自旋獲取,因為共享鎖可以被多個線程持有。
下面通過acquire方法來分析AQS怎么獲取鎖:
public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }tryAcquire是子類獲取鎖應該實現的方法,一般在里面判斷state值,state==0時就可以獲取到鎖,通過下面方法獲取鎖,然后返回true,否則返回false,公平鎖的話會再判斷自己有沒有前驅節點等。
setExclusiveOwnerThread(current); // 將當前線程設置為占用鎖的線程當tryAcquire獲取鎖失敗時,&&前面為true,才會調用addWaiter方法,將當前線程包裝成Node,加到等待鎖的隊列中去, 因為是FIFO隊列, 所以加在隊尾。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); //將當前線程包裝成Node// 這里我們用注釋的形式把Node的構造函數貼出來// 因為傳入的mode值為Node.EXCLUSIVE,所以節點的nextWaiter屬性被設為null/*static final Node EXCLUSIVE = null;Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}*/Node pred = tail;// 如果隊列不為空, 則用CAS方式將當前節點設為尾節點if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node); //將節點插入隊列return node; }將一個節點node添加到sync queue末尾的三步:
添加有可能失敗,原因可能是以下兩種之一:
失敗的時候會調用一個enq(node)方法,在該方法中, 出現第一種情況時,該方法也負責在隊列為空時, 初始化隊列,然后使用了死循環, 即以自旋方式將節點插入隊列,如果失敗則不停的嘗試, 直到成功為止,運用到了樂觀鎖的原理。
private Node enq(final Node node) {for (;;) {Node t = tail;// 如果是空隊列, 首先進行初始化// 這里也可以看出, 隊列不是在構造的時候初始化的, 而是延遲到需要用的時候再初始化, 以提升性能if (t == null) { // 注意,初始化時使用new Node()方法新建了一個dummy節點if (compareAndSetHead(new Node()))tail = head; // 這里僅僅是將尾節點指向dummy節點,并沒有返回} else {// 到這里說明隊列已經不是空的了, 這個時候再繼續嘗試將節點加到隊尾node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }enq(node)方法運行可能會造成尾分叉既多個尾節點的現象,因為node.prev = t;可能被多個線程運行,后面if語句則是CAS操作保證了單線程運行。不過只是一種暫時的現象,因為線程不斷循環保證入隊。
enq(node)方法后返回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
該方法中將再次嘗試去獲取鎖,因為如果當前節點的前驅節點就是HEAD節點,則可以再嘗試獲取鎖。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 該方法用來查找并獲取前置節點。final Node p = node.predecessor();// 在當前節點的前驅就是HEAD節點時, 再次嘗試獲取鎖if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//在獲取鎖失敗后, 判斷是否需要把當前線程掛起if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }setHead方法將該節點設置成頭節點,上一個頭節點就被頂掉。以此來達成節點出隊列的效果。
private void setHead(Node node) {head = node;node.thread = null;node.prev = null; }如果獲取不到鎖調用shouldParkAfterFailedAcquire,該方法用于決定在獲取鎖失敗后, 是否將線程掛起,決定的依據就是前驅節點的waitStatus值。在獨占鎖的獲取操作中,我們只用到了其中的兩個——CANCELLED和SIGNAL。每一個節點最開始的時候waitStatus的值都被初始化為0。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; // 獲得前驅節點的wsif (ws == Node.SIGNAL)// 前驅節點的狀態已經是SIGNAL了,說明鬧鐘已經設了,可以直接睡了return true;if (ws > 0) {// 當前節點的 ws > 0, 則為 Node.CANCELLED 說明前驅節點已經取消了等待鎖(由于超時或者中斷等原因)// 既然前驅節點不等了, 那就繼續往前找, 直到找到一個還在等待鎖的節點// 然后我們跨過這些不等待鎖的節點, 直接排在等待鎖的節點的后面do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 前驅節點的狀態既不是SIGNAL,也不是CANCELLED// 用CAS設置前驅節點的ws為 Node.SIGNAL,給自己定一個鬧鐘compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }- 如果為前驅節點的waitStatus值為 Node.SIGNAL 則直接返回 true運行&&后面的parkAndCheckInterrupt()方法。
- 如果為前驅節點的waitStatus值為 Node.CANCELLED (ws > 0), 則跳過那些節點, 重新尋找正常等待中的前驅節點,然后排在它后面,返回false,繼續循環。
- 其他情況, 將前驅節點的狀態改為 Node.SIGNAL, 返回false,繼續循環。
返回false時會進行循環,就是將那些CANCELLED的節點移出隊列,然后再循環一次,再嘗試獲取鎖,因為自己有可能已經到頭節點后面了,如果不是則自己排到waitStatus值為SIGNAL的前節點后面,此時shouldParkAfterFailedAcquire返回true。將調用parkAndCheckInterrupt()方法。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 線程被掛起,停在這里不再往下執行了return Thread.interrupted(); }調用了LockSupport類的park方法。
LockSupport工具類主要用來掛起park(Thread)和喚醒unpark(Thread)線程,底層實現也是使用的Unsafe類。若其他線程調用了阻塞線程的interrupt()方法,阻塞線程也會返回,即阻塞的線程是響應中斷的,而且不會拋出InterruptedException異常。LockSupport并不需要獲取對象的監視器,而是給線程一個“許可”(permit),unpark可以先于park調用,unpark一個并沒有被park的線程時,該線程在下一次調用park方法時就不會被掛起。
所以最后return Thread.interrupted();是因為不能保證他不是被中斷的,所以返回Thread的中斷狀態。
鎖的釋放release方法:
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }tryRelease(arg)? ?
該方法由繼承AQS的子類實現, 為釋放鎖的具體邏輯。一般是將state設置為0,setExclusiveOwnerThread(null);再將當前線程設置為null。
unparkSuccessor(h)? ?喚醒h的后繼線程
當有頭節點且頭節點的waitStatus不等于0的時候則喚醒后繼線程,因為waitStatus初始值為0,當隊列進入新節點時,頭節點會被設置為SIGNAL。
private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 如果head節點的ws比0小, 則直接將它設為0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 此時從尾節點開始向前找起, 直到找到距離head節點最近的ws<=0的節點Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t; // 沒有return, 繼續向前找。}// 如果找到了還在等待鎖的節點,則喚醒它if (s != null)LockSupport.unpark(s.thread); }從尾部開始遍歷,因為新節點接入的時候是先node.prev = t,隊列可能只執行到這步,后面還沒執行,所以從尾向前遍歷。
找到頭節點的下一個不是CANCELLED的節點并喚醒,unpark方法對應前面添加節點的park方法,所以回到前面。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 線程被掛起,停在這里不再往下執行了return Thread.interrupted(); }所以當線程獲取不到鎖,會被park一直阻塞狀態,直到被interrupt或者有鎖的線程釋放鎖時,才會獲得鎖。獲得鎖后返回中斷狀態
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 該方法用來查找并獲取前置節點。final Node p = node.predecessor();// 在當前節點的前驅就是HEAD節點時, 再次嘗試獲取鎖if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//在獲取鎖失敗后, 判斷是否需要把當前線程掛起if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }如果是中斷喚醒的返回true,再設置interrupted = true,因為Thread.interrupted()調用后中斷狀態會被重新設回false。繼續循環,如果自己是在頭節點下一位,就可以獲取鎖了,否則又要掛起。
共享鎖
共享鎖的acquireShared方法對應獨占鎖acquire
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg); } public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }doAcquireShared對應了獨占鎖的acquireQueued,
private void doAcquireShared(int arg) { ··························································final Node node = addWaiter(Node.SHARED); //代表共享模式 ··························································boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();//與獨占鎖的acquireQueued的區別主要就是中間這段代碼 ··························································if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}··························································if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }重點在setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);//if里面包含了兩個頭節點,一個新一個老,多線程下兩者可能不一樣,兩種情況。 //1.propagate > 0 表示調用方指明了后繼節點需要被喚醒 //2.頭節點后面的節點需要被喚醒(waitStatus<0),不論是老的頭結點還是新的頭結點if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;//如果當前節點的后繼節點是共享類型或者沒有后繼節點,則進行喚醒 //這里可以理解為除非明確指明不需要喚醒(后繼等待節點是獨占類型),否則都要喚醒if (s == null || s.isShared())doReleaseShared();} }在setHeadAndPropagate方法里面,將獲取鎖的節點設置為頭節點,然后再去doReleaseShared,doReleaseShared對應了獨占鎖的unparkSuccessor,作用是喚醒下一個線程,所以在共享鎖的releaseShared方法(對應獨占鎖release),就是釋放鎖方法里,也主要是用doReleaseShared來釋放鎖。
獨占鎖與共享鎖的區別
- 獨占鎖是持有鎖的線程釋放鎖之后才會去喚醒下一個線程。
- 共享鎖是線程獲取到鎖后,就會去喚醒下一個線程,所以共享鎖在獲取鎖和釋放鎖的時候都會調用doReleaseShared方法喚醒下一個線程,當然這會收共享線程數量的限制。
下面到doReleaseShared方法
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) { //至少有頭尾兩個節點int ws = h.waitStatus;if (ws == Node.SIGNAL) { //ws為SIGNAL的時候才去喚醒下一個節點if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //將頭節點的SIGNAL改為0,CAS操作continue; // loop to recheck casesunparkSuccessor(h); //保證單線程運行喚醒后繼線程}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} }doReleaseShared方法里,我們暫且把現在持有鎖的線程成為節點A,下一個節點為B。
首先判斷?if (ws == Node.SIGNAL),因為我們每次插入節點都會默認0,并且把前節點設成SIGNAL,所以當條件成立時,聲明A節點后面已經有B了。 到下一層,if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)),意思是把節點A的SIGNAL改為0。
為什么需要CAS操作呢?
CAS保證了后面單線程喚醒后繼線程的操作,在上面談到的doReleaseShared這個方法,在獲取鎖和釋放鎖的時候都會調用,防止重復喚醒。
接下來是 else if (ws == 0 &&?!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))? 什么時候滿足這句呢?
ws為0,每當一個節點進同步隊列都會把前面節點設置為SIGNAL,自己初始為0,所以滿足ws==0的條件就是節點A是隊列最后一個且后面還沒有節點B入列的情況。
滿足了ws==0,運行下面這句。
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE),CAS操作失敗返回true,將該新節點A的0設置APROPAGATE不成功。不成功就意味著新節點A的0已經被改了,被改意味著新節點A后面已經進入了節點B,設置前節點為SIGNAL的操作是線程在獲取不到鎖之后,阻塞之前,忘記的可以回顧一下前面的內容。
所以else if (ws == 0 &&?!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 這個條件成立分為了兩個階段,既有尾節點又加入新節點的這個瞬間可能會滿足。
那么滿足為什么要continue呢?
因為節點B線程在獲取不到鎖之后,阻塞之前,所以此時A還沒釋放鎖,A仍是頭節點,h==head條件成立,執行break跳出循環,不會去喚醒B了,這不符合共享鎖的機制。所以應該continue繼續循環,去喚醒B節點,而不是等A運行完釋放鎖的時候才去調用。
h == head如果不成立,說明A喚醒完B,B已經調用了setHead這個方法了,這個時候再去循環看看B節點后面有沒有節點。
?
總結
以上是生活随笔為你收集整理的多线程—AQS独占锁与共享锁原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 编译器vs.代码 谁之过
- 下一篇: 最易忽视的肾虚4件事