一个使用Java BlockingQueue实现的生产者和消费者
生活随笔
收集整理的這篇文章主要介紹了
一个使用Java BlockingQueue实现的生产者和消费者
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
消費者
package consumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit;public class Consumer implements Runnable {/** 用util.concurrent.BlockingQueue溝通生產(chǎn)者和消費者的橋梁*/BlockingQueue<String> queue;String id;@SuppressWarnings("unused")private volatile boolean isRunning = true;public Consumer(BlockingQueue<String> queue, String id) {this.queue = queue;this.id = id;}public void stop() {isRunning = false;}@Overridepublic void run() {System.out.println("Thread: " + id + " Consumer thread is running...");boolean isRunning = true;try {while (isRunning) {System.out.println("Thread: " + id + " fetch data from linkedQueue..." + " queue size: " + queue.size());/** 從隊列里取出一個元素,2秒超時,如果兩秒之后還沒有東西可以取,則poll返回null*/String data = queue.poll(2, TimeUnit.SECONDS);if (null != data) {System.out.println("Thread: " + id + " has consumed one data from queue: " + data+ " Queue sise: " + queue.size());// simulate data consumptionThread.sleep(1000);} else {isRunning = false;// 消費者準(zhǔn)備退出System.out.println("Thread: " + id + " Consumer read queue timeout");}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("Thread: " + id + " consumer thread ends");}}}生產(chǎn)者
package consumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;public class Producer implements Runnable {BlockingQueue<String> queue;String id;public Producer(BlockingQueue<String> queue, String id) {this.queue = queue;this.id = id;}@Overridepublic void run() {String data = null;try {while (isRunning) {System.out.println("PRODUCER: " + id + " is running");Thread.sleep(100);data = "data:" + count.incrementAndGet();System.out.println("Thread: " + id + " procedued data into queue: " + data + " ...");if (!queue.offer(data, 2, TimeUnit.SECONDS)) {System.out.println("failed to put data into queue: " + data);}}} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();} finally {System.out.println("Thread: " + id + " quit from producer thread");}}public void stop() {isRunning = false;}private volatile boolean isRunning = true;private static AtomicInteger count = new AtomicInteger();}測試代碼
package consumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;public class ConsumerProducerTest {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<String>(15);Producer producer1 = new Producer(queue, "PROD1");Producer producer2 = new Producer(queue, "PROD2");Producer producer3 = new Producer(queue, "PROD3");Consumer consumer1 = new Consumer(queue, "CONSUMER1");Consumer consumer2 = new Consumer(queue, "CONSUMER2");ExecutorService service = Executors.newCachedThreadPool();service.execute(producer1);service.execute(producer2);service.execute(producer3);service.execute(consumer1);service.execute(consumer2);Thread.sleep(3 * 1000);producer1.stop(); // 一定要先關(guān)閉生產(chǎn)者producer2.stop();producer3.stop();consumer1.stop();consumer2.stop();Thread.sleep(2000);service.shutdown();} }總結(jié)
以上是生活随笔為你收集整理的一个使用Java BlockingQueue实现的生产者和消费者的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑怎么合并单元格(如何取消合并单元格并
- 下一篇: Java 写时拷贝容器CopyOnWri