生产者/消费者模式(阻塞队列)
生活随笔
收集整理的這篇文章主要介紹了
生产者/消费者模式(阻塞队列)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
?
生產(chǎn)消費(fèi)者模式? 貌似也是阻塞的問題?花了一些時間終于弄明白這個鳥東東,以前還以為是不復(fù)雜的一個東西的,以前一直以為和觀察者模式差不多(其實也是差不多的,呵呵),生產(chǎn)消費(fèi)者模式應(yīng)該是可以通過觀察者模式來實現(xiàn)的,對于在什么環(huán)境下使用現(xiàn)在想的還不是特別清楚,主要是在實際中還沒使用過這個。?
需要使用到同步,以及線程,屬于多并發(fā)行列,和觀察者模式的差異也就在于此吧,所以實現(xiàn)起來也主要在這里的差異。 在實際的軟件開發(fā)過程中,經(jīng)常會碰到如下場景:某個模塊負(fù)責(zé)產(chǎn)生數(shù)據(jù),這些數(shù)據(jù)由另一個模塊來負(fù)責(zé)處理(此處的模塊是廣義的,可以是類、函數(shù)、線程、進(jìn)程等)。產(chǎn)生數(shù)據(jù)的模塊,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊,就稱為消費(fèi)者。?
單單抽象出生產(chǎn)者和消費(fèi)者,還夠不上是生產(chǎn)者/消費(fèi)者模式。該模式還需要有一個緩沖區(qū)處于生產(chǎn)者和消費(fèi)者之間,作為一個中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū),而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù)?
◇解耦?
假設(shè)生產(chǎn)者和消費(fèi)者分別是兩個類。如果讓生產(chǎn)者直接調(diào)用消費(fèi)者的某個方法,那么生產(chǎn)者對于消費(fèi)者就會產(chǎn)生依賴(也就是耦合)。將來如果消費(fèi)者的代碼發(fā)生變化,可能會影響到生產(chǎn)者。而如果兩者都依賴于某個緩沖區(qū),兩者之間不直接依賴,耦合也就相應(yīng)降低了。?
◇支持并發(fā)(concurrency)?
生產(chǎn)者直接調(diào)用消費(fèi)者的某個方法,還有另一個弊端。由于函數(shù)調(diào)用是同步的(或者叫阻塞的),在消費(fèi)者的方法沒有返回之前,生產(chǎn)者只好一直等在那邊。萬一消費(fèi)者處理數(shù)據(jù)很慢,生產(chǎn)者就會白白糟蹋大好時光。?
使用了生產(chǎn)者/消費(fèi)者模式之后,生產(chǎn)者和消費(fèi)者可以是兩個獨(dú)立的并發(fā)主體(常見并發(fā)類型有進(jìn)程和線程兩種,后面的帖子會講兩種并發(fā)類型下的應(yīng)用)。生產(chǎn)者把制造出來的數(shù)據(jù)往緩沖區(qū)一丟,就可以再去生產(chǎn)下一個數(shù)據(jù)。基本上不用依賴消費(fèi)者的處理速度。其實當(dāng)初這個模式,主要就是用來處理并發(fā)問題的。?
◇支持忙閑不均?
緩沖區(qū)還有另一個好處。如果制造數(shù)據(jù)的速度時快時慢,緩沖區(qū)的好處就體現(xiàn)出來了。當(dāng)數(shù)據(jù)制造快的時候,消費(fèi)者來不及處理,未處理的數(shù)據(jù)可以暫時存在緩沖區(qū)中。等生產(chǎn)者的制造速度慢下來,消費(fèi)者再慢慢處理掉。?
用了兩種方式實現(xiàn)了一下這個模式,主要參考了網(wǎng)上的一些例子才弄明白,這里對隊列的實現(xiàn)有很多種方法,需要和具體的應(yīng)用相結(jié)合吧,隊列緩沖區(qū)很簡單,現(xiàn)在已有大量的實現(xiàn),缺點是在性能上面(內(nèi)存分配的開銷和同步/互斥的開銷),下面的實現(xiàn)都是這種方式;環(huán)形緩沖區(qū)(減少了內(nèi)存分配的開銷),雙緩沖區(qū)(減少了同步/互斥的開銷)。?
第一個例子是使用的信號量的東東,沒有執(zhí)行具體的東西,只是實現(xiàn)了這個例子,要做復(fù)雜的業(yè)務(wù)邏輯的話需要自己在某些方法內(nèi)去具體實現(xiàn)?
代碼如下:?
消費(fèi)者:?
public class TestConsumer implements Runnable { TestQueue obj; public TestConsumer(TestQueue tq){ this.obj=tq; } public void run() { try { for(int i=0;i<10;i++){ obj.consumer(); } } catch (Exception e) { e.printStackTrace(); } } }
?
生產(chǎn)者:?
public class TestProduct implements Runnable { TestQueue obj; public TestProduct(TestQueue tq){ this.obj=tq; } public void run() { for(int i=0;i<10;i++){ try { obj.product("test"+i); } catch (Exception e) { e.printStackTrace(); } } } }
?
隊列(使用了信號量,采用synchronized進(jìn)行同步,采用lock進(jìn)行同步會出錯,或許是還不知道實現(xiàn)的方法):?
public static Object signal=new Object(); boolean bFull=false; private List thingsList=new ArrayList(); private final ReentrantLock lock = new ReentrantLock(true); BlockingQueue q = new ArrayBlockingQueue(10); /** * 生產(chǎn) * @param thing * @throws Exception */ public void product(String thing) throws Exception{ synchronized(signal){ if(!bFull){ bFull=true; //產(chǎn)生一些東西,放到 thingsList 共享資源中 System.out.println("product"); System.out.println("倉庫已滿,正等待消費(fèi)..."); thingsList.add(thing); signal.notify(); //然后通知消費(fèi)者 } signal.wait(); // 然后自己進(jìn)入signal待召隊列 } } /** * 消費(fèi) * @return * @throws Exception */ public String consumer()throws Exception{ synchronized(signal){ if(!bFull) { signal.wait(); // 進(jìn)入signal待召隊列,等待生產(chǎn)者的通知 } bFull=false; // 讀取buf 共享資源里面的東西 System.out.println("consume"); System.out.println("倉庫已空,正等待生產(chǎn)..."); signal.notify(); // 然后通知生產(chǎn)者 } String result=""; if(thingsList.size()>0){ result=thingsList.get(thingsList.size()-1).toString(); thingsList.remove(thingsList.size()-1); } return result; }
?
測試代碼:?
public class TestMain { public static void main(String[] args) throws Exception{ TestQueue tq=new TestQueue(); TestProduct tp=new TestProduct(tq); TestConsumer tc=new TestConsumer(tq); Thread t1=new Thread(tp); Thread t2=new Thread(tc); t1.start(); t2.start(); } }
?
運(yùn)行結(jié)果:?
product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)... product 倉庫已滿,正等待消費(fèi)... consume 倉庫已空,正等待生產(chǎn)...
?
第二種發(fā)放使用java.util.concurrent.BlockingQueue類來重寫的隊列那個類,使用這個方法比較簡單,并且性能上也沒有什么問題。?這是jdk里面的例子? * class Producer implements Runnable { * private final BlockingQueue queue; * Producer(BlockingQueue q) { queue = q; } * public void run() { * try { * while(true) { queue.put(produce()); } * } catch (InterruptedException ex) { ... handle ...} * } * Object produce() { ... } * } * * class Consumer implements Runnable { * private final BlockingQueue queue; * Consumer(BlockingQueue q) { queue = q; } * public void run() { * try { * while(true) { consume(queue.take()); } * } catch (InterruptedException ex) { ... handle ...} * } * void consume(Object x) { ... } * } * * class Setup { * void main() { * BlockingQueue q = new SomeQueueImplementation(); * Producer p = new Producer(q); * Consumer c1 = new Consumer(q); * Consumer c2 = new Consumer(q); * new Thread(p).start(); * new Thread(c1).start(); * new Thread(c2).start(); * } * }
?
jdk1.5以上的一個實現(xiàn),使用了Lock以及條件變量等東西?
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
?
參考1:生產(chǎn)者/消費(fèi)者模式(阻塞隊列) 參考2:生產(chǎn)者/消費(fèi)者模式(阻塞隊列)總結(jié)
以上是生活随笔為你收集整理的生产者/消费者模式(阻塞队列)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 段子
- 下一篇: 工匠精神,缔造美国净水传奇