ConcurrentHashMap源码jdk1.8学习笔记
生活随笔
收集整理的這篇文章主要介紹了
ConcurrentHashMap源码jdk1.8学习笔记
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
目錄
put方法
initTable方法
helpTransfer方法
transfer方法
addCount方法
?
put方法
public V put(K key, V value) {// 直接調用了putVal方法return putVal(key, value, false);}final V putVal(K key, V value, boolean onlyIfAbsent) {// 1.判斷key、value是否有為null的。疑問1:為什么不能為null?if (key == null || value == null) throw new NullPointerException();// 2.獲取key的hash值,位運算效率更高int hash = spread(key.hashCode());int binCount = 0;// 3.table數組賦值給tab變量,為什么要循環?比如高并發下3個線程進入了initTable方法// initTable中1個線程初始化,其余2線程循環Thread.yield()直到初始化完成退出initTable// 初始化完成后則需要這個循環進入其它的if分支進行其它操作,直到真正put完成break// 其它操作可能是helpTransfer,幫助擴容后reHash,以及后續會講到的cas put失敗的for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 4.判斷Node數組是否為空,為空則初始化一個Node數組。// 線程安全由初始化方法initTable保證。if (tab == null || (n = tab.length) == 0)tab = initTable();// 5.這里包含了第二次哈希(n - 1) & hash 得到k,v存放的數組下標i// 將數組第i個Node桶賦值給f,判斷此Node桶是否有數據else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 6.當然也存在并發情況,多個線程進入此分支,但沒關系,只有一個能cas成功if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin// 7.其它的線程什么也不干,進入下一次循環即可}// 8.通過此桶hash值 == -1 MOVED判斷是否在進行resize,且此桶還未完成rehashelse if ((fh = f.hash) == MOVED)// 9.則幫下忙,幫忙結束后,進入下次循環。幫忙線程有一個算一個來者不拒。tab = helpTransfer(tab, f);else {// 10.Node數組已經初始化過了,桶中也已經有了數據,也沒在resize,則putV oldVal = null;synchronized (f) {// 11.貌似是為了更加保險,再判斷一下?if (tabAt(tab, i) == f) {// 12.這個桶的hash值判斷是否為鏈表結構if (fh >= 0) {binCount = 1;// 13.循環鏈表,覆蓋或者新增節點,binCount記錄節點數for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 14.若是紅黑樹,調用其put方法else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 15.判斷節點數量是否>8,是則調用treeifyBin() 執行擴容或轉紅黑樹if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);// 16.若是新值替換舊值,不用執行addCount()方法,直接returnif (oldVal != null)return oldVal;break;}}}// 17.數據個數+1, binCount是鏈表的長度/紅黑樹的結點數// 擴容判斷也在此方法,線程安全詳見addCount()方法解析addCount(1L, binCount);return null;}1、為什么不能put null,而HashMap卻允許?
當你通過get(key)獲取對應的value時,如果獲取到的是null,你無法判斷,是put(k,v)的時候value為null,還是這個key就不存在于map中。HashMap是非并發的,可以通過后續執行contains(key)來做這個判斷。而支持并發的Map在后續調用contains(key)時,可能其它線程已經將原先存在的remove。
?
initTable方法
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 1.sizeCtl == -1 < 0說明已經有別的線程在執行初始化,yield放棄cpu等待即可// 值得注意的是,hotspot虛擬機來說yield() == seelp(0), seelp(0)底層也是使用yieldif ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// 2.并發下,可能有多個線程來到這個分支,但只有一個能cas成功,其余下次循環會走上邊else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2);}} finally {// 3.修改后原先等待線程走else if分支,但cas均會修改失敗,退出方法。sizeCtl = sc;}break;}}return tab;}helpTransfer方法
先看需要轉移的節點的數據結構
static final class ForwardingNode<K,V> extends Node<K,V> {// 指向擴容后的tabfinal Node<K,V>[] nextTable;// 只有transfer()中調用ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);this.nextTable = tab;}// ... 還有一個方法省略} final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 1.數據檢驗,如果 table 不是空 且 node 節點是轉移類型,// 且 node 節點的 nextTable(新 table) 不是空if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 嘗試幫助擴容// 2.根據 length 得到一個值,沒看懂含義?? TODOint rs = resizeStamp(tab.length);// 3.如果 nextTab 沒有被并發修改 且 tab 也沒有被并發修改// 且 sizeCtl < 0 (說明還在擴容)while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {// 4.如果 sizeCtl 無符號右移 16 不等于 rs // sc前 16 位如果不等于標識符,則標識符變化了// 或者 sizeCtl == rs + 1 擴容結束// 默認第一個線程設置 sc ==rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等于 rs + 1// 或者 sizeCtl == rs + 65535 (如果達到最大幫助線程的數量,即 65535)// 或者轉移下標正在調整 (擴容結束)// 結束循環,返回 tableif ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || transferIndex <= 0)break;// 5.如果以上都不是, 將 sizeCtl + 1, (表示增加了一個線程幫助其擴容)if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {// 6.進行轉移,線程有一個算一個來者不拒transfer(tab, nextTab);// 結束循環break;}}return nextTab;}return table; }?
transfer方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// 1.計算每個CPU處理桶個數,NCPU==1則處理所有,小于16則取16int n = tab.length, stride;if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 2.外邊調用此方法時,大部分為情況為nextTab!=null,為null時有cas保證只有一個線程進入if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOME// 3.處理擴容失敗sizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;// 4.判斷是否完成了擴容if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 5.線程第一次for循環,且未完成擴容,都會走這個分支,分配一個桶區間遷移任務else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}if (i < 0 || i >= n || i + n >= nextn) {int sc;// 6.全部任務完成if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 7.不是最后一個完成任務if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 8.最后一個完成任務還有后續要干,下次循環走6finishing = advance = true;i = n; // recheck before commit}}else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 加鎖進行一個桶的遷移synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {int runBit = fh & n;// 循環,尋找lastRun節點,往后的跟lastRun在一個桶,免去復制操作Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;// 頭插法,即快速,又符合后put的節點被訪問的可能性更大的事實// 但因為lastRun的存在,所以又并非純粹的倒序if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}addCount方法
private final void addCount(long x, int check) {CounterCell[] as; long b, s;// 1.若計數格子非空,或嘗試 cas baseCount+x 失敗,則進入if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;// 2.計數格子空 --說明還未出現并發// 或 隨機獲取一個計數格子,格子為空// 或 隨機一個計數格子不為空,則嘗試cas +x失敗,總體來說優先使用計數格子計數if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {// 3.通過循環自旋 以及 cas 操作初始化counterCells 或嘗試 cas baseCount+xfullAddCount(x, uncontended);return;}if (check <= 1)return;// sumCount()方法實現:counterCells累加 + baseCounts = sumCount();}// 是否需要擴容檢查參數,putVal()中調用的會檢查if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// size >= 擴容閾值sizeCtl 且 容量 < 最大容量MAXIMUM_CAPACITY = 1 << 30while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {// 已經在擴容了,幫助擴容if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 開始擴容else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}}?
?
參考文章:
https://blog.csdn.net/weixin_40299948/article/details/99692294
https://www.cnblogs.com/zerotomax/p/8687425.html
總結
以上是生活随笔為你收集整理的ConcurrentHashMap源码jdk1.8学习笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一些Chrome 调试小技巧汇总
- 下一篇: 云原生架构沙龙(成都站)圆满结束(附胶片