currenthashmap扩容原理_ConcurrentHashMap实现原理和源码解读
前言
HashMap是java編程中最常用的數(shù)據(jù)結(jié)構(gòu)之一,由于HashMap非線程安全,因此不適用于并發(fā)訪問的場景。JDK1.5之前,通常使用HashTable作為HashMap的線程安全版本,HashTable對讀寫進(jìn)行全局加鎖,在高并發(fā)情況下會造成嚴(yán)重的鎖競爭和等待,極大地降低了系統(tǒng)的吞吐量,ConcurrentHashMap應(yīng)運(yùn)而生。
相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在線程安全的基礎(chǔ)上提供了更好的寫并發(fā)能力,并且讀操作(?get)通常不會阻塞,使得讀寫操作可并行執(zhí)行,支持客戶端修改ConcurrentHashMap的并發(fā)訪問度,迭代期間也不會拋出?ConcurrentModificationException等等,ConcurrentHashMap有這么多優(yōu)點(diǎn),那么它有什么缺點(diǎn)嗎?有,一致性問題,這是當(dāng)前所有分布式系統(tǒng)都面臨的問題。下面開始分析ConcurrentHashMap的實現(xiàn)原理。
ConcurrentHashMap實現(xiàn)原理
ConcurrentHashMap的基本策略是將table細(xì)分為多個Segment保存在數(shù)組segments中,每個Segment本身又是一個可并發(fā)的哈希表,同時每個Segment都是一把ReentrantLock鎖,只有在同一個Segment內(nèi)才存在競態(tài)關(guān)系,不同的Segment之間沒有鎖競爭,這就是分段鎖機(jī)制。Segment內(nèi)部擁有一個HashEntry數(shù)組,數(shù)組中的每個元素又是一個鏈表。下面看看ConcurrentHashMap整體結(jié)構(gòu)圖:
為了減少占用空間,除了第一個Segment之外,剩余的Segment采用的是延遲初始化的機(jī)制,僅在第一次需要時才會創(chuàng)建(通過ensureSegment實現(xiàn))。為了保證延遲初始化存在的可見性,訪問segments數(shù)組及table數(shù)組中的元素均通過volatile訪問,主要借助于Unsafe中原子操作getObjectVolatile來實現(xiàn),此外,segments中segment的寫入,以及table中元素和next域的寫入均使用UNSAFE.putOrderedObject來完成。這些操作提供了AtomicReferenceArrays的功能。下面開始源碼之旅吧
成員變量
下面介紹ConcurrentHashMap中用到的成員域:
/**
* 默認(rèn)初始容量
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 默認(rèn)加載因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 默認(rèn)并發(fā)度,該參數(shù)會影響segments數(shù)組的長度
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 最大容量,構(gòu)造ConcurrentHashMap時指定的大小超過該值則會使用該值替換,
* ConcurrentHashMap的大小必須是2的冪,且小于等于1<<30,以確??墒褂胕nt索引條目
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 每個segment中table數(shù)組的最小長度,必須是2的冪,至少為2,以免延遲構(gòu)造后立即調(diào)整大小
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/**
* 允許的最大segment數(shù)量,用于限定構(gòu)造函數(shù)參數(shù)concurrencyLevel的邊界,必須是2的冪
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/**
* 非鎖定情況下調(diào)用size和containsValue方法的重試次數(shù),避免由于table連續(xù)被修改導(dǎo)致無限重試
*/
static final int RETRIES_BEFORE_LOCK = 2;
/**
* 與當(dāng)前實例相關(guān)聯(lián)的,用于key哈希碼的隨機(jī)值,以減少哈希沖突
*/
private transient final int hashSeed = randomHashSeed(this);
private static int randomHashSeed(ConcurrentHashMap instance) {
if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {
return sun.misc.Hashing.randomHashSeed(instance);
}
return 0;
}
/**
* 用于索引segment的掩碼值,key哈希碼的高位用于選擇segment
*/
final int segmentMask;
/**
* 用于索引segment偏移值
*/
final int segmentShift;
/**
* segments數(shù)組
*/
final Segment[] segments;
構(gòu)造方法
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)//
concurrencyLevel = MAX_SEGMENTS;
// 尋找與給定參數(shù)concurrencyLevel匹配的最佳Segment數(shù)組ssize,必須是2的冪
// 如果concurrencyLevel是2的冪,那么最后選定的ssize就是concurrencyLevel
// 否則concurrencyLevel,ssize為大于concurrencyLevel最小2的冪值
// concurrencyLevel為7,則ssize為2的3次冪,為8
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//計算每個Segment中,table數(shù)組的初始大小
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 創(chuàng)建segments和第一個segment
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); //原子按順序?qū)懭雜egments[0]
this.segments = ss;
}
/**
* map轉(zhuǎn)化為ConcurrentHashMap
*
* @param m the map
*/
public ConcurrentHashMap(Map extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
構(gòu)造器中各個參數(shù)的含義:
initialCapacity:創(chuàng)建ConccurentHashMap對象的初始容量,即ConccurentHashMap中HashEntity的總數(shù)量,創(chuàng)建時未指定initialCapacity則默認(rèn)為16,最大容量為MAXIMUM_CAPACITY。
loadFactor:負(fù)載因子,用于計算Segment的threshold域,
concurrencyLevel:即ConccurentHashMap的并發(fā)度,支持同時更新ConccurentHashMap且不發(fā)生鎖競爭的最大線程數(shù)。concurrencyLevel不能代表ConccurentHashMap實際并發(fā)度,ConccurentHashMap會使用大于等于該值的2的冪指數(shù)的最小值作為實際并發(fā)度,實際并發(fā)度即為segments數(shù)組的長度。創(chuàng)建時未指定concurrencyLevel則默認(rèn)為16。
并發(fā)度對ConccurentHashMap性能具有舉足輕重的作用,如果并發(fā)度設(shè)置的過小,會帶來嚴(yán)重的鎖競爭問題;如果并發(fā)度設(shè)置的過大,原本位于同一個Segment內(nèi)的訪問會擴(kuò)散到不同的Segment中,CPU cache命中率會下降,從而引起程序性能下降。
原子方法
先看源碼:
/**
* 獲取給定table的第i個元素,使用volatile讀語義
*/
@SuppressWarnings("unchecked")
static final HashEntry entryAt(HashEntry[] tab, int i) {
return (tab == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
/**
* 設(shè)置給定table的第i個元素,使用volatile寫入語義
*/
static final void setEntryAt(HashEntry[] tab, int i,
HashEntry e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
/**
* 通過Unsafe提供的具有volatile元素訪問語義的操作獲取給定Segment數(shù)組的第j個元素(如果ss非空)
* 注意:因為Segment數(shù)組的每個元素只能設(shè)置一次(使用完全有序的寫入),
* 所以一些性能敏感的方法只能依靠此方法作為對空讀取的重新檢查。
*/
@SuppressWarnings("unchecked")
static final Segment segmentAt(Segment[] ss, int j) {
long u = (j << SSHIFT) + SBASE;
return ss == null ? null :
(Segment) UNSAFE.getObjectVolatile(ss, u);
}
/**
* 根據(jù)給定hash獲取segment
*/
@SuppressWarnings("unchecked")
private Segment segmentForHash(int h) {
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
return (Segment) UNSAFE.getObjectVolatile(segments, u);
}
/**
* 根據(jù)給定segment和hash獲取table entry
*/
@SuppressWarnings("unchecked")
static final HashEntry entryForHash(Segment seg, int h) {
HashEntry[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
(HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}
ConcurrentHashMap主要使用以上幾個方法對segments數(shù)組和table數(shù)組進(jìn)行讀寫,并保證線程安全性。
其主要使用了UNSAFE.getObjectVolatile提供Volatile讀語義,UNSAFE.putOrderedObject提供了Volatile寫語義。下面分析這兩個方法帶來的好處:
UNSAFE.getObjectVolatile使得非volatile聲明的對象具有volatile讀的語義,那么要使非volatile聲明的對象具有volatile寫的語義則需要借助操作UNSAFE.putObjectvolatile。
那么UNSAFE.putOrderedObject操作的含義和作用又是什么呢?
為了控制特定條件下的指令重排序和內(nèi)存可見性問題,Java編譯器使用一種叫內(nèi)存屏障(Memory Barrier,或叫做內(nèi)存柵欄,Memory Fence)的CPU指令來禁止指令重排序。java中volatile寫入使用了內(nèi)存屏障中的LoadStore屏障規(guī)則,對于這樣的語句Load1; LoadStore; Store2,在Store2及后續(xù)寫入操作被刷出前,保證Load1要讀取的數(shù)據(jù)被讀取完畢。volatile的寫所插入的storeLoad是一個耗時的操作,因此出現(xiàn)了一個對volatile寫的升級版本,利用lazySet方法進(jìn)行性能優(yōu)化,在實現(xiàn)上對volatile的寫只會在之前插入StoreStore屏障,對于這樣的語句Store1; StoreStore; Store2,在Store2及后續(xù)寫入操作執(zhí)行前,保證Store1的寫入操作對其它處理器可見,也就是按順序的寫入。UNSAFE.putOrderedObject正是提供了這樣的語義,避免了寫寫指令重排序,但不保證內(nèi)存可見性,因此讀取時需借助volatile讀保證可見性。
ConcurrentHashMap正是利用了這些高性能的原子讀寫來避免加鎖帶來的開銷,從而大幅度提高了性能。
ensureSegment
ensureSegment用于確定指定的Segment是否存在,不存在則會創(chuàng)建。源碼如下:
@SuppressWarnings("unchecked")
private Segment ensureSegment(int k) {
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment seg;
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment s = new Segment(lf, threshold, tab);
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
使用getObjectVolatile()方法提供的原子讀語義獲取指定Segment,如果為空,以構(gòu)造ConcurrentHashMap對象時創(chuàng)建的Segment為模板,創(chuàng)建新的Segment。ensureSegment在創(chuàng)建Segment期間為不斷使用getObjectVolatile()檢查指定Segment是否為空,防止其他線程已經(jīng)創(chuàng)建了Segment。
HashEntry
開始介紹Segment之前,我們先看看HashEntry的結(jié)構(gòu):
static final class HashEntry {
final int hash;
final K key;
volatile V value;
volatile HashEntry next;
HashEntry(int hash, K key, V value, HashEntry next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* 使用volatile寫入語義設(shè)置next域
*/
final void setNext(HashEntry n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
HashEntry將value和next聲明為volatile ,是為了保證內(nèi)存可見性,也就是每次讀取都將從內(nèi)存讀取最新的值,而不會從緩存讀取。同時,寫入next域也使用volatile寫入語義保證原子性。寫入使用原子性操作,讀取使用volatile,從而保證了多線程訪問的線程安全性。
Segment
Segment作為ConccurentHashMap的專用數(shù)據(jù)結(jié)構(gòu),同時擴(kuò)展了ReentrantLock,使得Segment本身就是一把可重入鎖,方便執(zhí)行鎖定。Segment內(nèi)部持有一個始終處于一致狀態(tài)的entry列表,使得讀取操作無需加鎖(通過volatile讀table數(shù)組)。調(diào)整tables大小期間通過復(fù)制節(jié)點(diǎn)實現(xiàn),使得舊版本的table仍然可以遍歷。
Segment僅定義需要加鎖的可變方法,針對ConcurrentHashMap中相應(yīng)方法的調(diào)用都會被代理到Segment中的方法。這些可變方法使用scanAndLock和scanAndLockForPut在競爭中使用受控旋轉(zhuǎn),也就是自旋次數(shù)受限制的自旋鎖。由于線程的阻塞與喚醒通常伴隨著上下文切換、CPU搶占等,都是開銷比較大的操作,使用自旋次數(shù)受限制的自旋鎖,可以提高獲取鎖的概率,降低線程阻塞的概率,這樣可極大地提升性能。之所以自旋次數(shù)受限制,是因為自旋會不斷的消耗CPU的時間,無限制的自旋會導(dǎo)致開銷增長。因此自旋鎖適用于多核CPU下,同時線程等待鎖的時間非常短;若等待某個鎖需要的時間較長,讓線程盡早進(jìn)入阻塞才是正確的選擇。下面開始進(jìn)入源碼:
Segment成員變量
/**
* 對segment加鎖時,在阻塞之前進(jìn)行自旋的最大次數(shù)。
* 在多處理器上,使用有限數(shù)量的重試來維護(hù)在定位節(jié)點(diǎn)時獲取的高速緩存。
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* 每個segment的table數(shù)組,訪問數(shù)組中的元素通過entryAt/setEntryAt提供的volatile語義來完成
*/
transient volatile HashEntry[] table;
/**
* 元素的數(shù)量,只能在鎖中或其他volatile讀保證可見性之間進(jìn)行訪問
*/
transient int count;
/**
* 當(dāng)前segment中可變操作發(fā)生的次數(shù),put,remove等,可能會溢出32位
* 它為CHM isEmpty()和size()方法中的穩(wěn)定性檢查提供了足夠的準(zhǔn)確性。
* 只能在鎖中或其他volatile讀保證可見性之間進(jìn)行訪問
*/
transient int modCount;
/**
* 當(dāng)table大小超過此閾值時,對table進(jìn)行擴(kuò)容,值為(int)(capacity *loadFactor)
*/
transient int threshold;
/**
* 負(fù)載因子
*/
final float loadFactor;
/**
* 構(gòu)造方法
*/
Segment(float lf, int threshold, HashEntry[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
scanAndLockForPut
先看流程圖:
看源碼:
private HashEntry scanAndLockForPut(K key, int hash, V value) {
HashEntry first = entryForHash(this, hash);//根據(jù)key的hash值查找頭節(jié)點(diǎn)
HashEntry e = first;
HashEntry node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {//嘗試獲取鎖,成功則直接返回,失敗則開始自旋
HashEntry f; // 用于后續(xù)重新檢查頭節(jié)點(diǎn)
if (retries < 0) {
if (e == null) {//結(jié)束遍歷節(jié)點(diǎn)
if (node == null) // 創(chuàng)建節(jié)點(diǎn)
node = new HashEntry(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))//找到節(jié)點(diǎn),結(jié)束遍歷
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {//達(dá)到最大嘗試次數(shù)
lock();//進(jìn)入加鎖方法,失敗則會進(jìn)入排隊,阻塞當(dāng)前線程
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // 頭節(jié)點(diǎn)變化,需要重新遍歷,說明有新節(jié)點(diǎn)加入或被移除
retries = -1;
}
}
return node;
}
分析:while循環(huán)每執(zhí)行一次,都會嘗試獲取鎖,成功則會返回。retries 初始值設(shè)為-1是為了遍歷當(dāng)前hash對應(yīng)桶的鏈表,找到則停止遍歷,未找到則會預(yù)創(chuàng)建一個節(jié)點(diǎn);同時,如果頭節(jié)點(diǎn)發(fā)生變化,則會重新進(jìn)行遍歷,直到自旋次數(shù)大于MAX_SCAN_RETRIES,使用lock加鎖,獲取鎖失敗則會進(jìn)入等待隊列。
為什么scanAndLockForPut中要遍歷一次鏈表?
前面已經(jīng)提過scanAndLockForPut使用自旋次數(shù)受限制的自旋鎖進(jìn)行優(yōu)化加鎖的方式,此外,遍歷一次鏈表也是一種優(yōu)化方法,主要是盡可能使當(dāng)前鏈表中的節(jié)點(diǎn)進(jìn)入CPU高速緩存,提高緩存命中率,以便獲取鎖定后的遍歷速度更快。實際上加鎖后并沒有使用已經(jīng)找到的節(jié)點(diǎn),因為它們必須在鎖定下重新獲取,以確保更新的順序一致性,但是遍歷一次后通??梢愿斓刂匦露ㄎ?。這是一種預(yù)熱優(yōu)化的方式,scanAndLock中也使用了該優(yōu)化方式。
scanAndLock內(nèi)部實現(xiàn)方式與scanAndLockForPut相似,但比scanAndLockForPut更簡單,scanAndLock不需要預(yù)創(chuàng)建節(jié)點(diǎn)。因此scanAndLock主要用于remove和replace操作,而scanAndLockForPut則用于put,這里就不再貼源碼。
Segment?put
先看流程圖:
put源碼:
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
Segment中put的大體流程是先對Segment加鎖,然后根據(jù)(tab.length-1)&hash找到對應(yīng)的slot,然后遍歷slot對應(yīng)的鏈表,如果key對應(yīng)的entry已經(jīng)存在,根據(jù)onlyIfAbsent標(biāo)志決定是否替換舊值,如果key對應(yīng)的entry不存在,創(chuàng)建新節(jié)點(diǎn)插入鏈表頭部,期間若容量超過限制,判斷是否需要進(jìn)行rehash。
put實現(xiàn)還是比較簡單的,下面談?wù)勂渲兄饕膸讉€優(yōu)化點(diǎn)。
scanAndLockForPut的作用已經(jīng)介紹過了,如果鎖能很快的獲取,有限次數(shù)的自旋可防止線程進(jìn)入阻塞,有助于提升性能;此外,自旋期間會遍歷鏈表,希望遍歷的鏈表被CPU Cache所緩存,為后續(xù)實際put過程中的鏈表遍歷操作提升性能;最后scanAndLockForPut還會預(yù)創(chuàng)建節(jié)點(diǎn)。
HashEntry[] tab = table有什么好處?
從Segment源碼可知,table被聲明為volatile,為了保證內(nèi)存可見性,table上的修改都必須立即更新到主存,volatile寫實際是具有一定開銷的。由于put中的代碼都在加鎖區(qū)執(zhí)行,鎖既能保證可見性,也能保證原子性,因此,不需要針對table進(jìn)行volatile寫,將table引用賦值給局部變量以實現(xiàn)編譯、運(yùn)行時的優(yōu)化。
node.setNext(first)也是同樣的道理,HashEntry的next同樣被聲明為volatile,因此這里使用優(yōu)化的方式UNSAFE.putOrderedObject進(jìn)行volatile寫入。
既然put已在加鎖區(qū)運(yùn)行,為何訪問tab中的元素不直接通過數(shù)組索引,而是用entryAt(tab, index)?
加鎖保證了table引用的同步語義,但是對table數(shù)組中元素的寫入使用UNSAFE.putOrderedObject進(jìn)行順序?qū)?#xff0c;該操作只是禁止寫寫指令重排序,不能保證寫入后的內(nèi)存可見性。因此,必須使用entryAt(tab, index)提供的volatile讀來獲取最新的數(shù)據(jù)。
remove
remove源碼相對比較簡單,這里就直接分析源碼了。
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry e = entryAt(tab, index);
HashEntry pred = null;
while (e != null) {
K k;
HashEntry next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
remove首先會嘗試獲取鎖,失敗則會進(jìn)入scanAndLock代碼塊,scanAndLock和scanAndLockForPut實現(xiàn)相似。獲取鎖成功后,然后根據(jù)hash找到對應(yīng)的slot,然后遍歷slot對應(yīng)的鏈表,找到要移除元素的key,若value為空或與鏈表中元素的value相等則移除元素,否則退出。
remove和put中使用的優(yōu)化相同,這里就不再重復(fù)說明。
replace操作實現(xiàn)方式也非常簡單,這里不再說明。下面詳細(xì)分析rehash實現(xiàn)。
rehash
rehash主要的作用的是擴(kuò)容,將擴(kuò)容前table中的節(jié)點(diǎn)重新分配到新的table。由于table的capacity都是2的冪,按照2的冪次方擴(kuò)容為原來的一倍,擴(kuò)容前在slot i中的元素,擴(kuò)容后要么還是在slot i里,或者i+擴(kuò)容前table的capacity的slot中,這樣使得只需要移動原來桶中的部分元素即可將所有節(jié)點(diǎn)分配到新的table。
為了提高效率,rehash首先找到第一個后續(xù)所有節(jié)點(diǎn)在擴(kuò)容后index都保持不變的節(jié)點(diǎn),將這個節(jié)點(diǎn)加入擴(kuò)容后的table的index對應(yīng)的slot中,然后將這個節(jié)點(diǎn)之前的所有節(jié)點(diǎn)重排即可。
先看流程圖:
rehash源碼:
@SuppressWarnings("unchecked")
private void rehash(HashEntry node) {
HashEntry[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry[] newTable =
(HashEntry[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry e = oldTable[i];
if (e != null) {
HashEntry next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // slot中只有一個元素
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry lastRun = e;
int lastIdx = idx;
for (HashEntry last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
//復(fù)制lastRun之前的所有節(jié)點(diǎn)
for (HashEntry p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry n = newTable[k];
newTable[k] = new HashEntry(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; //添加新節(jié)點(diǎn)
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
為了理解這段代碼的原理,現(xiàn)構(gòu)造一個例子來說明,假設(shè)擴(kuò)容前oldTable的oldCapacity為4,則擴(kuò)容后newTable的newCapacity為8,假設(shè)oldTable[0]的元素如下:
由index=hash & (oldCapacity-1)可知,擴(kuò)容前以上幾個節(jié)點(diǎn)都在slot?oldTable[0]中。下面開始分析
計算頭節(jié)點(diǎn)idx = e.hash & sizeMask=e.hash & (newCapacity-1)=16 & 7=0;
將idx賦值給lastIdx = idx=0,last = next(hash=4);
last不為空,第一輪循環(huán):k = last.hash & sizeMask=4 & 7=4,滿足k != lastIdx,執(zhí)行語句:
lastIdx = k;?lastRun = last(hash=4);
last = last.next,last不為空,第二輪循環(huán):k =8 & 7=0,滿足k != lastIdx,執(zhí)行語句:
lastIdx = k;?lastRun = last(hash=8);
last = last.next,last不為空,第三輪循環(huán):k = 32 & 7=0,不滿足k != lastIdx;
last = last.next,last為空,結(jié)束循環(huán)。
到此找到第一個后續(xù)所有節(jié)點(diǎn)在擴(kuò)容后index都保持不變的節(jié)點(diǎn),lastRun為hash=8的節(jié)點(diǎn),將lastRun加入lastIdx對應(yīng)slot中,這樣lastRun的后續(xù)節(jié)點(diǎn)也會自動加入lastIdx對應(yīng)slot。然后將節(jié)點(diǎn)lastRun之前的所有節(jié)點(diǎn)重排,通過新建節(jié)點(diǎn),使用hash?& sizeMask計算index,則將節(jié)點(diǎn)插入index對應(yīng)slot中鏈表頭部。
綜上,最好的情況,每個slot鏈表的所有節(jié)點(diǎn)在擴(kuò)容后index都保持不變,那么只需移動頭節(jié)點(diǎn),不用創(chuàng)建新節(jié)點(diǎn)即可完成擴(kuò)容和節(jié)點(diǎn)重新分配;最差的情況,每個鏈表的倒數(shù)兩個節(jié)點(diǎn)在擴(kuò)容后index不同,那么需要重建并復(fù)制所有節(jié)點(diǎn)。
到此,Segment實現(xiàn)原理和源碼分析完成,下面進(jìn)入ConccurentHashMap主體代碼中。
主要操作
本節(jié)主要介紹ConccurentHashMap中的方法,get、containsKey、containsValue、size、put、putAll、putIfAbsent、remove等等。
get、containsKey
get與containsKey兩個方法的實現(xiàn)幾乎完全一致,都不需要加鎖讀數(shù)據(jù),下面以get源碼說明:
public V get(Object key) {
Segment s;
HashEntry[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
首先計算key的hash碼,計算Segment的index,使用getObjectVolatile()方法提供的原子讀語義獲取Segment,再計算Segment中slot的索引,使用getObjectVolatile()方法提供的原子讀語義獲取slot頭節(jié)點(diǎn),遍歷鏈表,判斷是否存在key相同的節(jié)點(diǎn)以及獲得該節(jié)點(diǎn)的value。由于遍歷過程中其他線程可能對鏈表結(jié)構(gòu)做了調(diào)整,因此get和containsKey返回的可能是過時的數(shù)據(jù),這就是ConcurrentHashMap的弱一致性。如果要求強(qiáng)一致性,那么必須使用Collections.synchronizedMap()。
size、containsValue、isEmpty
size用于返回ConcurrentHashMap中HashEntry的數(shù)量,containsValue用于判斷ConcurrentHashMap中是否存在給定的value,isEmpty用于判斷ConcurrentHashMap是否為空,size、containsValue、isEmpty都是基于整個ConcurrentHashMap來進(jìn)行操作的,因此實現(xiàn)原理基本相似。這里以size方法來說明實現(xiàn)原理。先看流程圖:
看源碼:
public int size() {
final Segment[] segments = this.segments;
int size;
boolean overflow; // 為true表示size溢出32位
long sum; // 所有segment中modCount的總和
long last = 0L;
int retries = -1; // 第一次迭代不計入重試,因此總共會重試3次
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
首先不加鎖循環(huán)所有的Segment(通過Unsafe的getObjectVolatile()以保證原子讀語義),計算所有Segment的count之后,同時計算所有Segment的modcount之和sum,如果sum與last相等,說明迭代期間沒有發(fā)生其他線程修改ConcurrentHashMap的情況,返回size,;當(dāng)重試次數(shù)超過預(yù)定義的值(RETRIES_BEFORE_LOCK為2)時,對所有的Segment依次進(jìn)行加鎖,再計算size的值。需要注意的是,加鎖過程中會強(qiáng)制創(chuàng)建所有的不存在Segment,否則容易出現(xiàn)其他線程創(chuàng)建Segment并進(jìn)行put,remove等操作。由于retries初始值為-1,因此會嘗試3次才會對所有的Segment加鎖。
注意:modcount在put, replace, remove以及clear等方法中都會被修改。
containsValue實現(xiàn)只是在重試問題上稍微不同,在第一次嘗試時,sum與last相等也不會返回,默認(rèn)會嘗試試第二次,只有第二次嘗試時sum與last也相等才返回。此外,在循環(huán)所有的Segment期間,一旦找到匹配value的HashEntry,立即返回,只有value不存在時,才會多次嘗試確認(rèn)。這里不貼源碼了。
isEmpty與containsValue、size實現(xiàn)稍有不同,isEmpty重試失敗不會進(jìn)行加鎖,而是直接返回false,isEmpty在循環(huán)所有的Segment期間,一旦某個Segment中的entry數(shù)量不為0,立即返回true;第一次循環(huán)所有的Segment期間,計算每個Segment的modCount之和sum;當(dāng)sum不為0,進(jìn)行第二次循環(huán),循環(huán)期間,使用sum減去每個Segment的modCount,如果sum不為0,返回false,否則返回true。這樣做的好處是,加入第一次循環(huán)期間發(fā)生了一個put,第二次循環(huán)發(fā)生了一個remove,那么isEmpty也能檢查出當(dāng)前map為空。
put、putAll、putIfAbsent
put和putIfAbsent實現(xiàn)原理基本相似,區(qū)別在于當(dāng)key存在時,put會替換舊值,更新modCount,putIfAbsent則不會。putAll通過循環(huán)調(diào)用put來實現(xiàn)。這里以put源碼來進(jìn)行說明。
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
由源碼實現(xiàn)可知,首先計算key的hash碼,再計算segments的index,獲取Segment,如果Segment為空,則會進(jìn)入ensureSegment創(chuàng)建Segment,最后將put操作代理給Segment中的put方法實現(xiàn)數(shù)據(jù)寫入。
注意:put中獲取指定Segment沒有使用原子讀語義,在ensureSegment中會使用原子讀語義重新檢查。
remove、replace、clear
remove和replace都是先通過key計算hash碼,定位到Segment,如果Segment為空,則不做任何操作,否則將操作代理給Segment的remove和replace方法。
clear會循環(huán)所有的Segment,如果Segment不空,將操作代理給Segment的clear,Segment的clear操作會直接對Segment加鎖,使用UNSAFE.putOrderedObject操作將table數(shù)組中的元素置為null。由于clear只是清除元素,Segment指向table的引用不會發(fā)生變化,使得clear期間仍然可以進(jìn)行遍歷。
弱一致性
ConcurrentHashMap是弱一致的。 ConcurrentHashMap的get,containsKey,clear,iterator 都是弱一致性的。
get和containsKey都是無鎖的操作,均通過getObjectVolatile()提供的原子讀語義來獲得Segment以及對應(yīng)的鏈表,然后遍歷鏈表。由于遍歷期間其他線程可能對鏈表結(jié)構(gòu)做了調(diào)整,因此get和containsKey返回的可能是過時的數(shù)據(jù)。如在get執(zhí)行UNSAFE.getObjectVolatile(segments, u)之后,其他線程若執(zhí)行了clear操作,則get將讀到失效的數(shù)據(jù)。
由于clear沒有使用全局的鎖,當(dāng)清除完一個segment之后,開始清理下一個segment的時候,已經(jīng)清理segments可能又被加入了數(shù)據(jù),因此clear返回的時候,ConcurrentHashMap中是可能存在數(shù)據(jù)的。因此,clear方法是弱一致的。
ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法,迭代器在遍歷期間如果已經(jīng)遍歷的table上的內(nèi)容變化了,迭代器不會拋出ConcurrentModificationException異常。如果未遍歷的數(shù)組上的內(nèi)容發(fā)生了變化,則有可能反映到迭代過程中,這就是ConcurrentHashMap迭代器弱一致的表現(xiàn)。
到此,jdk1.7中ConcurrentHashMap的實現(xiàn)分析完成。
歡迎指出本文有誤的地方,若對您有幫助,點(diǎn)個贊支持一下唄!轉(zhuǎn)載請注明原文出處
總結(jié)
以上是生活随笔為你收集整理的currenthashmap扩容原理_ConcurrentHashMap实现原理和源码解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 苹果向开发者推出iOS 17.2 Bet
- 下一篇: 一加nfc门禁卡录入_忘记门禁卡不再徘徊