日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

Java Review - 并发编程_ArrayBlockingQueue原理源码剖析

發布時間:2025/3/21 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java Review - 并发编程_ArrayBlockingQueue原理源码剖析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 概述
  • 類圖結構
  • 構造函數
  • 主要方法源碼解析
    • offer操作
    • put操作
    • poll操作
    • take操作
    • peek操作
    • size
  • 小結


概述

Java Review - 并發編程_LinkedBlockingQueue原理&源碼剖析
介紹了使用有界鏈表方式實現的阻塞隊列LinkedBlockingQueue,這里我們繼續來研究使用有界數組方式實現的阻塞隊列ArrayBlockingQueue的原理。


類圖結構

由該圖可以看出,ArrayBlockingQueue

  • 內部有一個數組items,用來存放隊列元素
  • putindex變量表示入隊元素下標
  • takeIndex是出隊下標
  • count統計隊列元素個數

從定義可知,這些變量并沒有使用volatile修飾,這是因為訪問這些變量都是在鎖塊內,而加鎖已經保證了鎖塊內變量的內存可見性了。

另外有個獨占鎖lock用來保證出、入隊操作的原子性,這保證了同時只有一個線程可以進行入隊、出隊操作。

另外,notEmpty、notFull條件變量用來進行出、入隊的同步。


構造函數

ArrayBlockingQueue是有界隊列,所以構造函數必須傳入隊列大小參數。

構造函數的代碼如下。

public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}

由以上代碼可知,在默認情況下使用ReentrantLock提供的非公平獨占鎖進行出、入隊操作的同步。


主要方法源碼解析

研究過LinkedBlockingQueue的實現后再看ArrayBlockingQueue的實現會感覺后者簡單了很多

offer操作

向隊列尾部插入一個元素,如果隊列有空閑空間則插入成功后返回true,如果隊列已滿則丟棄當前元素然后返回false。

如果e元素為null則拋出NullPointerException異常。

另外,該方法是不阻塞的。

public boolean offer(E e) {// 1 checkNotNull(e);// 2 final ReentrantLock lock = this.lock;lock.lock();try {// 3 if (count == items.length)return false;else {// 4 enqueue(e);return true;}} finally {lock.unlock();}}
  • 代碼(1) 如果e元素為null則拋出NullPointerException異常
  • 代碼(2)獲取獨占鎖,當前線程獲取該鎖后,其他入隊和出隊操作的線程都會被阻塞掛起而后被放入lock鎖的AQS阻塞隊列。
  • 代碼(3)判斷如果隊列滿則直接返回false,否則調用enqueue方法后返回true,enqueue的代碼如下
/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;// 6 元素入隊final Object[] items = this.items;items[putIndex] = x;// 7 計算下一個元素應該存放的下標位置if (++putIndex == items.length)putIndex = 0;count++;// 8 notEmpty.signal();}

如上代碼首先把當前元素放入items數組,然后計算下一個元素應該存放的下標位置,并遞增元素個數計數器,最后激活notEmpty的條件隊列中因為調用take操作而被阻塞的一個線程。

這里由于在操作共享變量count前加了鎖,所以不存在內存不可見問題,加過鎖后獲取的共享變量都是從主內存獲取的,而不是從CPU緩存或者寄存器獲取。

代碼(5)釋放鎖,然后會把修改的共享變量值(比如count的值)刷新回主內存中,這樣其他線程通過加鎖再次讀取這些共享變量時,就可以看到最新的值。


put操作

向隊列尾部插入一個元素,如果隊列有空閑則插入后直接返回true,如果隊列已滿則阻塞當前線程直到隊列有空閑并插入成功后返回true,如果在阻塞時被其他線程設置了中斷標志,則被阻塞線程會拋出InterruptedException異常而返回

另外,如果e元素為null則拋出NullPointerException異常。

public void put(E e) throws InterruptedException {// 1 checkNotNull(e);final ReentrantLock lock = this.lock;// 2 獲取鎖 可被中斷lock.lockInterruptibly();try {// 3 如果隊列滿,這把當前下層放入notFull管理的條件隊列while (count == items.length)notFull.await();// 4 插入隊列 enqueue(e);} finally {// 5 lock.unlock();}}
  • 在代碼(2)中,在獲取鎖的過程中當前線程被其他線程中斷了,則當前線程會拋出InterruptedException異常而退出。

  • 代碼(3)判斷如果當前隊列已滿,則把當前線程阻塞掛起后放入notFull的條件隊列,注意這里也是使用了while循環而不是if語句。\

  • 代碼(4)判斷如果隊列不滿則插入當前元素,此處不再贅述。


poll操作

從隊列頭部獲取并移除一個元素,如果隊列為空則返回null,該方法是不阻塞的

public E poll() {// 1 final ReentrantLock lock = this.lock;lock.lock();try {// 2 return (count == 0) ? null : dequeue();} finally {lock.unlock();}}
  • 代碼(1)獲取獨占鎖。

  • 代碼(2)判斷如果隊列為空則返回null,否則調用dequeue()方法

private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")// 4 獲取元素E x = (E) items[takeIndex];// 5 數組中的值為null items[takeIndex] = null;// 6 對頭指針計算,隊列元素個數減一if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 7 發送信號激活notFull條件隊列中的一個線程notFull.signal();return x;}

由以上代碼可知,首先獲取當前隊頭元素并將其保存到局部變量,然后重置隊頭元素為null,并重新設置隊頭下標,遞減元素計數器,最后發送信號激活notFull的條件隊列里面一個因為調用put方法而被阻塞的線程


take操作

獲取當前隊列頭部元素并從隊列里面移除它。如果隊列為空則阻塞當前線程直到隊列不為空然后返回元素,如果在阻塞時被其他線程設置了中斷標志,則被阻塞線程會拋出InterruptedException異常而返回。

public E take() throws InterruptedException {// 1 final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 2 隊列為空則等待,直到隊列中有數據 while (count == 0)notEmpty.await();// 3 獲取頭部元素return dequeue();} finally {// 4 lock.unlock();}

take操作的代碼也比較簡單,與poll相比只是代碼(2)不同。

在這里,如果隊列為空則把當前線程掛起后放入notEmpty的條件隊列,等其他線程調用notEmpty.signal()方法后再返回。

需要注意的是,這里也是使用while循環進行檢測并等待而不是使用if語句。


peek操作

獲取隊列頭部元素但是不從隊列里面移除它,如果隊列為空則返回null,該方法是不阻塞的

public E peek() {// 1 final ReentrantLock lock = this.lock;lock.lock();try {// 2 return itemAt(takeIndex); // null when queue is empty} finally {// 3 lock.unlock();}} final E itemAt(int i) {return (E) items[i];}

peek的實現更簡單,首先獲取獨占鎖,然后從數組items中獲取當前隊頭下標的值并返回,在返回前釋放獲取的鎖。

size

計算當前隊列元素個數。

public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return count;} finally {lock.unlock();}}

size操作比較簡單,獲取鎖后直接返回count,并在返回前釋放鎖。

也許你會問,這里又沒有修改count的值,只是簡單地獲取,為何要加鎖呢?

其實如果count被聲明為volatile的這里就不需要加鎖了,因為volatile類型的變量保證了內存的可見性,而ArrayBlockingQueue中的count并沒有被聲明為volatile的,這是因為count操作都是在獲取鎖后進行的

而獲取鎖的語義之一是,獲取鎖后訪問的變量都是從主內存獲取的,這保證了變量的內存可見性。

小結

  • ArrayBlockingQueue通過使用全局獨占鎖實現了同時只能有一個線程進行入隊或者出隊操作,這個鎖的粒度比較大,有點類似于在方法上添加synchronized的意思。

  • 其中offer和poll操作通過簡單的加鎖進行入隊、出隊操作,

  • 而put、take操作則使用條件變量實現了,如果隊列滿則等待,如果隊列空則等待,然后分別在出隊和入隊操作中發送信號激活等待線程實現同步。

  • 另外,相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的結果是精確的,因為計算前加了全局鎖。

總結

以上是生活随笔為你收集整理的Java Review - 并发编程_ArrayBlockingQueue原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。