日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)

發布時間:2024/10/12 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、前言

  在完成Map下的并發集合后,現在來分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一個阻塞型隊列,支持多任務并發操作,有了之前看源碼的積累,再看ArrayBlockingQueue源碼會很容易,下面開始正文。

二、ArrayBlockingQueue數據結構

  通過源碼分析,并且可以對比ArrayList可知,ArrayBlockingQueue的底層數據結構是數組,數據結構如下

  說明:ArrayBlockingQueue底層采用數據才存放數據,對數組的訪問添加了鎖的機制,使其能夠支持多線程并發。

三、ArrayBlockingQueue源碼分析

  3.1 類的繼承關系  

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {}

  說明:可以看到ArrayBlockingQueue繼承了AbstractQueue抽象類,AbstractQueue定義了對隊列的基本操作;同時實現了BlockingQueue接口,BlockingQueue表示阻塞型的隊列,其對隊列的操作可能會拋出異常;同時也實現了Searializable接口,表示可以被序列化。

  3.2 類的屬性  

public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 版本序列號private static final long serialVersionUID = -817911632652898426L;// 存放實際元素的數組final Object[] items;// 取元素索引int takeIndex;// 獲取元素索引int putIndex;// 隊列中的項int count;// 可重入鎖final ReentrantLock lock;// 等待獲取條件private final Condition notEmpty;// 等待存放條件private final Condition notFull;// 迭代器transient Itrs itrs = null; } View Code

  說明:從類的屬性中可以清楚的看到其底層的結構是Object類型的數組,取元素和存元素有不同的索引,有一個可重入鎖ReentrantLock,兩個條件Condition。對ReentrantLock和Condition不太熟悉的讀者可以參考筆者的這篇博客,【JUC】JDK1.8源碼分析之ReentrantLock(三)。

  3.3 類的構造函數

  1. ArrayBlockingQueue(int)型構造函數 

public ArrayBlockingQueue(int capacity) {// 調用兩個參數的構造函數this(capacity, false);} View Code

  說明:該構造函數用于創建一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。

  2. ArrayBlockingQueue(int, boolean)型構造函數  

public ArrayBlockingQueue(int capacity, boolean fair) {// 初始容量必須大于0if (capacity <= 0)throw new IllegalArgumentException();// 初始化數組this.items = new Object[capacity];// 初始化可重入鎖lock = new ReentrantLock(fair);// 初始化等待條件notEmpty = lock.newCondition();notFull = lock.newCondition();} View Code

  說明:該構造函數用于創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。

  3. ArrayBlockingQueue(int, boolean, Collection<? extends E>)型構造函數 

public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {// 調用兩個參數的構造函數this(capacity, fair);// 可重入鎖final ReentrantLock lock = this.lock;// 上鎖lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) { // 遍歷集合// 檢查元素是否為空 checkNotNull(e);// 存入ArrayBlockingQueue中items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) { // 當初始化容量小于傳入集合的大小時,會拋出異常throw new IllegalArgumentException();}// 元素數量count = i;// 初始化存元素的索引putIndex = (i == capacity) ? 0 : i;} finally {// 釋放鎖 lock.unlock();}} View Code

  說明:該構造函數用于創建一個具有給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,并以 collection 迭代器的遍歷順序添加元素。

  3.4 核心函數分析

  1. put函數  

public void put(E e) throws InterruptedException {checkNotNull(e);// 獲取可重入鎖final ReentrantLock lock = this.lock;// 如果當前線程未被中斷,則獲取鎖 lock.lockInterruptibly();try {while (count == items.length) // 判斷元素是否已滿// 若滿,則等待 notFull.await();// 入隊列 enqueue(e);} finally {// 釋放鎖 lock.unlock();}} View Code

  說明:put函數用于存放元素,在當前線程被中斷時會拋出異常,并且當隊列已經滿時,會阻塞一直等待。其中,put會調用enqueue函數,enqueue函數源碼如下  

private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;// 獲取數組final Object[] items = this.items;// 將元素放入items[putIndex] = x;if (++putIndex == items.length) // 放入后存元素的索引等于數組長度(表示已滿)// 重置存索引為0putIndex = 0;// 元素數量加1count++;// 喚醒在notEmpty條件上等待的線程 notEmpty.signal();} View Code

  說明:enqueue函數用于將元素存入底層Object數組中,并且會喚醒等待notEmpty條件的線程。

  2. offer函數  

public boolean offer(E e) {// 檢查元素不能為空 checkNotNull(e);// 可重入鎖final ReentrantLock lock = this.lock;// 獲取鎖 lock.lock();try {if (count == items.length) // 元素個數等于數組長度,則返回return false; else { // 添加進數組 enqueue(e);return true;}} finally {// 釋放數組 lock.unlock();}} View Code

  說明:offer函數也用于存放元素,在調用ArrayBlockingQueue的add方法時,會間接的調用到offer函數,offer函數添加元素不會拋出異常,當底層Object數組已滿時,則返回false,否則,會調用enqueue函數,將元素存入底層Object數組。并喚醒等待notEmpty條件的線程。

  3. take函數  

public E take() throws InterruptedException {// 可重入鎖final ReentrantLock lock = this.lock;// 如果當前線程未被中斷,則獲取鎖,中斷會拋出異常 lock.lockInterruptibly();try {while (count == 0) // 元素數量為0,即Object數組為空// 則等待notEmpty條件 notEmpty.await();// 出隊列return dequeue();} finally {// 釋放鎖 lock.unlock();}} View Code

  說明:take函數用于從ArrayBlockingQueue中獲取一個元素,其與put函數相對應,在當前線程被中斷時會拋出異常,并且當隊列為空時,會阻塞一直等待。其中,take會調用dequeue函數,dequeue函數源碼如下  

private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")// 取元素E x = (E) items[takeIndex];// 該索引的值賦值為nullitems[takeIndex] = null;// 取值索引等于數組長度if (++takeIndex == items.length)// 重新賦值取值索引takeIndex = 0;// 元素個數減1count--;if (itrs != null) itrs.elementDequeued();// 喚醒在notFull條件上等待的線程 notFull.signal();return x;} View Code

  說明:dequeue函數用于將取元素,并且會喚醒等待notFull條件的線程。

  4. poll函數  

public E poll() {// 重入鎖final ReentrantLock lock = this.lock;// 獲取鎖 lock.lock();try {// 若元素個數為0則返回null,否則,調用dequeue,出隊列return (count == 0) ? null : dequeue();} finally {// 釋放鎖 lock.unlock();}} View Code

  說明:poll函數用于獲取元素,其與offer函數相對應,不會拋出異常,當元素個數為0是,返回null,否則,調用dequeue函數,并喚醒等待notFull條件的線程。并返回。

  5. clear函數  

public void clear() {// 數組final Object[] items = this.items;// 可重入鎖final ReentrantLock lock = this.lock;// 獲取鎖 lock.lock();try {// 保存元素個數int k = count;if (k > 0) { // 元素個數大于0// 存數元素索引final int putIndex = this.putIndex;// 取元素索引int i = takeIndex;do {// 賦值為nullitems[i] = null;if (++i == items.length) // 重新賦值ii = 0;} while (i != putIndex);// 重新賦值取元素索引takeIndex = putIndex;// 元素個數為0count = 0;if (itrs != null)itrs.queueIsEmpty();for (; k > 0 && lock.hasWaiters(notFull); k--) // 若有等待notFull條件的線程,則逐一喚醒 notFull.signal();}} finally {// 釋放鎖 lock.unlock();}} View Code

  說明:clear函數用于清空ArrayBlockingQueue,并且會釋放所有等待notFull條件的線程(存放元素的線程)。

四、示例

  下面給出一個具體的示例來演示ArrayBlockingQueue的使用  

package com.hust.grid.leesf.collections;import java.util.concurrent.ArrayBlockingQueue;class PutThread extends Thread {private ArrayBlockingQueue<Integer> abq;public PutThread(ArrayBlockingQueue<Integer> abq) {this.abq = abq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("put " + i);abq.put(i);Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}} }class GetThread extends Thread {private ArrayBlockingQueue<Integer> abq;public GetThread(ArrayBlockingQueue<Integer> abq) {this.abq = abq;}public void run() {for (int i = 0; i < 10; i++) {try {System.out.println("take " + abq.take());Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}} }public class ArrayBlockingQueueDemo {public static void main(String[] args) {ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<Integer>(10);PutThread p1 = new PutThread(abq);GetThread g1 = new GetThread(abq);p1.start();g1.start();} } View Code

  運行結果:  

put 0 take 0 put 1 take 1 put 2 take 2 put 3 take 3 put 4 take 4 put 5 take 5 put 6 take 6 put 7 take 7 put 8 take 8 put 9 take 9 View Code

  說明:示例中使用了兩個線程,一個用于存元素,一個用于讀元素,存和讀各10次,每個線程存一個元素或者讀一個元素后都會休眠100ms,可以看到結果是交替打印,并且首先打印的肯定是put線程語句(因為若取線程先取元素,此時隊列并沒有元素,其會阻塞,等待存線程存入元素),并且最終程序可以正常結束。

  ① 若修改取元素線程,將存的元素的次數修改為15次(for循環的結束條件改為15即可),運行結果如下:  

put 0 take 0 put 1 take 1 put 2 take 2 put 3 take 3 put 4 take 4 put 5 take 5 put 6 take 6 put 7 take 7 put 8 take 8 put 9 take 9 View Code

  說明:運行結果與上面的運行結果相同,但是,此時程序無法正常結束,因為take方法被阻塞了,等待被喚醒。

五、總結

  總的來說,有了前面分析的基礎,分析ArrayBlockingQueue就會非常的簡單,ArrayBlockingQueue是通過ReentrantLock和Condition條件來保證多線程的正確訪問的。ArrayBockingQueue的分析就到這里,歡迎交流,謝謝各位園友的觀看~

轉載于:https://www.cnblogs.com/leesf456/p/5533770.html

總結

以上是生活随笔為你收集整理的【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。