日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

currenthashmap扩容原理_ConcurrentHashMap实现原理和源码解读

發(fā)布時間:2024/9/19 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 currenthashmap扩容原理_ConcurrentHashMap实现原理和源码解读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

HashMap是java編程中最常用的數(shù)據(jù)結(jié)構(gòu)之一,由于HashMap非線程安全,因此不適用于并發(fā)訪問的場景。JDK1.5之前,通常使用HashTable作為HashMap的線程安全版本,HashTable對讀寫進(jìn)行全局加鎖,在高并發(fā)情況下會造成嚴(yán)重的鎖競爭和等待,極大地降低了系統(tǒng)的吞吐量,ConcurrentHashMap應(yīng)運(yùn)而生。

相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在線程安全的基礎(chǔ)上提供了更好的寫并發(fā)能力,并且讀操作(?get)通常不會阻塞,使得讀寫操作可并行執(zhí)行,支持客戶端修改ConcurrentHashMap的并發(fā)訪問度,迭代期間也不會拋出?ConcurrentModificationException等等,ConcurrentHashMap有這么多優(yōu)點(diǎn),那么它有什么缺點(diǎn)嗎?有,一致性問題,這是當(dāng)前所有分布式系統(tǒng)都面臨的問題。下面開始分析ConcurrentHashMap的實現(xiàn)原理。

ConcurrentHashMap實現(xiàn)原理

ConcurrentHashMap的基本策略是將table細(xì)分為多個Segment保存在數(shù)組segments中,每個Segment本身又是一個可并發(fā)的哈希表,同時每個Segment都是一把ReentrantLock鎖,只有在同一個Segment內(nèi)才存在競態(tài)關(guān)系,不同的Segment之間沒有鎖競爭,這就是分段鎖機(jī)制。Segment內(nèi)部擁有一個HashEntry數(shù)組,數(shù)組中的每個元素又是一個鏈表。下面看看ConcurrentHashMap整體結(jié)構(gòu)圖:

為了減少占用空間,除了第一個Segment之外,剩余的Segment采用的是延遲初始化的機(jī)制,僅在第一次需要時才會創(chuàng)建(通過ensureSegment實現(xiàn))。為了保證延遲初始化存在的可見性,訪問segments數(shù)組及table數(shù)組中的元素均通過volatile訪問,主要借助于Unsafe中原子操作getObjectVolatile來實現(xiàn),此外,segments中segment的寫入,以及table中元素和next域的寫入均使用UNSAFE.putOrderedObject來完成。這些操作提供了AtomicReferenceArrays的功能。下面開始源碼之旅吧

成員變量

下面介紹ConcurrentHashMap中用到的成員域:

/**

* 默認(rèn)初始容量

*/

static final int DEFAULT_INITIAL_CAPACITY = 16;

/**

* 默認(rèn)加載因子

*/

static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**

* 默認(rèn)并發(fā)度,該參數(shù)會影響segments數(shù)組的長度

*/

static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**

* 最大容量,構(gòu)造ConcurrentHashMap時指定的大小超過該值則會使用該值替換,

* ConcurrentHashMap的大小必須是2的冪,且小于等于1<<30,以確??墒褂胕nt索引條目

*/

static final int MAXIMUM_CAPACITY = 1 << 30;

/**

* 每個segment中table數(shù)組的最小長度,必須是2的冪,至少為2,以免延遲構(gòu)造后立即調(diào)整大小

*/

static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/**

* 允許的最大segment數(shù)量,用于限定構(gòu)造函數(shù)參數(shù)concurrencyLevel的邊界,必須是2的冪

*/

static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

/**

* 非鎖定情況下調(diào)用size和containsValue方法的重試次數(shù),避免由于table連續(xù)被修改導(dǎo)致無限重試

*/

static final int RETRIES_BEFORE_LOCK = 2;

/**

* 與當(dāng)前實例相關(guān)聯(lián)的,用于key哈希碼的隨機(jī)值,以減少哈希沖突

*/

private transient final int hashSeed = randomHashSeed(this);

private static int randomHashSeed(ConcurrentHashMap instance) {

if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) {

return sun.misc.Hashing.randomHashSeed(instance);

}

return 0;

}

/**

* 用于索引segment的掩碼值,key哈希碼的高位用于選擇segment

*/

final int segmentMask;

/**

* 用于索引segment偏移值

*/

final int segmentShift;

/**

* segments數(shù)組

*/

final Segment[] segments;

構(gòu)造方法

@SuppressWarnings("unchecked")

public ConcurrentHashMap(int initialCapacity,

float loadFactor, int concurrencyLevel) {

if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)

throw new IllegalArgumentException();

if (concurrencyLevel > MAX_SEGMENTS)//

concurrencyLevel = MAX_SEGMENTS;

// 尋找與給定參數(shù)concurrencyLevel匹配的最佳Segment數(shù)組ssize,必須是2的冪

// 如果concurrencyLevel是2的冪,那么最后選定的ssize就是concurrencyLevel

// 否則concurrencyLevel,ssize為大于concurrencyLevel最小2的冪值

// concurrencyLevel為7,則ssize為2的3次冪,為8

int sshift = 0;

int ssize = 1;

while (ssize < concurrencyLevel) {

++sshift;

ssize <<= 1;

}

this.segmentShift = 32 - sshift;

this.segmentMask = ssize - 1;

if (initialCapacity > MAXIMUM_CAPACITY)

initialCapacity = MAXIMUM_CAPACITY;

//計算每個Segment中,table數(shù)組的初始大小

int c = initialCapacity / ssize;

if (c * ssize < initialCapacity)

++c;

int cap = MIN_SEGMENT_TABLE_CAPACITY;

while (cap < c)

cap <<= 1;

// 創(chuàng)建segments和第一個segment

Segment s0 =

new Segment(loadFactor, (int)(cap * loadFactor),

(HashEntry[])new HashEntry[cap]);

Segment[] ss = (Segment[])new Segment[ssize];

UNSAFE.putOrderedObject(ss, SBASE, s0); //原子按順序?qū)懭雜egments[0]

this.segments = ss;

}

/**

* map轉(zhuǎn)化為ConcurrentHashMap

*

* @param m the map

*/

public ConcurrentHashMap(Map extends K, ? extends V> m) {

this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,

DEFAULT_INITIAL_CAPACITY),

DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);

putAll(m);

}

構(gòu)造器中各個參數(shù)的含義:

initialCapacity:創(chuàng)建ConccurentHashMap對象的初始容量,即ConccurentHashMap中HashEntity的總數(shù)量,創(chuàng)建時未指定initialCapacity則默認(rèn)為16,最大容量為MAXIMUM_CAPACITY。

loadFactor:負(fù)載因子,用于計算Segment的threshold域,

concurrencyLevel:即ConccurentHashMap的并發(fā)度,支持同時更新ConccurentHashMap且不發(fā)生鎖競爭的最大線程數(shù)。concurrencyLevel不能代表ConccurentHashMap實際并發(fā)度,ConccurentHashMap會使用大于等于該值的2的冪指數(shù)的最小值作為實際并發(fā)度,實際并發(fā)度即為segments數(shù)組的長度。創(chuàng)建時未指定concurrencyLevel則默認(rèn)為16。

并發(fā)度對ConccurentHashMap性能具有舉足輕重的作用,如果并發(fā)度設(shè)置的過小,會帶來嚴(yán)重的鎖競爭問題;如果并發(fā)度設(shè)置的過大,原本位于同一個Segment內(nèi)的訪問會擴(kuò)散到不同的Segment中,CPU cache命中率會下降,從而引起程序性能下降。

原子方法

先看源碼:

/**

* 獲取給定table的第i個元素,使用volatile讀語義

*/

@SuppressWarnings("unchecked")

static final HashEntry entryAt(HashEntry[] tab, int i) {

return (tab == null) ? null :

(HashEntry) UNSAFE.getObjectVolatile

(tab, ((long)i << TSHIFT) + TBASE);

}

/**

* 設(shè)置給定table的第i個元素,使用volatile寫入語義

*/

static final void setEntryAt(HashEntry[] tab, int i,

HashEntry e) {

UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);

}

/**

* 通過Unsafe提供的具有volatile元素訪問語義的操作獲取給定Segment數(shù)組的第j個元素(如果ss非空)

* 注意:因為Segment數(shù)組的每個元素只能設(shè)置一次(使用完全有序的寫入),

* 所以一些性能敏感的方法只能依靠此方法作為對空讀取的重新檢查。

*/

@SuppressWarnings("unchecked")

static final Segment segmentAt(Segment[] ss, int j) {

long u = (j << SSHIFT) + SBASE;

return ss == null ? null :

(Segment) UNSAFE.getObjectVolatile(ss, u);

}

/**

* 根據(jù)給定hash獲取segment

*/

@SuppressWarnings("unchecked")

private Segment segmentForHash(int h) {

long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

return (Segment) UNSAFE.getObjectVolatile(segments, u);

}

/**

* 根據(jù)給定segment和hash獲取table entry

*/

@SuppressWarnings("unchecked")

static final HashEntry entryForHash(Segment seg, int h) {

HashEntry[] tab;

return (seg == null || (tab = seg.table) == null) ? null :

(HashEntry) UNSAFE.getObjectVolatile

(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);

}

ConcurrentHashMap主要使用以上幾個方法對segments數(shù)組和table數(shù)組進(jìn)行讀寫,并保證線程安全性。

其主要使用了UNSAFE.getObjectVolatile提供Volatile讀語義,UNSAFE.putOrderedObject提供了Volatile寫語義。下面分析這兩個方法帶來的好處:

UNSAFE.getObjectVolatile使得非volatile聲明的對象具有volatile讀的語義,那么要使非volatile聲明的對象具有volatile寫的語義則需要借助操作UNSAFE.putObjectvolatile。

那么UNSAFE.putOrderedObject操作的含義和作用又是什么呢?

為了控制特定條件下的指令重排序和內(nèi)存可見性問題,Java編譯器使用一種叫內(nèi)存屏障(Memory Barrier,或叫做內(nèi)存柵欄,Memory Fence)的CPU指令來禁止指令重排序。java中volatile寫入使用了內(nèi)存屏障中的LoadStore屏障規(guī)則,對于這樣的語句Load1; LoadStore; Store2,在Store2及后續(xù)寫入操作被刷出前,保證Load1要讀取的數(shù)據(jù)被讀取完畢。volatile的寫所插入的storeLoad是一個耗時的操作,因此出現(xiàn)了一個對volatile寫的升級版本,利用lazySet方法進(jìn)行性能優(yōu)化,在實現(xiàn)上對volatile的寫只會在之前插入StoreStore屏障,對于這樣的語句Store1; StoreStore; Store2,在Store2及后續(xù)寫入操作執(zhí)行前,保證Store1的寫入操作對其它處理器可見,也就是按順序的寫入。UNSAFE.putOrderedObject正是提供了這樣的語義,避免了寫寫指令重排序,但不保證內(nèi)存可見性,因此讀取時需借助volatile讀保證可見性。

ConcurrentHashMap正是利用了這些高性能的原子讀寫來避免加鎖帶來的開銷,從而大幅度提高了性能。

ensureSegment

ensureSegment用于確定指定的Segment是否存在,不存在則會創(chuàng)建。源碼如下:

@SuppressWarnings("unchecked")

private Segment ensureSegment(int k) {

final Segment[] ss = this.segments;

long u = (k << SSHIFT) + SBASE; // raw offset

Segment seg;

if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {

Segment proto = ss[0]; // use segment 0 as prototype

int cap = proto.table.length;

float lf = proto.loadFactor;

int threshold = (int)(cap * lf);

HashEntry[] tab = (HashEntry[])new HashEntry[cap];

if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))

== null) { // recheck

Segment s = new Segment(lf, threshold, tab);

while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))

== null) {

if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))

break;

}

}

}

return seg;

}

使用getObjectVolatile()方法提供的原子讀語義獲取指定Segment,如果為空,以構(gòu)造ConcurrentHashMap對象時創(chuàng)建的Segment為模板,創(chuàng)建新的Segment。ensureSegment在創(chuàng)建Segment期間為不斷使用getObjectVolatile()檢查指定Segment是否為空,防止其他線程已經(jīng)創(chuàng)建了Segment。

HashEntry

開始介紹Segment之前,我們先看看HashEntry的結(jié)構(gòu):

static final class HashEntry {

final int hash;

final K key;

volatile V value;

volatile HashEntry next;

HashEntry(int hash, K key, V value, HashEntry next) {

this.hash = hash;

this.key = key;

this.value = value;

this.next = next;

}

/**

* 使用volatile寫入語義設(shè)置next域

*/

final void setNext(HashEntry n) {

UNSAFE.putOrderedObject(this, nextOffset, n);

}

// Unsafe mechanics

static final sun.misc.Unsafe UNSAFE;

static final long nextOffset;

static {

try {

UNSAFE = sun.misc.Unsafe.getUnsafe();

Class k = HashEntry.class;

nextOffset = UNSAFE.objectFieldOffset

(k.getDeclaredField("next"));

} catch (Exception e) {

throw new Error(e);

}

}

}

HashEntry將value和next聲明為volatile ,是為了保證內(nèi)存可見性,也就是每次讀取都將從內(nèi)存讀取最新的值,而不會從緩存讀取。同時,寫入next域也使用volatile寫入語義保證原子性。寫入使用原子性操作,讀取使用volatile,從而保證了多線程訪問的線程安全性。

Segment

Segment作為ConccurentHashMap的專用數(shù)據(jù)結(jié)構(gòu),同時擴(kuò)展了ReentrantLock,使得Segment本身就是一把可重入鎖,方便執(zhí)行鎖定。Segment內(nèi)部持有一個始終處于一致狀態(tài)的entry列表,使得讀取操作無需加鎖(通過volatile讀table數(shù)組)。調(diào)整tables大小期間通過復(fù)制節(jié)點(diǎn)實現(xiàn),使得舊版本的table仍然可以遍歷。

Segment僅定義需要加鎖的可變方法,針對ConcurrentHashMap中相應(yīng)方法的調(diào)用都會被代理到Segment中的方法。這些可變方法使用scanAndLock和scanAndLockForPut在競爭中使用受控旋轉(zhuǎn),也就是自旋次數(shù)受限制的自旋鎖。由于線程的阻塞與喚醒通常伴隨著上下文切換、CPU搶占等,都是開銷比較大的操作,使用自旋次數(shù)受限制的自旋鎖,可以提高獲取鎖的概率,降低線程阻塞的概率,這樣可極大地提升性能。之所以自旋次數(shù)受限制,是因為自旋會不斷的消耗CPU的時間,無限制的自旋會導(dǎo)致開銷增長。因此自旋鎖適用于多核CPU下,同時線程等待鎖的時間非常短;若等待某個鎖需要的時間較長,讓線程盡早進(jìn)入阻塞才是正確的選擇。下面開始進(jìn)入源碼:

Segment成員變量

/**

* 對segment加鎖時,在阻塞之前進(jìn)行自旋的最大次數(shù)。

* 在多處理器上,使用有限數(shù)量的重試來維護(hù)在定位節(jié)點(diǎn)時獲取的高速緩存。

*/

static final int MAX_SCAN_RETRIES =

Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

/**

* 每個segment的table數(shù)組,訪問數(shù)組中的元素通過entryAt/setEntryAt提供的volatile語義來完成

*/

transient volatile HashEntry[] table;

/**

* 元素的數(shù)量,只能在鎖中或其他volatile讀保證可見性之間進(jìn)行訪問

*/

transient int count;

/**

* 當(dāng)前segment中可變操作發(fā)生的次數(shù),put,remove等,可能會溢出32位

* 它為CHM isEmpty()和size()方法中的穩(wěn)定性檢查提供了足夠的準(zhǔn)確性。

* 只能在鎖中或其他volatile讀保證可見性之間進(jìn)行訪問

*/

transient int modCount;

/**

* 當(dāng)table大小超過此閾值時,對table進(jìn)行擴(kuò)容,值為(int)(capacity *loadFactor)

*/

transient int threshold;

/**

* 負(fù)載因子

*/

final float loadFactor;

/**

* 構(gòu)造方法

*/

Segment(float lf, int threshold, HashEntry[] tab) {

this.loadFactor = lf;

this.threshold = threshold;

this.table = tab;

}

scanAndLockForPut

先看流程圖:

看源碼:

private HashEntry scanAndLockForPut(K key, int hash, V value) {

HashEntry first = entryForHash(this, hash);//根據(jù)key的hash值查找頭節(jié)點(diǎn)

HashEntry e = first;

HashEntry node = null;

int retries = -1; // negative while locating node

while (!tryLock()) {//嘗試獲取鎖,成功則直接返回,失敗則開始自旋

HashEntry f; // 用于后續(xù)重新檢查頭節(jié)點(diǎn)

if (retries < 0) {

if (e == null) {//結(jié)束遍歷節(jié)點(diǎn)

if (node == null) // 創(chuàng)建節(jié)點(diǎn)

node = new HashEntry(hash, key, value, null);

retries = 0;

}

else if (key.equals(e.key))//找到節(jié)點(diǎn),結(jié)束遍歷

retries = 0;

else

e = e.next;

}

else if (++retries > MAX_SCAN_RETRIES) {//達(dá)到最大嘗試次數(shù)

lock();//進(jìn)入加鎖方法,失敗則會進(jìn)入排隊,阻塞當(dāng)前線程

break;

}

else if ((retries & 1) == 0 &&

(f = entryForHash(this, hash)) != first) {

e = first = f; // 頭節(jié)點(diǎn)變化,需要重新遍歷,說明有新節(jié)點(diǎn)加入或被移除

retries = -1;

}

}

return node;

}

分析:while循環(huán)每執(zhí)行一次,都會嘗試獲取鎖,成功則會返回。retries 初始值設(shè)為-1是為了遍歷當(dāng)前hash對應(yīng)桶的鏈表,找到則停止遍歷,未找到則會預(yù)創(chuàng)建一個節(jié)點(diǎn);同時,如果頭節(jié)點(diǎn)發(fā)生變化,則會重新進(jìn)行遍歷,直到自旋次數(shù)大于MAX_SCAN_RETRIES,使用lock加鎖,獲取鎖失敗則會進(jìn)入等待隊列。

為什么scanAndLockForPut中要遍歷一次鏈表?

前面已經(jīng)提過scanAndLockForPut使用自旋次數(shù)受限制的自旋鎖進(jìn)行優(yōu)化加鎖的方式,此外,遍歷一次鏈表也是一種優(yōu)化方法,主要是盡可能使當(dāng)前鏈表中的節(jié)點(diǎn)進(jìn)入CPU高速緩存,提高緩存命中率,以便獲取鎖定后的遍歷速度更快。實際上加鎖后并沒有使用已經(jīng)找到的節(jié)點(diǎn),因為它們必須在鎖定下重新獲取,以確保更新的順序一致性,但是遍歷一次后通??梢愿斓刂匦露ㄎ?。這是一種預(yù)熱優(yōu)化的方式,scanAndLock中也使用了該優(yōu)化方式。

scanAndLock內(nèi)部實現(xiàn)方式與scanAndLockForPut相似,但比scanAndLockForPut更簡單,scanAndLock不需要預(yù)創(chuàng)建節(jié)點(diǎn)。因此scanAndLock主要用于remove和replace操作,而scanAndLockForPut則用于put,這里就不再貼源碼。

Segment?put

先看流程圖:

put源碼:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {

HashEntry node = tryLock() ? null :

scanAndLockForPut(key, hash, value);

V oldValue;

try {

HashEntry[] tab = table;

int index = (tab.length - 1) & hash;

HashEntry first = entryAt(tab, index);

for (HashEntry e = first;;) {

if (e != null) {

K k;

if ((k = e.key) == key ||

(e.hash == hash && key.equals(k))) {

oldValue = e.value;

if (!onlyIfAbsent) {

e.value = value;

++modCount;

}

break;

}

e = e.next;

}

else {

if (node != null)

node.setNext(first);

else

node = new HashEntry(hash, key, value, first);

int c = count + 1;

if (c > threshold && tab.length < MAXIMUM_CAPACITY)

rehash(node);

else

setEntryAt(tab, index, node);

++modCount;

count = c;

oldValue = null;

break;

}

}

} finally {

unlock();

}

return oldValue;

}

Segment中put的大體流程是先對Segment加鎖,然后根據(jù)(tab.length-1)&hash找到對應(yīng)的slot,然后遍歷slot對應(yīng)的鏈表,如果key對應(yīng)的entry已經(jīng)存在,根據(jù)onlyIfAbsent標(biāo)志決定是否替換舊值,如果key對應(yīng)的entry不存在,創(chuàng)建新節(jié)點(diǎn)插入鏈表頭部,期間若容量超過限制,判斷是否需要進(jìn)行rehash。

put實現(xiàn)還是比較簡單的,下面談?wù)勂渲兄饕膸讉€優(yōu)化點(diǎn)。

scanAndLockForPut的作用已經(jīng)介紹過了,如果鎖能很快的獲取,有限次數(shù)的自旋可防止線程進(jìn)入阻塞,有助于提升性能;此外,自旋期間會遍歷鏈表,希望遍歷的鏈表被CPU Cache所緩存,為后續(xù)實際put過程中的鏈表遍歷操作提升性能;最后scanAndLockForPut還會預(yù)創(chuàng)建節(jié)點(diǎn)。

HashEntry[] tab = table有什么好處?

從Segment源碼可知,table被聲明為volatile,為了保證內(nèi)存可見性,table上的修改都必須立即更新到主存,volatile寫實際是具有一定開銷的。由于put中的代碼都在加鎖區(qū)執(zhí)行,鎖既能保證可見性,也能保證原子性,因此,不需要針對table進(jìn)行volatile寫,將table引用賦值給局部變量以實現(xiàn)編譯、運(yùn)行時的優(yōu)化。

node.setNext(first)也是同樣的道理,HashEntry的next同樣被聲明為volatile,因此這里使用優(yōu)化的方式UNSAFE.putOrderedObject進(jìn)行volatile寫入。

既然put已在加鎖區(qū)運(yùn)行,為何訪問tab中的元素不直接通過數(shù)組索引,而是用entryAt(tab, index)?

加鎖保證了table引用的同步語義,但是對table數(shù)組中元素的寫入使用UNSAFE.putOrderedObject進(jìn)行順序?qū)?#xff0c;該操作只是禁止寫寫指令重排序,不能保證寫入后的內(nèi)存可見性。因此,必須使用entryAt(tab, index)提供的volatile讀來獲取最新的數(shù)據(jù)。

remove

remove源碼相對比較簡單,這里就直接分析源碼了。

final V remove(Object key, int hash, Object value) {

if (!tryLock())

scanAndLock(key, hash);

V oldValue = null;

try {

HashEntry[] tab = table;

int index = (tab.length - 1) & hash;

HashEntry e = entryAt(tab, index);

HashEntry pred = null;

while (e != null) {

K k;

HashEntry next = e.next;

if ((k = e.key) == key ||

(e.hash == hash && key.equals(k))) {

V v = e.value;

if (value == null || value == v || value.equals(v)) {

if (pred == null)

setEntryAt(tab, index, next);

else

pred.setNext(next);

++modCount;

--count;

oldValue = v;

}

break;

}

pred = e;

e = next;

}

} finally {

unlock();

}

return oldValue;

}

remove首先會嘗試獲取鎖,失敗則會進(jìn)入scanAndLock代碼塊,scanAndLock和scanAndLockForPut實現(xiàn)相似。獲取鎖成功后,然后根據(jù)hash找到對應(yīng)的slot,然后遍歷slot對應(yīng)的鏈表,找到要移除元素的key,若value為空或與鏈表中元素的value相等則移除元素,否則退出。

remove和put中使用的優(yōu)化相同,這里就不再重復(fù)說明。

replace操作實現(xiàn)方式也非常簡單,這里不再說明。下面詳細(xì)分析rehash實現(xiàn)。

rehash

rehash主要的作用的是擴(kuò)容,將擴(kuò)容前table中的節(jié)點(diǎn)重新分配到新的table。由于table的capacity都是2的冪,按照2的冪次方擴(kuò)容為原來的一倍,擴(kuò)容前在slot i中的元素,擴(kuò)容后要么還是在slot i里,或者i+擴(kuò)容前table的capacity的slot中,這樣使得只需要移動原來桶中的部分元素即可將所有節(jié)點(diǎn)分配到新的table。

為了提高效率,rehash首先找到第一個后續(xù)所有節(jié)點(diǎn)在擴(kuò)容后index都保持不變的節(jié)點(diǎn),將這個節(jié)點(diǎn)加入擴(kuò)容后的table的index對應(yīng)的slot中,然后將這個節(jié)點(diǎn)之前的所有節(jié)點(diǎn)重排即可。

先看流程圖:

rehash源碼:

@SuppressWarnings("unchecked")

private void rehash(HashEntry node) {

HashEntry[] oldTable = table;

int oldCapacity = oldTable.length;

int newCapacity = oldCapacity << 1;

threshold = (int)(newCapacity * loadFactor);

HashEntry[] newTable =

(HashEntry[]) new HashEntry[newCapacity];

int sizeMask = newCapacity - 1;

for (int i = 0; i < oldCapacity ; i++) {

HashEntry e = oldTable[i];

if (e != null) {

HashEntry next = e.next;

int idx = e.hash & sizeMask;

if (next == null) // slot中只有一個元素

newTable[idx] = e;

else { // Reuse consecutive sequence at same slot

HashEntry lastRun = e;

int lastIdx = idx;

for (HashEntry last = next;

last != null;

last = last.next) {

int k = last.hash & sizeMask;

if (k != lastIdx) {

lastIdx = k;

lastRun = last;

}

}

newTable[lastIdx] = lastRun;

//復(fù)制lastRun之前的所有節(jié)點(diǎn)

for (HashEntry p = e; p != lastRun; p = p.next) {

V v = p.value;

int h = p.hash;

int k = h & sizeMask;

HashEntry n = newTable[k];

newTable[k] = new HashEntry(h, p.key, v, n);

}

}

}

}

int nodeIndex = node.hash & sizeMask; //添加新節(jié)點(diǎn)

node.setNext(newTable[nodeIndex]);

newTable[nodeIndex] = node;

table = newTable;

}

為了理解這段代碼的原理,現(xiàn)構(gòu)造一個例子來說明,假設(shè)擴(kuò)容前oldTable的oldCapacity為4,則擴(kuò)容后newTable的newCapacity為8,假設(shè)oldTable[0]的元素如下:

由index=hash & (oldCapacity-1)可知,擴(kuò)容前以上幾個節(jié)點(diǎn)都在slot?oldTable[0]中。下面開始分析

計算頭節(jié)點(diǎn)idx = e.hash & sizeMask=e.hash & (newCapacity-1)=16 & 7=0;

將idx賦值給lastIdx = idx=0,last = next(hash=4);

last不為空,第一輪循環(huán):k = last.hash & sizeMask=4 & 7=4,滿足k != lastIdx,執(zhí)行語句:

lastIdx = k;?lastRun = last(hash=4);

last = last.next,last不為空,第二輪循環(huán):k =8 & 7=0,滿足k != lastIdx,執(zhí)行語句:

lastIdx = k;?lastRun = last(hash=8);

last = last.next,last不為空,第三輪循環(huán):k = 32 & 7=0,不滿足k != lastIdx;

last = last.next,last為空,結(jié)束循環(huán)。

到此找到第一個后續(xù)所有節(jié)點(diǎn)在擴(kuò)容后index都保持不變的節(jié)點(diǎn),lastRun為hash=8的節(jié)點(diǎn),將lastRun加入lastIdx對應(yīng)slot中,這樣lastRun的后續(xù)節(jié)點(diǎn)也會自動加入lastIdx對應(yīng)slot。然后將節(jié)點(diǎn)lastRun之前的所有節(jié)點(diǎn)重排,通過新建節(jié)點(diǎn),使用hash?& sizeMask計算index,則將節(jié)點(diǎn)插入index對應(yīng)slot中鏈表頭部。

綜上,最好的情況,每個slot鏈表的所有節(jié)點(diǎn)在擴(kuò)容后index都保持不變,那么只需移動頭節(jié)點(diǎn),不用創(chuàng)建新節(jié)點(diǎn)即可完成擴(kuò)容和節(jié)點(diǎn)重新分配;最差的情況,每個鏈表的倒數(shù)兩個節(jié)點(diǎn)在擴(kuò)容后index不同,那么需要重建并復(fù)制所有節(jié)點(diǎn)。

到此,Segment實現(xiàn)原理和源碼分析完成,下面進(jìn)入ConccurentHashMap主體代碼中。

主要操作

本節(jié)主要介紹ConccurentHashMap中的方法,get、containsKey、containsValue、size、put、putAll、putIfAbsent、remove等等。

get、containsKey

get與containsKey兩個方法的實現(xiàn)幾乎完全一致,都不需要加鎖讀數(shù)據(jù),下面以get源碼說明:

public V get(Object key) {

Segment s;

HashEntry[] tab;

int h = hash(key);

long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&

(tab = s.table) != null) {

for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile

(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);

e != null; e = e.next) {

K k;

if ((k = e.key) == key || (e.hash == h && key.equals(k)))

return e.value;

}

}

return null;

}

首先計算key的hash碼,計算Segment的index,使用getObjectVolatile()方法提供的原子讀語義獲取Segment,再計算Segment中slot的索引,使用getObjectVolatile()方法提供的原子讀語義獲取slot頭節(jié)點(diǎn),遍歷鏈表,判斷是否存在key相同的節(jié)點(diǎn)以及獲得該節(jié)點(diǎn)的value。由于遍歷過程中其他線程可能對鏈表結(jié)構(gòu)做了調(diào)整,因此get和containsKey返回的可能是過時的數(shù)據(jù),這就是ConcurrentHashMap的弱一致性。如果要求強(qiáng)一致性,那么必須使用Collections.synchronizedMap()。

size、containsValue、isEmpty

size用于返回ConcurrentHashMap中HashEntry的數(shù)量,containsValue用于判斷ConcurrentHashMap中是否存在給定的value,isEmpty用于判斷ConcurrentHashMap是否為空,size、containsValue、isEmpty都是基于整個ConcurrentHashMap來進(jìn)行操作的,因此實現(xiàn)原理基本相似。這里以size方法來說明實現(xiàn)原理。先看流程圖:

看源碼:

public int size() {

final Segment[] segments = this.segments;

int size;

boolean overflow; // 為true表示size溢出32位

long sum; // 所有segment中modCount的總和

long last = 0L;

int retries = -1; // 第一次迭代不計入重試,因此總共會重試3次

try {

for (;;) {

if (retries++ == RETRIES_BEFORE_LOCK) {

for (int j = 0; j < segments.length; ++j)

ensureSegment(j).lock(); // force creation

}

sum = 0L;

size = 0;

overflow = false;

for (int j = 0; j < segments.length; ++j) {

Segment seg = segmentAt(segments, j);

if (seg != null) {

sum += seg.modCount;

int c = seg.count;

if (c < 0 || (size += c) < 0)

overflow = true;

}

}

if (sum == last)

break;

last = sum;

}

} finally {

if (retries > RETRIES_BEFORE_LOCK) {

for (int j = 0; j < segments.length; ++j)

segmentAt(segments, j).unlock();

}

}

return overflow ? Integer.MAX_VALUE : size;

}

首先不加鎖循環(huán)所有的Segment(通過Unsafe的getObjectVolatile()以保證原子讀語義),計算所有Segment的count之后,同時計算所有Segment的modcount之和sum,如果sum與last相等,說明迭代期間沒有發(fā)生其他線程修改ConcurrentHashMap的情況,返回size,;當(dāng)重試次數(shù)超過預(yù)定義的值(RETRIES_BEFORE_LOCK為2)時,對所有的Segment依次進(jìn)行加鎖,再計算size的值。需要注意的是,加鎖過程中會強(qiáng)制創(chuàng)建所有的不存在Segment,否則容易出現(xiàn)其他線程創(chuàng)建Segment并進(jìn)行put,remove等操作。由于retries初始值為-1,因此會嘗試3次才會對所有的Segment加鎖。

注意:modcount在put, replace, remove以及clear等方法中都會被修改。

containsValue實現(xiàn)只是在重試問題上稍微不同,在第一次嘗試時,sum與last相等也不會返回,默認(rèn)會嘗試試第二次,只有第二次嘗試時sum與last也相等才返回。此外,在循環(huán)所有的Segment期間,一旦找到匹配value的HashEntry,立即返回,只有value不存在時,才會多次嘗試確認(rèn)。這里不貼源碼了。

isEmpty與containsValue、size實現(xiàn)稍有不同,isEmpty重試失敗不會進(jìn)行加鎖,而是直接返回false,isEmpty在循環(huán)所有的Segment期間,一旦某個Segment中的entry數(shù)量不為0,立即返回true;第一次循環(huán)所有的Segment期間,計算每個Segment的modCount之和sum;當(dāng)sum不為0,進(jìn)行第二次循環(huán),循環(huán)期間,使用sum減去每個Segment的modCount,如果sum不為0,返回false,否則返回true。這樣做的好處是,加入第一次循環(huán)期間發(fā)生了一個put,第二次循環(huán)發(fā)生了一個remove,那么isEmpty也能檢查出當(dāng)前map為空。

put、putAll、putIfAbsent

put和putIfAbsent實現(xiàn)原理基本相似,區(qū)別在于當(dāng)key存在時,put會替換舊值,更新modCount,putIfAbsent則不會。putAll通過循環(huán)調(diào)用put來實現(xiàn)。這里以put源碼來進(jìn)行說明。

@SuppressWarnings("unchecked")

public V put(K key, V value) {

Segment s;

if (value == null)

throw new NullPointerException();

int hash = hash(key);

int j = (hash >>> segmentShift) & segmentMask;

if ((s = (Segment)UNSAFE.getObject

(segments, (j << SSHIFT) + SBASE)) == null)

s = ensureSegment(j);

return s.put(key, hash, value, false);

}

由源碼實現(xiàn)可知,首先計算key的hash碼,再計算segments的index,獲取Segment,如果Segment為空,則會進(jìn)入ensureSegment創(chuàng)建Segment,最后將put操作代理給Segment中的put方法實現(xiàn)數(shù)據(jù)寫入。

注意:put中獲取指定Segment沒有使用原子讀語義,在ensureSegment中會使用原子讀語義重新檢查。

remove、replace、clear

remove和replace都是先通過key計算hash碼,定位到Segment,如果Segment為空,則不做任何操作,否則將操作代理給Segment的remove和replace方法。

clear會循環(huán)所有的Segment,如果Segment不空,將操作代理給Segment的clear,Segment的clear操作會直接對Segment加鎖,使用UNSAFE.putOrderedObject操作將table數(shù)組中的元素置為null。由于clear只是清除元素,Segment指向table的引用不會發(fā)生變化,使得clear期間仍然可以進(jìn)行遍歷。

弱一致性

ConcurrentHashMap是弱一致的。 ConcurrentHashMap的get,containsKey,clear,iterator 都是弱一致性的。

get和containsKey都是無鎖的操作,均通過getObjectVolatile()提供的原子讀語義來獲得Segment以及對應(yīng)的鏈表,然后遍歷鏈表。由于遍歷期間其他線程可能對鏈表結(jié)構(gòu)做了調(diào)整,因此get和containsKey返回的可能是過時的數(shù)據(jù)。如在get執(zhí)行UNSAFE.getObjectVolatile(segments, u)之后,其他線程若執(zhí)行了clear操作,則get將讀到失效的數(shù)據(jù)。

由于clear沒有使用全局的鎖,當(dāng)清除完一個segment之后,開始清理下一個segment的時候,已經(jīng)清理segments可能又被加入了數(shù)據(jù),因此clear返回的時候,ConcurrentHashMap中是可能存在數(shù)據(jù)的。因此,clear方法是弱一致的。

ConcurrentHashMap中的迭代器主要包括entrySet、keySet、values方法,迭代器在遍歷期間如果已經(jīng)遍歷的table上的內(nèi)容變化了,迭代器不會拋出ConcurrentModificationException異常。如果未遍歷的數(shù)組上的內(nèi)容發(fā)生了變化,則有可能反映到迭代過程中,這就是ConcurrentHashMap迭代器弱一致的表現(xiàn)。

到此,jdk1.7中ConcurrentHashMap的實現(xiàn)分析完成。

歡迎指出本文有誤的地方,若對您有幫助,點(diǎn)個贊支持一下唄!轉(zhuǎn)載請注明原文出處

總結(jié)

以上是生活随笔為你收集整理的currenthashmap扩容原理_ConcurrentHashMap实现原理和源码解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。