常用并发工具类(并发集合类)
文章目錄
- 概述
- BlockingQueue
- ArrayBlockingQueue
- 數據存儲相關屬性
- 阻塞特性相關屬性
- 主要方法
- LinkedBlockingQueue
- LinkedBlockingQueue 主要屬性
- LinkedBlockingQueue 設計思想
- ConcurrentLinkedQueue
- PriorityBlockingQueue
- PriorityBlockingQueue 主要屬性
- PriorityBlockingQueue 設計思想
- SynchronousQueue
- LinkedBlockingDeque
- DelayQueue
- DelayQueue 主要屬性
- DelayQueue 主要設計思想
- CopyOnWrite
- CopyOnWriteArrayList
- CopyOnWriteArrayList 的主要屬性
- CopyOnWriteArrayList 主要設計思想
- CopyOnWriteArraySet
概述
常用的并發集合類,主要有 ArrayBlockingQueue,ConcurrentLinkedQueue,LinkedBlockingQueue,PriorityBlockingQueue,DelayQueue,SynchronousQueue,LinkedBlockingDeque,以及 COW 系列的集合,如 CopyOnWriteArrayList,CopyOnWriteArraySet 等
BlockingQueue
BlockingQueue 描述了一個阻塞隊列應該有的行為,例如:
- 阻塞讀方法:take()
- 阻塞寫方法:put()
阻塞的讀寫方法的函數簽名上都會拋出 InterruptedException,具體的阻塞邏輯由實現類自行實現。
ArrayBlockingQueue
是一個基于數組實現的阻塞隊列,創建對象時必須指定容量
數據存儲相關屬性
內置了一個循環隊列用于存儲數據,可以更高效地利用內存空間
/** The queued items */final Object[] items;/** items index for next take, poll, peek or remove */int takeIndex;/** items index for next put, offer, or add */int putIndex;/** Number of elements in the queue */int count;阻塞特性相關屬性
內置一個 ReentrantLock 鎖對象,一個隊列是否為空的 Condition 對象,以及一個隊列是否滿的 Condition 對象
/** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;其中,notEmpty 條件和 notFull 條件都是由 lock 對象創建出來的
主要方法
- 構造方法中必須傳入容量,可以通過 public ArrayBlockingQueue(int capacity, boolean fair) 中的 fair 參數來指定公平/非公平模式
- fair 參數最終會用于構造 ReentrantLock 鎖對象
- put 方法會在隊列滿的時候((隊尾下標+1)%隊列長度=隊頭下標))調用 notEmpty 條件進行等待,等隊列再次有空位的時候(有元素出隊時將會被喚醒)將元素放至隊尾
- take 會在隊列為空的時候(隊尾下標=隊頭下標)調用 notFull 條件進行等待,當隊列不為空時,將隊頭的元素返回,并刪除隊頭元素
LinkedBlockingQueue
LinkedBlockingQueue 是一個基于單向鏈表實現的阻塞隊列,其容量為:
- 如果在構造函數中指定了容量,那么容量就是隊列長度的最大邊界限制
- 如果沒有在構造函數中指定容量,容量將會默認設置成 Integer.MAX_VALUE,近似無界
所以,LinkedBlockingQueue 在不傳入容量的情況下,最大容量會被設置為 Integer.MAX_VALUE
LinkedBlockingQueue 主要屬性
/** The capacity bound, or Integer.MAX_VALUE if none */private final int capacity;/** Current number of elements */private final AtomicInteger count = new AtomicInteger();/*** Head of linked list.* Invariant: head.item == null*/transient Node<E> head;/*** Tail of linked list.* Invariant: last.next == null*/private transient Node<E> last;/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();- LinkedBlockingQueue 底層的數據結構是單向鏈表
- 內置一個讀取鎖,ReentrantLock takeLock ,以及由這把鎖創建出來的非空條件 Condition notEmpty
- 內置一個寫入鎖,ReentrantLock putLock,以及由這把鎖創建出來的非空條件 Condition notFull
LinkedBlockingQueue 設計思想
- 執行 take() 方法的時候,會從隊列的頭部取出元素,所以必須要獲取讀取鎖 takLock,從而確保同時只有一個線程可以操作鏈表的頭部
- 當執行 take() 方法的時候如果隊列為空,那么當前線程將會調用 notEmpty.await() 方法進行阻塞,一直到有元素放入事件發生(如調用 put() 方法)
- 執行 put() 方法的時候,會從隊列的尾部添加元素,所以必須要獲取到寫入鎖 putLock,從而確保同時只有一個線程可以操作鏈表的尾部
- 當執行 put() 方法的時候如果隊列滿了,那么當前線程將會調用 notFull.await() 方法進行阻塞,一直到有元素取出事件發生(如調用 take() 方法)
- 當執行 remove()、contains() 等方法時,將會同時獲取讀、寫兩把鎖,在方法結束后同時釋放兩把鎖
- 這樣做的目的是因為 remove() 等方法不能確定在鏈表的某個位置操作(讀取或者新增、刪除)元素,所以需要同時對隊頭和隊尾都加鎖
總的來說,LinkedBlockingQueue 就是通過控制鏈表同一方向的操作,來實現線程安全和讀寫條件阻塞;當不能確定操作元素的位置時,將進行雙重加鎖
ConcurrentLinkedQueue
ConcurrentLinkedQueue 是一個基于單向鏈表實現的無界線程安全隊列,基于 CAS + 自旋來保證線程安全
PriorityBlockingQueue
PriorityBlockingQueue 是一個基于優先級的無界阻塞隊列,底層的數據結構是堆。
PriorityBlockingQueue 主要屬性
/*** Priority queue represented as a balanced binary heap: the two* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The* priority queue is ordered by comparator, or by the elements'* natural ordering, if comparator is null: For each node n in the* heap and each descendant d of n, n <= d. The element with the* lowest value is in queue[0], assuming the queue is nonempty.*/private transient Object[] queue;/*** The number of elements in the priority queue.*/private transient int size;/*** The comparator, or null if priority queue uses elements'* natural ordering.*/private transient Comparator<? super E> comparator;/*** Lock used for all public operations*/private final ReentrantLock lock;/*** Condition for blocking when empty*/private final Condition notEmpty;/*** Spinlock for allocation, acquired via CAS.*/private transient volatile int allocationSpinLock;- PriorityBlockingQueue 底層有一個可擴容的對象數組 Object[] queue,結構是堆
- 內置一個 ReentrantLock 獨占鎖,以及由這個鎖構造出來的 Condition notEmpty 條件
- 一個 volatile 關鍵字修飾的整型變量 volatile int allocationSpinLock
PriorityBlockingQueue 設計思想
- 所有的存取方法都需要獲取鎖后才能執行,包括 size() 方法
- 當數組的長度大于等于容量時,需要進行擴容操作
- 擴容之前先要釋放鎖
- 首先要嘗試使用 CAS 嘗試把 allocationSpinLock 變量的值修改為 1
- 當前容量小于 64 時,增長為原來的 2 倍 + 2;否則增長為原來的 1.5 倍
- 還原 allocationSpinLock 的值
- 擴容完成之后,再次獲取鎖
- 插入元素時,會按照指定的 Comparator 的比較方法進行插入;否則將會使用元素自身的 Comparable.compareTo() 方法進行比較
- 調用 take() 方法時如果隊列為空時,當前線程將會調用 notEmpty.await() 進行阻塞,直到有新的元素入隊后被喚醒
- 出隊的元素始終是堆頂元素
SynchronousQueue
一個不存儲元素的阻塞隊列,每一個入隊操作必須對應一個出隊操作,每一個出隊操作必須對應一個入隊操作。
LinkedBlockingDeque
LinkedBlockingQueue 是一個雙端阻塞隊列,底層數據結構是雙向鏈表。
- 內置一個 ReentrantLock 獨占鎖,以及由這個鎖構造出來的 Condition notEmpty(隊列為空,等待放入) 和 Condition notFull(隊列已滿,等待取出)條件
- 當隊列為空時調用阻塞取出相關方法(take())時,將會調用 notEmpty.await() 方法進行阻塞,直到有元素被放入到隊列中
- 當隊列已滿時調用阻塞存入相關方法(put())時,將會調用 notFull.await() 方法進行阻塞,直到有元素被取出
- 可以從隊列的兩端(頭部/尾部)進行元素操作(存入/取出)元素,意味著可以使用 LinkeBlockingQueue 自由實現棧或者隊列
DelayQueue
DelayQueue 是一個無界延時阻塞隊列。底層的數據結構是優先級隊列 PriorityQueue
DelayQueue 主要屬性
private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null;/*** Condition signalled when a newer element becomes available* at the head of the queue or a new thread may need to* become leader.*/private final Condition available = lock.newCondition();- 內置一個 ReentrantLock 獨占鎖,以及一個由鎖對象創建出來的條件對象 Condition available
- 底層數據結構是 PriorityQueue,是由堆來實現的
- 有一個 Thread leader 屬性,代表的是當前第一個嘗試等待獲取隊頭元素的線程
DelayQueue 主要設計思想
- 所有放入到 DelayQueue 中的元素,都要實現 Delayed 接口
- 當隊列為空時調用出隊方法,將會調用 available.await() 方法進行阻塞,直到有新的元素被添加到隊列中
- 當隊列不為空時調用出隊方法,首先判斷隊頭元素的剩余過期時間是否小于等于 0
- 如果隊頭元素的剩余過期時間小于等于 0,那么說明元素已經到期,直接把這個元素移除并返回
- 如果隊頭元素的剩余過期時間(long delay)大于 0,將會判斷當前 leader 是否為空
- 如果為空,則把自己設置為 leader,并且調用 available.awaitNanos(long delay) 方法,將自己阻塞 long delay 時間
- 如果不為空,說明已經有其他線程在等待獲取元素了,那么將調用 available.await() 方法進行無限期等待
- 在阻塞了 long delay 時間后,重新將 leader 置為 null,并且執行上面的邏輯
- 所有存取方法調用前都需要獲取鎖
CopyOnWrite
CopyOnWrite,顧名思義,就是寫時復制的意思。
主要的設計思想就是在修改數據的時候,并不直接在原數據上進行修改,而是先拷貝一份原數據的副本,后續對于數據的修改操作在拷貝出來的副本上進行,最后將副本替換掉原數據。
這樣做的好處就是,在讀取數據的時候,是不用采取任何并發控制手段的,直接訪問源數據就行,所以寫時復制適用于讀多寫少的場景。
CopyOnWriteArrayList
CopyOnWriteArrayList 是一個并發安全的動態數組,底層數據結構是數組。
CopyOnWriteArrayList 的主要屬性
/** The lock protecting all mutators */final transient ReentrantLock lock = new ReentrantLock();/** The array, accessed only via getArray/setArray. */private transient volatile Object[] array;- 內置一個 ReentrantLock 獨占鎖
- 一個用 volatile 關鍵字修飾的對象數組 array
CopyOnWriteArrayList 主要設計思想
- 在所有的修改方法執行之前,首先要獲取鎖
- 在使用元素添加方法時,每次會拷貝出一個長度 +1 的新數組,同時把要添加的元素放入到新數組的最后一個位置上,最后把舊數組替換成新數組
- 在使用元素移除方法時,每次會拷貝出一個長度 -1 的新數組,最后根據要刪除的元素位置,進行數組拷貝,最后把舊數組替換成新數組
- 所有讀取方法都沒有采取任何同步措施,包括獲取迭代器
- 獲取迭代器的方法,并沒有像很多文章中說的對當前數組進行了一個拷貝,而是直接把當前數組的引用傳遞給了迭代器
- 之所以不用對當前數組進行拷貝,是因為每次的修改方法都會創建出一個新數組,所以 CopyOnWriteArrayList 迭代器根本不會,也沒有必要去拷貝當前數組
CopyOnWriteArraySet
CopyOnWriteArraySet 是一個并發安全的無重復集合,底層是基于 CopyOnWriteArrayList 來實現的
總結
以上是生活随笔為你收集整理的常用并发工具类(并发集合类)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C/C++ 踩过的坑和防御式编程
- 下一篇: HBase 原理