java实现生产者消费者模式
一: 什么是生產者消費者模型
? ? ? ?生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
簡單來說:
? ? ? ?生產者消費者模型就是指,在一個系統中,存在兩種角色,一個為生產者,一個為消費者,通過一個緩沖區(倉庫)進行通信,生產者將生產的產品放入倉庫,消費者從倉庫中取產品。當倉庫滿時,生產者阻塞,當倉庫空時,消費者阻塞。
二: 關系圖
三: 實現方式
3.1 wait—notify 方式
3.1.1 舉例1
生產者類
/*** 生產者類* 實現runnable接口* @author DH**/ public class Producer implements Runnable{private BufferArea ba;//通過傳入參數的方式是使得對象相同,具有互斥鎖的效果。public Producer(BufferArea ba){this.ba = ba;}@Overridepublic void run() {while(true){setIntervalTime();ba.set();//生產產品}}//設置時間間隔public void setIntervalTime(){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}消費者類
/*** 消費者類* 實現runnable接口* @author DH**/ public class Consumer implements Runnable{private BufferArea ba;public Consumer(BufferArea ba){this.ba = ba;}@Overridepublic void run() {while(true){setIntervalTime();ba.get();//消費產品}}//設置時間間隔public void setIntervalTime(){try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}} }倉庫?
/*** 倉庫* 緩沖區* wait()/notify()* @author DH**/ public class BufferArea {private int currNum = 0;//當前倉庫的產品數量private int maxNum = 10;//倉庫最大產品容量public synchronized void set(){if(currNum<maxNum){currNum++;System.out.println(Thread.currentThread().getName()+" 生產了一件產品!當前產品數為:"+currNum);notifyAll();}else{//當前產品數大于倉庫的最大容量try {System.out.println(Thread.currentThread().getName()+" 開始等待!當前倉庫已滿,產品數為:"+currNum);wait();} catch (InterruptedException e) {e.printStackTrace();}}}public synchronized void get(){if(currNum>0){//倉庫中有產品currNum--;System.out.println(Thread.currentThread().getName()+" 獲得了一件產品!當前產品數為:"+currNum);notifyAll();}else{try {System.out.println(Thread.currentThread().getName()+" 開始等待!當前倉庫為空,產品數為:"+currNum);wait();} catch (InterruptedException e) {e.printStackTrace();}}} }測試類
/*** 測試類* @author DH**/ public class MainCode {public static void main(String[] args) {//同一個倉庫BufferArea ba = new BufferArea();//三個生產者Producer p1 = new Producer(ba);Producer p2 = new Producer(ba);Producer p3 = new Producer(ba);//三個消費者Consumer c1 = new Consumer(ba);Consumer c2 = new Consumer(ba);Consumer c3 = new Consumer(ba);//創建線程,并給線程命名Thread t1 = new Thread(p1,"生產者1");Thread t2 = new Thread(p2,"生產者2");Thread t3 = new Thread(p3,"生產者3");Thread t4 = new Thread(c1,"消費者1");Thread t5 = new Thread(c2,"消費者2");Thread t6 = new Thread(c3,"消費者3");//使線程進入就緒狀態t1.start();t2.start();t3.start();t4.start();t5.start();t6.start();} }通過設置生產者消費者的時間間隔,可以測試倉庫滿和空時的情況,當生產者時間間隔小,表示生產的快,會出現倉庫滿了的情況。測試結果如下。
?生產者2 生產了一件產品!當前產品數為:1
生產者1 生產了一件產品!當前產品數為:2
生產者3 生產了一件產品!當前產品數為:3
生產者3 生產了一件產品!當前產品數為:4
生產者1 生產了一件產品!當前產品數為:5
生產者2 生產了一件產品!當前產品數為:6
生產者1 生產了一件產品!當前產品數為:7
生產者3 生產了一件產品!當前產品數為:8
生產者2 生產了一件產品!當前產品數為:9
生產者3 生產了一件產品!當前產品數為:10
生產者2 開始等待!當前倉庫已滿,產品數為:10
生產者1 開始等待!當前倉庫已滿,產品數為:10
消費者1 獲得了一件產品!當前產品數為:9
消費者2 獲得了一件產品!當前產品數為:8
當消費者時間間隔小,表示消費的快,會出現倉庫為空的情況。測試結果如下。
消費者2 開始等待!當前倉庫為空,產品數為:0
生產者3 生產了一件產品!當前產品數為:1
生產者2 生產了一件產品!當前產品數為:2
生產者1 生產了一件產品!當前產品數為:3
消費者2 獲得了一件產品!當前產品數為:2
消費者1 獲得了一件產品!當前產品數為:1
消費者3 獲得了一件產品!當前產品數為:0
消費者3 開始等待!當前倉庫為空,產品數為:0
3.1.2 舉例2?
產品類(倉庫)
package test.exception.producer_consumer_model;/* 假設為產品為筆*/public class Production {private String type = "";private String color = "";private long code = 0; // 產品編號private boolean isProduced = false; // 是否生產完成 初始狀態為未生產狀態private boolean isContinueProduce = true; // 是否停產該產品public void setContinueProduce(boolean continueProduce) {isContinueProduce = continueProduce;}public void setCode(long code) {this.code = code;}public Production(){}public boolean isContinueProduce() {return isContinueProduce;}public void setType(String type) {this.type = type;}public void setColor(String color) {this.color = color;}public void setProduced(boolean produced) {isProduced = produced;}public boolean isProduced() {return isProduced;}@Overridepublic String toString() {return color + type + "-" + code;} }生產者
package test.exception.producer_consumer_model;public class Producer implements Runnable {private final Production pen; // 產品public Producer(Production pen) {this.pen = pen;}// 生產public void produce() {long code = 0;while (this.pen.isContinueProduce()) {synchronized (this.pen) {if (this.pen.isProduced()) {try {this.pen.wait(); // 等待消費者消費} catch (InterruptedException e) {e.printStackTrace();}}// 開始生產this.pen.setType("鉛筆");this.pen.setColor("藍色");this.pen.setCode(code++);this.pen.setProduced(true);System.out.println(this.pen + " is produced");this.pen.notify();}}System.out.println("finish producing");}@Overridepublic void run() {produce();} }消費者
package test.exception.producer_consumer_model;public class Consumer implements Runnable {private final Production pen;public Consumer(Production pen) {this.pen = pen;}// 持續消費public void consumer() {while (this.pen.isContinueProduce()) {synchronized (this.pen) {if (!this.pen.isProduced()) {try {this.pen.wait(); // 等待生產者生產} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(this.pen + " is consumed"); // 使用this.pen.setProduced(false); // 使用完后更新狀態this.pen.notify();}}// 確保停止生產后,能夠使用最后生產的一支筆if (this.pen.isProduced()) {System.out.println(this.pen + " is consumed");}System.out.println("finish using");}@Overridepublic void run() {consumer();} }測試類
package test.exception.producer_consumer_model;public class Demo {public static void main(String[] args) throws InterruptedException {Production pen = new Production();Consumer consumer = new Consumer(pen);Producer producer = new Producer(pen);new Thread(producer).start(); // 開啟生產者線程new Thread(consumer).start(); // 開啟消費者線程Thread.sleep(10000);pen.setContinueProduce(false); // 10s后停止生產該類型的筆} }運行結果?
3.2 阻塞隊列方式?
這里因為篇幅原因, 詳細的實現方式可以看我的這篇博客;?https://blog.csdn.net/m0_50370837/article/details/124339524
參考文章:?Java的生產者消費者模型_Hi--Man的博客-CSDN博客
https://www.jb51.net/article/187908.htm
總結
以上是生活随笔為你收集整理的java实现生产者消费者模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: androidx使用Toolbar
- 下一篇: 使用 Mailgun 配置 Ghost