消息队列:生产者/消费者模式
1.什么是生產(chǎn)者消費者模式
? ? ? ?生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
? ? ? ?這個阻塞隊列就是用來給生產(chǎn)者和消費者解耦的??v觀大多數(shù)設(shè)計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學(xué)習(xí)一些設(shè)計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設(shè)計模式。
2.生產(chǎn)消費者模型
? ? ? ?生產(chǎn)者消費者模型具體來講,就是在一個系統(tǒng)中,存在生產(chǎn)者和消費者兩種角色,他們通過內(nèi)存緩沖區(qū)進行通信,生產(chǎn)者生產(chǎn)消費者需要的資料,消費者把資料做成產(chǎn)品。生產(chǎn)消費者模式如下圖。
? ? ? ?在日益發(fā)展的服務(wù)類型中,譬如注冊用戶這種服務(wù),它可能解耦成好幾種獨立的服務(wù)(賬號驗證,郵箱驗證碼,手機短信碼等)。它們作為消費者,等待用戶輸入數(shù)據(jù),在前臺數(shù)據(jù)提交之后會經(jīng)過分解并發(fā)送到各個服務(wù)所在的url,分發(fā)的那個角色就相當于生產(chǎn)者。消費者在獲取數(shù)據(jù)時候有可能一次不能處理完,那么它們各自有一個請求隊列,那就是內(nèi)存緩沖區(qū)了。做這項工作的框架叫做消息隊列。
3.生產(chǎn)者消費者模型的實現(xiàn)
生產(chǎn)者是一堆線程,消費者是另一堆線程,內(nèi)存緩沖區(qū)可以使用List數(shù)組隊列,數(shù)據(jù)類型只需要定義一個簡單的類就好。關(guān)鍵是如何處理多線程之間的協(xié)作。這其實也是多線程通信的一個范例。
在這個模型中,最關(guān)鍵就是內(nèi)存緩沖區(qū)為空的時候消費者必須等待,而內(nèi)存緩沖區(qū)滿的時候,生產(chǎn)者必須等待。其他時候可以是個動態(tài)平衡。值得注意的是多線程對臨界區(qū)資源的操作時候必須保證在讀寫中只能存在一個線程,所以需要設(shè)計鎖的策略。
4.為什么要使用生產(chǎn)者和消費者模式
? ? ? ?在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這種生產(chǎn)消費能力不均衡的問題,所以便有了生產(chǎn)者和消費者模式。
為了不至于太抽象,我們舉一個寄信的例子(雖說這年頭寄信已經(jīng)不時興,但這個例子還是比較貼切的)。假設(shè)你要寄一封平信,大致過程如下:
??? 1、你把信寫好——相當于生產(chǎn)者制造數(shù)據(jù)
??? 2、你把信放入郵筒——相當于生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)
??? 3、郵遞員把信從郵筒取出——相當于消費者把數(shù)據(jù)取出緩沖區(qū)
??? 4、郵遞員把信拿去郵局做相應(yīng)的處理——相當于消費者處理數(shù)據(jù)
4.1優(yōu)點
- 解耦
??? 假設(shè)生產(chǎn)者和消費者分別是兩個類。如果讓生產(chǎn)者直接調(diào)用消費者的某個方法,那么生產(chǎn)者對于消費者就會產(chǎn)生依賴(也就是耦合)。將來如果消費者的代碼發(fā)生變化,可能會影響到生產(chǎn)者。而如果兩者都依賴于某個緩沖區(qū),兩者之間不直接依賴,耦合也就相應(yīng)降低了。
? ? ? ?接著上述的例子,如果不使用郵筒(也就是緩沖區(qū)),你必須得把信直接交給郵遞員。有同學(xué)會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產(chǎn)生和你和郵遞員之間的依賴(相當于生產(chǎn)者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要重新認識一下(相當于消費者變化導(dǎo)致修改生產(chǎn)者代碼)。而郵筒相對來說比較固定,你依賴它的成本就比較低(相當于和緩沖區(qū)之間的弱耦合)。
- 支持并發(fā)(concurrency)
? ? ? ?生產(chǎn)者直接調(diào)用消費者的某個方法,還有另一個弊端。由于函數(shù)調(diào)用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產(chǎn)者只好一直等在那邊。萬一消費者處理數(shù)據(jù)很慢,生產(chǎn)者就會白白糟蹋大好時光。
? ? ? ?使用了生產(chǎn)者/消費者模式之后,生產(chǎn)者和消費者可以是兩個獨立的并發(fā)主體(常見并發(fā)類型有進程和線程兩種)。生產(chǎn)者把制造出來的數(shù)據(jù)往緩沖區(qū)一丟,就可以再去生產(chǎn)下一個數(shù)據(jù)?;旧喜挥靡蕾囅M者的處理速度。
- 支持忙閑不均
? ? ? ?緩沖區(qū)還有另一個好處。如果制造數(shù)據(jù)的速度時快時慢,緩沖區(qū)的好處就體現(xiàn)出來了。當數(shù)據(jù)制造快的時候,消費者來不及處理,未處理的數(shù)據(jù)可以暫時存在緩沖區(qū)中。等生產(chǎn)者的制造速度慢下來,消費者再慢慢處理掉。
? ? ? ?為了充分復(fù)用,我們再拿寄信的例子來說事。假設(shè)郵遞員一次只能帶走1000封信。萬一某次碰上情人節(jié)(也可能是圣誕節(jié))送賀卡,需要寄出去的信超過1000封,這時候郵筒這個緩沖區(qū)就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來時再拿走。
5.多生產(chǎn)者和多消費者場景
? ? ? ? 在多核時代,多線程并發(fā)處理速度比單線程處理速度更快,所以我們可以使用多個線程來生產(chǎn)數(shù)據(jù),同樣可以使用多個消費線程來消費數(shù)據(jù)。而更復(fù)雜的情況是,消費者消費的數(shù)據(jù),有可能需要繼續(xù)處理,于是消費者處理完數(shù)據(jù)之后,它又要作為生產(chǎn)者把數(shù)據(jù)放在新的隊列里,交給其他消費者繼續(xù)處理。如下圖:
6.線程池與生產(chǎn)消費者模式
? ? ? ?Java中的線程池類其實就是一種生產(chǎn)者和消費者模式的實現(xiàn)方式,但是我覺得其實現(xiàn)方式更加高明。生產(chǎn)者把任務(wù)丟給線程池,線程池創(chuàng)建線程并處理任務(wù),如果將要運行的任務(wù)數(shù)大于線程池的基本線程數(shù)就把任務(wù)扔到阻塞隊列里,這種做法比只使用一個阻塞隊列來實現(xiàn)生產(chǎn)者和消費者模式顯然要高明很多,因為消費者能夠處理直接就處理掉了,這樣速度更快,而生產(chǎn)者先存,消費者再取這種方式顯然慢一些。
? ? ? ?我們的系統(tǒng)也可以使用線程池來實現(xiàn)多生產(chǎn)者消費者模式。比如創(chuàng)建N個不同規(guī)模的Java線程池來處理不同性質(zhì)的任務(wù),比如線程池1將數(shù)據(jù)讀到內(nèi)存之后,交給線程池2里的線程繼續(xù)處理壓縮數(shù)據(jù)。線程池1主要處理IO密集型任務(wù),線程池2主要處理CPU密集型任務(wù)。
7.內(nèi)存緩沖區(qū)
最傳統(tǒng)、最常見的方式:隊列(FIFO)作緩沖。
7.1 線程方式
并發(fā)線程中使用隊列的優(yōu)缺點
- 內(nèi)存分配的性能
? ? ? ?在線程方式下,生產(chǎn)者和消費者各自是一個線程。生產(chǎn)者把數(shù)據(jù)寫入隊列頭(以下簡稱push),消費者從隊列尾部讀出數(shù)據(jù)(以下簡稱pop)。當隊列為空,消費者就稍息(稍事休息);當隊列滿(達到最大長度),生產(chǎn)者就稍息。整個流程并不復(fù)雜。
? ? ? ?上述過程會有一個主要的問題是關(guān)于內(nèi)存分配的性能開銷。對于常見的隊列實現(xiàn):在每次push時,可能涉及到堆內(nèi)存的分配;在每次pop時,可能涉及堆內(nèi)存的釋放。假如生產(chǎn)者和消費者都很勤快,頻繁地push、pop,那內(nèi)存分配的開銷就很可觀了。對于內(nèi)存分配的開銷,可查找Java性能優(yōu)化相關(guān)知識。
? ? ? ? 解決辦法:環(huán)形緩沖區(qū)。
- 同步和互斥的性能
? ? ? ?另外,由于兩個線程共用一個隊列,自然就會涉及到線程間諸如同步、互斥、死鎖等等。這會兒要細談的是,同步和互斥的性能開銷。在很多場合中,諸如信號量、互斥量等的使用也是有不小的開銷的(某些情況下,也可能導(dǎo)致用戶態(tài)/核心態(tài)切換)。如果像剛才所說,生產(chǎn)者和消費者都很勤快,那這些開銷也不容小覷。
? ? ? ? 解決辦法:雙緩沖區(qū)。
- 適用于隊列的場合
? ? ? ?由于隊列是很常見的數(shù)據(jù)結(jié)構(gòu),大部分編程語言都內(nèi)置了隊列的支持,有些語言甚至提供了線程安全的隊列(比如JDK 1.5引入的ArrayBlockingQueue)。因此,開發(fā)人員可以撿現(xiàn)成,避免了重新發(fā)明輪子。
? ? ? ?所以,假如你的數(shù)據(jù)流量不是很大,采用隊列緩沖區(qū)的好處還是很明顯的:邏輯清晰、代碼簡單、維護方便。比較符合KISS原則。
7.2 進程方式
? ? ? ?跨進程的生產(chǎn)者/消費者模式,非常依賴于具體的進程間通訊(IPC)方式。而IPC的種類很多。下面介紹比較常用的跨平臺、且編程語言支持較多的IPC方式。
-
匿名管道
? ? ? ?感覺管道是最像隊列的IPC類型。生產(chǎn)者進程在管道的寫端放入數(shù)據(jù);消費者進程在管道的讀端取出數(shù)據(jù)。整個的效果和線程中使用隊列非常類似,區(qū)別在于使用管道就無需操心線程安全、內(nèi)存分配等瑣事(操作系統(tǒng)暗中都幫你搞定了)。
? ? ? ?管道又分命名管道和匿名管道兩種,今天主要聊匿名管道。因為命名管道在不同的操作系統(tǒng)下差異較大(比如Win32和POSIX,在命名管道的API接口和功能實現(xiàn)上都有較大差異;有些平臺不支持命名管道,比如Windows CE)。除了操作系統(tǒng)的問題,對于有些編程語言(比如Java)來說,命名管道是無法使用的。
? ? ? ?其實匿名管道在不同平臺上的API接口,也是有差異的(比如Win32的CreatePipe和POSIX的pipe,用法就很不一樣)。但是我們可以僅使用標準輸入和標準輸出(以下簡稱stdio)來進行數(shù)據(jù)的流入流出。然后利用shell的管道符把生產(chǎn)者進程和消費者進程關(guān)聯(lián)起來。實際上,很多操作系統(tǒng)(尤其是POSIX風(fēng)格的)自帶的命令都充分利用了這個特性來實現(xiàn)數(shù)據(jù)的傳輸(比如more、grep等),如此優(yōu)點:
? ? ? ?1、基本上所有操作系統(tǒng)都支持在shell方式下使用管道符。因此很容易實現(xiàn)跨平臺。
? ? ? ?2、大部分編程語言都能夠操作stdio,因此跨編程語言也就容易實現(xiàn)。
? ? ? ?3、管道方式省卻了線程安全方面的瑣事。有利于降低開發(fā)、調(diào)試成本。
? ? ? ?當然,這種方式也有自身的缺點:
? ? ? ?1、生產(chǎn)者進程和消費者進程必須得在同一臺主機上,無法跨機器通訊。這個缺點比較明顯。
? ? ? ?2、在一對一的情況下,這種方式挺合用。但如果要擴展到一對多或者多對一,那就有點棘手了。所以這種方式的擴展性要打個折扣。假如今后要考慮類似的擴展,這個缺點就比較明顯。
? ? ? ? 3、由于管道是shell創(chuàng)建的,對于兩邊的進程不可見(程序看到的只是stdio)。在某些情況下,導(dǎo)致程序不便于對管道進行操縱(比如調(diào)整管道緩沖區(qū)尺寸)。這個缺點不太明顯。
? ? ? ? 4、最后,這種方式只能單向傳數(shù)據(jù)。好在大多數(shù)情況下,消費者進程不需要傳數(shù)據(jù)給生產(chǎn)者進程。萬一你確實需要信息反饋(從消費者到生產(chǎn)者),那就費勁了。可能得考慮換種IPC方式。
? ? ? ?注意事項:
? ? ? ?1、對stdio進行讀寫操作是以阻塞方式進行。比如管道中沒有數(shù)據(jù),消費者進程的讀操作就會一直停在哪兒,直到管道中重新有數(shù)據(jù)。
? ? ? ?2、由于stdio內(nèi)部帶有自己的緩沖區(qū)(這緩沖區(qū)和管道緩沖區(qū)是兩碼事),有時會導(dǎo)致一些不太爽的現(xiàn)象(比如生產(chǎn)者進程輸出了數(shù)據(jù),但消費者進程沒有立即讀到)。
-
SOCKET(TCP方式)
? ? ? ?基于TCP方式的SOCKET通訊是又一個類似于隊列的IPC方式。它同樣保證了數(shù)據(jù)的順序到達;同樣有緩沖的機制。而且跨平臺和跨語言,和剛才介紹的shell管道符方式類似。
? ? ? ? SOCKET相比shell管道符的方式,主要有如下幾個優(yōu)點:
??? 1、SOCKET方式可以跨機器(便于實現(xiàn)分布式)。這是主要優(yōu)點。
??? 2、SOCKET方式便于將來擴展成為多對一或者一對多。這也是主要優(yōu)點。
??? 3、SOCKET可以設(shè)置阻塞和非阻塞方法,用起來比較靈活。這是次要優(yōu)點。
??? 4、SOCKET支持雙向通訊,有利于消費者反饋信息。
? ? ? ?當然有利就有弊。相對于上述shell管道的方式,使用SOCKET在編程上會更復(fù)雜一些。好在前人已經(jīng)做了大量的工作,可借助于這些第三方的庫和框架,比如C++的ACE庫、Python的Twisted。
? ? ? ?雖然TCP在很多方面比UDP可靠,但鑒于跨機器通訊先天的不可預(yù)料性,可以在生產(chǎn)者進程和消費者進程內(nèi)部各自再引入基于線程的"生產(chǎn)者/消費者模式",如下圖:
這么做的關(guān)鍵點在于把代碼分為兩部分:生產(chǎn)線程和消費線程屬于和業(yè)務(wù)邏輯相關(guān)的代碼(和通訊邏輯無關(guān));發(fā)送線程和接收線程屬于通訊相關(guān)的代碼(和業(yè)務(wù)邏輯無關(guān))。
??? 這樣的好處是很明顯的,具體如下:
??? 1、能夠應(yīng)對暫時性的網(wǎng)絡(luò)故障。并且在網(wǎng)絡(luò)故障解除后,能夠繼續(xù)工作。
??? 2、網(wǎng)絡(luò)故障的應(yīng)對處理方式(比如斷開后的嘗試重連),只影響發(fā)送和接收線程,不會影響生產(chǎn)線程和消費線程(業(yè)務(wù)邏輯部分)。
??? 3、具體的SOCKET方式(阻塞和非阻塞)只影響發(fā)送和接收線程,不影響生產(chǎn)線程和消費線程(業(yè)務(wù)邏輯部分)。
??? 4、不依賴TCP自身的發(fā)送緩沖區(qū)和接收緩沖區(qū)。(默認的TCP緩沖區(qū)的大小可能無法滿足實際要求)
??? 5、業(yè)務(wù)邏輯的變化(比如業(yè)務(wù)需求變更)不影響發(fā)送線程和接收線程。
??? 針對上述的最后一條,如果整個業(yè)務(wù)系統(tǒng)中有多個進程是采用上述的模式,那或許可以重構(gòu):在業(yè)務(wù)邏輯代碼和通訊邏輯代碼之間,把業(yè)務(wù)邏輯無關(guān)的部分封裝成一個通訊中間件。
7.3 環(huán)形緩沖區(qū)
使用場景:當存儲空間(不僅包括內(nèi)存,還可能包括諸如硬盤之類的存儲介質(zhì))的分配/釋放非常頻繁并且確實產(chǎn)生了明顯的影響,才應(yīng)該考慮環(huán)形緩沖區(qū)的使用。否則的話,還是選用最基本、最簡單的隊列緩沖區(qū)。
-
環(huán)形緩沖區(qū) vs 隊列緩沖區(qū)
? ? 1.外部接口相似
? ? 普通的隊列有一個寫入端和一個讀出端。隊列為空的時候,讀出端無法讀取數(shù)據(jù);當隊列滿(達到最大尺寸)時,寫入端無法寫入數(shù)據(jù)。
??? 對于使用者來講,環(huán)形緩沖區(qū)和隊列緩沖區(qū)是一樣的。它也有一個寫入端(用于push)和一個讀出端(用于pop),也有緩沖區(qū)“滿”和“空”的狀態(tài)。所以,從隊列緩沖區(qū)切換到環(huán)形緩沖區(qū),對于使用者來說能比較平滑地過渡。
? ? 2.內(nèi)部結(jié)構(gòu)迥異
??? 雖然兩者的對外接口差不多,但是內(nèi)部結(jié)構(gòu)和運作機制有很大差別。重點介紹一下環(huán)形緩沖區(qū)的內(nèi)部結(jié)構(gòu)。
? ? 可以把環(huán)形緩沖區(qū)的讀出端(以下簡稱R)和寫入端(以下簡稱W)想象成是兩個人在體育場跑道上追逐(R追W)。當R追上W的時候,就是緩沖區(qū)為空;當W追上R的時候(W比R多跑一圈),就是緩沖區(qū)滿。
??? 為了形象起見,如下:
?從上圖可以看出,環(huán)形緩沖區(qū)所有的push和pop操作都是在一個固定的存儲空間內(nèi)進行。而隊列緩沖區(qū)在push的時候,可能會分配存儲空間用于存儲新元素;在pop時,可能會釋放廢棄元素的存儲空間。所以環(huán)形方式相比隊列方式,少掉了對于緩沖區(qū)元素所用存儲空間的分配、釋放。這是環(huán)形緩沖區(qū)的一個主要優(yōu)勢。
-
環(huán)形緩沖區(qū)的實現(xiàn)
? ? ? ?1.數(shù)組方式 vs 鏈表方式
? ? ? ?環(huán)形緩沖區(qū)的內(nèi)部實現(xiàn),即可基于數(shù)組(此處的數(shù)組,泛指連續(xù)存儲空間)實現(xiàn),也可基于鏈表實現(xiàn)。
? ? ? ?數(shù)組在物理存儲上是一維的連續(xù)線性結(jié)構(gòu),可以在初始化時,把存儲空間一次性分配好,這是數(shù)組方式的優(yōu)點。但是要使用數(shù)組來模擬環(huán),你必須在邏輯上把數(shù)組的頭和尾相連。在順序遍歷數(shù)組時,對尾部元素(最后一個元素)要作一下特殊處理。訪問尾部元素的下一個元素時,要重新回到頭部元素(第0個元素)。如下圖所示:
? ? ? ? 使用鏈表的方式,正好和數(shù)組相反:鏈表省去了頭尾相連的特殊處理。但是鏈表在初始化的時候比較繁瑣,而且在有些場合(比如跨進程的IPC)不太方便使用。
? ? ? ?2.讀寫操作
? ? ? ?環(huán)形緩沖區(qū)要維護兩個索引,分別對應(yīng)寫入端(W)和讀取端(R)。寫入(push)的時候,先確保環(huán)沒滿,然后把數(shù)據(jù)復(fù)制到W所對應(yīng)的元素,最后W指向下一個元素;讀取(pop)的時候,先確保環(huán)沒空,然后返回R對應(yīng)的元素,最后R指向下一個元素。
? ? ? ?3.判斷“空”和“滿”
? ? ? ?上述的操作并不復(fù)雜,不過有一個小小的麻煩:空環(huán)和滿環(huán)的時候,R和W都指向同一個位置!這樣就無法判斷到底是“空”還是“滿”。大體上有兩種方法可以解決該問題。
? ? ? ?辦法1:始終保持一個元素不用
? ? ? ?當空環(huán)的時候,R和W重疊。當W比R跑得快,追到距離R還有一個元素間隔的時候,就認為環(huán)已經(jīng)滿。當環(huán)內(nèi)元素占用的存儲空間較大的時候,這種辦法顯得很土(浪費空間)。
? ? ? ?辦法2:維護額外變量
? ? ? ?如果不喜歡上述辦法,還可以采用額外的變量來解決。比如可以用一個整數(shù)記錄當前環(huán)中已經(jīng)保存的元素個數(shù)(該整數(shù)>=0)。當R和W重疊的時候,通過該變量就可以知道是“空”還是“滿”。
? ? ? 4.元素的存儲
? ? ? ?由于環(huán)形緩沖區(qū)本身就是要降低存儲空間分配的開銷,因此緩沖區(qū)中元素的類型要選好。盡量存儲值類型的數(shù)據(jù),而不要存儲指針(引用)類型的數(shù)據(jù)。因為指針類型的數(shù)據(jù)又會引起存儲空間(比如堆內(nèi)存)的分配和釋放,使得環(huán)形緩沖區(qū)的效果打折扣。
-
應(yīng)用場合
? ? ? ?如果所使用的編程語言和開發(fā)庫中帶有現(xiàn)成的、成熟的環(huán)形緩沖區(qū),建議使用現(xiàn)成的庫,不要重新制造輪子;確實找不到現(xiàn)成的,才考慮自己實現(xiàn)。
? ? ? 1.用于并發(fā)線程
? ? ? ?和線程中的隊列緩沖區(qū)類似,線程中的環(huán)形緩沖區(qū)也要考慮線程安全的問題。除非使用的環(huán)形緩沖區(qū)的庫已經(jīng)實現(xiàn)了線程安全,否則還是得自己動手搞定。線程方式下的環(huán)形緩沖區(qū)用得比較多,相關(guān)的網(wǎng)上資料也多,下面就大致介紹幾個。
? ? ? ?對于C++的程序員,強烈推薦使用boost提供的circular_buffer模板,該模板最開始是在boost 1.35版本中引入的。鑒于boost在C++社區(qū)中的地位,大伙兒應(yīng)該可以放心使用該模板。
? ? ? ?對于C程序員,可以去看看開源項目circbuf,不過該項目是GPL協(xié)議的,不太爽;而且活躍度不太高;而且只有一個開發(fā)人員。大伙兒慎用!建議只拿它當參考。
? ? ? ?對于C#程序員,可以參考CodeProject上的一個示例。
? ? ? ?2.用于并發(fā)進程
? ? ? ?進程間的環(huán)形緩沖區(qū),似乎少有現(xiàn)成的庫可用。
? ? ? ?適用于進程間環(huán)形緩沖的IPC類型,常見的有共享內(nèi)存和文件。在這兩種方式上進行環(huán)形緩沖,通常都采用數(shù)組的方式實現(xiàn)。程序事先分配好一個固定長度的存儲空間,然后具體的讀寫操作、判斷“空”和“滿”、元素存儲等細節(jié)就可參照前面所說的來進行。
? ? ? ?共享內(nèi)存方式的性能很好,適用于數(shù)據(jù)流量很大的場景。但是有些語言(比如Java)對于共享內(nèi)存不支持。因此,該方式在多語言協(xié)同開發(fā)的系統(tǒng)中,會有一定的局限性。
? ? ? ? 而文件方式在編程語言方面支持很好,幾乎所有編程語言都支持操作文件。但它可能會受限于磁盤讀寫(Disk I/O)的性能。所以文件方式不太適合于快速數(shù)據(jù)傳輸;但是對于某些“數(shù)據(jù)單元”很大的場合,文件方式是值得考慮的。
? ? ? ? 對于進程間的環(huán)形緩沖區(qū),同樣要考慮好進程間的同步、互斥等問題。
8.生產(chǎn)者消費者模式三種實現(xiàn)方式代碼示例
8.1?synchronized、wait和notify ?
package producerConsumer; //wait 和 notify public class ProducerConsumerWithWaitNofity {public static void main(String[] args) {Resource resource = new Resource();//生產(chǎn)者線程ProducerThread p1 = new ProducerThread(resource);ProducerThread p2 = new ProducerThread(resource);ProducerThread p3 = new ProducerThread(resource);//消費者線程ConsumerThread c1 = new ConsumerThread(resource);//ConsumerThread c2 = new ConsumerThread(resource);//ConsumerThread c3 = new ConsumerThread(resource);p1.start();p2.start();p3.start();c1.start();//c2.start();//c3.start();}} /*** 公共資源類* @author **/ class Resource{//重要//當前資源數(shù)量private int num = 0;//資源池中允許存放的資源數(shù)目private int size = 10;/*** 從資源池中取走資源*/public synchronized void remove(){if(num > 0){num--;System.out.println("消費者" + Thread.currentThread().getName() +"消耗一件資源," + "當前線程池有" + num + "個");notifyAll();//通知生產(chǎn)者生產(chǎn)資源}else{try {//如果沒有資源,則消費者進入等待狀態(tài)wait();System.out.println("消費者" + Thread.currentThread().getName() + "線程進入等待狀態(tài)");} catch (InterruptedException e) {e.printStackTrace();}}}/*** 向資源池中添加資源*/public synchronized void add(){if(num < size){num++;System.out.println(Thread.currentThread().getName() + "生產(chǎn)一件資源,當前資源池有" + num + "個");//通知等待的消費者notifyAll();}else{//如果當前資源池中有10件資源try{wait();//生產(chǎn)者進入等待狀態(tài),并釋放鎖System.out.println(Thread.currentThread().getName()+"線程進入等待");}catch(InterruptedException e){e.printStackTrace();}}} } /*** 消費者線程*/ class ConsumerThread extends Thread{private Resource resource;public ConsumerThread(Resource resource){this.resource = resource;}@Overridepublic void run() {while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}resource.remove();}} } /*** 生產(chǎn)者線程*/ class ProducerThread extends Thread{private Resource resource;public ProducerThread(Resource resource){this.resource = resource;}@Overridepublic void run() {//不斷地生產(chǎn)資源while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}resource.add();}}}8.2?lock和condition的await、signalAll??
package producerConsumer;import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*** 使用Lock 和 Condition解決生產(chǎn)者消費者問題* @author tangzhijing**/ public class LockCondition {public static void main(String[] args) {Lock lock = new ReentrantLock();Condition producerCondition = lock.newCondition();Condition consumerCondition = lock.newCondition();Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);//生產(chǎn)者線程ProducerThread2 producer1 = new ProducerThread2(resource);//消費者線程ConsumerThread2 consumer1 = new ConsumerThread2(resource);ConsumerThread2 consumer2 = new ConsumerThread2(resource);ConsumerThread2 consumer3 = new ConsumerThread2(resource);producer1.start();consumer1.start();consumer2.start();consumer3.start();} } /*** 消費者線程*/ class ConsumerThread2 extends Thread{private Resource2 resource;public ConsumerThread2(Resource2 resource){this.resource = resource;//setName("消費者");}public void run(){while(true){try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource.remove();}} } /*** 生產(chǎn)者線程* @author tangzhijing**/ class ProducerThread2 extends Thread{private Resource2 resource;public ProducerThread2(Resource2 resource){this.resource = resource;setName("生產(chǎn)者");}public void run(){while(true){try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource.add();}} } /*** 公共資源類* @author tangzhijing**/ class Resource2{private int num = 0;//當前資源數(shù)量private int size = 10;//資源池中允許存放的資源數(shù)目private Lock lock;private Condition producerCondition;private Condition consumerCondition;public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {this.lock = lock;this.producerCondition = producerCondition;this.consumerCondition = consumerCondition;}/*** 向資源池中添加資源*/public void add(){lock.lock();try{if(num < size){num++;System.out.println(Thread.currentThread().getName() + "生產(chǎn)一件資源,當前資源池有" + num + "個");//喚醒等待的消費者consumerCondition.signalAll();}else{//讓生產(chǎn)者線程等待try {producerCondition.await();System.out.println(Thread.currentThread().getName() + "線程進入等待");} catch (InterruptedException e) {e.printStackTrace();}}}finally{lock.unlock();}}/*** 從資源池中取走資源*/public void remove(){lock.lock();try{if(num > 0){num--;System.out.println("消費者" + Thread.currentThread().getName() + "消耗一件資源," + "當前資源池有" + num + "個");producerCondition.signalAll();//喚醒等待的生產(chǎn)者}else{try {consumerCondition.await();System.out.println(Thread.currentThread().getName() + "線程進入等待");} catch (InterruptedException e) {e.printStackTrace();}//讓消費者等待}}finally{lock.unlock();}}}8.3?lock和condition的await、signalAll
package producerConsumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;//使用阻塞隊列BlockingQueue解決生產(chǎn)者消費者 public class BlockingQueueConsumerProducer {public static void main(String[] args) {Resource3 resource = new Resource3();//生產(chǎn)者線程ProducerThread3 p = new ProducerThread3(resource);//多個消費者ConsumerThread3 c1 = new ConsumerThread3(resource);ConsumerThread3 c2 = new ConsumerThread3(resource);ConsumerThread3 c3 = new ConsumerThread3(resource);p.start();c1.start();c2.start();c3.start();} } /*** 消費者線程* @author tangzhijing**/ class ConsumerThread3 extends Thread {private Resource3 resource3;public ConsumerThread3(Resource3 resource) {this.resource3 = resource;//setName("消費者");}public void run() {while (true) {try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource3.remove();}} } /*** 生產(chǎn)者線程* @author tangzhijing**/ class ProducerThread3 extends Thread{private Resource3 resource3;public ProducerThread3(Resource3 resource) {this.resource3 = resource;//setName("生產(chǎn)者");}public void run() {while (true) {try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource3.add();}} } class Resource3{private BlockingQueue resourceQueue = new LinkedBlockingQueue(10);/*** 向資源池中添加資源*/public void add(){try {resourceQueue.put(1);System.out.println("生產(chǎn)者" + Thread.currentThread().getName()+ "生產(chǎn)一件資源," + "當前資源池有" + resourceQueue.size() + "個資源");} catch (InterruptedException e) {e.printStackTrace();}}/*** 向資源池中移除資源*/public void remove(){try {resourceQueue.take();System.out.println("消費者" + Thread.currentThread().getName() + "消耗一件資源," + "當前資源池有" + resourceQueue.size() + "個資源");} catch (InterruptedException e) {e.printStackTrace();}} }參考文章:
1.https://blog.csdn.net/u011109589/article/details/80519863
2.https://www.cnblogs.com/chentingk/p/6497107.html
3.http://ifeve.com/producers-and-consumers-mode/(并發(fā)編程網(wǎng)--創(chuàng)始人:方騰飛)
4.https://www.cnblogs.com/fankongkong/p/7339848.html
總結(jié)
以上是生活随笔為你收集整理的消息队列:生产者/消费者模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机图像处理的未来发展,探讨计算机图像
- 下一篇: 半导体鼻祖:仙童半导体的故事