java blockingqueue_Java多线程进阶(三一)—— J.U.C之collections框架:BlockingQueue接口...
一、引言
從本節開始,我們將介紹juc-collections框架中的“阻塞隊列”部分。阻塞隊列在實際應用中非常廣泛,許多消息中間件中定義的隊列,通常就是一種“阻塞隊列”。
那么“阻塞隊列”和我們之前討論過的ConcurrentLinkedQueue、ConcurrentLinkedDeque有什么不同呢?
ConcurrentLinkedQueue和ConcurrentLinkedDeque是以非阻塞算法實現的高性能隊列,其使用場景一般是在并發環境下,需要“隊列”/“?!边@類數據結構時才會使用;而“阻塞隊列”通常利用了“鎖”來實現,也就是會阻塞調用線程,其使用場景一般是在“生產者-消費者”模式中,用于線程之間的數據交換或系統解耦。
在Java多線程基礎(七)——Producer-Consumer模式中,我們曾簡要的談到過“生產者-消費者”這種模式。在這種模式中,“生產者”和“消費者”是相互獨立的,兩者之間的通信需要依靠一個隊列。這個隊列,其實就是本文中的“阻塞隊列”。
引入“阻塞隊列”的最大好處就是解耦,在軟件工程中,“高內聚,低耦合”是進行模塊設計的準則之一,這樣“生產者”和“消費者”其實是互不影響的,將來任意一方需要升級時,可以保證系統的平滑過渡。
二、BlockingQueue簡介
BlockingQueue是在JDK1.5時,隨著J.U.C引入的一個接口:
BlockingQueue繼承了Queue接口,提供了一些阻塞方法,主要作用如下:
當線程向隊列中插入元素時,如果隊列已滿,則阻塞線程,直到隊列有空閑位置(非滿);
當線程從隊列中取元素(刪除隊列元素)時,如果隊列未空,則阻塞線程,直到隊列有元素;
既然BlockingQueue是一種隊列,所以也具備隊列的三種基本方法:插入、刪除、讀取:
操作類型
拋出異常
返回特殊值
阻塞線程
超時
插入
add(e)
offer(e)
put(e)
offer(e, time, unit)
刪除
remove()
poll()
take()
poll(time, unit)
讀取
element()
peek()
/
/
可以看到,對于每種基本方法,“拋出異?!焙汀胺祷靥厥庵怠钡姆椒ǘx和Queue是完全一樣的。BlockingQueue只是增加了兩類和阻塞相關的方法:put(e)、take();offer(e, time, unit)、poll(time, unit)。
put(e)和take()方法會一直阻塞調用線程,直到線程被中斷或隊列狀態可用;
offer(e, time, unit)和poll(time, unit)方法會限時阻塞調用線程,直到超時或線程被中斷或隊列狀態可用。
public interface BlockingQueue extends Queue {
/**
* 插入元素e至隊尾, 如果隊列已滿, 則阻塞調用線程直到隊列有空閑空間.
*/
void put(E e) throws InterruptedException;
/**
* 插入元素e至隊列, 如果隊列已滿, 則限時阻塞調用線程,直到隊列有空閑空間或超時.
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 從隊首刪除元素,如果隊列為空, 則阻塞調用線程直到隊列中有元素.
*/
E take() throws InterruptedException;
/**
* 從隊首刪除元素,如果隊列為空, 則限時阻塞調用線程,直到隊列中有元素或超時.
*/
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// ...
}
除此之外,BlockingQueue還具有以下特點:
BlockingQueue隊列中不能包含null元素;
BlockingQueue接口的實現類都必須是線程安全的,實現類一般通過“鎖”保證線程安全;
BlockingQueue 可以是限定容量的。remainingCapacity()方法用于返回剩余可用容量,對于沒有容量限制的BlockingQueue實現,該方法總是返回Integer.MAX_VALUE 。
三、再談“生產者-消費者”模式
最后,我們來看下如何利用BlockingQueue來實現生產者-消費者模式。在生產者-消費者模式中,一共有四類角色:生產者、消費者、消息隊列、消息體。我們利用BlockingQueue來實現消息隊列,其余部分沒有什么變化。
Producer(生產者)
生產者生產消息體(Data),并將消息體(Data)傳遞給通道(Channel)。
/**
* 生產者
*/
public class Producer implements Runnable {
private Channel channel;
public Producer(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
while (true) {
String v = String.valueOf(ThreadLocalRandom.current().nextInt());
Data data = new Data(v);
try {
channel.put(data);
System.out.println(Thread.currentThread().getName() + " produce :" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread.yield();
}
}
}
Consumer(消費者)
消費者從通道(Channel)中獲取數據,進行處理。
/**
* 消費者
*/
public class Consumer implements Runnable {
private final Channel channel;
public Consumer(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
while (true) {
try {
Object obj = channel.take();
System.out.println(Thread.currentThread().getName() + " consume :" + obj.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread.yield();
}
}
}
Channel(通道)
相當于消息的隊列,對消息進行排隊,控制消息的傳輸。
/**
* 通道類
*/
public class Channel {
private final BlockingQueue blockingQueue;
public Channel(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
public Object take() throws InterruptedException {
return blockingQueue.take();
}
public void put(Object o) throws InterruptedException {
blockingQueue.put(o);
}
}
Data(消息體/數據)
Data代表了實際生產或消費的數據。
/**
* 數據/消息
*/
public class Data implements Serializable {
private T data;
public Data(T data) {
this.data = data;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
@Override
public String toString() {
return "Data{" +
"data=" + data +
'}';
}
}
調用如下:
public class Main {
public static void main(String[] args) {
BlockingQueue blockingQueue = new SomeQueueImplementation();
Channel channel = new Channel(blockingQueue);
Producer p = new Producer(channel);
Consumer c1 = new Consumer(channel);
Consumer c2 = new Consumer(channel);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
總結
以上是生活随笔為你收集整理的java blockingqueue_Java多线程进阶(三一)—— J.U.C之collections框架:BlockingQueue接口...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java宝典_JAVA宝典之_JAVA基
- 下一篇: java 转账 锁_Java多线程 多个