java ReentrantLock 实现原理
使用?synchronized?來做同步處理時(shí),鎖的獲取和釋放都是隱式的,實(shí)現(xiàn)的原理是通過編譯后加上不同的機(jī)器指令來實(shí)現(xiàn)。
而?ReentrantLock?就是一個(gè)普通的類,它是基于?AQS(AbstractQueuedSynchronizer)來實(shí)現(xiàn)的。
是一個(gè)重入鎖:一個(gè)線程獲得了鎖之后仍然可以反復(fù)的加鎖,不會(huì)出現(xiàn)自己阻塞自己的情況。
AQS?是?Java?并發(fā)包里實(shí)現(xiàn)鎖、同步的一個(gè)重要的基礎(chǔ)框架。
?
鎖類型
ReentrantLock 分為公平鎖和非公平鎖,可以通過構(gòu)造方法來指定具體類型:
//默認(rèn)非公平鎖public ReentrantLock() {sync = new NonfairSync();}//公平鎖public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}默認(rèn)一般使用非公平鎖,它的效率和吞吐量都比公平鎖高的多(后面會(huì)分析具體原因)。
?
獲取鎖
通常的使用方式如下:
private ReentrantLock lock = new ReentrantLock();public void run() {lock.lock();try {//do bussiness} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}?
公平鎖獲取鎖
首先看下獲取鎖的過程:
public void lock() {sync.lock();}可以看到是使用?sync的方法,而這個(gè)方法是一個(gè)抽象方法,具體是由其子類(FairSync)來實(shí)現(xiàn)的,以下是公平鎖的實(shí)現(xiàn):
final void lock() {acquire(1);}//AbstractQueuedSynchronizer 中的 acquire()public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}第一步是嘗試獲取鎖(tryAcquire(arg)),這個(gè)也是由其子類實(shí)現(xiàn):
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}首先會(huì)判斷?AQS?中的?state?是否等于 0,0 表示目前沒有其他線程獲得鎖,當(dāng)前線程就可以嘗試獲取鎖。
注意:嘗試之前會(huì)利用?hasQueuedPredecessors()?方法來判斷 AQS 的隊(duì)列中中是否有其他線程,如果有則不會(huì)嘗試獲取鎖(這是公平鎖特有的情況)。
如果隊(duì)列中沒有線程就利用 CAS 來將 AQS 中的 state 修改為1,也就是獲取鎖,獲取成功則將當(dāng)前線程置為獲得鎖的獨(dú)占線程(setExclusiveOwnerThread(current))。
如果?state?大于 0 時(shí),說明鎖已經(jīng)被獲取了,則需要判斷獲取鎖的線程是否為當(dāng)前線程(ReentrantLock?支持重入),是則需要將?state + 1,并將值更新。
寫入隊(duì)列
如果?tryAcquire(arg)?獲取鎖失敗,則需要用?addWaiter(Node.EXCLUSIVE)?將當(dāng)前線程寫入隊(duì)列中。
寫入之前需要將當(dāng)前線程包裝為一個(gè)?Node?對象(addWaiter(Node.EXCLUSIVE))。
AQS 中的隊(duì)列是由 Node 節(jié)點(diǎn)組成的雙向鏈表實(shí)現(xiàn)的。
包裝代碼:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}首先判斷隊(duì)列是否為空,不為空時(shí)則將封裝好的?Node?利用?CAS?寫入隊(duì)尾,如果出現(xiàn)并發(fā)寫入失敗就需要調(diào)用?enq(node);來寫入了。
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}這個(gè)處理邏輯就相當(dāng)于自旋加上?CAS?保證一定能寫入隊(duì)列。
掛起等待線程
寫入隊(duì)列之后需要將當(dāng)前線程掛起(利用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}首先會(huì)根據(jù)?node.predecessor()?獲取到上一個(gè)節(jié)點(diǎn)是否為頭節(jié)點(diǎn),如果是則嘗試獲取一次鎖,獲取成功就萬事大吉了。
如果不是頭節(jié)點(diǎn),或者獲取鎖失敗,則會(huì)根據(jù)上一個(gè)節(jié)點(diǎn)的?waitStatus?狀態(tài)來處理(shouldParkAfterFailedAcquire(p, node))。
waitStatus?用于記錄當(dāng)前節(jié)點(diǎn)的狀態(tài),如節(jié)點(diǎn)取消、節(jié)點(diǎn)等待等。
shouldParkAfterFailedAcquire(p, node)?返回當(dāng)前線程是否需要掛起,如果需要?jiǎng)t調(diào)用?parkAndCheckInterrupt():
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}他是利用?LockSupport?的?part?方法來掛起當(dāng)前線程的,直到被喚醒。
?
非公平鎖獲取鎖
公平鎖與非公平鎖的差異主要在獲取鎖:
公平鎖就相當(dāng)于買票,后來的人需要排到隊(duì)尾依次買票,不能插隊(duì)。
而非公平鎖則沒有這些規(guī)則,是搶占模式,每來一個(gè)人不會(huì)去管隊(duì)列如何,直接嘗試獲取鎖。
非公平鎖:
final void lock() {//直接嘗試獲取鎖if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}公平鎖:
final void lock() {acquire(1);}還要一個(gè)重要的區(qū)別是在嘗試獲取鎖時(shí)tryAcquire(arg),非公平鎖是不需要判斷隊(duì)列中是否還有其他線程,也是直接嘗試獲取鎖:
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {//沒有 !hasQueuedPredecessors() 判斷if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}?
釋放鎖
公平鎖和非公平鎖的釋放流程都是一樣的:
public void unlock() {sync.release(1);}public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)//喚醒被掛起的線程unparkSuccessor(h);return true;}return false;}//嘗試釋放鎖protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}首先會(huì)判斷當(dāng)前線程是否為獲得鎖的線程,由于是重入鎖所以需要將?state?減到 0 才認(rèn)為完全釋放鎖。
釋放之后需要調(diào)用?unparkSuccessor(h)?來喚醒被掛起的線程。
?
總結(jié)
由于公平鎖需要關(guān)心隊(duì)列的情況,得按照隊(duì)列里的先后順序來獲取鎖(會(huì)造成大量的線程上下文切換),而非公平鎖則沒有這個(gè)限制。
所以也就能解釋非公平鎖的效率會(huì)被公平鎖更高。
總結(jié)
以上是生活随笔為你收集整理的java ReentrantLock 实现原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 多线程三大核心
- 下一篇: java ConcurrentHashM