并发编程总结4-JUC-REENTRANTLOCK-2(公平锁)
?二、基本概念
?1.?AQS?-- 指AbstractQueuedSynchronizer類。
? ? AQS是java中管理“鎖”的抽象類,鎖的許多公共方法都是在這個類中實現(xiàn)。AQS是獨占鎖(例如,ReentrantLock)和共享鎖(例如,Semaphore)的公共父類。
?2.?AQS鎖的類別 -- 分為“獨占鎖”和“共享鎖”兩種。
? ? (01)?獨占鎖?-- 鎖在一個時間點只能被一個線程鎖占有。根據(jù)鎖的獲取機制,它又劃分為“公平鎖”和“非公平鎖”。公平鎖,是按照通過CLH等待線程按照先來先得的規(guī)則,公平的獲取鎖;而非公平鎖,則當(dāng)線程要獲取鎖時,它會無視CLH等待隊列而直接獲取鎖。獨占鎖的典型實例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是獨占鎖。
? ? (02)?共享鎖?-- 能被多個線程同時擁有,能被共享的鎖。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier,?CountDownLatch和Semaphore都是共享鎖。
?3.?CLH隊列
? ? CLH隊列是AQS中“等待鎖”的線程隊列。在多線程中,為了保護(hù)競爭資源不被多個線程同時操作而起來錯誤,我們常常需要通過鎖來保護(hù)這些資源。在獨占鎖中,競爭資源在一個時間點只能被一個線程鎖訪問;而其它線程則需要等待。CLH就是管理這些“等待鎖”的線程的隊列。
? ? CLH是一個非阻塞的 FIFO 隊列。也就是說往里面插入或移除一個節(jié)點的時候,在并發(fā)條件下不會阻塞,而是通過自旋鎖和 CAS 保證節(jié)點插入和移除的原子性。
?4.?CAS函數(shù)?-- Compare And Swap?
? ? CAS函數(shù),是比較并交換函數(shù),它是原子操作函數(shù);即,通過CAS操作的數(shù)據(jù)都是以原子方式進(jìn)行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函數(shù)。它們共同的特點是,這些函數(shù)所執(zhí)行的動作是以原子的方式進(jìn)行的。
?三、JUC-公平鎖-獲取鎖
? 獲取鎖
public void lock() {//調(diào)用FairSync的lock方法 sync.lock();}//繼承Sync Sync繼承AbstractQueuedSynchronizer類static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {//調(diào)用AbstractQueuedSynchronizer的acquire方法acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}} lock?AQS 的acquire()實現(xiàn)
public final void acquire(int arg) {//tryAcquire嘗試獲取鎖 //addWaiter(Node.EXCLUSIVE), arg)如果失敗新增等待節(jié)點//acquireQueued根據(jù)隊列獲取鎖if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();} aqs實現(xiàn)的acquire()(01) “當(dāng)前線程”首先通過tryAcquire()嘗試獲取鎖。獲取成功的話,直接返回;嘗試失敗的話,進(jìn)入到等待隊列排序等待(前面還有可能有需要線程在等待該鎖)。
(02) “當(dāng)前線程”嘗試失敗的情況下,先通過addWaiter(Node.EXCLUSIVE)來將“當(dāng)前線程”加入到"CLH隊列(非阻塞的FIFO隊列)"末尾。CLH隊列就是線程等待隊列。
(03) 再執(zhí)行完addWaiter(Node.EXCLUSIVE)之后,會調(diào)用acquireQueued()來獲取鎖。?
FairSync.tryAcquire()
/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) { // 獲取“當(dāng)前線程”final Thread current = Thread.currentThread();// 獲取“獨占鎖”的狀態(tài)int c = getState();// c=0意味著“鎖沒有被任何線程鎖擁有”,if (c == 0) {// 若“鎖沒有被任何線程鎖擁有”,// 則判斷“當(dāng)前線程”是不是CLH隊列中的第一個線程線程,// 若是的話,則獲取該鎖,設(shè)置鎖的狀態(tài),并切設(shè)置鎖的擁有者為“當(dāng)前線程”。if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {// 如果“獨占鎖”的擁有者已經(jīng)為“當(dāng)前線程”,// 則將更新鎖的狀態(tài)。int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }
?tryAcquire()的作用就是嘗試去獲取鎖,嘗試成功的話,返回true;嘗試失敗的話,返回false,后續(xù)再通過其它辦法來獲取該鎖。
?
hasQueuedPredecessors()
hasQueuedPredecessors() 是通過判斷"當(dāng)前線程"是不是在CLH隊列的隊首,來返回AQS中是不是有比“當(dāng)前線程”等待更久的線程。
Node
private transient volatile Node head; // CLH隊列的隊首 private transient volatile Node tail; // CLH隊列的隊尾// CLH隊列的節(jié)點 static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;// 線程已被取消,對應(yīng)的waitStatus的值static final int CANCELLED = 1;// “當(dāng)前線程的后繼線程需要被unpark(喚醒)”,對應(yīng)的waitStatus的值。// 一般發(fā)生情況是:當(dāng)前線程的后繼線程處于阻塞狀態(tài),而當(dāng)前線程被release或cancel掉,因此需要喚醒當(dāng)前線程的后繼線程。static final int SIGNAL = -1;// 線程(處在Condition休眠狀態(tài))在等待Condition喚醒,對應(yīng)的waitStatus的值static final int CONDITION = -2;// (共享鎖)其它線程獲取到“共享鎖”,對應(yīng)的waitStatus的值static final int PROPAGATE = -3; // waitStatus為“CANCELLED, SIGNAL, CONDITION, PROPAGATE”時分別表示不同狀態(tài),// 若waitStatus=0,則意味著當(dāng)前線程不屬于上面的任何一種狀態(tài)。volatile int waitStatus;// 前一節(jié)點volatile Node prev;// 后一節(jié)點volatile Node next;// 節(jié)點所對應(yīng)的線程volatile Thread thread;// nextWaiter是“區(qū)別當(dāng)前CLH隊列是 ‘獨占鎖’隊列 還是 ‘共享鎖’隊列 的標(biāo)記”// 若nextWaiter=SHARED,則CLH隊列是“獨占鎖”隊列;// 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),則CLH隊列是“共享鎖”隊列。 Node nextWaiter;// “共享鎖”則返回true,“獨占鎖”則返回false。final boolean isShared() {return nextWaiter == SHARED;}// 返回前一節(jié)點final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // Used to establish initial head or SHARED marker }// 構(gòu)造函數(shù)。thread是節(jié)點所對應(yīng)的線程,mode是用來表示thread的鎖是“獨占鎖”還是“共享鎖”。Node(Thread thread, Node mode) { // Used by addWaiterthis.nextWaiter = mode;this.thread = thread;}// 構(gòu)造函數(shù)。thread是節(jié)點所對應(yīng)的線程,waitStatus是線程的等待狀態(tài)。Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;} }addWaiter()
private Node addWaiter(Node mode) {// 新建一個Node節(jié)點,節(jié)點對應(yīng)的線程是“當(dāng)前線程”,“當(dāng)前線程”的鎖的模型是mode。Node node = new Node(Thread.currentThread(), mode);Node pred = tail;// 若CLH隊列不為空,則將“當(dāng)前線程”添加到CLH隊列末尾if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 若CLH隊列為空,則調(diào)用enq()新建CLH隊列,然后再將“當(dāng)前線程”添加到CLH隊列中。 enq(node);return node; }addWaiter(Node.EXCLUSIVE)會首先創(chuàng)建一個Node節(jié)點,節(jié)點的類型是“獨占鎖”(Node.EXCLUSIVE)類型。然后,再將該節(jié)點添加到CLH隊列的末尾。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// interrupted表示在CLH隊列的調(diào)度中,// “當(dāng)前線程”在休眠時,有沒有被中斷過。boolean interrupted = false;for (;;) {// 獲取上一個節(jié)點。// node是“當(dāng)前線程”對應(yīng)的節(jié)點,這里就意味著“獲取上一個等待鎖的線程”。final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }acquireQueued()的目的是從隊列中獲取鎖
shouldParkAfterFailedAcquire
// 返回“當(dāng)前線程是否應(yīng)該阻塞” private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 前繼節(jié)點的狀態(tài)int ws = pred.waitStatus;// 如果前繼節(jié)點是SIGNAL狀態(tài),則意味這當(dāng)前線程需要被unpark喚醒。此時,返回true。if (ws == Node.SIGNAL)return true;// 如果前繼節(jié)點是“取消”狀態(tài),則設(shè)置 “當(dāng)前節(jié)點”的 “當(dāng)前前繼節(jié)點” 為 “‘原前繼節(jié)點’的前繼節(jié)點”。if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {// 如果前繼節(jié)點為“0”或者“共享鎖”狀態(tài),則設(shè)置前繼節(jié)點為SIGNAL狀態(tài)。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }?
如果前繼節(jié)點狀態(tài)為SIGNAL,表明當(dāng)前節(jié)點需要被unpark(喚醒),此時則返回true。
如果前繼節(jié)點狀態(tài)為CANCELLED(ws>0),說明前繼節(jié)點已經(jīng)被取消,則通過先前回溯找到一個有效(非CANCELLED狀態(tài))的節(jié)點,并返回false。
如果前繼節(jié)點狀態(tài)為非SIGNAL、非CANCELLED,則設(shè)置前繼的狀態(tài)為SIGNAL,并返回false。
selfInterrupt
?
private static void selfInterrupt() {Thread.currentThread().interrupt(); }?
如果在acquireQueued()中,當(dāng)前線程被中斷過,則執(zhí)行selfInterrupt();否則不會執(zhí)行。
在acquireQueued()中,即使是線程在阻塞狀態(tài)被中斷喚醒而獲取到cpu執(zhí)行權(quán)利;但是,如果該線程的前面還有其它等待鎖的線程,根據(jù)公平性原則,該線程依然無法獲取到鎖。它會再次阻塞! 該線程再次阻塞,直到該線程被它的前面等待鎖的線程鎖喚醒;線程才會獲取鎖,然后“真正執(zhí)行起來”!
四、JUC-公平鎖-釋放鎖?
unlock()
public void unlock() {sync.release(1); }“1”的含義和“獲取鎖的函數(shù)acquire(1)的含義”一樣,它是設(shè)置“釋放鎖的狀態(tài)”的參數(shù)。由于“公平鎖”是可重入的,所以對于同一個線程,每釋放鎖一次,鎖的狀態(tài)-1。
release()
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }release()會先調(diào)用tryRelease()來嘗試釋放當(dāng)前線程鎖持有的鎖。成功的話,則喚醒后繼等待線程,并返回true。否則,直接返回false。
protected final boolean tryRelease(int releases) {// c是本次釋放鎖之后的狀態(tài)int c = getState() - releases;// 如果“當(dāng)前線程”不是“鎖的持有者”,則拋出異常!if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 如果“鎖”已經(jīng)被當(dāng)前線程徹底釋放,則設(shè)置“鎖”的持有者為null,即鎖是可獲取狀態(tài)。if (c == 0) {free = true;setExclusiveOwnerThread(null);}// 設(shè)置當(dāng)前線程的鎖的狀態(tài)。 setState(c);return free; }tryRelease()的作用是嘗試釋放鎖。
(01) 如果“當(dāng)前線程”不是“鎖的持有者”,則拋出異常。
(02) 如果“當(dāng)前線程”在本次釋放鎖操作之后,對鎖的擁有狀態(tài)是0(即,當(dāng)前線程徹底釋放該“鎖”),則設(shè)置“鎖”的持有者為null,即鎖是可獲取狀態(tài)。同時,更新當(dāng)前線程的鎖的狀態(tài)為0。
unparkSuccessor()
?
private void unparkSuccessor(Node node) {// 獲取當(dāng)前線程的狀態(tài)int ws = node.waitStatus;// 如果狀態(tài)<0,則設(shè)置狀態(tài)=0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//獲取當(dāng)前節(jié)點的“有效的后繼節(jié)點”,無效的話,則通過for循環(huán)進(jìn)行獲取。// 這里的有效,是指“后繼節(jié)點對應(yīng)的線程狀態(tài)<=0”Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 喚醒“后繼節(jié)點對應(yīng)的線程”if (s != null)LockSupport.unpark(s.thread); }?
在release()中“當(dāng)前線程”釋放鎖成功的話,會喚醒當(dāng)前線程的后繼線程。
根據(jù)CLH隊列的FIFO規(guī)則,“當(dāng)前線程”(即已經(jīng)獲取鎖的線程)肯定是head;如果CLH隊列非空的話,則喚醒鎖的下一個等待線程。
?
?
?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/guoliangxie/p/6697855.html
總結(jié)
以上是生活随笔為你收集整理的并发编程总结4-JUC-REENTRANTLOCK-2(公平锁)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【机房收费系统】多么痛的领悟
- 下一篇: ORACLE利用STANDBY端RMAN