日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

详解 ConcurrentHashMap

發布時間:2025/3/12 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 详解 ConcurrentHashMap 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • ConcurrentHashMap 的底層數據結構?
  • ConcurrentHashMap 的帶參構造方法的流程?
  • ConcurrentHashMap 的 put 方法的流程?
  • ConcurrentHashMap addCount 方法的流程是怎樣的呢?
  • ConcurrentHashMap transfer 方法的流程是怎樣的呢?
  • ConcurrentHashMap helpTransfer 方法的流程是怎樣的呢?
  • ConcurrentHashMap 的 get 方法的流程?
  • ConcurrentHashMap 的 sizeCtl 的含義,以及值的流轉過程?
  • ConcurrentHashMap 的 size 方法的流程?
  • 其他
    • 如果 ConcurrentHashMap 的某個數組下標位置是一顆紅黑樹,那么這個位置上的節點類型是 TreeNode 嗎?
    • 為什么要用 TreeBin 對象作為這個位置上的節點,而不是 TreeNode 對象呢?
    • ConcurrentHashMap 的 size 方法會返回最新的值嗎?
    • transferIndex 的真正含義
  • ConcurrentHashMap 總結
    • put 方法流程總結
    • ConcurrentHashMap 的元素數量計數
    • ConcurrentHashMap 的擴容操作
  • ConcurrentHashMap 的設計思想總結
    • 大量的無鎖并發安全處理操作
    • 細化臨界資源粒度
    • 高效的擴容機制
    • 高效的狀態管理機制

ConcurrentHashMap 的底層數據結構?

ConcurrentHashMap 的底層數據結構是 Node 數組。Node 類的定義如下:

static class Node<K,V> implements Map.Entry<K,V> {//節點的 hash 值final int hash;//節點的 key 值final K key;//節點的 value 值volatile V val;//后繼節點volatile Node<K,V> next; }

其中,元素的 key 和 value 均不能為空。

ConcurrentHashMap 的帶參構造方法的流程?

  • 判斷傳入的初始容量是否合法,小于 0 將拋出異常
  • 判斷是否傳入的初始容量大于最大值(2^30 次方)的一半,如果是,則將容量設置為最大值
  • 否則將容量設置為大于傳入的初始容量的最小的 2 的整數次冪
  • 將 sizeCtl 參數賦值為初始容量

ConcurrentHashMap 的 put 方法的流程?

ConcurrentHashMap 的 put 方法流程如下:

  • 首先檢查 key 和 value 是否為空,如果為空,則直接拋出空指針異常
  • 其次調用 spread 方法計算 hash 值
    • 將 key 的 hashcode 往右移 16 位,跟原 hashcode 值做異或運算
    • 異或運算得到的結果,跟 HASH_BITS (HASH_BITS = 0x7fffffff,換算成二進制有 31 個 1)做運算得到最終結果
  • 判斷數組是否為空,如果數組為空,則執行初始化方法
    • 當表為空時,一直執行循環
    • 完成構造方法后,sizeCtl 參數要么等于 0,(即使用的無參構造器),要么等于初始容量大小,(使用的指定了初始容量的構造器)
    • 當 sizeCtl 為負數時,即表正在被其他線程初始化或者正在被其他線程擴容時,調用 Thread.yield 方法主動讓出 cpu 執行權(即等待其他線程完成初始化或表擴容的操作
    • 當 sizeCtl 不為負數時,使用 CAS 將 sizeCtl 的值設置為 -1
    • 再次判斷表是否為空
      • 如果表不為空,則說明表已經被其他線程初始化完成,則直接跳出循環
      • 如果表為空,判斷是否指定了初始容量,如果指定了初始化容量,則使用指定的數值作為初始化容量;如果沒有指定初始容量,則使用默認容量 16
      • 初始化一個大小為上一步中得到的容量的 Node 數組
      • 將 sizeCtl 的值設置為容量的 0.75(可類比于 HashMap 中的擴容閾值
  • 根據 hash 跟數組長度 - 1進行運算后,得到元素在數組中的下標,并檢查該下標位置是否存在元素
    • 如果該下標位置不存在元素,則用 CAS 對該下標位置進行賦值,如果賦值成功,則跳出循環
    • 如果 CAS 操作失敗,則繼續循環
  • 如果數組該下標位置存在元素(以下簡稱該元素為 f),則檢查 f 的 hash 值是否等于 -1(當元素的 hash 值為 -1 時,代表該數組正在進行擴容),即 MOVED
    • 如果是,則說明其他線程正在進行擴容,則執行 helpTransfer 方法協助完成擴容操作
  • 否則,開始對該數組下標位置上的桶中的元素進行遍歷比較
    • 首先使用 synchronized 關鍵字對 f 進行加鎖
    • 加鎖成功,則重新獲取一遍該數組下標位置上的元素,判斷其與 f 是否相等,即判斷 f 是否發生了變化,如果發生了變化,則直接進入下一次循環
    • 如果沒有發生變化,則判斷 f 的 hash 值是否大于等于 0
      • 如果大于等于 0,則說明是鏈表結構,則遍歷鏈表,將 binCount 值賦為 1,每次遍歷都將 binCount +1
      • 使用 key 的 equals 方法逐一比對元素,如果該 key 不存在,則將待插入元素加入到鏈表的尾部
      • 如果存在該 key,則根據 onlyIfAbsent 參數來判斷是否需要將舊 value 值進行覆蓋
    • 如果 f 的 hash 值小于 0 ,則判斷 f 是否是 TreeBin 類型的元素
    • 如果是,將 binCount 值賦為 2,將待插入元素插入到紅黑樹中
      • 如果紅黑樹插入失敗,則說明存在該 key,則根據 onlyIfAbsent 參數來判斷是否需要將舊 value 值進行覆蓋
  • 判斷 binCount 的值是否不等于 0,即是否進行了紅黑樹和鏈表的查找過程
    • 如果不等于 0,則判斷鏈表是否需要轉化成紅黑樹,當鏈表上的元素個數大于 8(即在插入第 9 個元素時),且數組的長度大于 64 時,將鏈表轉化成紅黑樹
    • 轉化成紅黑樹后,將該數組下標位置上的元素使用 CAS 替換成 TreeBin 類型的元素
    • 如果替換了舊值,則將舊值返回
  • 執行 addCount 方法,即嘗試將元素數量 +1

結合源碼來看:

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null; }

ConcurrentHashMap addCount 方法的流程是怎樣的呢?

addCount 方法,即嘗試將當前元素數量自增的方法,其主要的流程如下:

  • 首先判斷 counterCells 是否不為空
  • 或者嘗試使用 CAS 對 baseCount 屬性進行增加的時候是否失敗
  • 如果滿足上面的條件
    • 繼續判斷 counterCells 是否為空
    • 如果 counterCells 不為空,則調用 ThreadLocalRandom.getProbe 方法生成一個隨機數,跟 countCells.length-1 進行操作之后,得到 counterCells 數組的下標,判斷 counterCells 該下標位置上的元素是否為空(即得到一個沒有線程正在占用的)
    • 如果上述條件都不滿足,則使用 CAS 將 counterCells 下標位置的 value 值進行增加,判斷 CAS 操作是否失敗
    • 如果上述任一條件滿足,說明已經發生了線程間的競爭,則調用 fullAddCount 方法進行 counterCells 內部的自增操作
    • 如果上述所有條件都不滿足,說明對于 countCells 下標位置的 value 值進行 CAS 增加的操作成功了
      • 如果 check 參數小于等于 1,則直接返回
      • 否則,調用 sumCount 方法統計一下當前數組中的元素數量
        • sumCount 方法,就是簡單地將 baseCount 的值和所有 counterCells 數組的所有元素的 value 值求和,此方法沒有加鎖,同步措施主要依靠 baseCount 和 CounterCell 的 value 屬性都是用 volatile 關鍵字來修飾的。
  • 檢查 check 變量是否大于等于 0
    • 如果大于等于 0,說明需要檢查是否要進行擴容
    • 判斷當前元素數量是否大于 sizeCtl 參數,且表不為空,且表的長度小于最大長度時,此時說明需要擴容,則進入循環
      • 首先計算擴容戳(即計算當前表長度數值的最高非 0 位前的 0 的個數,跟 2152^{15}215 進行運算)
      • 接下來判斷 sizeCtl 是否小于 0
        • 如果小于 0 代表數組正在擴容,即有線程正在對數組進行擴容
          • 判斷 sizeCtl 往右移 16 位后是否不等于 擴容戳
          • 判斷 nextTable 屬性是否等于 0
          • 判斷 transferIndex 是否小于等于 0
          • 如果上述 3 個條件任一成立,代表數組已經被其他線程擴容完成,則直接返回
          • 如果上述 3 個條件都不成立,則嘗試使用 CAS 對 sizeCtl 進行 +1
            • 如果 CAS 成功,代表該線程開始執行協助擴容操作,參與擴容的線程數(sizeCtl 參數的低 16 位)+1,則開始執行協助擴容
        • 如果 sizeCtl 不小于 0,則嘗試使用 CAS 對 sizeCtl 的值修改成擴容戳左移 16 位且 +2
          • 如果 CAS 成功,則執行初始化擴容操作(此前沒有其他線程在對數組進行擴容)
        • 重新計算當前元素數量(調用 sumCount 方法)后進入下次循環

結合源碼來看:

private final void addCount(long x, int check) {CounterCell[] as; long b, s;if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}} }

ConcurrentHashMap transfer 方法的流程是怎樣的呢?

ConcurrentHashMap 的 transfer 方法,即為擴容方法,其主要的流程如下:

  • 首先,需要通過 CPU 核心數確定每個線程需要處理的桶的數量 stride,最小為 16
  • 如果 nextTable 屬性為空,則說明正在執行初始化擴容,則新建一個原數組長度兩倍的新數組,并賦值給 nextTable,并將 nextTransferIndex 屬性賦值為原數組長度
  • 創建一個 FowardingNode 類型的節點,此類節點的 hash 值為 -1,其中有一個 nextTab 屬性,記錄的就是擴容時的新數組
  • 根據 transferIndex 與 stride 的值,嘗試使用 CAS 將 transferIndex 的值修改為 transferIndex - stride,這一步是確定當前線程要處理的桶的范圍,即當前線程要處理的數組下標范圍是 [transferIndex - stride,transferIndex) 這個區間內的所有桶
  • 分配到需要處理的桶的范圍后,從右到左逆序遍歷這個范圍中的每一個桶,遍歷的下標為 i
    • 判斷位置為 i 上的這個節點是否為空,如果為空,則嘗試使用 CAS 將這個位置上的節點修改成創建好的 FowardingNode 節點
    • 如果這個節點不為空,那么判斷這個節點的 hash 值是否等于 -1,如果是,代表這個節點是 ForwardingNode 類型的節點,則不予處理
    • 否則說明這個節點上的元素還沒有被遷移,則開始遷移這個桶中的所有節點
      • 首先對這個節點使用 synchronized 進行加鎖
      • 加鎖成功后,判斷這個節點有沒有被改變
      • 如果沒被改變,則判斷這個節點的 hash 值是否大于 0
      • 如果大于 0,則說明這個節點是鏈表的頭節點,則開始對鏈表進行遷移
        • 首先,遍歷鏈表,計算每一個節點的 runBit ,其計算方式就是將節點的 hash 值與原數組長度進行運算,計算結果只有兩種
          • 如果 runBit 的值為 0,則說明節點在新數組中的位置等于原來的下標位置
          • 如果 runBit 的值不為 0,則說明節點在新數組中的位置等于原來的下標 + 原數組的長度位置
        • 找到最后一個與前驅節點的 runBit 值不相等的節點 lastRun,最后的 runBit 值等于 lastRun 節點的 runBit
        • lastRun 節點的含義,就是在鏈表中找到一個其后續節點的 runBit 值都相等的節點,在發生遷移的時候,只需要移動這個 lastRun 節點,就可以完成其后續所有節點的遷移
        • 如果最后的 runBit 等于 0,則將 lastRun 賦值給低位鏈表頭節點 ln;如果最后的 runBit 不等于 0,則賦值給高位鏈表頭節點 hn
        • 從頭遍歷鏈表,直到找到 lastRun 的位置停止,根據 runBit 值的不同,使用頭插法將元素插入到低位鏈表中,或者高位鏈表
        • 使用 CAS 將新數組的 i 的位置上的元素賦值為低位鏈表頭節點 ln
        • 使用 CAS 將新數組的 i + 原數組長度 的位置上的元素賦值為高位鏈表頭節點 hn
        • 使用 CAS 將原數組的 i 位置上的元素賦值為創建好的 ForwardingNode 節點
      • 如果原數組 i 上的元素是 TreeBin 類型,則執行紅黑樹的遷移工作,遷移過程與鏈表類似,也是根據每個節點的 runBit 來確定在高位的紅黑樹中,還是在低位的紅黑樹
  • 當待處理區間內的所有桶都處理完畢后,再次嘗試獲取任務,如果獲取成功,則遍歷新獲取的區間內的所有桶進行遷移處理
  • 如果 transferIndex 已經小于等于 0,則說明已經沒有任務可以分配了,那么嘗試使用 CAS 將參與擴容的線程數 -1后(即將 sizeCtl -1 ),看是否當前擴容的線程數是否只剩下一個(即 sizeCtl - 2 = resizeStamp() << 16,即回到了初始擴容時將 sizeCtl 修改成的數值),如果是則直接返回
  • 如果不是,則進行 recheck 處理,將原數組上的所有位置,從右到左再次重新遍歷一遍,檢查是否還存在元素還沒有被遷移
  • 當 recheck 處理完畢后,則原數組上的所有位置上的元素都已經遷移完畢,則將新數組替換掉舊數組,將 sizeCtl 參數設置為新數組長度的 0.75,并將 nextTable 屬性置空后返回
    結合源碼來看:
/**** @param tab 當前的數組* @param nextTab 不為空時,說明正在擴容,傳入的即為尚未擴容完成的數組;為空時,說明尚未開始擴容*/ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;//stride 變量即為每個 CPU 要處理的桶的數量//判斷 CPU 核心數是否大于 1,如果大于 1,則 stride 等于當前數組長度除以 8 再除以 CPU 核心數//否則 stride 等于當前數組長度//判斷 stride 是否小于最小值,即 16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)//如果 stride 小于 16,則賦值為 16 stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) { // initiating//如果傳入的 nextTab 值為空,則說明需要初始化一個擴容后的數組 try {@SuppressWarnings("unchecked")//創建一個長度為舊數組長度兩倍的新數組Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}//將初始化好的新數組賦值給 nextTable 屬性nextTable = nextTab;//將 transferIndex 屬性賦值為舊數組的長度transferIndex = n;}//將 nextn 變量賦值為新數組的長度int nextn = nextTab.length;//初始化 ForwardingNode 類型的數組,將 nextTab 變量傳入,當作這個節點的 nextTab 屬性ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//初始化 advance 變量為 trueboolean advance = true;//初始化 finishing 變量為 falseboolean finishing = false; // to ensure sweep before committing nextTab//初始化 i 和 bound 變量,初始值都為 0,進入循環for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;//當 advance 屬性為 true 時,一直進行循環//這段循環的目的即為給當前線程分配一段需要處理的桶的區間//即給當前線程分配擴容任務while (advance) {int nextIndex, nextBound;//如果 i-1 大于等于 bound ,或者說 finishing 為 trueif (--i >= bound || finishing)//則將 advance 變量賦值為 false,即跳出循環的條件advance = false;//將 nextIndex 賦值為 transferIndex//并判斷值是否小于等于 0else if ((nextIndex = transferIndex) <= 0) {//如果 transferIndex 小于等于 0,代表給線程分配擴容任務已經完成,接下來就該跳出循環了//則將 i 賦值為 -1//將 advance 屬性賦值為 falsei = -1;advance = false;}//使用 CAS 嘗試將 transferIndex 修改為 transferIndex - stride 的差值//這是因為,需要給當前線程分配處理桶的區間//即,當前線程需要處理的桶的區間為:[transferIndex-stride,transferIndex)else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {//如果 CAS 成功,即將 bound 的值賦值為 transferIndex-stride,即為需要處理桶的左邊界(含)bound = nextBound;//將 i 賦值為 nextIndex -1,即為需要處理桶的右邊界(含)i = nextIndex - 1;//將 advance 變量賦值為 false,即跳出循環的條件advance = false;}}//判斷如果 i < 0,或者 i >= 原數組長度//或者 i + n 大于等于新數組長度if (i < 0 || i >= n || i + n >= nextn) {//實測只有 i = -1 的時候會滿足條件,即走進了上面一個循環的第二個分支的條件的時候//而第二個條件滿足,即說明 transferIndex 已經 <= 0 了//即說明給線程分配任務已經完成了int sc;//如果擴容已經結束if (finishing) {//將 nextTable 屬性賦值為 nullnextTable = null;//將當前數組替換為新數組table = nextTab;//將 sizeCtl 屬性賦值為新數組長度的 0.75 倍//即 sizeCtl 重新變成擴容閾值sizeCtl = (n << 1) - (n >>> 1);//擴容操作完成,直接返回return;}//使用 CAS 嘗試將 sizeCtl -1,即參與擴容的線程數量 -1if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//如果 CAS 成功,判斷參與擴容的線程數量是否只剩 1 個了//擴容戳往左移 16 位 +2 即為初始化擴容時的 sizeCtl 參數的值if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)//如果參與擴容的線程數量只剩一個了,則說明擴容操作已經完成,則直接返回return;//否則說明整個擴容操作還沒有完成,只是當前線程的當前任務完成了//將 finishing 和 advance 參數都賦值為 truefinishing = advance = true;//將 i 賦值為原數組長度//完整地從右到左重新檢查一遍原數組上的每一個位置,查看是否還有元素沒有遷移i = n; // recheck before commit}}//判斷下標 i(即當前處理的桶的位置)位置上是否為空else if ((f = tabAt(tab, i)) == null)//如果為空,則嘗試用 CAS 把舊數組上的第 i 個元素,修改為 ForwardingNode 類型的節點//ForwardingNode 節點的 hash 值比較特殊,為 -1,枚舉值為 MOVED//將 advance 的值賦值為 CAS 的結果advance = casTabAt(tab, i, null, fwd);//判斷,如果下標 i (當前處理的桶的位置)上的 hash 值為 -1else if ((fh = f.hash) == MOVED)//代表這個下標對應的節點已經被賦值為了 ForwardingNode 類型的節點//說明該位置已經被處理了,則將 advance 賦值為 trueadvance = true; // already processedelse {//否則,說明下標 i(當前處理的桶的位置)上的元素不為空,且還沒有被處理//首先對該下標元素 f 使用 synchronized 進行加鎖synchronized (f) {//進來之后第一件事情,先判斷數組下標位置的元素是否還等于 fif (tabAt(tab, i) == f) {//如果等于,則說明還沒有被修改過Node<K,V> ln, hn;//如果 f 的 hash 值大于等于 0(即判斷該元素是鏈表還是紅黑樹的節點)if (fh >= 0) {//如果大于 0 ,說明這個桶中的元素是鏈表類型的節點//實際上這個分支中的代碼應該是將鏈表轉移的邏輯//將 f 的 hash 值與原數組的長度進行與操作//runBit 變量其實就是節點的 hash 值參與計算數組下標位置的比較部分往左移了一位的值//如果這一位是 0(runBit = 0),代表遷移過去的位置還是原數組下標位置//如果這一位是 1,代表遷移過去的位置是原數組下標 + 舊數組長度的位置int runBit = fh & n;//lastRun 變量即為鏈表上,最后一個與前節點的 runBit 不相等的節點//為什么要這樣設置?//因為這樣的話,到了這個 lastRun 節點后面的節點就沒有必要再往下遍歷了//因為到了 lastRun 節點,后面的節點的 runBit 都跟 lastRun 節點一樣//意思就是說后面節點都不用動,只需要將 lastRun 遷過去就可以了Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {//遍歷舊鏈表,//這個循環的作用就是找到 lastRun 的位置int b = p.hash & n;if (b != runBit) {//如果計算出來的 runBit 與通過 f 計算出來的 runBit 不一致//就把 runBit 重新賦值// lastRun 變量也賦值為最新遍歷到的這個元素runBit = b;lastRun = p;}}//判斷 runBit 是否等于 0if (runBit == 0) {//如果是,則將 ln 賦值為 lastRun//所以 ln 代表的含義就是 lastRun 應該要遷移到原數組下標的鏈表頭節點ln = lastRun;hn = null;}else {//如果不等于 0,則將 hn 賦值為 lastRun//所以 hn 代表的含義就是 lastRun 應該要遷移到原數組下標 + 原數組長度位置的鏈表頭節點hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {//這個循環//將 f 到 lastRun 中間的所有節點使用頭插法,再根據 runBit 的不同分別組成高位和低位兩條新的鏈表//即 ln 與 hn,低位鏈表與高位鏈表int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}//將新數組中,下標為 i (即原數組下標位置)位置的元素設置為低位鏈表setTabAt(nextTab, i, ln);//將新數組中,下標為 i + n(即原數組下標 + 原數組長度位置)位置的元素設置為高位鏈表setTabAt(nextTab, i + n, hn);//將原數組中,下標為 i (即原數組下標位置)位置的元素設置為 ForwardingNode 類型的節點//表示這個位置上的元素已經遷移完成setTabAt(tab, i, fwd);//將 advance 屬性賦值為 truadvance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}} }

ConcurrentHashMap helpTransfer 方法的流程是怎樣的呢?

helpTransfer 即協助擴容方法,其主要流程如下:

  • 首先進行一些判斷
    • 當前數組不能為空
    • 數組下標位置的節點是 FowardingNode 類型
    • FowardingNode 節點的 nextTable 屬性不為空
  • 同時滿足上述的三個條件后,進行下一步邏輯處理,否則直接將當前數組 table 對象返回出去
  • 使用 resizeStamp 方法,計算數組長度的擴容戳(resizeStamp,簡寫為 int rs 變量)
    • 具體的實現就是首先調用 Integer.numberOfLeadingZeros() 計算數組長度最高非 0 位前的 0 的個數,由于數組的長度始終是 2 的整數次冪,所以當數組的長度發生變化時(即發生擴容時),該值肯定是會變化的(每次擴容后最高非 0 位往左移 1 位,則該數值減少了 1)
    • 再將 1 往左移 15 位,最后將兩個值做 ^ 或運算,(相當于將兩個值相加),即得到了擴容戳數值
    • 可以看出,擴容戳的取值范圍為 [215[2^{15}[215 , 215+32]2^{15}+32]215+32],且數組每次擴容,該數值將會 -1
  • 進入循環,判斷 nextTab,table 屬性是否發生變化(判斷其引用是否發生變化),判斷 sizeCtl 屬性是否小于 0(初始化完成后,sizeCtl 屬性小于 0 說明在進行擴容)
  • 如果不滿足條件,直接將棧幀中的本地變量 nextTab 屬性返回出去
  • 滿足條件則進入循環
  • 判斷 sizeCtl 往右移 16 位后是否等于擴容戳(如果不等于,說明數組的大小已經發生了變化)
  • 判斷 transferIndex 是否小于等于 0
  • 如果滿足條件,則說明線程已經完成了擴容,則直接跳出循環,將棧幀中的本地變量 nextTable 屬性返回出去
  • 如果不滿足條件,則使用 CAS 嘗試將 sizeCtl 屬性 +1(代表協助擴容的線程數量 +1 了)
  • 如果 CAS 成功,則執行擴容方法

結合源碼來看:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;//首先進行判斷//1.當前數組不能為空//2.數組下標位置的節點是 FowardingNode 類型//3.數組下標位置的節點的 nextTable 屬性不為空if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 使用 resizeStamp 方法,計算數組長度的擴容戳int rs = resizeStamp(tab.length);//判斷 nextTable,table 屬性是否發生變化(判斷其引用是否發生變化)//判斷 sizeCtl 屬性是否小于 0while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {//判斷 sizeCtl 往右移 16 位后是否不等于擴容戳//判斷 transferIndex 是否小于等于 0//其他兩個條件我認為是無效條件,不可能成立的,所以不去糾結代表的含義了//如果滿足上面說的兩個條件,則說明線程已經完成了擴容,則直接跳出循環if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;//使用 CAS 嘗試將 sizeCtl 屬性 +1if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//如果成功,代表協助擴容的線程數量 +1 了//執行擴容方法transfer(tab, nextTab);break;}}//將棧幀中的本地變量 nextTab 屬性返回出去return nextTab;}//將當前數組返回出去return table; }

ConcurrentHashMap 的 get 方法的流程?

ConcurrentHashMap 的 get 方法,是不加鎖的,具體的流程如下:

  • 首先通過 sepread 方法,計算出 key 的 hash 值(計算方法就是將 key 的 hashcode 往右移 16 位后與原 hashcode 進行異或運算)
  • 判斷數組是否為空,如果為空,則直接返回空
  • 如果數組不為空,則根據 hash & 數組長度 -1 得到節點在數組中的位置,判斷這個位置上的節點是否為空
  • 如果為空,則返回空
  • 如果不為空,則用 equals 方法判斷下標位置的這個節點的 key 是否與輸入的 key 相等,如果相等,則將 value 返回出去
  • 如果不相等,則判斷下標位置的節點的 hash 值是否小于 0
    • 如果小于 0,則說明該位置上的節點是 FowardingNode 類型(hash 值為 -1),或者 TreeBin 類型(hash 值為 -2)
      • 如果是 FowardingNode 類型,則說明數組正在進行擴容且這個節點已經遷移到了新的數組中,則在 ForwardingNode 的 nextTable 屬性(即擴容后的新數組)中,查找節點
      • 如果是 TreeBin 類型,則在紅黑樹中執行查找的邏輯
    • 如果大于等于 0,則說明該位置上的節點是鏈表類型,則遍歷鏈表查找元素

ConcurrentHashMap 的 sizeCtl 的含義,以及值的流轉過程?

ConcurrentHashMap 的 sizeCtl,在不同的時間有不同的含義,詳解如下:

  • 調用構造器完成后,sizeCtl 表示當前容量(使用無參構造器時,sizeCtl = 0,使用帶參構造器時,sizeCtl = 當前容量)
  • 當前正在執行初始化數組時,sizeCtl 的值為 -1,代表正在初始化數組
  • 當數組初始化完成后,sizeCtl 表示擴容閾值,值為數組長度的 0.75
  • 當擴容中時,sizeCtl 的高 16 位代表的是擴容戳(即 2152^{15}215 + 當前數組長度的最高非 0 位前面的 0 的個數),低 16 位代表的是參與擴容的線程數 + 1

ConcurrentHashMap 的 size 方法的流程?

size 方法,即統計 ConcurrentHashMap 當前已存入的元素個數

  • 調用 sumCount 方法
  • 在 sumCount 方法內部,將 baseCount 和所有的 CounterCell 內部的 value 值進行累加,得到的就是當前已存入的元素個數
  • 判斷元素個數是否大于整型值的最大值,如果是就返回整型值的最大值

其他

如果 ConcurrentHashMap 的某個數組下標位置是一顆紅黑樹,那么這個位置上的節點類型是 TreeNode 嗎?

ConcurrentHashMap 如果某個桶里面是一顆紅黑樹,那么該數組下標位置就是一個 TreeBin 對象,而不是一個 TreeNode 對象,TreeBin 對象相當于在 TreeNode 對象外面套了個殼子,TreeBin 對象有一個 TreeNode 屬性,這個屬性就是紅黑樹的根節點。

為什么要用 TreeBin 對象作為這個位置上的節點,而不是 TreeNode 對象呢?

這是因為在修改紅黑樹的時候,理論上來說需要對紅黑樹的根節點進行加鎖,但是實際上,在紅黑樹的修改過程中,根節點很可能因為樹的自平衡動作而被修改為其他節點。所以單純使用紅黑樹的根節點作為鎖對象是不靠譜的。

ConcurrentHashMap 的 size 方法會返回最新的值嗎?

ConcurrentHashMap 的 size 方法不會返回最新的值,只會返回調用方法那一刻元素數量的快照結果。

意思就是說,如果在 size 方法被調用的過程中,元素的數量發生了變化,那返回的元素數量依然是調用 size 方法那一刻的快照值。

這是因為,在 size 方法內部,是沒有采取任何同步措施的

  • 計算時取的計算依據 counterCells 和 baseCount 屬性,都是在調用方法那一刻的快照引用,如果在計算的過程中,這兩個計算依據發生了變化,那么計算時還是用的舊值進行計算的
  • 在對 counterCellls 數組中的 CounterCell 對象的 value 屬性進行遍歷累加時,如果累加過后,該屬性發生了變化,那么返回的數值就不是最新的值了

transferIndex 的真正含義

代表的是,當前給線程分配任務的邊界,即已經分配給線程處理擴容的區間為 [transferIndex, newTableSize),而還沒有被分配給線程處理擴容的區間為:[0,transferIndex)
所以,當 transferIndex 小于等于 0 時,并不意味著擴容就結束了,而只是意味著將整個數組的擴容任務都分給了參與擴容的線程

ConcurrentHashMap 總結

ConcurrentHashMap 是一個高性能的并發安全的 Map,常用做堆緩存,例如 Spring 的單例池,對象池等。除去處理并發相關操作外,主體流程與 HashMap 的數據操作流程基本一致。

put 方法流程總結

  • 首先計算 key 的 hash 值
  • 判斷表是否為空,如果為空則需要先進行初始化
    • 當表為空時,一直循環操作
    • 首先看是否有其他線程正在執行初始化操作(判斷 sizeCtl 參數是否小于 0),如果有,則調用 Thread.yield() 方法讓出 CPU 執行時間片,進入下次循環
    • 嘗試使用 CAS 把 sizeCtl 參數替換為 -1,如果替換成功,則當前線程執行表初始化操作
  • 根據 hash 值 & 數組長度 - 1 找到數組中對應桶的位置
  • 如果該位置上沒有元素,則嘗試使用 CAS 把待插入的元素替換到該位置上,如果成功則跳出循環
  • 如果該位置上有元素,則判斷該位置上的元素是否處于擴容狀態,如果是,則協助進行擴容
  • 上述條件都不滿足,則嘗試對該位置上的元素使用 synchronized 進行加鎖
    • 加鎖成功后,判斷該位置上的元素有沒有變化,如果有,說明有其他線程已經對這個位置上的元素做了改變,進入下次循環
    • 判斷該桶上的數據結構是鏈表還是紅黑樹,如果是鏈表則使用尾插法插入新元素,如果是紅黑樹則執行紅黑樹的插入邏輯
  • 判斷鏈表是否要轉化為紅黑樹(當前表的長度大于等于 64 且鏈表的長度大于等于 8),如果是,則執行鏈表轉化紅黑樹的操作
  • 如果是覆蓋了舊值,則直接將舊值返回
  • 將元素數量 + 1(執行 addCount 方法)

ConcurrentHashMap 的元素數量計數

ConcurrentHashMap 中的元素數量,是采用了 LongAdder 類的設計思想,當前元素的數量并不是用一個數值變量來表示的,而是由一個計數器數組(CounterCell 類型的數組) 來維護的,當需要獲取當前元素數量時,會將當前計數器數組的快照進行遍歷累加,最后才能得到當前數組中的元素數量。
這樣做的好處就是當由多個線程都要去并發修改元素數量時,降低發生競爭的可能性。
試想一下,如果說只是用一個 volatile 修飾的數值類型 + CAS 來修改元素數量,那么當同一時刻有多個線程去修改元素數量時,每次都只會有一個線程修改成功,那么其余的線程都相當于空轉了一次,當并發的線程數量很多時,大多數線程將都會做類似自旋操作,這樣就白白浪費了 CPU 資源。
造成上述問題的根本性原因就是臨界資源的粒度太粗,導致發生競爭的可能性非常大。所以 CounterCell 數組的設計,正是將臨界資源的粒度給細化了,當一個線程對某個 CounterCell 的計數值修改失敗后,將會轉而去嘗試修改其他 CounterCell 的數值,這樣就降低了發生競爭的可能性,從而提升了修改操作的命中率。

ConcurrentHashMap 的擴容操作

ComcurremtHashMap 的擴容操作,是允許多個線程協助共同進行擴容操作的。

  • 在判斷當前數組需要擴容(sizeCtl > 0 時,代表的含義就是擴容閾值)之后,首先發起擴容操作的線程就會把 sizeCtl 的值使用 CAS 修改為高 16 位代表擴容戳(2 的 15 次方 + 擴容前數組的長度最高非 0 位前的 0 的個數),低 16 位為 2 的數值,這個值小于 0。
  • 第一個進行擴容操作的線程負責進行新數組的初始化
  • 后來在 ConcurrentHashMap 中執行操作的線程發現當前正在執行擴容后,將會進行協助擴容,協助擴容之前將會用 CAS 操作嘗試將 sizeCtl 的值 +1,即 sizeCtl 的低 16 位 +1,即參與擴容的線程數量 +1。
  • 在參與擴容的每個線程,都會嘗試使用 CAS 修改 transferIndex 的值(領取任務),修改后的 transferIndex 的值與修改前的 transferIndex 的值的區間范圍,即為該線程負責進行擴容的數組下標范圍,線程將會針對該范圍內的每一個位置上的元素都進行擴容操作
  • 線程完成自己負責擴容的數組下標范圍后,將會再次判斷擴容有沒有完成
    • 如果沒有,再次嘗試修改 transferIndex 的值以獲取負責進行擴容的數組下標范圍(再次領取任務),再次進行擴容操作
    • 如果 transferIndex 的值已經小于 0 了(已經沒有可以領取的任務了),那么線程會完整地檢查一遍原數組,看還有沒有元素沒有被轉移
  • 所有工作完成,將會把 sizeCtl 參數 -1 后退出擴容方法,最后一個線程將會把原數組替換成新數組

ConcurrentHashMap 的設計思想總結

大量的無鎖并發安全處理操作

  • ConcurrentHashMap 中的很多變量都使用了 volatile 關鍵字修飾,可以確保在變量值在被一個線程修改后,其他線程能立馬得到這個修改后的值
  • ConcurrentHashMap 在修改變量值時,采用的是 CAS + 自旋重試的操作,可以在不使用鎖來阻塞其他參與線程的情況下并發安全地修改變量值

細化臨界資源粒度

  • ConcurrentHashMap 使用了計數器數組(CounterCell 數組)來降低修改元素數量時的發生并發競爭概率
  • 在添加新元素且這個新元素對應的數組下標位置有節點存在時,ConcurrentHashMap 鎖住的是數組下標位置上的這個元素(鏈表頭節點或者紅黑樹的根節點),使不同數組下標位置的桶上的修改操作互不影響,降低了發生并發競爭的概率

高效的擴容機制

高效的擴容機制主要的核心設計思想在于 ConcurrentHashMap 使用 transferIndex 來進行分段擴容,這樣做的好處有:

  • 多線程協助共同完成擴容:ConcurrentHashMap 使用了多線程協助共同完成擴容的機制,使得 ConcurrentHashMap 的擴容操作在多線程場景下,不會讓其他線程阻塞等待單個線程操作擴容完畢,提高了單個線程的執行效率,也使整體的擴容效率大大提升
  • 在擴容期間仍可以無阻塞訪問數據:假設現在有一個線程想要調用 get 方法,并且當前 ConcurrentHashMap 正在執行擴容操作,那么可能遇見的場景有以下幾種:
    • key 對應的桶已經完成了擴容(但是還有其他桶沒有完成擴容),那么原數組中的桶的位置上將會放置一個 ForwardingNode 類型的桶,那么線程可以通過 nextTable(新數組) 完成對數據的訪問
    • key 對應的桶還沒有開始進行擴容,那么直接訪問原數組中的桶就可以完成對數據的訪問
    • key 對應的桶正在執行擴容,由于 get 方法訪問的是調用時刻的原數組快照,所以該桶正在執行擴容時還沒有對其完成改變,所以直接訪問原數組中的桶就可以完成對數據的訪問

高效的狀態管理機制

ConcurrentHashMap 使用單個整形變量來標識當前數組所處狀態,將單個整形變量根據位數不同劃分了不同的含義,減少了多余的狀態值定義,一定程度上減少了內存消耗以及提升了整體效率

總結

以上是生活随笔為你收集整理的详解 ConcurrentHashMap的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。