搞定ReentrantReadWriteLock 几道小小数学题就够了
ReadWriteLock
ReadWriteLock 直譯過來為【讀寫鎖】。現實中,讀多寫少的業務場景是非常普遍的,比如應用緩存
一個線程將數據寫入緩存,其他線程可以直接讀取緩存中的數據,提高數據查詢效率
之前提到的互斥鎖都是排他鎖,也就是說同一時刻只允許一個線程進行訪問,當面對可共享讀的業務場景,互斥鎖顯然是比較低效的一種處理方式。為了提高效率,讀寫鎖模型就誕生了
效率提升是一方面,但并發編程更重要的是在保證準確性的前提下提高效率
一個寫線程改變了緩存中的值,其他讀線程一定是可以 “感知” 到的,否則可能導致查詢到的值不準確
所以關于讀寫鎖模型就了下面這 3 條規定:
允許多個線程同時讀共享變量
只允許一個線程寫共享變量
如果寫線程正在執行寫操作,此時則禁止其他讀線程讀共享變量
ReadWriteLock 是一個接口,其內部只有兩個方法:
public?interface?ReadWriteLock?{//?返回用于讀的鎖Lock?readLock();//?返回用于寫的鎖Lock?writeLock(); }所以要了解整個讀/寫鎖的整個應用過程,需要從它的實現類 ReentrantReadWriteLock 說起
ReentrantReadWriteLock 類結構
直接對比ReentrantReadWriteLock 與 ReentrantLock的類結構
他們又很相似吧,根據類名稱以及類結構,按照咱們前序文章的分析,你也就能看出 ReentrantReadWriteLock 的基本特性:
其中黃顏色標記的的 鎖降級 是看不出來的, 這里先有個印象,下面會單獨說明
另外,不知道你是否還記得,Java AQS隊列同步器以及ReentrantLock的應用 說過,Lock 和 AQS 同步器是一種組合形式的存在,既然這里是讀/寫兩種鎖,他們的組合模式也就分成了兩種:
讀鎖與自定義同步器的聚合
寫鎖與自定義同步器的聚合
這里只是提醒大家,模式沒有變,不要被讀/寫兩種鎖迷惑
基本示例
說了這么多,如果你忘了前序知識,整體理解感覺應該是有斷檔的,所以先來看個示例(模擬使用緩存)讓大家對 ReentrantReadWriteLock 有個直觀的使用印象
public?class?ReentrantReadWriteLockCache?{//?定義一個非線程安全的?HashMap?用于緩存對象static?Map<String,?Object>?map?=?new?HashMap<String,?Object>();//?創建讀寫鎖對象static?ReadWriteLock?readWriteLock?=?new?ReentrantReadWriteLock();//?構建讀鎖static?Lock?rl?=?readWriteLock.readLock();//?構建寫鎖static?Lock?wl?=?readWriteLock.writeLock();public?static?final?Object?get(String?key)?{rl.lock();try{return?map.get(key);}finally?{rl.unlock();}}public?static?final?Object?put(String?key,?Object?value){wl.lock();try{return?map.put(key,?value);}finally?{wl.unlock();}} }你瞧,使用就是這么簡單。但是你知道的,AQS 的核心是鎖的實現,即控制同步狀態 state 的值,ReentrantReadWriteLock 也是應用AQS的 state 來控制同步狀態的,那么問題來了:
一個 int 類型的 state 怎么既控制讀的同步狀態,又可以控制寫的同步狀態呢?
顯然需要一點設計了
讀寫狀態設計
如果要在一個 int 類型變量上維護多個狀態,那肯定就需要拆分了。我們知道 int 類型數據占32位,所以我們就有機會按位切割使用state了。我們將其切割成兩部分:
高16位表示讀
低16位表示寫
所以,要想準確的計算讀/寫各自的狀態值,肯定就要應用位運算了,下面代碼是 JDK1.8,ReentrantReadWriteLock 自定義同步器 Sync 的位操作
abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{static?final?int?SHARED_SHIFT???=?16;static?final?int?SHARED_UNIT????=?(1?<<?SHARED_SHIFT);static?final?int?MAX_COUNT??????=?(1?<<?SHARED_SHIFT)?-?1;static?final?int?EXCLUSIVE_MASK?=?(1?<<?SHARED_SHIFT)?-?1;static?int?sharedCount(int?c)?{?return?c?>>>?SHARED_SHIFT;?}static?int?exclusiveCount(int?c)?{?return?c?&?EXCLUSIVE_MASK;?} }乍一看真是有些復雜的可怕,別慌,咱們通過幾道小小數學題就可以搞定整個位運算過程
整個 ReentrantReadWriteLock 中 讀/寫狀態的計算就是反復應用這幾道數學題,所以,在閱讀下面內容之前,希望你搞懂這簡單的運算
基礎鋪墊足夠了,我們進入源碼分析吧
源碼分析
寫鎖分析
由于寫鎖是排他的,所以肯定是要重寫 AQS 中 tryAcquire 方法
????????protected?final?boolean?tryAcquire(int?acquires)?{????????Thread?current?=?Thread.currentThread();//?獲取?state?整體的值int?c?=?getState();//?獲取寫狀態的值int?w?=?exclusiveCount(c);if?(c?!=?0)?{//?w=0:?根據推理二,整體狀態不等于零,寫狀態等于零,所以,讀狀態大于0,即存在讀鎖//?或者當前線程不是已獲取寫鎖的線程//?二者之一條件成真,則獲取寫狀態失敗if?(w?==?0?||?current?!=?getExclusiveOwnerThread())return?false;if?(w?+?exclusiveCount(acquires)?>?MAX_COUNT)throw?new?Error("Maximum?lock?count?exceeded");//?根據推理一第?1?條,更新寫狀態值setState(c?+?acquires);return?true;}if?(writerShouldBlock()?||!compareAndSetState(c,?c?+?acquires))return?false;setExclusiveOwnerThread(current);return?true;}上述代碼 第 19 行 writerShouldBlock 也并沒有什么神秘的,只不過是公平/非公平獲取鎖方式的判斷(是否有前驅節點來判斷)
你瞧,寫鎖獲取方式就是這么簡單
讀鎖分析
由于讀鎖是共享式的,所以肯定是要重寫 AQS 中 tryAcquireShared 方法
????????protected?final?int?tryAcquireShared(int?unused)?{Thread?current?=?Thread.currentThread();int?c?=?getState();//?寫狀態不等于0,并且鎖的持有者不是當前線程,根據約定?3,則獲取讀鎖失敗if?(exclusiveCount(c)?!=?0?&&getExclusiveOwnerThread()?!=?current)return?-1;//?獲取讀狀態值int?r?=?sharedCount(c);//?這個地方有點不一樣,我們單獨說明if?(!readerShouldBlock()?&&r?<?MAX_COUNT?&&compareAndSetState(c,?c?+?SHARED_UNIT))?{if?(r?==?0)?{firstReader?=?current;firstReaderHoldCount?=?1;}?else?if?(firstReader?==?current)?{firstReaderHoldCount++;}?else?{HoldCounter?rh?=?cachedHoldCounter;if?(rh?==?null?||?rh.tid?!=?getThreadId(current))cachedHoldCounter?=?rh?=?readHolds.get();else?if?(rh.count?==?0)readHolds.set(rh);rh.count++;}return?1;}//?如果獲取讀鎖失敗則進入自旋獲取return?fullTryAcquireShared(current);}readerShouldBlock 和 writerShouldBlock 在公平鎖的實現上都是判斷是否有前驅節點,但是在非公平鎖的實現上,前者是這樣的:
final?boolean?readerShouldBlock()?{return?apparentlyFirstQueuedIsExclusive(); }final?boolean?apparentlyFirstQueuedIsExclusive()?{Node?h,?s;return?(h?=?head)?!=?null?&&//?等待隊列頭節點的下一個節點(s?=?h.next)??!=?null?&&//?如果是排他式的節點!s.isShared()?????????&&s.thread?!=?null; }簡單來說,如果請求讀鎖的當前線程發現同步隊列的 head 節點的下一個節點為排他式節點,那么就說明有一個線程在等待獲取寫鎖(爭搶寫鎖失敗,被放入到同步隊列中),那么請求讀鎖的線程就要阻塞,畢竟讀多寫少,如果還沒有這點判斷機制,寫鎖可能會發生【饑餓】
上述條件都滿足了,也就會進入 ?tryAcquireShared 代碼的第 14 行到第 25 行,這段代碼主要是為了記錄線程持有鎖的次數。讀鎖是共享式的,還想記錄每個線程持有讀鎖的次數,就要用到 ThreadLocal 了,因為這不影響同步狀態 state 的值,所以就不分析了, 只把關系放在這吧
到這里讀鎖的獲取也就結束了,比寫鎖稍稍復雜那么一丟丟,接下來就說明一下那個可能讓你迷惑的鎖升級/降級問題吧
??
讀寫鎖的升級與降級
個人理解:讀鎖是可以被多線程共享的,寫鎖是單線程獨占的,也就是說寫鎖的并發限制比讀鎖高,所以
在真正了解讀寫鎖的升級與降級之前,我們需要完善一下本文開頭 ReentrantReadWriteLock 的例子
?public?static?final?Object?get(String?key)?{Object?obj?=?null;rl.lock();try{//?獲取緩存中的值obj?=?map.get(key);}finally?{rl.unlock();}//?緩存中值不為空,直接返回if?(obj!=?null)?{return?obj;}//?緩存中值為空,則通過寫鎖查詢DB,并將其寫入到緩存中wl.lock();try{//?再次嘗試獲取緩存中的值obj?=?map.get(key);//?再次獲取緩存中值還是為空if?(obj?==?null)?{//?查詢DBobj?=?getDataFromDB(key);?//?偽代碼:getDataFromDB//?將其放入到緩存中map.put(key,?obj);}}finally?{wl.unlock();}return?obj;}有童鞋可能會有疑問
在寫鎖里面,為什么代碼第19行還要再次獲取緩存中的值呢?不是多此一舉嗎?
其實這里再次嘗試獲取緩存中的值是很有必要的,因為可能存在多個線程同時執行 get 方法,并且參數 key 也是相同的,執行到代碼第 16 行 wl.lock() ,比如這樣:
線程 A,B,C 同時執行到臨界區 wl.lock(), 只有線程 A 獲取寫鎖成功,線程B,C只能阻塞,直到線程A 釋放寫鎖。這時,當線程B 或者 C 再次進入臨界區時,線程 A 已經將值更新到緩存中了,所以線程B,C沒必要再查詢一次DB,而是再次嘗試查詢緩存中的值
既然再次獲取緩存很有必要,我能否在讀鎖里直接判斷,如果緩存中沒有值,那就再次獲取寫鎖來查詢DB不就可以了嘛,就像這樣:
?public?static?final?Object?getLockUpgrade(String?key)?{Object?obj?=?null;rl.lock();try{obj?=?map.get(key);if?(obj?==?null){wl.lock();try{obj?=?map.get(key);if?(obj?==?null)?{obj?=?getDataFromDB(key);?//?偽代碼:getDataFromDBmap.put(key,?obj);}}finally?{wl.unlock();}}}finally?{rl.unlock();}return?obj;}這還真是不可以的,因為獲取一個寫入鎖需要先釋放所有的讀取鎖,如果有兩個讀取鎖試圖獲取寫入鎖,且都不釋放讀取鎖時,就會發生死鎖,所以在這里,鎖的升級是不被允許的
讀寫鎖的升級是不可以的,那么鎖的降級是可以的嘛?這個是 Oracle 官網關于鎖降級的示例?https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html
,我將代碼粘貼在此處,大家有興趣可以點進去連接看更多內容
代碼中聲明了一個 volatile 類型的 cacheValid 變量,保證其可見性。
首先獲取讀鎖,如果cache不可用,則釋放讀鎖
然后獲取寫鎖
在更改數據之前,再檢查一次cacheValid的值,然后修改數據,將cacheValid置為true
然后在釋放寫鎖前獲取讀鎖 此時
cache中數據可用,處理cache中數據,最后釋放讀鎖
這個過程就是一個完整的鎖降級的過程,目的是保證數據可見性,聽起來很有道理的樣子,那么問題來了:
上述代碼為什么在釋放寫鎖之前要獲取讀鎖呢?
如果當前的線程A在修改完cache中的數據后,沒有獲取讀鎖而是直接釋放了寫鎖;假設此時另一個線程B 獲取了寫鎖并修改了數據,那么線程A無法感知到數據已被修改,但線程A還應用了緩存數據,所以就可能出現數據錯誤
如果遵循鎖降級的步驟,線程A 在釋放寫鎖之前獲取讀鎖,那么線程B在獲取寫鎖時將被阻塞,直到線程A完成數據處理過程,釋放讀鎖,從而保證數據的可見性
那問題又來了:
使用寫鎖一定要降級嗎?
如果你理解了上面的問題,相信這個問題已經有了答案。假如線程A修改完數據之后, 經過耗時操作后想要再使用數據時,希望使用的是自己修改后的數據,而不是其他線程修改后的數據,這樣的話確實是需要鎖降級;如果只是希望最后使用數據的時候,拿到的是最新的數據,而不一定是自己剛修改過的數據,那么先釋放寫鎖,再獲取讀鎖,然后使用數據也無妨
在這里我要額外說明一下你可能存在的誤解:
如果已經釋放了讀鎖再獲取寫鎖不叫鎖的升級
如果已經釋放了寫鎖在獲取讀鎖也不叫鎖的降級
相信你到這里也理解了鎖的升級與降級過程,以及他們被允許或被禁止的原因了
總結
本文主要說明了 ReentrantReadWriteLock 是如何應用 state 做位拆分實現讀/寫兩種同步狀態的,另外也通過源碼分析了讀/寫鎖獲取同步狀態的過程,最后又了解了讀寫鎖的升級/降級機制,相信到這里你對讀寫鎖已經有了一定的理解。
?
靈魂追問
讀鎖也沒修改數據,還允許共享式獲取,那還有必要設置讀鎖嗎?
在分布式環境中,你是如何保證緩存數據一致性的呢?
當你打開看ReentrantReadWriteLock源碼時,你會發現,WriteLock 中可以使用 Condition,但是ReadLock 使用Condition卻會拋出UnsupportedOperationException,這是為什么呢?
參考
Java 并發實戰
Java 并發編程的藝術
https://www.jianshu.com/p/58697bb2243e
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
新人創作打卡挑戰賽發博客就能抽獎!定制產品紅包拿不停!總結
以上是生活随笔為你收集整理的搞定ReentrantReadWriteLock 几道小小数学题就够了的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java web学习(一)Servlet
- 下一篇: 全面容器化:阿里5年带给我的最大收获