http://www.iteye.com/topic/344876
ConcurrentHashMap是Java 5中支持高并發、高吞吐量的線程安全HashMap實現。在這之前我對ConcurrentHashMap只有一些膚淺的理解,僅知道它采用了多個鎖,大概也足夠了。但是在經過一次慘痛的面試經歷之后,我覺得必須深入研究它的實現。面試中被問到讀是否要加鎖,因為讀寫會發生沖突,我說必須要加鎖,我和面試官也因此發生了沖突,結果可想而知。還是閑話少說,通過仔細閱讀源代碼,現在總算理解ConcurrentHashMap實現機制了,其實現之精巧,令人嘆服,與大家共享之。
?
?
實現原理 ?
?
鎖分離 (Lock Stripping)
?
ConcurrentHashMap允許多個修改操作并發進行,其關鍵在于使用了鎖分離技術。它使用了多個鎖來控制對hash表的不同部分進行的修改。ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的hash table,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以并發進行。
?
有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。這里“按順序”是很重要的,否則極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,并且其成員變量實際上也是final的,但是,僅僅是將數組聲明為final的并不保證數組成員也是final的,這需要實現上的保證。這可以確保不會出現死鎖,因為獲得鎖的順序是固定的。不變性是多線程編程占有很重要的地位,下面還要談到。
?
Java代碼??
? ? ?? final ?Segment<K,V>[]?segments;?? ? ?
不變(Immutable)和易變(Volatile)
?
ConcurrentHashMap完全允許多個讀操作并發進行,讀操作并不需要加鎖。如果使用傳統的技術,如HashMap中的實現,如果允許可以在hash鏈的中間添加或刪除元素,讀操作不加鎖將得到不一致的數據。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry代表每個hash鏈中的一個節點,其結構如下所示:
?
Java代碼??
static ?final ?class ?HashEntry<K,V>?{?? ????final ?K?key;?? ????final ?int ?hash;?? ????volatile ?V?value;?? ????final ?HashEntry<K,V>?next;?? }?? 可以看到除了value不是final的,其它值都是final的,這意味著不能從hash鏈的中間或尾部添加或刪除節點,因為這需要修改next引用值,所有的節點的修改只能從頭部開始。對于put操作,可以一律添加到Hash鏈的頭部。但是對于remove操作,可能需要從中間刪除一個節點,這就需要將要刪除節點的前面所有節點整個復制一遍,最后一個節點指向要刪除結點的下一個結點。這在講解刪除操作時還會詳述。為了確保讀操作能夠看到最新的值,將value設置成volatile,這避免了加鎖。
?
其它
?
為了加快定位段以及段中hash槽的速度,每個段hash槽的的個數都是2^n,這使得通過位運算就可以定位段和段中hash槽的位置。當并發級別為默認值16時,也就是段的個數,hash值的高4位決定分配在哪個段中。但是我們也不要忘記《算法導論》給我們的教訓:hash槽的的個數不應該是2^n,這可能導致hash槽分配不均,這需要對hash值重新再hash一次。(這段似乎有點多余了?)
?
這是重新hash的算法,還比較復雜,我也懶得去理解了。
Java代碼??
private ?static ?int ?hash(int ?h)?{?? ?????? ?????? ????h?+=?(h?<<??15 )?^?0xffffcd7d ;?? ????h?^=?(h?>>>?10 );?? ????h?+=?(h?<<???3 );?? ????h?^=?(h?>>>??6 );?? ????h?+=?(h?<<???2 )?+?(h?<<?14 );?? ????return ?h?^?(h?>>>?16 );?? }?? ?
這是定位段的方法:
Java代碼??
final ?Segment<K,V>?segmentFor(int ?hash)?{?? ????return ?segments[(hash?>>>?segmentShift)?&?segmentMask];?? }?? ? ?
?
數據結構
?
關于Hash表的基礎數據結構,這里不想做過多的探討。Hash表的一個很重要方面就是如何解決hash沖突,ConcurrentHashMap和HashMap使用相同的方式,都是將hash值相同的節點放在一個hash鏈中。與HashMap不同的是,ConcurrentHashMap使用多個子Hash表,也就是段(Segment)。下面是ConcurrentHashMap的數據成員:
?
Java代碼??
public ?class ?ConcurrentHashMap<K,?V>?extends ?AbstractMap<K,?V>?? ????????implements ?ConcurrentMap<K,?V>,?Serializable?{?? ????? ? ? ?? ????final ?int ?segmentMask;?? ?? ????? ? ?? ????final ?int ?segmentShift;?? ?? ????? ? ?? ????final ?Segment<K,V>[]?segments;?? }?? ?
所有的成員都是final的,其中segmentMask和segmentShift主要是為了定位段,參見上面的segmentFor方法。
?
每個Segment相當于一個子Hash表,它的數據成員如下:
?
Java代碼??
????static ?final ?class ?Segment<K,V>?extends ?ReentrantLock?implements ?Serializable?{?? private ?static ?final ?long ?serialVersionUID?=?2249069246763182397L;?? ????????? ? ?? ????????transient ?volatile ?int ?count;?? ?? ????????? ? ? ? ? ? ? ?? ????????transient ?int ?modCount;?? ?? ????????? ? ? ? ?? ????????transient ?int ?threshold;?? ?? ????????? ? ?? ????????transient ?volatile ?HashEntry<K,V>[]?table;?? ?? ????????? ? ? ? ? ?? ????????final ?float ?loadFactor;?? }?? count用來統計該段數據的個數,它是volatile,它用來協調修改和讀取操作,以保證讀取操作能夠讀取到幾乎最新的修改。協調方式是這樣的,每次修改操作做了結構上的改變,如增加/刪除節點(修改節點的值不算結構上的改變),都要寫count值,每次讀取操作開始都要讀取count的值。這利用了Java 5中對volatile語義的增強,對同一個volatile變量的寫和讀存在happens-before關系。modCount統計段結構改變的次數,主要是為了檢測對多個段進行遍歷過程中某個段是否發生改變,在講述跨段操作時會還會詳述。threashold用來表示需要進行rehash的界限值。table數組存儲段中節點,每個數組元素是個hash鏈,用HashEntry表示。table也是volatile,這使得能夠讀取到最新的table值而不需要同步。loadFactor表示負載因子。
?
?
實現細節
?
修改操作
?
先來看下刪除操作remove(key)。
Java代碼??
public ?V?remove(Object?key)?{?? ?hash?=?hash(key.hashCode());?? ????return ?segmentFor(hash).remove(key,?hash,?null );?? }?? 整個操作是先定位到段,然后委托給段的remove操作。當多個刪除操作并發進行時,只要它們所在的段不相同,它們就可以同時進行。下面是Segment的remove方法實現:
Java代碼??
V?remove(Object?key,?int ?hash,?Object?value)?{?? ????lock();?? ????try ?{?? ????????int ?c?=?count?-?1 ;?? ????????HashEntry<K,V>[]?tab?=?table;?? ????????int ?index?=?hash?&?(tab.length?-?1 );?? ????????HashEntry<K,V>?first?=?tab[index];?? ????????HashEntry<K,V>?e?=?first;?? ????????while ?(e?!=?null ?&&?(e.hash?!=?hash?||?!key.equals(e.key)))?? ????????????e?=?e.next;?? ?? ????????V?oldValue?=?null ;?? ????????if ?(e?!=?null )?{?? ????????????V?v?=?e.value;?? ????????????if ?(value?==?null ?||?value.equals(v))?{?? ????????????????oldValue?=?v;?? ?????????????????? ?????????????????? ?????????????????? ????????????????++modCount;?? ????????????????HashEntry<K,V>?newFirst?=?e.next;?? ????????????????for ?(HashEntry<K,V>?p?=?first;?p?!=?e;?p?=?p.next)?? ????????????????????newFirst?=?new ?HashEntry<K,V>(p.key,?p.hash,?? ??????????????????????????????????????????????????newFirst,?p.value);?? ????????????????tab[index]?=?newFirst;?? ????????????????count?=?c;??? ????????????}?? ????????}?? ????????return ?oldValue;?? ????}?finally ?{?? ????????unlock();?? ????}?? }?? ?整個操作是在持有段鎖的情況下執行的,空白行之前的行主要是定位到要刪除的節點e。接下來,如果不存在這個節點就直接返回null,否則就要將e前面的結點復制一遍,尾結點指向e的下一個結點。e后面的結點不需要復制,它們可以重用。下面是個示意圖,我直接從這個網站?上復制的(畫這樣的圖實在是太麻煩了,如果哪位有好的畫圖工具,可以推薦一下)。
?
?
刪除元素之前:
?
?
?
刪除元素3之后:
?
第二個圖其實有點問題,復制的結點中應該是值為2的結點在前面,值為1的結點在后面,也就是剛好和原來結點順序相反,還好這不影響我們的討論。
?
整個remove實現并不復雜,但是需要注意如下幾點。第一,當要刪除的結點存在時,刪除的最后一步操作要將count的值減一。這必須是最后一步操作,否則讀取操作可能看不到之前對段所做的結構性修改。第二,remove執行的開始就將table賦給一個局部變量tab,這是因為table是volatile變量,讀寫volatile變量的開銷很大。編譯器也不能對volatile變量的讀寫做任何優化,直接多次訪問非volatile實例變量沒有多大影響,編譯器會做相應優化。
?
?
接下來看put操作,同樣地put操作也是委托給段的put方法。下面是段的put方法:
Java代碼??
V?put(K?key,?int ?hash,?V?value,?boolean ?onlyIfAbsent)?{?? ????lock();?? ????try ?{?? ????????int ?c?=?count;?? ????????if ?(c++?>?threshold)??? ????????????rehash();?? ????????HashEntry<K,V>[]?tab?=?table;?? ????????int ?index?=?hash?&?(tab.length?-?1 );?? ????????HashEntry<K,V>?first?=?tab[index];?? ????????HashEntry<K,V>?e?=?first;?? ????????while ?(e?!=?null ?&&?(e.hash?!=?hash?||?!key.equals(e.key)))?? ????????????e?=?e.next;?? ?? ????????V?oldValue;?? ????????if ?(e?!=?null )?{?? ????????????oldValue?=?e.value;?? ????????????if ?(!onlyIfAbsent)?? ????????????????e.value?=?value;?? ????????}?? ????????else ?{?? ????????????oldValue?=?null ;?? ????????????++modCount;?? ????????????tab[index]?=?new ?HashEntry<K,V>(key,?hash,?first,?value);?? ????????????count?=?c;??? ????????}?? ????????return ?oldValue;?? ????}?finally ?{?? ????????unlock();?? ????}?? }?? 該方法也是在持有段鎖的情況下執行的,首先判斷是否需要rehash,需要就先rehash。接著是找是否存在同樣一個key的結點,如果存在就直接替換這個結點的值。否則創建一個新的結點并添加到hash鏈的頭部,這時一定要修改modCount和count的值,同樣修改count的值一定要放在最后一步。put方法調用了rehash方法,reash方法實現得也很精巧,主要利用了table的大小為2^n,這里就不介紹了。
?
修改操作還有putAll和replace。putAll就是多次調用put方法,沒什么好說的。replace甚至不用做結構上的更改,實現要比put和delete要簡單得多,理解了put和delete,理解replace就不在話下了,這里也不介紹了。
?
?
獲取操作
?
首先看下get操作,同樣ConcurrentHashMap的get操作是直接委托給Segment的get方法,直接看Segment的get方法:
?
Java代碼??
V?get(Object?key,?int ?hash)?{?? ????if ?(count?!=?0 )?{??? ????????HashEntry<K,V>?e?=?getFirst(hash);?? ????????while ?(e?!=?null )?{?? ????????????if ?(e.hash?==?hash?&&?key.equals(e.key))?{?? ????????????????V?v?=?e.value;?? ????????????????if ?(v?!=?null )?? ????????????????????return ?v;?? ????????????????return ?readValueUnderLock(e);??? ????????????}?? ????????????e?=?e.next;?? ????????}?? ????}?? ????return ?null ;?? }?? ?
get操作不需要鎖。第一步是訪問count變量,這是一個volatile變量,由于所有的修改操作在進行結構修改時都會在最后一步寫count變量,通過這種機制保證get操作能夠得到幾乎最新的結構更新。對于非結構更新,也就是結點值的改變,由于HashEntry的value變量是volatile的,也能保證讀取到最新的值。接下來就是對hash鏈進行遍歷找到要獲取的結點,如果沒有找到,直接訪回null。對hash鏈進行遍歷不需要加鎖的原因在于鏈指針next是final的。但是頭指針卻不是final的,這是通過getFirst(hash)方法返回,也就是存在table數組中的值。這使得getFirst(hash)可能返回過時的頭結點,例如,當執行get方法時,剛執行完getFirst(hash)之后,另一個線程執行了刪除操作并更新頭結點,這就導致get方法中返回的頭結點不是最新的。這是可以允許,通過對count變量的協調機制,get能讀取到幾乎最新的數據,雖然可能不是最新的。要得到最新的數據,只有采用完全的同步。
?
最后,如果找到了所求的結點,判斷它的值如果非空就直接返回,否則在有鎖的狀態下再讀一次。這似乎有些費解,理論上結點的值不可能為空,這是因為put的時候就進行了判斷,如果為空就要拋NullPointerException。空值的唯一源頭就是HashEntry中的默認值,因為HashEntry中的value不是final的,非同步讀取有可能讀取到空值。仔細看下put操作的語句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在這條語句中,HashEntry構造函數中對value的賦值以及對tab[index]的賦值可能被重新排序,這就可能導致結點的值為空。這種情況應當很罕見,一旦發生這種情況,ConcurrentHashMap采取的方式是在持有鎖的情況下再讀一遍,這能夠保證讀到最新的值,并且一定不會為空值。
?
Java代碼??
V?readValueUnderLock(HashEntry<K,V>?e)?{?? ????lock();?? ????try ?{?? ????????return ?e.value;?? ????}?finally ?{?? ????????unlock();?? ????}?? }?? ? ?
另一個操作是containsKey,這個實現就要簡單得多了,因為它不需要讀取值:
Java代碼??
boolean ?containsKey(Object?key,?int ?hash)?{?? ????if ?(count?!=?0 )?{??? ????????HashEntry<K,V>?e?=?getFirst(hash);?? ????????while ?(e?!=?null )?{?? ????????????if ?(e.hash?==?hash?&&?key.equals(e.key))?? ????????????????return ?true ;?? ????????????e?=?e.next;?? ????????}?? ????}?? ????return ?false ;?? }?? ? ?
跨段操作 ?
?
有些操作需要涉及到多個段,比如說size(), containsValaue()。先來看下size()方法:
?
Java代碼??
public ?int ?size()?{?? ????final ?Segment<K,V>[]?segments?=?this .segments;?? ????long ?sum?=?0 ;?? ????long ?check?=?0 ;?? ????int []?mc?=?new ?int [segments.length];?? ?????? ?????? ????for ?(int ?k?=?0 ;?k?<?RETRIES_BEFORE_LOCK;?++k)?{?? ????????check?=?0 ;?? ????????sum?=?0 ;?? ????????int ?mcsum?=?0 ;?? ????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?{?? ????????????sum?+=?segments[i].count;?? ????????????mcsum?+=?mc[i]?=?segments[i].modCount;?? ????????}?? ????????if ?(mcsum?!=?0 )?{?? ????????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?{?? ????????????????check?+=?segments[i].count;?? ????????????????if ?(mc[i]?!=?segments[i].modCount)?{?? ????????????????????check?=?-1 ;??? ????????????????????break ;?? ????????????????}?? ????????????}?? ????????}?? ????????if ?(check?==?sum)?? ????????????break ;?? ????}?? ????if ?(check?!=?sum)?{??? ????????sum?=?0 ;?? ????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?? ????????????segments[i].lock();?? ????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?? ????????????sum?+=?segments[i].count;?? ????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?? ????????????segments[i].unlock();?? ????}?? ????if ?(sum?>?Integer.MAX_VALUE)?? ????????return ?Integer.MAX_VALUE;?? ????else ?? ????????return ?(int )sum;?? }?? ?
size方法主要思路是先在沒有鎖的情況下對所有段大小求和,如果不能成功(這是因為遍歷過程中可能有其它線程正在對已經遍歷過的段進行結構性更新),最多執行RETRIES_BEFORE_LOCK次,如果還不成功就在持有所有段鎖的情況下再對所有段大小求和。在沒有鎖的情況下主要是利用Segment中的modCount進行檢測,在遍歷過程中保存每個Segment的modCount,遍歷完成之后再檢測每個Segment的modCount有沒有改變,如果有改變表示有其它線程正在對Segment進行結構性并發更新,需要重新計算。
?
?
其實這種方式是存在問題的,在第一個內層for循環中,在這兩條語句sum += segments[i].count; mcsum += mc[i] = segments[i].modCount;之間,其它線程可能正在對Segment進行結構性的修改,導致segments[i].count和segments[i].modCount讀取的數據并不一致。這可能使size()方法返回任何時候都不曾存在的大小,很奇怪javadoc居然沒有明確標出這一點,可能是因為這個時間窗口太小了吧。size()的實現還有一點需要注意,必須要先segments[i].count,才能segments[i].modCount,這是因為segment[i].count是對volatile變量的訪問,接下來segments[i].modCount才能得到幾乎最新的值(前面我已經說了為什么只是“幾乎”了)。這點在containsValue方法中得到了淋漓盡致的展現:
?
?
Java代碼??
public ?boolean ?containsValue(Object?value)?{?? ????if ?(value?==?null )?? ????????throw ?new ?NullPointerException();?? ?? ?????? ?? ????final ?Segment<K,V>[]?segments?=?this .segments;?? ????int []?mc?=?new ?int [segments.length];?? ?? ?????? ????for ?(int ?k?=?0 ;?k?<?RETRIES_BEFORE_LOCK;?++k)?{?? ????????int ?sum?=?0 ;?? ????????int ?mcsum?=?0 ;?? ????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?{?? ????????????int ?c?=?segments[i].count;?? ????????????mcsum?+=?mc[i]?=?segments[i].modCount;?? ????????????if ?(segments[i].containsValue(value))?? ????????????????return ?true ;?? ????????}?? ????????boolean ?cleanSweep?=?true ;?? ????????if ?(mcsum?!=?0 )?{?? ????????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?{?? ????????????????int ?c?=?segments[i].count;?? ????????????????if ?(mc[i]?!=?segments[i].modCount)?{?? ????????????????????cleanSweep?=?false ;?? ????????????????????break ;?? ????????????????}?? ????????????}?? ????????}?? ????????if ?(cleanSweep)?? ????????????return ?false ;?? ????}?? ?????? ????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?? ????????segments[i].lock();?? ????boolean ?found?=?false ;?? ????try ?{?? ????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?{?? ????????????if ?(segments[i].containsValue(value))?{?? ????????????????found?=?true ;?? ????????????????break ;?? ????????????}?? ????????}?? ????}?finally ?{?? ????????for ?(int ?i?=?0 ;?i?<?segments.length;?++i)?? ????????????segments[i].unlock();?? ????}?? ????return ?found;?? }?? ? 同樣注意內層的第一個for循環,里面有語句int c = segments[i].count; 但是c卻從來沒有被使用過,即使如此,編譯器也不能做優化將這條語句去掉,因為存在對volatile變量count的讀取,這條語句存在的唯一目的就是保證segments[i].modCount讀取到幾乎最新的值。關于containsValue方法的其它部分就不分析了,它和size方法差不多。
?
?
跨段方法中還有一個isEmpty()方法,其實現比size()方法還要簡單,也不介紹了。最后簡單地介紹下迭代方法,如keySet(), values(), entrySet()方法,這些方法都返回相應的迭代器,所有迭代器都繼承于Hash_Iterator類(提交時居然提醒我不能包含sh It,只得加了下劃線),里實現了主要的方法。其結構是:
?
Java代碼??
abstract ?class ?Hash_Iterator{?? ????int ?nextSegmentIndex;?? ????int ?nextTableIndex;?? ????HashEntry<K,V>[]?currentTable;?? ????HashEntry<K,?V>?nextEntry;?? ????HashEntry<K,?V>?lastReturned;?? }?? ?
?nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex對應段中中hash鏈的索引,currentTable是nextSegmentIndex對應段的table。調用next方法時主要是調用了advance方法:
?
?
Java代碼??
final ?void ?advance()?{?? ????if ?(nextEntry?!=?null ?&&?(nextEntry?=?nextEntry.next)?!=?null )?? ????????return ;?? ?? ????while ?(nextTableIndex?>=?0 )?{?? ????????if ?(?(nextEntry?=?currentTable[nextTableIndex--])?!=?null )?? ????????????return ;?? ????}?? ?? ????while ?(nextSegmentIndex?>=?0 )?{?? ????????Segment<K,V>?seg?=?segments[nextSegmentIndex--];?? ????????if ?(seg.count?!=?0 )?{?? ????????????currentTable?=?seg.table;?? ????????????for ?(int ?j?=?currentTable.length?-?1 ;?j?>=?0 ;?--j)?{?? ????????????????if ?(?(nextEntry?=?currentTable[j])?!=?null )?{?? ????????????????????nextTableIndex?=?j?-?1 ;?? ????????????????????return ;?? ????????????????}?? ????????????}?? ????????}?? ????}?? }?? 不想再多介紹了,唯一需要注意的是跳到下一個段時,一定要先讀取下一個段的count變量。?
?
這種迭代方式的主要效果是不會拋出ConcurrentModificationException。一旦獲取到下一個段的table,也就意味著這個段的頭結點在迭代過程中就確定了,在迭代過程中就不能反映對這個段節點并發的刪除和添加,對于節點的更新是能夠反映的,因為節點的值是一個volatile變量。
?
結束語
?
ConcurrentHashMap是一個支持高并發的高性能的HashMap實現,它支持完全并發的讀以及一定程度并發的寫。ConcurrentHashMap的實現也是很精巧,充分利用了最新的JMM規范,值得學習,卻不值得模仿。最后由于本人水平有限,對大師的作品難免有誤解,如果存在,還望大牛們不吝指出。
?
?
?
?
參考文章:
http://www.ibm.com/developerworks/java/library/j-jtp08223/,這個是討論的是Doug Lea's util.concurrent包中的ConcurrentHashMap的實現,不過大致思想是一致的。
?
http://floatingpoint.tinou.com/2008/09/performance-optimization-in-concurrenthashmap.html
?
轉載于:https://www.cnblogs.com/leeeee/p/7276060.html
總結
以上是生活随笔 為你收集整理的深入Java集合学习系列:ConcurrentHashMap之实现细节 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。