java+arrayblockquene_java集合(五)Queue集合之ArrayBlockingQueue 详解
一、ArrayBlockingQueue概述
ArrayBlockingQueue是java并發包下一個以數組實現的阻塞隊列,它是線程安全的
public class ArrayBlockingQueue extends AbstractQueueimplements BlockingQueue, java.io.Serializable {
源碼分析
主要屬性
//使用數組存儲元素
final Object[] items;//取元素的指針
inttakeIndex;//放元素的指針
intputIndex;//元素數量
intcount;//保證并發訪問的鎖
final ReentrantLock lock;//非空條件
privatefinal Condition notEmpty;//非滿條件
private final Condition notFull;
通過屬性我們可以得出以下幾個重要信息:
(1)利用數組存儲元素;
(2)通過放指針和取指針來標記下一次操作的位置;
(3)利用重入鎖來保證并發安全;
主要構造方法
public ArrayBlockingQueue(intcapacity) {this(capacity, false);
}public ArrayBlockingQueue(intcapacity, boolean fair) {if (capacity <= 0)throw newIllegalArgumentException();//初始化數組
this.items = newObject[capacity];//創建重入鎖及兩個條件
lock = newReentrantLock(fair);
notEmpty= lock.newCondition();
notFull= lock.newCondition();
}
通過構造方法我們可以得出以下兩個結論:
(1)ArrayBlockingQueue初始化時必須傳入容量,也就是數組的大小;
(2)可以通過構造方法控制重入鎖的類型是公平鎖還是非公平鎖;
入隊
入隊有四個方法,它們分別是add(E e)、offer(E e)、put(E e)、offer(E e, long timeout, TimeUnit unit),它們有什么區別呢?
publicboolean add(E e) {//調用父類的add(e)方法
returnsuper.add(e);
}//super.add(e)
publicboolean add(E e) {//調用offer(e)如果成功返回true,如果失敗拋出異常
if(offer(e))return true;else
throw new IllegalStateException("Queue full");
}publicboolean offer(E e) {//元素不可為空
checkNotNull(e);
final ReentrantLocklock = this.lock;//加鎖
lock.lock();try{if (count ==items.length)//如果數組滿了就返回false
return false;else{//如果數組沒滿就調用入隊方法并返回true
enqueue(e);return true;
}
}finally{//解鎖
lock.unlock();
}
}public voidput(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLocklock = this.lock;//加鎖,如果線程中斷了拋出異常
lock.lockInterruptibly();try{//如果數組滿了,使用notFull等待//notFull等待的意思是說現在隊列滿了//只有取走一個元素后,隊列才不滿//然后喚醒notFull,然后繼續現在的邏輯//這里之所以使用while而不是if//是因為有可能多個線程阻塞在lock上//即使喚醒了可能其它線程先一步修改了隊列又變成滿的了//這時候需要再次等待
while (count ==items.length)
notFull.await();//入隊
enqueue(e);
}finally{//解鎖
lock.unlock();
}
}public boolean offer(E e, longtimeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);long nanos =unit.toNanos(timeout);
final ReentrantLocklock = this.lock;//加鎖
lock.lockInterruptibly();try{//如果數組滿了,就阻塞nanos納秒//如果喚醒這個線程時依然沒有空間且時間到了就返回false
while (count ==items.length) {if (nanos <= 0)return false;
nanos=notFull.awaitNanos(nanos);
}//入隊
enqueue(e);return true;
}finally{//解鎖
lock.unlock();
}
}private voidenqueue(E x) {
final Object[] items= this.items;//把元素直接放在放指針的位置上
items[putIndex] =x;//如果放指針到數組盡頭了,就返回頭部
if (++putIndex ==items.length)
putIndex= 0;//數量加1
count++;//喚醒notEmpty,因為入隊了一個元素,所以肯定不為空了
notEmpty.signal();
}
(1)add(e)時如果隊列滿了則拋出異常;
(2)offer(e)時如果隊列滿了則返回false;
(3)put(e)時如果隊列滿了則使用notFull等待;
(4)offer(e, timeout, unit)時如果隊列滿了則等待一段時間后如果隊列依然滿就返回false;
(5)利用放指針循環使用數組來存儲元素;
出隊
出隊有四個方法,它們分別是remove()、poll()、take()、poll(long timeout, TimeUnit unit),它們有什么區別呢?
publicE remove() {//調用poll()方法出隊
E x =poll();if (x != null)//如果有元素出隊就返回這個元素
returnx;else
//如果沒有元素出隊就拋出異常
throw newNoSuchElementException();
}publicE poll() {
final ReentrantLocklock = this.lock;//加鎖
lock.lock();try{//如果隊列沒有元素則返回null,否則出隊
return (count == 0) ? null: dequeue();
}finally{lock.unlock();
}
}publicE take() throws InterruptedException {
final ReentrantLocklock = this.lock;//加鎖
lock.lockInterruptibly();try{//如果隊列無元素,則阻塞等待在條件notEmpty上
while (count == 0)
notEmpty.await();//有元素了再出隊
returndequeue();
}finally{//解鎖
lock.unlock();
}
}public E poll(longtimeout, TimeUnit unit) throws InterruptedException {long nanos =unit.toNanos(timeout);
final ReentrantLocklock = this.lock;//加鎖
lock.lockInterruptibly();try{//如果隊列無元素,則阻塞等待nanos納秒//如果下一次這個線程獲得了鎖但隊列依然無元素且已超時就返回null
while (count == 0) {if (nanos <= 0)return null;
nanos=notEmpty.awaitNanos(nanos);
}returndequeue();
}finally{lock.unlock();
}
}privateE dequeue() {
final Object[] items= this.items;
@SuppressWarnings("unchecked")//取取指針位置的元素
E x =(E) items[takeIndex];//把取指針位置設為null
items[takeIndex] = null;//取指針前移,如果數組到頭了就返回數組前端循環利用
if (++takeIndex ==items.length)
takeIndex= 0;//元素數量減1
count--;if (itrs != null)
itrs.elementDequeued();//喚醒notFull條件
notFull.signal();returnx;
}
(1)remove()時如果隊列為空則拋出異常;
(2)poll()時如果隊列為空則返回null;
(3)take()時如果隊列為空則阻塞等待在條件notEmpty上;
(4)poll(timeout, unit)時如果隊列為空則阻塞等待一段時間后如果還為空就返回null;
(5)利用取指針循環從數組中取元素;
總結
(1)ArrayBlockingQueue不需要擴容,因為是初始化時指定容量,并循環利用數組;
(2)ArrayBlockingQueue利用takeIndex和putIndex循環利用數組;
(3)入隊和出隊各定義了四組方法為滿足不同的用途;
(4)利用重入鎖和兩個條件保證并發安全;
彩蛋
(1)論BlockingQueue中的那些方法?
BlockingQueue是所有阻塞隊列的頂級接口,它里面定義了一批方法,它們有什么區別呢?
操作拋出異常返回特定值阻塞超時
入隊
add(e)
offer(e)——false
put(e)
offer(e, timeout, unit)
出隊
remove()
poll()——null
take()
poll(timeout, unit)
檢查
element()
peek()——null
-
-
(2)ArrayBlockingQueue有哪些缺點呢?
a)隊列長度固定且必須在初始化時指定,所以使用之前一定要慎重考慮好容量;
b)如果消費速度跟不上入隊速度,則會導致提供者線程一直阻塞,且越阻塞越多,非常危險;
c)只使用了一個鎖來控制入隊出隊,效率較低,那是不是可以借助分段的思想把入隊出隊分裂成兩個鎖呢?且聽下回分解。
總結
以上是生活随笔為你收集整理的java+arrayblockquene_java集合(五)Queue集合之ArrayBlockingQueue 详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java union方法参数_Java
- 下一篇: java 非静态语句块_静态初始化代码块