日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Queue —— JUC 的豪华队列组件

發布時間:2025/3/12 编程问答 15 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Queue —— JUC 的豪华队列组件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 引言
  • 一、Queue 的繼承關系
    • 1.1 Queue 定義基礎操作
    • 1.2 AbstractQueue 為子類減負
    • 1.3 BlockingQueue 阻塞式Queue
    • 1.4 Deque 兩頭進出
  • 二、Queue 的重要實現
  • 三、BlockingQueue 的實現原理
  • 四、Queue 在生產者消費者模式中的應用
  • 五、Queue 在線程池中的應用
  • 六、ConcurrentLinkedQueue
  • 總結

引言

Queue 是Collection 接口下的另一個重要接口。
常常作為生產者/消費者模式線程池任務隊列等基礎組件的形式存在。
JUC中提供了豐富的Queue組件,如 ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue、SynchronousQueue 等等。
學習 Queue 家族對于理解生產者消費者模式、并發、線程池等至關重要。
本篇博客總結 Queue 家族的接口、類,和一些重要的實現,及其部分底層邏輯。

一、Queue 的繼承關系

Queue 接口下面幾個重要的抽象有 AbstractQueue、BlockingQueue、Deque:

1.1 Queue 定義基礎操作

Queue 接口定義了 6 個基本方法:

失敗拋異常返回特殊值
添加add()offer()
移除remove()poll()
查看element()peek()

1.2 AbstractQueue 為子類減負

AbstractQueue 實現了 Queue 中的 add()、remove()、element(),為它們添加了失敗時拋異常的邏輯:

同時它還實現了 Collection 接口中的一些公共方法,如 clear()、addAll() 等。
大多數常用 Queue 都實現了這個抽象類。
它的作用是為子類填補常規集合所需要的添加、刪除操作的邏輯,讓子類更加專注于某種特性的實現,為子類減負。 由于這些方法是集合通用API,因此提升到抽象類中來實現,
在實際使用子類Queue的時候,這些方法往往不常使用。

1.3 BlockingQueue 阻塞式Queue

BlockingQueue 毫不意外的繼承了 Queue 接口。
并在 Queue 的基礎之上,擴展了兩個新的阻塞式方法:

void put(E e);E take();

又是一對添加和取出的組合方法,這兩個方法要求子類必須以阻塞的方式放入和取出元素,所謂阻塞式行為就是由于隊列中的容量限制等因素,導致在隊列已滿或為空時,無法再繼續添加或取出的時候,線程必須等待不返回,直到條件滿足完成操作。

舉個例子
去飯店吃午飯,但是廚師沒有做好的飯菜(隊列為空),你可以選擇離開去別的飯店(非阻塞直接返回),也可以選擇留下來,等待廚師把飯菜做好,吃完再走(阻塞直到成功)。

在 Queue 中,offer() 和 poll() 都是非阻塞式的方法,put() 和 take() 是阻塞式的方法,我們也可以理解為 put() 就是阻塞式的 offer() ,take() 就是阻塞式的 poll()。

1.4 Deque 兩頭進出

經典的隊列結構是 FIFO,即先進先出,一般都是“尾進頭出”。在一些常規的 Queue 組件的源碼中也經常看到會 setTail 這樣的操作。
為了滿足更豐富的應用場景,Queue 家族引入了 Deque 抽象。
Deque 意為 “雙端隊列”,可以在一端進也可以在這端出:

值得一提的是,對于經典數據結構中的 Stack 棧結構,Deque 也提供了相應的方法:

void push(E e);E pop();

二、Queue 的重要實現

Queue 的重要實現有以下這些:

特點實現描述
面向并發ConcurrentLinkedQueue基于CAS的并發線程安全的隊列容器
阻塞式ArrayBlockingQueue、LinkedBlockingQueue、DelayQueueLinkedBQ 雖然是用鏈表實現的,但依然是有界隊列,最大值為 Integer.maxValue,DelayQueue 可以在任務提交后延遲執行
雙端隊列LinkedList從名字完全看不出是隊列的雙端隊列
交換隊列TransferQueue(阻塞式)TransferQueue是一個接口,但它只有一個實現:LinkedTransferQueue
同步隊列SynchronousQueue(阻塞式)不存儲元素的隊列,手遞手傳遞元素

這些隊列往往都會繼承 AbstractQueue,同時又兼具其他特性,如 阻塞、延遲 等等。

三、BlockingQueue 的實現原理

凡是支持阻塞操作的隊列,都繼承自 BlockingQueue,是線程安全的隊列容器。
在 ArrayBlockingQueue 和 LinkedBlockingQueue 中,都用到了顯式鎖 Lock 和條件隊列 Condition。
在《Java 并發編程實戰》一書中的 “14.3 顯式的 Condition 對象” 中也舉例了類似的有界緩存代碼。
Condition 的原理是將基于不同的前提條件的依賴性操作區分在多個條件等待隊列中,基于不同條件等待和喚醒

以 ArrayBlockingQueue 為例,當執行 put 或 take 的時候,首先需要拿到主鎖 lock。
緊接著就是校驗前置條件:count == items.length 或 count == 0,如果條件不滿足,就執行相應 Condition.await。
如果條件滿足,那么執行入隊或出隊方法——enqueue(E) 或 dequeue:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 監控所有訪問的主鎖 */final ReentrantLock lock;/** 等待 take 操作的 Condition */private final Condition notEmpty;/** 等待 put 操作的 Condition */private final Condition notFull;/*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.* 插入一個指定元素到隊尾,如果隊列滿,則等待可用空間 */public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}/*** 從隊首提取一個元素,如果隊列空,則等待隊列放入元素*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}/*** Inserts element at current put position, advances, and signals.* 插入一個元素到當前的 putIndex,然后指針進一,執行喚醒* Call only when holding lock.* 僅當持鎖才可調用*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}/*** Extracts element at current take position, advances, and signals.* 從當前 takeIndex 位置提取一個元素,指針進一,執行喚醒* Call only when holding lock.* 僅當持鎖才可調用*/private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;} }

四、Queue 在生產者消費者模式中的應用

在沒有阻塞隊列的情況下,如果想要實現生產者-消費者,必須自行處理生產者的狀態切換和消費者的狀態切換。

而 BlockingQueue 的實現中已經集成了 Condition 和 Lock 等一系列生產者-消費者模式所必須的關鍵元素,讓代碼變得異常簡潔。
不需要考慮生產者或消費者的線程狀態,只要調用了阻塞隊列的 put 和 take,BlockingQueue 會自動為我們維護線程狀態的切換

以下代碼是使用 ArrayBlockingQueue(5) 實現的生產者消費者模式。

producer 線程負責生產任務,consumer 線程負責消費任務,隊列中只能容納 5 個元素。stateMonitor 線程用于監控任務隊列的滿、空情況,并打印 producer 和 consumer 線程的狀態。

通過調整 producer 和 consumer 線程的工作快慢程度,可以很清晰的感受到 BlockingQueue 的強大!

提示:由于程序中為了模擬生產或消費動作需要一定時間,使用了 sleep ,“動作更快”的線程會打印出 TIMED_WAITING ,它并不是 BlockingQueue 處理成的線程狀態,直接忽略即可。

/*** 使用 BlockingQueue 實現生產者消費者模式** @author 圣斗士Morty* @data 2021/6/27 13:29*/ public class T08_ProducerConsumerMode {/* 調換producer和consumer線程的快慢程度,觀察執行結果*/static int slow = 5, fast = 2;public static void main(String[] args) {// 可容納 5 個任務的有界阻塞隊列BlockingQueue<String> taskQueue = new ArrayBlockingQueue<>(5);// 生產者線程Thread producer = new Thread(() -> {for (int i = 0; ; i++) {try {TimeUnit.SECONDS.sleep(fast);System.out.println("正在生產'任務" + i + "'...");taskQueue.put("任務" + i);} catch (InterruptedException e) {e.printStackTrace();}}});// 消費者線程Thread concumer = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(slow);String task = taskQueue.take();System.out.println("正在消費'" + task + "'...");} catch (InterruptedException e) {e.printStackTrace();}}});// 狀態監控線程Thread stateMonitor = new Thread(() -> {for (; ; ) {try {TimeUnit.SECONDS.sleep(1);if (taskQueue.isEmpty())System.out.println("消費者狀態:" + concumer.getState());if (taskQueue.size() == 5)System.out.println("生產者狀態:" + producer.getState());} catch (InterruptedException e) {e.printStackTrace();}}});stateMonitor.start();producer.start();concumer.start();} }

五、Queue 在線程池中的應用

這里簡單帶一下隊列在線程池中的應用。
juc 中提供了四種常用的線程池實現:

Executors.newCachedThreadPool(); Executors.newFixedThreadPool(10); Executors.newSingleThreadExecutor(); Executors.newScheduledThreadPool(10);

Executors 是類似Collections的工具方法,以上四個方法返回一個 ExecutorService 對象,實際上是它的一個子類 ThreadPoolExecutor 和 ScheduledExecutorService。
以 cachedThreadPool 和 fixedThreadPool 為例:

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());} public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

它們直接返回固定構造結果的 ThreadPoolExecutor,構造器如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}

ThreadPoolExecutor 的構造器如上所示,它有 7 個參數:

1、corePoolSize :核心線程數,隨線程池永遠存活的線程數量
2、maximumPoolSize:最大線程數
3、keepAliveTime:空閑時存活時間
4、TimeUnit :存活時間單位,3、4兩個參數共同決定非核心線程可以空閑多久回收給操作系統
5、workQueue:一個 BlockingQueue類型的任務隊列。
6、threadFactory:線程工廠,定義線程的創建方式
7、RejectedExecutionHandler:拒絕策略,當線程池已滿時應該執行怎樣的拒絕策略

cachedThreadPool 使用 SynchronousQueue 作為任務隊列,SynchronousQueue是手遞手式的隊列,即放入的元素都必須直接交給消費者。
通過這樣一個隊列,任何任務提交到 cachedThreadPool 之后,都會立即被消費,如果任務足夠多,可能會打到 Integer.MAX_VALUE 的級別,所以在實際開發中,一定要清楚的知道業務中的任務規模才能很好的使用。
fixedThreadPool 使用 LinkedBlockingQueue 作為任務隊列的實現,這個線程池能夠維持一個固定任務數的線程池。但是為什么不使用 ArrayBlockingQueue 呢?

六、ConcurrentLinkedQueue

ConcurrentLinkedQueue 是針對并發場景下提供的一種線程安全的隊列容器,繼承 AbstractQueue,同時實現了 Queue。
線程安全的非阻塞隊列,沒有支持阻塞的 put 和 take 方法。
內部維護了 Node 鏈表結構。
offer()、poll()等方法都采用了CAS的方式,保證了線程安全性,同時也兼顧了性能:

public boolean offer(E e) {// 非空校驗checkNotNull(e);final Node<E> newNode = new Node<E>(e);// 取得 tail 節點,同時令 p 指向它,然后循環CASfor (Node<E> t = tail, p = t;;) {// 取得 tail 的next節點Node<E> q = p.next;// 拿到 tail 的瞬間可能會有其他線程設置了新的 tail,需要判斷 tail 后面是否為 nullif (q == null) {// 如果 q 為 null,說明 p 確實是 tail,此時將新的節點 CAS 到 tail 的后面if (p.casNext(null, newNode)) {// p 指向新的節點if (p != t) // CAS 設置新的 tailcasTail(t, newNode);return true;}// Lost CAS race to another thread; re-read next}else if (p == q)// We have fallen off list. If tail is unchanged, it// will also be off-list, in which case we need to// jump to head, from which all live nodes are always// reachable. Else the new tail is a better bet.p = (t != (t = tail)) ? t : head;else// Check for tail updates after two hops.p = (p != t && t != (t = tail)) ? t : q;}} public E poll() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;if (item != null && p.casItem(item, null)) {// Successful CAS is the linearization point// for item to be removed from this queue.if (p != h) // hop two nodes at a timeupdateHead(h, ((q = p.next) != null) ? q : p);return item;}else if ((q = p.next) == null) {updateHead(h, p);return null;}else if (p == q)continue restartFromHead;elsep = q;}}}

總結

了解 Queue 的類圖結構非常關鍵,重點記憶 BlockingQueue 的結構和擴展的兩個方法 put() 、take(),這是阻塞式API的關鍵:

失敗拋異常返回特殊值阻塞
添加add()offer()put()
取出remove()poll()take()
查看element()peek()

以上表格是Queue api中的重點,需要熟記。
常見的實現隊列:

特點實現描述
面向并發ConcurrentLinkedQueue基于CAS的并發線程安全的隊列容器
阻塞式ArrayBlockingQueue、LinkedBlockingQueue、DelayQueueLinkedBQ 雖然是用鏈表實現的,但依然是有界隊列,最大值為 Integer.maxValue,DelayQueue 可以在任務提交后延遲執行
雙端隊列LinkedList從名字完全看不出是隊列的雙端隊列
交換隊列TransferQueue(阻塞式)TransferQueue是一個接口,但它只有一個實現:LinkedTransferQueue
同步隊列SynchronousQueue(阻塞式)不存儲元素的隊列,手遞手傳遞元素

常見的四種線程池:

線程池任務隊列
Executors.newCachedThreadPool();SynchronousQueue
Executors.newFixedThreadPool(10);LinkedBlockingQueue
Executors.newSingleThreadExecutor();LinkedBlockingQueue
Executors.newScheduledThreadPool(10);DelayedWorkQueue

ConcurrentLinkedQueue 是非阻塞并發隊列,它的 CAS操作,offer 為例:

public boolean offer(E e) {// 非空校驗checkNotNull(e);final Node<E> newNode = new Node<E>(e);// 取得 tail 節點,同時令 p 指向它,然后循環CASfor (Node<E> t = tail, p = t;;) {// 取得 tail 的next節點Node<E> q = p.next;// 拿到 tail 的瞬間可能會有其他線程設置了新的 tail,需要判斷 tail 后面是否為 nullif (q == null) {// 如果 q 為 null,說明 p 確實是 tail,此時將新的節點 CAS 到 tail 的后面if (p.casNext(null, newNode)) {// p 指向新的節點if (p != t) // CAS 設置新的 tailcasTail(t, newNode);return true;}// Lost CAS race to another thread; re-read next}// ... 其他邏輯}}

總結

以上是生活随笔為你收集整理的Queue —— JUC 的豪华队列组件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。