转:Java 7 种阻塞队列详解
轉(zhuǎn)自:
Java 7 種阻塞隊(duì)列詳解 - 云+社區(qū) - 騰訊云隊(duì)列(Queue)是一種經(jīng)常使用的集合。Queue 實(shí)際上是實(shí)現(xiàn)了一個(gè)先進(jìn)先出(FIFO:First In First Out)的有序表。和 List、Set ...https://cloud.tencent.com/developer/article/1706970
隊(duì)列和阻塞隊(duì)列
隊(duì)列
隊(duì)列(Queue)是一種經(jīng)常使用的集合。Queue 實(shí)際上是實(shí)現(xiàn)了一個(gè)先進(jìn)先出(FIFO:First In First Out)的有序表。和 List、Set 一樣都繼承自 Collection。它和 List 的區(qū)別在于,List可以在任意位置添加和刪除元素,而Queue 只有兩個(gè)操作:
- 把元素添加到隊(duì)列末尾;
- 從隊(duì)列頭部取出元素。
超市的收銀臺就是一個(gè)隊(duì)列:
我們常用的 LinkedList 就可以當(dāng)隊(duì)列使用,實(shí)現(xiàn)了 Dequeue 接口,還有 ConcurrentLinkedQueue,他們都屬于非阻塞隊(duì)列。
阻塞隊(duì)列
阻塞隊(duì)列,顧名思義,首先它是一個(gè)隊(duì)列,而一個(gè)阻塞隊(duì)列在數(shù)據(jù)結(jié)構(gòu)中所起的作用大致如下
線程 1 往阻塞隊(duì)列中添加元素,而線程 2 從阻塞隊(duì)列中移除元素
- 當(dāng)阻塞隊(duì)列是空時(shí),從隊(duì)列中獲取元素的操作將會被阻塞。
- 當(dāng)阻塞隊(duì)列是滿時(shí),從隊(duì)列中添加元素的操作將會被阻塞。
試圖從空的阻塞隊(duì)列中獲取元素的線程將會阻塞,直到其他的線程往空的隊(duì)列插入新的元素,同樣,試圖往已滿的阻塞隊(duì)列添加新元素的線程同樣也會阻塞,直到其他的線程從列中移除一個(gè)或多個(gè)元素或者完全清空隊(duì)列后繼續(xù)新增。
類似我們?nèi)ズ5讚婆抨?duì),海底撈爆滿情況下,阻塞隊(duì)列相當(dāng)于用餐區(qū),用餐區(qū)滿了的話,就阻塞在候客區(qū)等著,可以用餐的話 put 一波去用餐,吃完就 take 出去。
為什么要用阻塞隊(duì)列,有什么好處嗎
在多線程領(lǐng)域:所謂阻塞,是指在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒。
那為什么需要 BlockingQueue 呢
好處是我們不需要關(guān)心什么時(shí)候需要阻塞線程,什么時(shí)候需要喚醒線程,因?yàn)檫@些 BlockingQueue 都包辦了。
在 concurrent 包發(fā)布以前,多線程環(huán)境下,我們每個(gè)程序員都必須自己去實(shí)現(xiàn)這些細(xì)節(jié),尤其還要兼顧效率和線程安全,這會給我們的程序帶來不小的復(fù)雜性。現(xiàn)在有了阻塞隊(duì)列,我們的操作就從手動擋換成了自動擋。
Java 里的阻塞隊(duì)列
Collection的子類除了我們熟悉的 List 和 Set,還有一個(gè) Queue,阻塞隊(duì)列 BlockingQueue 繼承自 Queue。
BlockingQueue 是個(gè)接口,需要使用它的實(shí)現(xiàn)之一來使用 BlockingQueue,java.util.concurrent 包下具有以下 BlockingQueue 接口的實(shí)現(xiàn)類:
JDK 提供了 7 個(gè)阻塞隊(duì)列。分別是
- ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列
- LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
- PriorityBlockingQueue :一個(gè)支持優(yōu)先級排序的無界阻塞隊(duì)列
- DelayQueue:一個(gè)使用優(yōu)先級隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列
- SynchronousQueue:一個(gè)不存儲元素的阻塞隊(duì)列
- LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列(實(shí)現(xiàn)了繼承于 BlockingQueue 的 TransferQueue)
- LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列
BlockingQueue 核心方法
相比 Queue 接口,BlockingQueue 有四種形式的 API。
| 插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除(取出) | remove() | poll() | take() | poll(time,unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 |
以 ArrayBlockingQueue 為例來看下 Java 阻塞隊(duì)列提供的常用方法
- 拋出異常:
- 當(dāng)阻塞隊(duì)列滿時(shí),再往隊(duì)列里 add 插入元素會拋出 java.lang.IllegalStateException: Queue full 異常;
- 當(dāng)隊(duì)列為空時(shí),從隊(duì)列里 remove 移除元素時(shí)會拋出 NoSuchElementException 異常 。
- element(),返回隊(duì)列頭部的元素,如果隊(duì)列為空,則拋出一個(gè) NoSuchElementException 異常
- 返回特殊值:
- offer(),插入方法,成功返回 true,失敗返回 false;
- poll(),移除方法,成功返回出隊(duì)列的元素,隊(duì)列里沒有則返回 null
- peek() ,返回隊(duì)列頭部的元素,如果隊(duì)列為空,則返回 null
- 一直阻塞:
- 當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)線程繼續(xù)往隊(duì)列里 put 元素,隊(duì)列會一直阻塞生產(chǎn)線程,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出;
- 當(dāng)阻塞隊(duì)列空時(shí),消費(fèi)線程試圖從隊(duì)列里 take 元素,隊(duì)列也會一直阻塞消費(fèi)線程,直到隊(duì)列可用。
- 超時(shí)退出:
- 當(dāng)阻塞隊(duì)列滿時(shí),隊(duì)列會阻塞生產(chǎn)線程一定時(shí)間,如果超過一定的時(shí)間,生產(chǎn)線程就會退出,返回 false
- 當(dāng)阻塞隊(duì)列空時(shí),隊(duì)列會阻塞消費(fèi)線程一定時(shí)間,如果超過一定的時(shí)間,消費(fèi)線程會退出,返回 null
BlockingQueue 實(shí)現(xiàn)類
逐個(gè)分析下這 7 個(gè)阻塞隊(duì)列,常用的幾個(gè)順便探究下源碼。
ArrayBlockingQueue
ArrayBlockingQueue,一個(gè)由數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列。該隊(duì)列采用先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序添加的。
ArrayBlockingQueue 為有界且固定,其大小在構(gòu)造時(shí)由構(gòu)造函數(shù)來決定,確認(rèn)之后就不能再改變了。
ArrayBlockingQueue 支持對等待的生產(chǎn)者線程和使用者線程進(jìn)行排序的可選公平策略,但是在默認(rèn)情況下不保證線程公平的訪問,在構(gòu)造時(shí)可以選擇公平策略(fair = true)。公平性通常會降低吞吐量,但是減少了可變性和避免了“不平衡性”。(ArrayBlockingQueue 內(nèi)部的阻塞隊(duì)列是通過 ReentrantLock 和 Condition 條件隊(duì)列實(shí)現(xiàn)的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平訪問的區(qū)別)
所謂公平訪問隊(duì)列是指阻塞的所有生產(chǎn)者線程或消費(fèi)者線程,當(dāng)隊(duì)列可用時(shí),可以按照阻塞的先后順序訪問隊(duì)列,即先阻塞的生產(chǎn)者線程,可以先往隊(duì)列里插入元素,先阻塞的消費(fèi)者線程,可以先從隊(duì)列里獲取元素,可以保證先進(jìn)先出,避免饑餓現(xiàn)象。
源碼解讀
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 通過數(shù)組來實(shí)現(xiàn)的隊(duì)列final Object[] items;//記錄隊(duì)首元素的下標(biāo)int takeIndex;//記錄隊(duì)尾元素的下標(biāo)int putIndex;//隊(duì)列中的元素個(gè)數(shù)int count;//通過ReentrantLock來實(shí)現(xiàn)同步final ReentrantLock lock;//有2個(gè)條件對象,分別表示隊(duì)列不為空和隊(duì)列不滿的情況private final Condition notEmpty;private final Condition notFull;//迭代器transient Itrs itrs;//offer方法用于向隊(duì)列中添加數(shù)據(jù)public boolean offer(E e) {// 可以看出添加的數(shù)據(jù)不支持null值checkNotNull(e);final ReentrantLock lock = this.lock;//通過重入鎖來實(shí)現(xiàn)同步lock.lock();try {//如果隊(duì)列已經(jīng)滿了的話直接就返回false,不會阻塞調(diào)用這個(gè)offer方法的線程if (count == items.length)return false;else {//如果隊(duì)列沒有滿,就調(diào)用enqueue方法將元素添加到隊(duì)列中enqueue(e);return true;}} finally {lock.unlock();}}//多了個(gè)等待時(shí)間的 offer方法public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {checkNotNull(e);long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//獲取可中斷鎖lock.lockInterruptibly();try {while (count == items.length) {if (nanos <= 0)return false;//等待設(shè)置的時(shí)間nanos = notFull.awaitNanos(nanos);}//如果等待時(shí)間過了,隊(duì)列有空間的話就會調(diào)用enqueue方法將元素添加到隊(duì)列enqueue(e);return true;} finally {lock.unlock();}}//將數(shù)據(jù)添加到隊(duì)列中的具體方法private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;//通過循環(huán)數(shù)組實(shí)現(xiàn)的隊(duì)列,當(dāng)數(shù)組滿了時(shí)下標(biāo)就變成0了if (++putIndex == items.length)putIndex = 0;count++;//激活因?yàn)閚otEmpty條件而阻塞的線程,比如調(diào)用take方法的線程notEmpty.signal();}//將數(shù)據(jù)從隊(duì)列中取出的方法private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];//將對應(yīng)的數(shù)組下標(biāo)位置設(shè)置為null釋放資源items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//激活因?yàn)閚otFull條件而阻塞的線程,比如調(diào)用put方法的線程notFull.signal();return x;}//put方法和offer方法不一樣的地方在于,如果隊(duì)列是滿的話,它就會把調(diào)用put方法的線程阻塞,直到隊(duì)列里有空間public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;//因?yàn)楹竺嬲{(diào)用了條件變量的await()方法,而await()方法會在中斷標(biāo)志設(shè)置后拋出InterruptedException異常后退出,// 所以在加鎖時(shí)候先看中斷標(biāo)志是不是被設(shè)置了,如果設(shè)置了直接拋出InterruptedException異常,就不用再去獲取鎖了lock.lockInterruptibly();try {while (count == items.length)//如果隊(duì)列滿的話就阻塞等待,直到notFull的signal方法被調(diào)用,也就是隊(duì)列里有空間了notFull.await();//隊(duì)列里有空間了執(zhí)行添加操作enqueue(e);} finally {lock.unlock();}}//poll方法用于從隊(duì)列中取數(shù)據(jù),不會阻塞當(dāng)前線程public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {//如果隊(duì)列為空的話會直接返回null,否則調(diào)用dequeue方法取數(shù)據(jù)return (count == 0) ? null : dequeue();} finally {lock.unlock();}}//有等待時(shí)間的 poll 重載方法public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0) {if (nanos <= 0)return null;nanos = notEmpty.awaitNanos(nanos);}return dequeue();} finally {lock.unlock();}}//take方法也是用于取隊(duì)列中的數(shù)據(jù),但是和poll方法不同的是它有可能會阻塞當(dāng)前的線程public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//當(dāng)隊(duì)列為空時(shí),就會阻塞當(dāng)前線程while (count == 0)notEmpty.await();//直到隊(duì)列中有數(shù)據(jù)了,調(diào)用dequeue方法將數(shù)據(jù)返回return dequeue();} finally {lock.unlock();}}//返回隊(duì)首元素public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return itemAt(takeIndex); // null when queue is empty} finally {lock.unlock();}}//獲取隊(duì)列的元素個(gè)數(shù),加了鎖,所以結(jié)果是準(zhǔn)確的public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}// 此外,還有一些其他方法//返回隊(duì)列剩余空間,還能加幾個(gè)元素public int remainingCapacity() {final ReentrantLock lock = this.lock;lock.lock();try {return items.length - count;} finally {lock.unlock();}}// 判斷隊(duì)列中是否存在當(dāng)前元素opublic boolean contains(Object o){}// 返回一個(gè)按正確順序,包含隊(duì)列中所有元素的數(shù)組public Object[] toArray(){}// 自動清空隊(duì)列中的所有元素public void clear(){}// 移除隊(duì)列中所有可用元素,并將他們加入到給定的 Collection 中 public int drainTo(Collection<? super E> c){}// 返回此隊(duì)列中按正確順序進(jìn)行迭代的,包含所有元素的迭代器public Iterator<E> iterator() }LinkedBlockingQueue
LinkedBlockingQueue 是一個(gè)用單向鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長度為 Integer.MAX_VALUE。此隊(duì)列按照先進(jìn)先出的原則對元素進(jìn)行排序。
如果不是特殊業(yè)務(wù),LinkedBlockingQueue 使用時(shí),切記要定義容量 new LinkedBlockingQueue(capacity)
,防止過度膨脹。
源碼解讀
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = -6903933977591709194L;// 基于鏈表實(shí)現(xiàn),肯定要有結(jié)點(diǎn)類,典型的單鏈表結(jié)構(gòu)static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}//容量private final int capacity;//當(dāng)前隊(duì)列元素?cái)?shù)量private final AtomicInteger count = new AtomicInteger();// 頭節(jié)點(diǎn),不存數(shù)據(jù)transient Node<E> head;// 尾節(jié)點(diǎn),便于入隊(duì)private transient Node<E> last;// take鎖,出隊(duì)鎖,只有take,poll方法會持有private final ReentrantLock takeLock = new ReentrantLock();// 出隊(duì)等待條件// 當(dāng)隊(duì)列無元素時(shí),take鎖會阻塞在notEmpty條件上,等待其它線程喚醒private final Condition notEmpty = takeLock.newCondition();// 入隊(duì)鎖,只有put,offer會持有private final ReentrantLock putLock = new ReentrantLock();// 入隊(duì)等待條件// 當(dāng)隊(duì)列滿了時(shí),put鎖會會阻塞在notFull上,等待其它線程喚醒private final Condition notFull = putLock.newCondition();//同樣提供三個(gè)構(gòu)造器public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();// 初始化head和last指針為空值節(jié)點(diǎn)this.capacity = capacity;last = head = new Node<E>(null);}public LinkedBlockingQueue() {// 如果沒傳容量,就使用最大int值初始化其容量this(Integer.MAX_VALUE);}public LinkedBlockingQueue(Collection<? extends E> c) {}//入隊(duì)public void put(E e) throws InterruptedException {// 不允許null元素if (e == null) throw new NullPointerException();//規(guī)定給當(dāng)前put方法預(yù)留一個(gè)本地變量int c = -1;// 新建一個(gè)節(jié)點(diǎn)Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;// 使用put鎖加鎖putLock.lockInterruptibly();try {// 如果隊(duì)列滿了,就阻塞在notFull條件上// 等待被其它線程喚醒while (count.get() == capacity) {notFull.await();}// 隊(duì)列不滿了,就入隊(duì)enqueue(node);// 隊(duì)列長度加1c = count.getAndIncrement();// 如果現(xiàn)隊(duì)列長度小于容量// 就再喚醒一個(gè)阻塞在notFull條件上的線程// 這里為啥要喚醒一下呢?// 因?yàn)榭赡苡泻芏嗑€程阻塞在notFull這個(gè)條件上的// 而取元素時(shí)只有取之前隊(duì)列是滿的才會喚醒notFull// 為什么隊(duì)列滿的才喚醒notFull呢?// 因?yàn)閱拘咽切枰觩utLock的,這是為了減少鎖的次數(shù)// 所以,這里索性在放完元素就檢測一下,未滿就喚醒其它notFull上的線程// 說白了,這也是鎖分離帶來的代價(jià)if (c + 1 < capacity)notFull.signal();} finally {// 釋放鎖putLock.unlock();}// 如果原隊(duì)列長度為0,現(xiàn)在加了一個(gè)元素后立即喚醒notEmpty條件if (c == 0)signalNotEmpty();}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;// 加take鎖takeLock.lock();try {// 喚醒notEmpty條件notEmpty.signal();} finally {takeLock.unlock();}}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}private void enqueue(Node<E> node) {// 直接加到last后面last = last.next = node;}public boolean offer(E e) {//用帶過期時(shí)間的說明}public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();//轉(zhuǎn)換為納秒long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//獲取入隊(duì)鎖,支持等待鎖的過程中被中斷putLock.lockInterruptibly();try {//隊(duì)列滿了,再看看有沒有超時(shí)while (count.get() == capacity) {if (nanos <= 0)//等待時(shí)間超時(shí)return false;//進(jìn)行等待,awaitNanos(long nanos)是AQS中的方法//在等待過程中,如果被喚醒或超時(shí),則繼續(xù)當(dāng)前循環(huán)//如果被中斷,則拋出中斷異常nanos = notFull.awaitNanos(nanos);}//進(jìn)入隊(duì)尾enqueue(new Node<E>(e));c = count.getAndIncrement();//說明當(dāng)前元素后面還能再插入一個(gè)//就喚醒一個(gè)入隊(duì)條件隊(duì)列中阻塞的線程if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}//節(jié)點(diǎn)數(shù)量為0,說明隊(duì)列是空的if (c == 0)//喚醒一個(gè)出隊(duì)條件隊(duì)列阻塞的線程signalNotEmpty();return true;}public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {// 如果隊(duì)列無元素,則阻塞在notEmpty條件上while (count.get() == 0) {notEmpty.await();}// 否則,出隊(duì)x = dequeue();// 獲取出隊(duì)前隊(duì)列的長度c = count.getAndDecrement();// 如果取之前隊(duì)列長度大于1,則喚醒notEmptyif (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 如果取之前隊(duì)列長度等于容量// 則喚醒notFullif (c == capacity)signalNotFull();return x;}private E dequeue() {Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {E x = null;int c = -1;long nanos = unit.toNanos(timeout);final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {//隊(duì)列為空且已經(jīng)超時(shí),直接返回空if (nanos <= 0)return null;//等待過程中可能被喚醒,超時(shí),中斷nanos = notEmpty.awaitNanos(nanos);}//進(jìn)行出隊(duì)操作x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}//如果出隊(duì)前,隊(duì)列是滿的,則喚醒一個(gè)被take()阻塞的線程if (c == capacity)signalNotFull();return x;}public E poll() {//}public E peek() {if (count.get() == 0)return null;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {Node<E> first = head.next;if (first == null)return null;elsereturn first.item;} finally {takeLock.unlock();}}void unlink(Node<E> p, Node<E> trail) {// assert isFullyLocked();// p.next is not changed, to allow iterators that are// traversing p to maintain their weak-consistency guarantee.p.item = null;trail.next = p.next;if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signal();}public boolean remove(Object o) {if (o == null) return false;fullyLock();try {for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {if (o.equals(p.item)) {unlink(p, trail);return true;}}return false;} finally {fullyUnlock();}}public boolean contains(Object o) {}static final class LBQSpliterator<E> implements Spliterator<E> {} }LinkedBlockingQueue 與 ArrayBlockingQueue 對比
- ArrayBlockingQueue 入隊(duì)出隊(duì)采用一把鎖,導(dǎo)致入隊(duì)出隊(duì)相互阻塞,效率低下;
- LinkedBlockingQueue 入隊(duì)出隊(duì)采用兩把鎖,入隊(duì)出隊(duì)互不干擾,效率較高;
- 二者都是有界隊(duì)列,如果長度相等且出隊(duì)速度跟不上入隊(duì)速度,都會導(dǎo)致大量線程阻塞;
- LinkedBlockingQueue 如果初始化不傳入初始容量,則使用最大 int 值,如果出隊(duì)速度跟不上入隊(duì)速度,會導(dǎo)致隊(duì)列特別長,占用大量內(nèi)存;
PriorityBlockingQueue
PriorityBlockingQueue 是一個(gè)支持優(yōu)先級的無界阻塞隊(duì)列。(雖說是無界隊(duì)列,但是由于資源耗盡的話,也會OutOfMemoryError,無法添加元素)
默認(rèn)情況下元素采用自然順序升序排列。也可以自定義類實(shí)現(xiàn) compareTo() 方法來指定元素排序規(guī)則,或者初始化 PriorityBlockingQueue 時(shí),指定構(gòu)造參數(shù) Comparator 來對元素進(jìn)行排序。但需要注意的是不能保證同優(yōu)先級元素的順序。PriorityBlockingQueue 是基于最小二叉堆實(shí)現(xiàn),使用基于 CAS 實(shí)現(xiàn)的自旋鎖來控制隊(duì)列的動態(tài)擴(kuò)容,保證了擴(kuò)容操作不會阻塞 take 操作的執(zhí)行。
DelayQueue
DelayQueue 是一個(gè)使用優(yōu)先級隊(duì)列實(shí)現(xiàn)的延遲無界阻塞隊(duì)列。
隊(duì)列使用 PriorityQueue 來實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn) Delayed 接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。我們可以將 DelayQueue 運(yùn)用在以下應(yīng)用場景:
- 緩存系統(tǒng)的設(shè)計(jì):可以用 DelayQueue 保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時(shí),表示緩存有效期到了。
- 定時(shí)任務(wù)調(diào)度。使用 DelayQueue 保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從 DelayQueue 中獲取到任務(wù)就開始執(zhí)行,從比如 Timer 就是使用 DelayQueue 實(shí)現(xiàn)的。
SynchronousQueue
SynchronousQueue 是一個(gè)不存儲元素的阻塞隊(duì)列,也即是單個(gè)元素的隊(duì)列。
每一個(gè) put 操作必須等待一個(gè) take 操作,否則不能繼續(xù)添加元素。SynchronousQueue 可以看成是一個(gè)傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程。隊(duì)列本身并不存儲任何元素,非常適合于傳遞性場景, 比如在一個(gè)線程中使用的數(shù)據(jù),傳遞給另外一個(gè)線程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
Coding
synchronousQueue 是一個(gè)沒有數(shù)據(jù)緩沖的阻塞隊(duì)列,生產(chǎn)者線程對其的插入操作 put() 必須等待消費(fèi)者的移除操作 take(),反過來也一樣。
對應(yīng) peek, contains, clear, isEmpty ... 等方法其實(shí)是無效的。
但是 poll() 和 offer() 就不會阻塞,舉例來說就是 offer 的時(shí)候如果有消費(fèi)者在等待那么就會立馬滿足返回 true,如果沒有就會返回 false,不會等待消費(fèi)者到來。
public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueue<String> queue = new SynchronousQueue<>();//System.out.println(queue.offer("aaa")); //false//System.out.println(queue.poll()); //nullSystem.out.println(queue.add("bbb")); //IllegalStateException: Queue fullnew Thread(()->{try {System.out.println("Thread 1 put a");queue.put("a");System.out.println("Thread 1 put b");queue.put("b");System.out.println("Thread 1 put c");queue.put("c");} catch (InterruptedException e) {e.printStackTrace();}}).start();new Thread(()->{try {TimeUnit.SECONDS.sleep(2);System.out.println("Thread 2 get:"+queue.take());TimeUnit.SECONDS.sleep(2);System.out.println("Thread 2 get:"+queue.take());TimeUnit.SECONDS.sleep(2);System.out.println("Thread 2 get:"+queue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();} } Thread 1 put a Thread 2 get:a Thread 1 put b Thread 2 get:b Thread 1 put c Thread 2 get:c源碼解讀
不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞隊(duì)列依賴AQS實(shí)現(xiàn)并發(fā)操作,SynchronousQueue直接使用CAS實(shí)現(xiàn)線程的安全訪問。
synchronousQueue 提供了兩個(gè)構(gòu)造器(公平與否),內(nèi)部是通過 Transferer 來實(shí)現(xiàn)的,具體分為兩個(gè)Transferer,分別是 TransferStack 和 TransferQueue。
TransferStack:非公平競爭模式使用的數(shù)據(jù)結(jié)構(gòu)是后進(jìn)先出棧(LIFO Stack)
TransferQueue:公平競爭模式則使用先進(jìn)先出隊(duì)列(FIFO Queue)
性能上兩者是相當(dāng)?shù)?#xff0c;一般情況下,FIFO 通常可以支持更大的吞吐量,但 LIFO 可以更大程度的保持線程的本地化。
private transient volatile Transferer<E> transferer;public SynchronousQueue() {this(false); }public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }分析 TransferQueue 的實(shí)現(xiàn)
//構(gòu)造函數(shù)中會初始化一個(gè)出隊(duì)的節(jié)點(diǎn),并且首尾都指向這個(gè)節(jié)點(diǎn) TransferQueue() {QNode h = new QNode(null, false); // initialize to dummy node.head = h;tail = h; } //隊(duì)列節(jié)點(diǎn), static final class QNode {volatile QNode next; // next node in queuevolatile Object item; // CAS'ed to or from nullvolatile Thread waiter; // to control park/unparkfinal boolean isData;QNode(Object item, boolean isData) {this.item = item;this.isData = isData;}// 設(shè)置next和item的值,用于進(jìn)行并發(fā)更新, cas 無鎖操作boolean casNext(QNode cmp, QNode val) {return next == cmp &&UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}boolean casItem(Object cmp, Object val) {return item == cmp &&UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}void tryCancel(Object cmp) {UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);}boolean isCancelled() {return item == this;}boolean isOffList() {return next == this;}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long itemOffset;private static final long nextOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = QNode.class;itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));} catch (Exception e) {throw new Error(e);}} }從 put() 方法和 take() 方法可以看出最終調(diào)用的都是 TransferQueue 的 transfer() 方法。
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();} }public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException(); }//transfer方法用于提交數(shù)據(jù)或者是獲取數(shù)據(jù) E transfer(E e, boolean timed, long nanos) {QNode s = null; // constructed/reused as needed//如果e不為null,就說明是添加數(shù)據(jù)的入隊(duì)操作boolean isData = (e != null);for (;;) {QNode t = tail;QNode h = head;if (t == null || h == null) // saw uninitialized valuecontinue; // spin//如果當(dāng)前操作和 tail 節(jié)點(diǎn)的操作是一樣的;或者頭尾相同(表明隊(duì)列中啥都沒有)。if (h == t || t.isData == isData) { // empty or same-modeQNode tn = t.next;// 如果 t 和 tail 不一樣,說明,tail 被其他的線程改了,重來if (t != tail) // inconsistent readcontinue;// 如果 tail 的 next 不是空。就需要將 next 追加到 tail 后面了if (tn != null) { // lagging tail// 使用 CAS 將 tail.next 變成 tail,advanceTail(t, tn);continue;}// 時(shí)間到了,不等待,返回 null,插入失敗,獲取也是失敗的if (timed && nanos <= 0) // can't waitreturn null;if (s == null)s = new QNode(e, isData);if (!t.casNext(null, s)) // failed to link incontinue;advanceTail(t, s); // swing tail and waitObject x = awaitFulfill(s, e, timed, nanos);if (x == s) { // wait was cancelledclean(t, s);return null;}if (!s.isOffList()) { // not already unlinkedadvanceHead(t, s); // unlink if headif (x != null) // and forget fieldss.item = s;s.waiter = null;}return (x != null) ? (E)x : e;} else { // complementary-modeQNode m = h.next; // node to fulfillif (t != tail || m == null || h != head)continue; // inconsistent readObject x = m.item;if (isData == (x != null) || // m already fulfilledx == m || // m cancelled!m.casItem(x, e)) { // lost CASadvanceHead(h, m); // dequeue and retrycontinue;}advanceHead(h, m); // successfully fulfilledLockSupport.unpark(m.waiter);return (x != null) ? (E)x : e;}} }LinkedTransferQueue
LinkedTransferQueue 是一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞 TransferQueue 隊(duì)列。
LinkedTransferQueue采用一種預(yù)占模式。意思就是消費(fèi)者線程取元素時(shí),如果隊(duì)列不為空,則直接取走數(shù)據(jù),若隊(duì)列為空,那就生成一個(gè)節(jié)點(diǎn)(節(jié)點(diǎn)元素為null)入隊(duì),然后消費(fèi)者線程被等待在這個(gè)節(jié)點(diǎn)上,后面生產(chǎn)者線程入隊(duì)時(shí)發(fā)現(xiàn)有一個(gè)元素為null的節(jié)點(diǎn),生產(chǎn)者線程就不入隊(duì)了,直接就將元素填充到該節(jié)點(diǎn),并喚醒該節(jié)點(diǎn)等待的線程,被喚醒的消費(fèi)者線程取走元素,從調(diào)用的方法返回。我們稱這種節(jié)點(diǎn)操作為“匹配”方式。
隊(duì)列實(shí)現(xiàn)了 TransferQueue 接口重寫了 tryTransfer 和 transfer 方法,這組方法和 SynchronousQueue 公平模式的隊(duì)列類似,具有匹配的功能
LinkedBlockingDeque
LinkedBlockingDeque 是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素。雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競爭。相比其他的阻塞隊(duì)列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結(jié)尾的方法,表示插入,獲取(peek)或移除雙端隊(duì)列的第一個(gè)元素。以 Last 單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊(duì)列的最后一個(gè)元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。
在初始化 LinkedBlockingDeque 時(shí)可以設(shè)置容量防止其過渡膨脹,默認(rèn)容量也是 Integer.MAX_VALUE。另外雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中。
阻塞隊(duì)列使用場景
我們常用的生產(chǎn)者消費(fèi)者模式就可以基于阻塞隊(duì)列實(shí)現(xiàn);
線程池中活躍線程數(shù)達(dá)到 corePoolSize 時(shí),線程池將會將后續(xù)的 task 提交到 BlockingQueue 中;
生產(chǎn)者消費(fèi)者模式
JDK API文檔的 BlockingQueue 給出了一個(gè)典型的應(yīng)用
面試題:一個(gè)初始值為 0 的變量,兩個(gè)線程對齊交替操作,一個(gè)+1,一個(gè)-1,5 輪
public class ProdCounsume_TraditionDemo {public static void main(String[] args) {ShareData shareData = new ShareData();new Thread(() -> {for (int i = 0; i <= 5; i++) {shareData.increment();}}, "T1").start();new Thread(() -> {for (int i = 0; i <= 5; i++) {shareData.decrement();}}, "T1").start();} }//線程操作資源類 class ShareData {private int num = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void increment() {lock.lock();try {while (num != 0) {//等待,不能生產(chǎn)condition.await();}//干活num++;System.out.println(Thread.currentThread().getName() + "\t" + num);//喚醒condition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void decrement() {lock.lock();try {while (num == 0) {//等待,不能生產(chǎn)condition.await();}//干活num--;System.out.println(Thread.currentThread().getName() + "\t" + num);//喚醒condition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}} }線程池
線程池的核心方法 ThreadPoolExecutor,用 BlockingQueue 存放任務(wù)的阻塞隊(duì)列,被提交但尚未被執(zhí)行的任務(wù)
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)線程池在內(nèi)部實(shí)際也是構(gòu)建了一個(gè)生產(chǎn)者消費(fèi)者模型,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的緩沖任務(wù),復(fù)用線程。
不同的線程池實(shí)現(xiàn)用的是不同的阻塞隊(duì)列,newFixedThreadPool 和 newSingleThreadExecutor 用的是LinkedBlockingQueue,newCachedThreadPool 用的是 SynchronousQueue。
文章持續(xù)更新,可以微信搜「 JavaKeeper 」第一時(shí)間閱讀,無套路領(lǐng)取 500+ 本電子書和 30+ 視頻教學(xué)和源碼,本文 GitHub github.com/JavaKeeper 已經(jīng)收錄,Javaer 開發(fā)、面試必備技能兵器譜,有你想要的。
參考與感謝
Home - 廖雪峰的官方網(wǎng)站
SynchronousQueue源碼 并發(fā)編程之 SynchronousQueue 核心源碼分析 - 掘金
總結(jié)
以上是生活随笔為你收集整理的转:Java 7 种阻塞队列详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dly电脑是什么意思(dly笔记本电脑)
- 下一篇: hashmap应用场景_工作中常用到的J