日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

生产者消费者模型java实现

發布時間:2024/3/12 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 生产者消费者模型java实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

做題的時候遇到了生產者消費者問題,這個問題可以說是線程學習的經典題目了,就忍不住研究了一波。它描述是有一塊緩沖區(隊列實現)作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。在Java中這個數組線程阻塞的問題,多個用戶同時發送多個請求,怎么保證不發生線程死鎖,是我們要考慮的問題。

生產者消費者模式說明:

1.生產者只在倉庫未滿時進行生產,倉庫滿時生產者進程被阻塞;

2.消費者只在倉庫非空時進行消費,倉庫為空時消費者進程被阻塞;

3.當消費者發現倉庫為空時會通知生產者生產;

3.當生產者發現倉庫滿時會通知消費者消費;

實現的關鍵:

我們知道在JAVA環境中,線程Thread有如下幾個狀態:

1.新建狀態

2.就緒狀態

3.運行狀態

4.阻塞狀態

5.死亡狀態

生產者消費者問題就是要控制線程的阻塞狀態,保證生產者和消費者進程在一定條件下,一直穩定運行,不出現沒有商品但是消費者還是一直購買,商品滿了但是生產者還是不斷生產導致浪費的情況。

?

我們考慮線程常用的Sychronized、RetrenLock還有阻塞隊列來實現。

(1)Object的wait() / notify()方法?

wait(): wait()方法可以讓線程進入等待狀態,當緩沖區已滿/空時,生產者/消費者線程停止自己的執行,放棄鎖,使自己處于等待狀態,讓其他線程執行。

notify():notify隨機選擇一個在該對象上調用wait方法的線程,解除其阻塞狀態。當生產者/消費者向緩沖區放入/取出一個產品時,向其他等待的線程發出可執行的通知,同時放棄鎖,使自己處于等待狀態。

?

?

代碼實現:

import java.util.LinkedList; import java.util.Queue; import java.util.Random;/*** 生產者消費者模式:使用Object.wait() / notify()方法實現*/ public class ProducerConsumer {private static final int CAPACITY = 5; //申請一個容量最大的倉庫public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生產者*/public static class Producer extends Thread{private Queue<Integer> queue;//隊列作為倉庫String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){ //while(condition)為自旋鎖,為防止該線程沒有收到notify()調用也從wait()中返回 //(也稱作虛假喚醒),這個線程會重新去檢查condition條件以決定當前是否可以安全 //地繼續執行還是需要重新保持等待,而不是認為線程被喚醒了就可以安全地繼續執行 //了,自旋鎖當終止條件滿足時,才會停止自旋,這里設置了一直執行,直到程序手動停 //止。synchronized(queue){//給隊列加鎖,保證線程安全while(queue.size() == maxSize){//當隊列是滿的時候,生產者線程等待,由消費者線程進行操作try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}//隊列不為空的時候,生產者被喚醒進行操作System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//因此如果想在一個滿的隊列中加入一個新項,調用 add() 方法就會拋出一//個 unchecked 異常,而調用 offer() 方法會返回 falsequeue.notifyAll();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}/*** 消費者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){synchronized(queue){while(queue.isEmpty()){try {//隊列為空,說明沒有生產者生產的商品,消費者進行等待System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();//如果隊列元素為空,調用remove() 的行為與 Collection 接口的版本相似會拋出異常,這里是模擬消費者取走商品的過程// 但是新的 poll() 方法在用空集合調用時只是返回 null。因此新的方法更適合容易出現異常條件的情況。System.out.println("[" + name + "] Consuming value : " + x);queue.notifyAll();//喚醒所有隊列,消費者和生產者根據隊列情況進行操作try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}} }

?

2. 使用Lock和Condition的await() / signal()方法

Condition接口的await()和signal()是用來做同步的兩種方法,它們的功能基本上和Object的wait()/?nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。通過在Lock對象上調用newCondition()方法,將條件變量和一個鎖對象進行綁定,進而控制并發程序訪問競爭資源的安全。

代碼實現:

import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生產者消費者模式:使用Lock和Condition實現*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//隊列滿的條件private static final Condition emptyCondition = lock.newCondition();//隊列空的條件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生產者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//獲得鎖lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//這里可以和wait()進行對比,兩種控制線程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖,Lock不同于Sychronized,需要手動釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消費者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//獲得鎖lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");//隊列為空滿足條件,消費者線程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}

(3)BlockingQueue阻塞隊列方法?

我們采用一個阻塞隊列來實現。

?

通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大于消費者消費的速度,并且當生產出來的數據累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。

我們這里使用LinkedBlockingQueue,它是一個已經在內部實現了同步的隊列,實現方式采用的是我們第2種await()/?signal()方法。它可以在生成對象時指定容量大小。它用于阻塞操作的是put()和take()方法。

  • put()方法:類似于我們上面的生產者線程,容量達到最大時,自動阻塞。
  • take()方法:類似于我們上面的消費者線程,容量為0時,自動阻塞。

?

?

代碼實現:

import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生產者消費者模式:使用Lock和Condition實現*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//隊列滿的條件private static final Condition emptyCondition = lock.newCondition();//隊列空的條件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生產者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//獲得鎖lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//這里可以和wait()進行對比,兩種控制線程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++); //喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖,Lock不同于Sychronized,需要手動釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消費者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer"); //隊列為空滿足條件,消費者線程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}

小結:三種實現形式,其實理念都是相同的,都是控制阻塞狀態,根據條件去控制線程的運行狀態和阻塞狀態。生產者消費者模式 為信息傳輸開辟了一個嶄新的概念,因為它的優先級最高,所以即使網絡發生堵塞時它也會最先通過,最大程度的保證了設備的安全。也有缺點,就是在網絡中的個數是有限制的。生產者消費者模式在設置時比較簡單,使用方便安全,在將來的自動化行業必定會大大被人們所認同。

參考資料:

https://blog.csdn.net/u010983881/article/details/78554671#commentBox

總結

以上是生活随笔為你收集整理的生产者消费者模型java实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。