日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁

發布時間:2024/1/17 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這篇講講ReentrantReadWriteLock可重入讀寫鎖,它不僅是讀寫鎖的實現,并且支持可重入性。?聊聊高并發(十五)實現一個簡單的讀-寫鎖(共享-排他鎖)?這篇講了如何模擬一個讀寫鎖。

?

可重入的讀寫鎖的特點是

1. 當有線程獲取讀鎖時,不允許再有線程獲得寫鎖

2. 當有線程獲得寫鎖時,不允許其他線程獲得讀鎖和寫鎖

?

這里隱含著幾層含義:

?
  • static final int SHARED_SHIFT = 16;

  • static final int SHARED_UNIT = (1 << SHARED_SHIFT);

  • static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

  • static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

  • ?
  • /** Returns the number of shared holds represented in count */

  • static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

  • /** Returns the number of exclusive holds represented in count */

  • static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

  • ?

    ?

    1. 可以同時有多個線程同時獲得讀鎖,進入臨界區。這時候的讀鎖的行為和Semaphore信號量是類似的

    2. 由于是可重入的,所以1個線程如果獲得了讀鎖,那么它可以重入這個讀鎖

    3. 如果1個線程獲得了讀鎖,那么它不能同時再獲得寫鎖,這個就是所謂的“鎖升級”,讀鎖升級到寫鎖可能會造成死鎖,所以是不允許的

    4. 如果1個線程獲得了寫鎖,那么不允許其他線程再獲得讀鎖和寫鎖,但是它自己可以獲得讀鎖,就是所謂的“鎖降級”,鎖降級是允許的

    ?

    關于讀寫鎖的實現還要考慮的幾個要點:

    1. 釋放鎖時的優先級問題,是讓寫鎖先獲得還是先讓讀鎖先獲得

    2. 是否允許讀線程插隊

    3. 是否允許寫線程插隊,因為讀寫鎖一般用在大量讀,少量寫的情況,如果寫線程沒有優先級,那么可能造成寫線程的饑餓

    4. 鎖的升降級問題,一般是允許1個線程的寫鎖降級為讀鎖,不允許讀鎖升級成寫鎖

    ?

    帶著問題看看ReentrantReadWriteLock的源碼。 它同樣提供了Sync來繼承AQS并提供擴展,但是它的Sync相比較Semaphore和CountDownLatch要更加復雜。

    1. 把State狀態作為一個讀寫鎖的計數器,包括了重入的次數。state是32位的int值,所以把高位16位作為讀鎖的計數器,低位的16位作為寫鎖的計數器,并提供了響應的讀寫這兩個計數器的位操作方法。

    計算sharedCount時,采用無符號的移位操作,右移16位就是讀鎖計數器的值

    寫鎖直接用EXCLUSIVE_MASK和state做與運算,EXCLUSIVE_MASK的值是00000000000000001111111111111111,相當于計算了低位16位的值

    需要注意計算出來的值包含了重入的次數。所以MAX_COUNT限定了最大值是2^17 - 1

    ?

    ?
  • static final int SHARED_SHIFT = 16;

  • static final int SHARED_UNIT = (1 << SHARED_SHIFT);

  • static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

  • static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

  • ?
  • /** Returns the number of shared holds represented in count */

  • static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

  • /** Returns the number of exclusive holds represented in count */

  • static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }


  • HoldCount類用來計算1個線程的重入次數,并使用了1個ThreadLocal類型的HoldCounter,可以記錄每個線程的鎖的重入次數。 cachedHoldCounter記錄了最后1個獲取讀鎖的線程的重入次數。 firstReader指向了第一個獲取讀鎖的線程,firstReaderHoldCounter記錄了第一個獲取讀鎖的線程的重入次數

    ?

    ?

    ?
  • static final class HoldCounter {

  • int count = 0;

  • // Use id, not reference, to avoid garbage retention

  • final long tid = Thread.currentThread().getId();

  • }

  • ?
  • /**

  • * ThreadLocal subclass. Easiest to explicitly define for sake

  • * of deserialization mechanics.

  • */

  • static final class ThreadLocalHoldCounter

  • extends ThreadLocal<HoldCounter> {

  • public HoldCounter initialValue() {

  • return new HoldCounter();

  • }

  • }

  • ?
  • /**

  • ???????? * The hold count of the last thread to successfully acquire

  • ???????? * readLock. This saves ThreadLocal lookup in the common case

  • ???????? * where the next thread to release is the last one to

  • ???????? * acquire. This is non-volatile since it is just used

  • ???????? * as a heuristic, and would be great for threads to cache.

  • ???????? *

  • ???????? * <p>Can outlive the Thread for which it is caching the read

  • ???????? * hold count, but avoids garbage retention by not retaining a

  • ???????? * reference to the Thread.

  • ???????? *

  • ???????? * <p>Accessed via a benign data race; relies on the memory

  • ???????? * model's final field and out-of-thin-air guarantees.

  • ???????? */

  • ??????? private transient HoldCounter cachedHoldCounter;


  • Sync提供了兩個抽象方法給子類擴展,用來表示讀鎖和寫鎖是否應該阻塞等待

    ?

    ?

    ?
  • /**

  • * Returns true if the current thread, when trying to acquire

  • * the read lock, and otherwise eligible to do so, should block

  • * because of policy for overtaking other waiting threads.

  • */

  • abstract boolean readerShouldBlock();

  • ?
  • /**

  • * Returns true if the current thread, when trying to acquire

  • * the write lock, and otherwise eligible to do so, should block

  • * because of policy for overtaking other waiting threads.

  • */

  • abstract boolean writerShouldBlock();


  • 寫鎖的tryXXX獲取和釋放

    ?

    1. 寫鎖釋放時,由于沒有其他線程獲得臨界區,它的tryRelease()方法只需要設置狀態的值,通過exclusiveCount計算寫鎖的計數器,如果為0表示釋放了寫鎖,就把exclusiveOwnerThread設置為null.

    2. 寫鎖的tryAcquire獲取時,

    ??? 先判斷狀態是否為0,為0表示沒有線程獲得鎖,就可以直接設置狀態,然后把exclusiveOwnerThread設置為當前線程

    ??? 如果狀態不為0,那表示有幾種可能:寫鎖為0,讀鎖不為0;寫鎖不為0,讀鎖為0,寫鎖不為0,讀鎖也不為0。

    ??? 所以它先判斷寫鎖是否為0,寫鎖為0,那么表示讀鎖肯定不會為0,就失敗,

    ??? 或者寫鎖不為0,但是exclusiveOwnerThread不是自己,那么表示已經有其他線程獲得了寫鎖,就失敗

    ??? 寫鎖不為0,并且exclusiveOwnerThread是自己,那么肯定表示是寫鎖的重入的情況,所以設置state狀態,返回成功。

    ????

    ?

    ?
  • protected final boolean tryRelease(int releases) {

  • if (!isHeldExclusively())

  • throw new IllegalMonitorStateException();

  • int nextc = getState() - releases;

  • boolean free = exclusiveCount(nextc) == 0;

  • if (free)

  • setExclusiveOwnerThread(null);

  • setState(nextc);

  • return free;

  • }

  • ?
  • protected final boolean tryAcquire(int acquires) {

  • ??????????? /*

  • ???????????? * Walkthrough:

  • ???????????? * 1. If read count nonzero or write count nonzero

  • ???????????? *??? and owner is a different thread, fail.

  • ???????????? * 2. If count would saturate, fail. (This can only

  • ???????????? *??? happen if count is already nonzero.)

  • ???????????? * 3. Otherwise, this thread is eligible for lock if

  • ???????????? *??? it is either a reentrant acquire or

  • ???????????? *??? queue policy allows it. If so, update state

  • ???????????? *??? and set owner.

  • ???????????? */

  • ??????????? Thread current = Thread.currentThread();

  • ??????????? int c = getState();

  • ??????????? int w = exclusiveCount(c);

  • ??????????? if (c != 0) {

  • ??????????????? // (Note: if c != 0 and w == 0 then shared count != 0)

  • ??????????????? if (w == 0 || current != getExclusiveOwnerThread())

  • ??????????????????? return false;

  • ??????????????? if (w + exclusiveCount(acquires) > MAX_COUNT)

  • ??????????????????? throw new Error("Maximum lock count exceeded");

  • ??????????????? // Reentrant acquire

  • ??????????????? setState(c + acquires);

  • ??????????????? return true;

  • ??????????? }

  • ??????????? if (writerShouldBlock() ||

  • ??????????????? !compareAndSetState(c, c + acquires))

  • ??????????????? return false;

  • ??????????? setExclusiveOwnerThread(current);

  • ??????????? return true;

  • ??????? }


  • 讀鎖的tryXXX獲取和釋放

    ?

    1. 讀鎖釋放時基于共享的方式,修改線程各自的HoldCounter的值,最后采用位操作修改位于state的總體的讀鎖計數器。tryReleaseShared()之后具體的釋放后續線程的操作由AQS根據隊列狀態來決定。

    2. 讀所獲取時先看寫鎖的計數器,如果寫鎖已經被獲取,并且不是當前線程所獲取的,就直接失敗返回

    ??? 這里會進行一次快速路徑獲取,嘗試獲取一次,如果readShouldBlock()返回false,并且CAS操作成功了,意思是可以獲得鎖,就更新相關讀鎖計數器

    ??? 否則就進行輪詢方式的獲取fullTryAcquireShared()

    ??? 也就是說如果當前沒有線程獲取寫鎖,或者是自己獲取寫鎖,就可以獲取讀鎖

    ??? 一個線程獲取了寫鎖之后,它還可以獲取讀鎖,也就是所謂的“鎖降級”,但這時候其他線程無法獲取讀鎖,在檢查到有其他寫鎖存在時就退出了

    ?

    ?
  • protected final boolean tryReleaseShared(int unused) {

  • Thread current = Thread.currentThread();

  • if (firstReader == current) {

  • // assert firstReaderHoldCount > 0;

  • if (firstReaderHoldCount == 1)

  • firstReader = null;

  • else

  • firstReaderHoldCount--;

  • } else {

  • HoldCounter rh = cachedHoldCounter;

  • if (rh == null || rh.tid != current.getId())

  • rh = readHolds.get();

  • int count = rh.count;

  • if (count <= 1) {

  • readHolds.remove();

  • if (count <= 0)

  • throw unmatchedUnlockException();

  • }

  • --rh.count;

  • }

  • for (;;) {

  • int c = getState();

  • int nextc = c - SHARED_UNIT;

  • if (compareAndSetState(c, nextc))

  • // Releasing the read lock has no effect on readers,

  • // but it may allow waiting writers to proceed if

  • // both read and write locks are now free.

  • return nextc == 0;

  • }

  • }

  • ?
  • protected final int tryAcquireShared(int unused) {

  • ??????????? /*

  • ???????????? * Walkthrough:

  • ???????????? * 1. If write lock held by another thread, fail.

  • ???????????? * 2. Otherwise, this thread is eligible for

  • ???????????? *??? lock wrt state, so ask if it should block

  • ???????????? *??? because of queue policy. If not, try

  • ???????????? *??? to grant by CASing state and updating count.

  • ???????????? *??? Note that step does not check for reentrant

  • ???????????? *??? acquires, which is postponed to full version

  • ???????????? *??? to avoid having to check hold count in

  • ???????????? *??? the more typical non-reentrant case.

  • ???????????? * 3. If step 2 fails either because thread

  • ???????????? *??? apparently not eligible or CAS fails or count

  • ???????????? *??? saturated, chain to version with full retry loop.

  • ???????????? */

  • ??????????? Thread current = Thread.currentThread();

  • ??????????? int c = getState();

  • ??????????? if (exclusiveCount(c) != 0 &&

  • ??????????????? getExclusiveOwnerThread() != current)

  • ??????????????? return -1;

  • ??????????? int r = sharedCount(c);

  • ??????????? if (!readerShouldBlock() &&

  • ??????????????? r < MAX_COUNT &&

  • ??????????????? compareAndSetState(c, c + SHARED_UNIT)) {

  • ??????????????? if (r == 0) {

  • ??????????????????? firstReader = current;

  • ??????????????????? firstReaderHoldCount = 1;

  • ??????????????? } else if (firstReader == current) {

  • ??????????????????? firstReaderHoldCount++;

  • ??????????????? } else {

  • ??????????????????? HoldCounter rh = cachedHoldCounter;

  • ??????????????????? if (rh == null || rh.tid != current.getId())

  • ??????????????????????? cachedHoldCounter = rh = readHolds.get();

  • ??????????????????? else if (rh.count == 0)

  • ??????????????????????? readHolds.set(rh);

  • ??????????????????? rh.count++;

  • ??????????????? }

  • ??????????????? return 1;

  • ??????????? }

  • ??????????? return fullTryAcquireShared(current);

  • ??????? }

  • ?
  • ?/**

  • ???????? * Full version of acquire for reads, that handles CAS misses

  • ???????? * and reentrant reads not dealt with in tryAcquireShared.

  • ???????? */

  • ??????? final int fullTryAcquireShared(Thread current) {

  • ??????????? /*

  • ???????????? * This code is in part redundant with that in

  • ???????????? * tryAcquireShared but is simpler overall by not

  • ???????????? * complicating tryAcquireShared with interactions between

  • ???????????? * retries and lazily reading hold counts.

  • ???????????? */

  • ??????????? HoldCounter rh = cachedHoldCounter;

  • ??????????? if (rh == null || rh.tid != current.getId())

  • ??????????????? rh = readHolds.get();

  • ??????????? for (;;) {

  • ??????????????? int c = getState();

  • ??????????????? int w = exclusiveCount(c);

  • ??????????????? if ((w != 0 && getExclusiveOwnerThread() != current) ||

  • ??????????????????? ((rh.count | w) == 0 && readerShouldBlock(current)))

  • ??????????????????? return -1;

  • ??????????????? if (sharedCount(c) == MAX_COUNT)

  • ??????????????????? throw new Error("Maximum lock count exceeded");

  • ??????????????? if (compareAndSetState(c, c + SHARED_UNIT)) {

  • ??????????????????? cachedHoldCounter = rh; // cache for release

  • ??????????????????? rh.count++;

  • ??????????????????? return 1;

  • ??????????????? }

  • ??????????? }

  • ??????? }?

  • ?

    tryWriteLock和tryReadLock操作和上面的操作類似,它們是讀寫鎖的tryLock()的實際實現,表示嘗試獲取一次鎖

    1. tryWriteLock方法嘗試獲得寫鎖,先判斷狀態是否為0,為0并且CAS操作成功就表示獲得鎖。如果狀態不為0,就判斷寫鎖計數器的值,如果寫鎖計數器為0就表示存在讀鎖,就返回失敗,獲取寫鎖不為0,但是不是當前線程所獲取的,也返回失敗。只有寫鎖不為0并且是當前線程自己獲取的寫鎖,就是所謂的寫鎖重入操作。CAS成功后就表示獲得寫鎖

    ?

    ?
  • final boolean tryWriteLock() {

  • Thread current = Thread.currentThread();

  • int c = getState();

  • if (c != 0) {

  • int w = exclusiveCount(c);

  • if (w == 0 ||current != getExclusiveOwnerThread())

  • return false;

  • if (w == MAX_COUNT)

  • throw new Error("Maximum lock count exceeded");

  • }

  • if (!compareAndSetState(c, c + 1))

  • return false;

  • setExclusiveOwnerThread(current);

  • return true;

  • }

  • ?
  • final boolean tryReadLock() {

  • ??????????? Thread current = Thread.currentThread();

  • ??????????? for (;;) {

  • ??????????????? int c = getState();

  • ??????????????? if (exclusiveCount(c) != 0 &&

  • ??????????????????? getExclusiveOwnerThread() != current)

  • ??????????????????? return false;

  • ??????????????? if (sharedCount(c) == MAX_COUNT)

  • ??????????????????? throw new Error("Maximum lock count exceeded");

  • ??????????????? if (compareAndSetState(c, c + SHARED_UNIT)) {

  • ??????????????????? HoldCounter rh = cachedHoldCounter;

  • ??????????????????? if (rh == null || rh.tid != current.getId())

  • ??????????????????????? cachedHoldCounter = rh = readHolds.get();

  • ??????????????????? rh.count++;

  • ??????????????????? return true;

  • ??????????????? }

  • ??????????? }

  • ??????? }


  • ReentrantReadWriteLock也提供了非公平和公平的兩個Sync版本

    非公平的版本中

    1. 寫鎖總是優先獲取,不考慮AQS隊列中先來的線程

    2. 讀鎖也不按FIFO隊列排隊,而是看當前獲得鎖是否是寫鎖,如果是寫鎖,就等待,否則就嘗試獲得鎖

    而公平版本中

    1. 如果有其他鎖存在,獲取寫鎖操作就失敗,應該(should)進AQS隊列等待

    2. 如果有其他鎖存在,獲取讀鎖操作就失敗,應該(should)進AQS隊列等待

    ?

    ?
  • final static class NonfairSync extends Sync {

  • private static final long serialVersionUID = -8159625535654395037L;

  • final boolean writerShouldBlock(Thread current) {

  • return false; // writers can always barge

  • }

  • final boolean readerShouldBlock(Thread current) {

  • /* As a heuristic to avoid indefinite writer starvation,

  • * block if the thread that momentarily appears to be head

  • * of queue, if one exists, is a waiting writer. This is

  • * only a probablistic effect since a new reader will not

  • * block if there is a waiting writer behind other enabled

  • * readers that have not yet drained from the queue.

  • */

  • return apparentlyFirstQueuedIsExclusive();

  • }

  • }

  • ?
  • /**

  • * Fair version of Sync

  • */

  • final static class FairSync extends Sync {

  • private static final long serialVersionUID = -2274990926593161451L;

  • final boolean writerShouldBlock(Thread current) {

  • // only proceed if queue is empty or current thread at head

  • return !isFirst(current);

  • }

  • final boolean readerShouldBlock(Thread current) {

  • // only proceed if queue is empty or current thread at head

  • return !isFirst(current);

  • }

  • }


  • 具體ReadLock和WriteLock的實現就是依賴Sync來實現的,默認是非公平版本的Sync。

    讀鎖采用共享默認的AQS,它提供了中斷/不可中斷的lock操作,tryLock操作,限時的tryLock操作。值得注意的時讀鎖不支持newCondition操作。

    ?

    ?

    ?
  • public static class ReadLock implements Lock, java.io.Serializable {

  • private static final long serialVersionUID = -5992448646407690164L;

  • private final Sync sync;

  • ?
  • protected ReadLock(ReentrantReadWriteLock lock) {

  • sync = lock.sync;

  • }

  • ?
  • ?
  • public void lock() {

  • sync.acquireShared(1);

  • }

  • ?
  • ? public void lockInterruptibly() throws InterruptedException {

  • ??????????? sync.acquireSharedInterruptibly(1);

  • ??????? }

  • ?
  • public? boolean tryLock() {

  • ??????????? return sync.tryReadLock();

  • ??????? }

  • ?
  • public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {

  • ??????????? return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

  • ??????? }

  • ?
  • public? void unlock() {

  • ??????????? sync.releaseShared(1);

  • ??????? }

  • ?
  • ? public Condition newCondition() {

  • ??????????? throw new UnsupportedOperationException();

  • ??????? }


  • WriteLock基于獨占模式的AQS,它提供了中斷/不可中斷的lock操作,tryLock操作,限時的tryLock操作

    ?

    ?
  • public static class WriteLock implements Lock, java.io.Serializable {

  • private static final long serialVersionUID = -4992448646407690164L;

  • private final Sync sync;

  • ?
  • protected WriteLock(ReentrantReadWriteLock lock) {

  • sync = lock.sync;

  • }

  • ?
  • ? public void lock() {

  • ??????????? sync.acquire(1);

  • ??????? }

  • ?
  • ?public void lockInterruptibly() throws InterruptedException {

  • ??????????? sync.acquireInterruptibly(1);

  • ??????? }

  • ?
  • public boolean tryLock( ) {

  • ??????????? return sync.tryWriteLock();

  • ??????? }

  • ?
  • public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {

  • ??????????? return sync.tryAcquireNanos(1, unit.toNanos(timeout));

  • ??????? }

  • ?
  • public void unlock() {

  • ??????????? sync.release(1);

  • ??????? }

  • ?
  • public Condition newCondition() {

  • ??????????? return sync.newCondition();

  • ??????? }



  • 最后再說一下AQS和各種同步器實現的關系,AQS提供了同步隊列和條件隊列的管理,包括各種情況下的入隊出隊操作。而同步器子類實現了tryAcquire和tryRelease方法來操作狀態,來表示什么情況下可以直接獲得鎖而不需要進入AQS,什么情況下獲取鎖失敗則需要進入AQS隊列等待

    總結

    以上是生活随笔為你收集整理的聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。