多线程小抄集(新编四)
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new4/
ConcurrentHashMap
ConcurrentHashMap是線程安全的HashMap,鍵值都不能為null。
JDK7的實現:內部采用分段鎖來實現,默認初始容量為16(所以理論上這個時候最多可以同時支持 16 個線程并發寫,只要它們的操作分別分布在不同的 Segment 上。這個值可以在初始化的時候設置為其他值,但是一旦初始化以后,它是不可以擴容的),裝載因子為0.75f,分段16,每個段的HashEntry<K,V>[]大小為2。每次擴容為原來容量的2倍,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。在獲取size操作的時候,先嘗試2(RETRIES_BEFORE_LOCK)次通過不鎖住Segment的方式統計各個Segment大小,如果統計的過程中,容器的count發生了變化,再采用加鎖的方式來統計所有的Segment的大小。
JDK8的實現:ConcurrentHashMap中結構為:數組、鏈表和紅黑樹。當某個槽內的元素個數增加到8個且table容量大于或者等于64時,由鏈表轉為紅黑樹;當某個槽內的元素個數減少到6個時,由紅黑樹重新轉回鏈表。鏈表轉紅黑樹的過程,就是把給定順序的元素構造成一顆紅黑樹的過程。需要注意的是,當table容量小于64時,只會擴容,并不會把鏈表轉為紅黑樹。在轉化過程中,使用同步塊鎖住當前槽的首元素,防止其他進程對當前槽進行增刪改操作,轉化完成后利用CAS替換原有的鏈表。
無論是JDK7還是JDK8,ConcurrentHashMap的size方法都只能返回一個大概數量,無法做到100%的精確,因為已經統計過的槽在size返回最終結果前有可能又出現了變化,從而導致返回大小與實際大小存在些許差異。
JDK8中size方法的實現比JDK7要簡單很多:
public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }最大值是 Integer 類型的最大值,但是 Map 的 size 可能超過 MAX_VALUE, 所以還有一個方法 mappingCount(),JDK 的建議使用 mappingCount() 而不是size()。mappingCount() 的代碼如下:
public long mappingCount() {long n = sumCount();return (n < 0L) ? 0L : n; // ignore transient negative values }以上可以看出,無論是 size() 還是 mappingCount(), 計算大小的核心方法都是 sumCount()。sumCount() 的代碼如下:
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum; }CounterCell 這個類到底是什么?我們會發現它使用了 @sun.misc.Contended 標記的類,內部包含一個 volatile 變量。@sun.misc.Contended 這個注解標識著這個類防止需要防止 “偽共享”。
JDK1.8 size 是通過對 baseCount 和 counterCell 進行 CAS 計算,最終通過 baseCount 和 遍歷 CounterCell 數組得出 size。
JDK8 ConcurrentHashMap CPU 100%
遞歸調用computeIfAbsent方法
線程安全的非阻塞隊列
非阻塞隊列有ConcurrentLinkedQueue, ConcurrentLinkedDeque。元素不能為null。以ConcurrentLinkedQueue為例,有頭head和尾tail兩個指針,遵循FIFO的原則進行入隊和出隊,方法有add(E e), peek()取出不刪除, poll()取出刪除, remove(Object o),size(), contains(Object o), addAll(Collection c), isEmpty()。ConcurrentLinkedDeque是雙向隊列,可以在頭和尾兩個方向進行相應的操作。
阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法。 支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。 支持阻塞的移除方法:意思是隊列為空時,獲取元素的線程會等待隊列變為非空。 任何阻塞隊列中的元素都不能為null.
阻塞隊列的插入和移除操作處理方式:
| 入隊 | add(e) | offer(e) | put(e) | offer(e,timeout,unit) |
| 出隊 | remove() | poll() | take() | poll(timeout,unit) |
| 查看 | element() | peek() | 無 | 無 |
如果是無界隊列,隊列不可能出現滿的情況,所以使用put或offer方法永遠不會被阻塞,而且使用offer方法時,該方法永遠返回true.
Java里的阻塞隊列:ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。 LinkedeBlockingQueue:一個有鏈表結構組成的(有界/無界)阻塞隊列。 PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列 。DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。 SynchronousQueue:一個不存儲元素的阻塞隊列。 LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。 LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
ArrayBlockingQueue
此隊列按照FIFO的原則對元素進行排序,支持公平和非公平策略,默認為不公平。初始化時必須設定容量大小。
public ArrayBlockingQueue(int capacity) public ArrayBlockingQueue(int capacity, boolean fair) public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)LinkedBlockingQueue
與ArrayBlockingQueue一樣,按照FIFO原則進行排序,內部采用鏈表結構,且不能設置為公平的。默認隊列長度為Integer.MAX_VALUE。
public LinkedBlockingQueue() {this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) public LinkedBlockingQueue(Collection<? extends E> c)PriorityBlockingQueue
是一個支持優先級的無界阻塞隊列,默認初始容量為11,默認情況下采用自然順序升序排列,不能保證同優先級元素的順序。底層采用二叉堆實現。內部元素要么實現Comparable接口,要么在初始化的時候指定構造函數的Comparator來對元素進行排序。
public PriorityBlockingQueue() public PriorityBlockingQueue(int initialCapacity) public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) public PriorityBlockingQueue(Collection<? extends E> c)DelayQueue
支持延時獲取元素的無界阻塞隊列。內部包含一個PriorityQueue來實現,隊列中的元素必須實現Delayed接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。 DelayQueue非常有用,可以將DelayQueue運用在下面應用場景。
- 緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
- 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行。
SynchronousQueue
不存儲元素的阻塞隊列,支持公平和非公平策略。每一個put操作必須等待一個take操作,否則不能繼續添加元素。非常適合傳遞性場景。
CopyOnWriteArrayList
在每次修改時,都會創建并重新發布一個新的容器副本,從而實現可變性。CopyOnWriteArrayList的迭代器保留一個指向底層基礎數組的引用,這個數組當前位于迭代器的起始位置,由于它不會被修改,因此在對其進行同步時只需確保數組內容的可見性。因此,多個線程可以同時對這個容器進行迭代,而不會彼此干擾或者與修改容器的線程相互干擾。“寫時復制”容器返回的迭代器不會拋出ConcurrentModificationException并且返回的元素與迭代器創建時的元素完全一致,而不必考慮之后修改操作所帶來的影響。顯然,每當修改容器時都會復制底層數組,這需要一定的開銷,特別是當容器的規模較大時,僅當迭代操作遠遠多于修改操作時,才應該使用“寫入時賦值”容器。
原子類
Java中Atomic包里一共提供了12個類,屬于4種類型的原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用、原子更新屬性(字段)。Atomic包里的類基本都是使用Unsafe實現的包裝類。
1)原子更新基本類型:AtomicBoolean,AtomicInteger, AtomicLong.
2)原子更新數組:AtomicIntegerArray,AtomicLongArray, AtomicReferenceArray.
3)原子更新引用類型:AtomicReference, AtomicStampedReference, AtomicMarkableReference.
4 ) 原子更新字段類型:AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, AtomicLongFieldUpdater.
原子更新基本類型
AtomicBoolean,AtomicInteger, AtomicLong三個類提供的方法類似,以AtomicInteger為例:有int addAndGet(int delta), boolean compareAndSet(int expect, int update), int getAndIncrement(), void lazySet(int newValue),int getAndSet(int newValue)。
// setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); } } private volatile int value;public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }sun.misc.Unsafe只提供三種CAS方法:compareAndSwapObject, compareAndSwapInt和compareAndSwapLong。對于AtomicBoolean,它是先把Boolean轉換成整形,再使用compareAndSwapInt進行CAS,原子更新char,float,double變量也可以用類似的思路來實現。
原子更新數組
以AtomicIntegerArray為例,此類主要提供原子的方式更新數組里的整形,常用方法如下:
- int addAndGet(int i, int delta):以原子的方式將輸入值與數組中索引i的元素相加。
- boolean compareAndSet(int i, int expect, int update):如果當前值等于預期值,則以原子方式將數組位置i的元素設置成update值。
- AtomicIntegerArray的兩個構造方法:
- AtomicIntegerArray(int length):指定數組的大小,并初始化為0
- AtomicIntegerArray(int [] array):對給定的數組進行拷貝。
案例:
int value[] = new int[]{1,2,3};AtomicIntegerArray aia = new AtomicIntegerArray(value);System.out.println(aia.getAndSet(1, 9));System.out.println(aia.get(1));System.out.println(value[1]);運行結果:2 9 2
原子更新引用示例
public class AtomicReferenceDemo {public static void main(String[] args) {User user = new User("conan", 15);AtomicReference<User> userRef = new AtomicReference<>(user);User userChg = new User("Shinichi", 17);userRef.compareAndSet(user, userChg);User newUser = userRef.get();System.out.println(user);System.out.println(userChg);System.out.println(userRef.get());AtomicIntegerFieldUpdater<User> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");fieldUpdater.getAndIncrement(newUser);System.out.println(newUser);System.out.println(fieldUpdater.get(newUser));}@Data@AllArgsConstructorstatic class User{private String name;volatile int age;} }輸出:
AtomicReferenceDemo.User(name=conan, age=15) AtomicReferenceDemo.User(name=Shinichi, age=17) AtomicReferenceDemo.User(name=Shinichi, age=17) AtomicReferenceDemo.User(name=Shinichi, age=18) 18CAS
全稱CompareAndSwap。假設有三個操作數:內存值V,舊的預期值A,要修改的值B,當且僅當預期值A和內存值V相同時,才會將內存值修改為B并返回true,否則什么都不做并返回false。當然CAS一定要配合volatile變量,這樣才能保證每次拿到的遍歷是主內存中最新的那個值,否則舊的預期值A對某條線程來說,永遠是一個不會變的值A,只要某次CAS操作失敗,永遠都不可能成功。cmpxchg指令
AQS
全稱AbstractQueuedSynchronizer。如果說JUC(java.util.concurrent)的基礎是CAS的話,那么AQS就是整個JAVA并發包的核心了,ReentrantLock, ReentrantReadWriteLock, CountDownLatch, Semaphore等都用到了它。
CAS的問題
AtomicStampedReference
循環時間長開銷大
自旋CAS如果長時間不成功,會給CPU帶來非常大的執行開銷。
只能保證一個共享變量的原子操作
當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性。AtomicReference類來保證引用對象之間的原子性,就可以把多個變量放在一個對象里來執行CAS操作。
如何避免死鎖
死鎖是指兩個或兩個以上的進程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,他們都將無法推進下去。這是一個嚴重的問題,因為死鎖會讓你的程序掛起無法完成任務,死鎖的發生必須滿足以下4個條件:
避免死鎖最簡單的方法就是阻止循環等待條件,將系統中所有的資源設置標志位、排序,規定所有的進程申請資源必須以一定的順序做操作來避免死鎖。
支持定時的鎖 boolean tryLock(long timeout, TimeUnit unit)
通過ThreadDump來分析找出死鎖
怎么檢測一個線程是否擁有鎖
java.lang.Thread中有一個方法:public static native boolean holdsLock(Object obj). 當且僅當當前線程擁有某個具體對象的鎖時返回true
JAVA中的線程調度算法
搶占式。一個線程用完CPU之后,操作系統會根據現場優先級、線程饑餓情況等數據算出一個總的優先級并分配下一個時間片給某個線程執行。
無鎖
要保證現場安全,并不是一定就要進行同步,兩者沒有因果關系。同步只是保證共享數據爭用時的正確性的手段,如果一個方法本來就不涉及共享數據,那它自然就無須任何同步措施去保證正確性,因此會有一些代碼天生就是線程安全的。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new4/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的多线程小抄集(新编四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 多线程小抄集(新编三)
- 下一篇: Kafka最全面试题整理|划重点要考