【Linux】生产者消费者模型
文章目錄
- 一. 什么是生產(chǎn)者消費(fèi)者模型
- 1. 基本概念
- 2. 三種關(guān)系
- 3. 再次理解生產(chǎn)者消費(fèi)者模型
- 二. 生產(chǎn)者消費(fèi)者模型的優(yōu)點(diǎn)
- 三. 基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
- 1. 準(zhǔn)備工作
- 2. 阻塞隊(duì)列實(shí)現(xiàn)
- 3. 測(cè)試阻塞隊(duì)列
- 4. 阻塞隊(duì)列完整代碼
- 5. 關(guān)于改進(jìn)阻塞隊(duì)列的幾點(diǎn)補(bǔ)充
- 5.1 多生產(chǎn)者多消費(fèi)者的設(shè)計(jì)
- 5.2 阻塞隊(duì)列所存儲(chǔ)數(shù)據(jù)可以是更復(fù)雜的任務(wù)
- 四. 基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
- 1. 基本規(guī)則
- 2. 環(huán)形隊(duì)列的實(shí)現(xiàn)
- 2.3 單生產(chǎn)者單消費(fèi)者
- 2.4 多生產(chǎn)者多消費(fèi)者
一. 什么是生產(chǎn)者消費(fèi)者模型
1. 基本概念
生產(chǎn)者消費(fèi)者模型就是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而是通過容器來進(jìn)行通訊,即生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給容器;消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從容器里取。
2. 三種關(guān)系
實(shí)際中,生產(chǎn)者可能有多個(gè),消費(fèi)者也可能有多個(gè),它們彼此之間要應(yīng)該滿足什么關(guān)系呢?
假設(shè)下面的情景:
- 每次一個(gè)生產(chǎn)者一次只能生產(chǎn)一個(gè)數(shù)據(jù),
- 每次一個(gè)消費(fèi)者一次只能消費(fèi)一個(gè)數(shù)據(jù)
- 唯一的容器容器每次只容許一個(gè)生產(chǎn)者push數(shù)據(jù)或一個(gè)消費(fèi)者pop數(shù)據(jù)。
在滿足上面的情景下,可以推測(cè)生產(chǎn)者、消費(fèi)者彼此之間的關(guān)系:
- [生產(chǎn)者和生產(chǎn)者]:互斥與同步關(guān)系?;コ怏w現(xiàn)在所有生產(chǎn)者競(jìng)爭(zhēng),只有一個(gè)能去容器pop數(shù)據(jù)。同步的話要保證每一個(gè)生產(chǎn)者都有機(jī)會(huì)到容器中pop數(shù)據(jù)。
- [消費(fèi)者和消費(fèi)者]:互斥與同步關(guān)系。互斥體現(xiàn)在所有消費(fèi)者競(jìng)爭(zhēng),只有一個(gè)能去容器push數(shù)據(jù)。同步要求每一個(gè)消費(fèi)者都有機(jī)會(huì)去容器中push數(shù)據(jù)。
- [生產(chǎn)者和消費(fèi)者]:互斥與同步關(guān)系?;コ怏w現(xiàn)在二者只有一個(gè)能先訪問容器,這時(shí)另外一個(gè)只能阻塞等待。同步體現(xiàn)在容器不能永遠(yuǎn)只是生產(chǎn)者在push或消費(fèi)者在pop,生產(chǎn)者生產(chǎn)了一些數(shù)據(jù)后要告知消費(fèi)者來消費(fèi),反之亦然。
3. 再次理解生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型的核心思想在于:眾多的生產(chǎn)者和眾多的消費(fèi)者通過唯一的容器進(jìn)行數(shù)據(jù)交互,在交互的同時(shí)必須維護(hù)好彼此之間的互斥與同步的關(guān)系。
二. 生產(chǎn)者消費(fèi)者模型的優(yōu)點(diǎn)
容器就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的數(shù)據(jù)處理能力。這個(gè)容器就是用來給生產(chǎn)者和消費(fèi)者解耦的。假如只是一對(duì)一的生產(chǎn)和消費(fèi),快的那方必須等待慢的那方才能完成一次交易,然后繼續(xù)下一組;而如果它們之間有一個(gè)容器可以存儲(chǔ)數(shù)據(jù),其中一個(gè)生產(chǎn)者把數(shù)據(jù)push到容器后不用等消費(fèi)者,下一個(gè)生產(chǎn)者繼續(xù)往容器里push數(shù)據(jù),也就是說在容器滿之前生產(chǎn)者可以一直連續(xù)的生產(chǎn)數(shù)據(jù),消費(fèi)者也是一樣的道理。
即通過容器使生產(chǎn)者和消費(fèi)者解耦,提高了它們數(shù)據(jù)交互的效率。
三. 基于BlockingQueue的生產(chǎn)者消費(fèi)者模型
在多線程編程中阻塞隊(duì)列(Blocking Queue)是一種常用于實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型的數(shù)據(jù)結(jié)構(gòu),它有如下如下幾個(gè)特點(diǎn):
- 眾多生產(chǎn)者中先內(nèi)部競(jìng)爭(zhēng)出一個(gè)生產(chǎn)者,去阻塞隊(duì)列中生產(chǎn)一個(gè)數(shù)據(jù),完成之后重新內(nèi)部競(jìng)爭(zhēng)。
- 眾多消費(fèi)者中也是內(nèi)部競(jìng)爭(zhēng)出一個(gè)消費(fèi)者,去阻塞隊(duì)列里拿取一個(gè)數(shù)據(jù),拿到后重新內(nèi)部競(jìng)爭(zhēng)。
- 每次只能有一個(gè)線程操作隊(duì)列,要么是消費(fèi)者pop,要么是生產(chǎn)push。
- 當(dāng)隊(duì)列為空時(shí),消費(fèi)者通知生產(chǎn)者來生產(chǎn)數(shù)據(jù),然后自己會(huì)被阻塞等待,直到合適的時(shí)候生產(chǎn)者把它喚醒過來提醒它消費(fèi);當(dāng)隊(duì)列滿時(shí),生產(chǎn)者通知消費(fèi)者過來拿取數(shù)據(jù),然后自己被阻塞等待,直到消費(fèi)者把它喚醒,叫它繼續(xù)生產(chǎn)。
1. 準(zhǔn)備工作
從最簡(jiǎn)單的開始設(shè)計(jì),只有一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者,創(chuàng)建兩個(gè)線程代表它們,后續(xù)它們將在自己的控制流中完成相應(yīng)的生產(chǎn)和消費(fèi)任務(wù);至于它們進(jìn)行數(shù)據(jù)交互的容器,使用STL的容器適配器queue即可,交互的數(shù)據(jù)類型為整數(shù)。
在主線程中創(chuàng)建好生產(chǎn)者、消費(fèi)者線程還有阻塞隊(duì)列:
int main() { srand((unsigned int)time(nullptr)); // 1、new一個(gè)阻塞隊(duì)列 BlockQueue<int>* p = new BlockQueue<int>; // 2、創(chuàng)建兩個(gè)新線程,分別代表生產(chǎn)者和消費(fèi)者 pthread_t pro, con; pthread_create(&pro, nullptr, ProducerAction, p); pthread_create(&con, nullptr, ConsumerAction, p); // 3、主線程等待它們完成任務(wù)后負(fù)責(zé)銷毀阻塞隊(duì)列 pthread_join(pro, nullptr); pthread_join(pro, nullptr); delete p; return 0; }2. 阻塞隊(duì)列實(shí)現(xiàn)
基本框架
阻塞隊(duì)列中包含4個(gè)成員變量:
- _q,一個(gè)普通隊(duì)列,用來存儲(chǔ)數(shù)據(jù)。
- _capacity,阻塞隊(duì)列的容量,默認(rèn)可以存5個(gè)數(shù)據(jù)。
- full,一個(gè)條件變量。當(dāng)阻塞隊(duì)列滿時(shí)生產(chǎn)者在該條件下等待。
- empty,一個(gè)條件變量。當(dāng)阻塞隊(duì)列空時(shí)消費(fèi)者在該條件下等待。
- mutex,一把互斥鎖。保證所有時(shí)間內(nèi)只有一個(gè)線程能操作隊(duì)列。
構(gòu)造函數(shù)負(fù)責(zé)初始化兩個(gè)條件變量和鎖,析構(gòu)函數(shù)負(fù)責(zé)銷毀它們:
template<class T> class BlockQueue { public:// 構(gòu)造函數(shù)BlockQueue(size_t capcity = 5):_capacity(capcity){pthread_cond_init(&full, nullptr);pthread_cond_init(&empty, nullptr);pthread_mutex_init(&mutex, nullptr);}// 析構(gòu)函數(shù)~BlockQueue(){pthread_cond_destroy(&full);pthread_cond_destroy(&empty);pthread_mutex_destroy(&mutex);}// 生產(chǎn)者插入數(shù)據(jù)void PushData(T data){};// 消費(fèi)者刪除數(shù)據(jù)void PopData(T& data);private: // 判斷阻塞隊(duì)列是否為空bool IsFull() { return _q.size() >= _capacity; } // 判斷阻塞隊(duì)列是否為滿bool IsEmpty() { return _q.empty(); } queue<T> _q; size_t _capacity; pthread_cond_t full; pthread_cond_t empty; pthread_mutex_t mutex; };生產(chǎn)者生產(chǎn)數(shù)據(jù)
成員函數(shù)void PushData(T data)由生產(chǎn)者調(diào)用,功能是插入一個(gè)數(shù)據(jù)到阻塞隊(duì)列中,下面是該函數(shù)的幾點(diǎn)說明:
- 該函數(shù)一進(jìn)來就要申請(qǐng)鎖,最后插入完成釋放鎖。
- 插入數(shù)據(jù)之前要檢查阻塞隊(duì)列是否滿了,如果滿了就要需要通知消費(fèi)者來消費(fèi),然后自己在full條件下等待。
消費(fèi)者拿取數(shù)據(jù)
消費(fèi)者可以調(diào)用阻塞隊(duì)列里的成員函數(shù)void PopData(T& data)拿走一個(gè)阻塞隊(duì)列里的數(shù)據(jù),下面是該函數(shù)的幾點(diǎn)說明:
- 消費(fèi)者調(diào)用時(shí)需要傳入一個(gè)輸出型參數(shù)。阻塞隊(duì)列會(huì)把隊(duì)頭數(shù)據(jù)內(nèi)容寫入到輸出型參數(shù)的內(nèi)存空間中。
- 進(jìn)來的第一步先申請(qǐng)鎖,拿走數(shù)據(jù)后釋放鎖。
- 拿取數(shù)據(jù)之前要檢查阻塞隊(duì)列是否為空,為空的話要通知生產(chǎn)者進(jìn)行生產(chǎn),然后自己在empty條件下等待。
關(guān)于阻塞隊(duì)列生產(chǎn)、拿取數(shù)據(jù)操作的幾個(gè)問題
問題一:判斷阻塞隊(duì)列空滿時(shí)為什么要用while循環(huán),而不用if判斷語句?
拿生產(chǎn)者來說,它在插入前隊(duì)列已經(jīng)滿了,如果用if判斷語句的話,在if里面要執(zhí)行pthread_cond_wait()等待條件full滿足,當(dāng)這個(gè)生產(chǎn)者被喚醒后執(zhí)行if外面的push插入數(shù)據(jù)。但是如果pthread_cond_wait()等待出錯(cuò)了,直接退出if語句會(huì)繼續(xù)往下執(zhí)行push操作,導(dǎo)致本來已經(jīng)滿了的隊(duì)列多插入了一個(gè)數(shù)據(jù);如果我們用while循環(huán)的話,即使等待出錯(cuò)了,這時(shí)還會(huì)重新回去判斷隊(duì)列是否滿了,這樣可以避免隊(duì)列數(shù)據(jù)出錯(cuò)的問題。
問題二:判空和判滿邏輯中,能不能先等待再喚醒?
答案是不行的,首先對(duì)于訪問阻塞隊(duì)列的鎖mutex,生產(chǎn)者和消費(fèi)者是共同競(jìng)爭(zhēng)的,如果這個(gè)線程先等待的話鎖被釋放了,但是它不會(huì)繼續(xù)往下執(zhí)行喚醒另一個(gè)線程的操作了(因?yàn)檫@個(gè)線程自己也在等待被對(duì)方喚醒),最后導(dǎo)致鎖沒人申請(qǐng),線程都等待各自的條件下死等待。
正確的邏輯是先喚醒對(duì)方,然后自己在對(duì)應(yīng)的條件變量下等待;后面等到條件成熟時(shí)對(duì)方把自己?jiǎn)拘选<次覀冊(cè)谠O(shè)計(jì)條件變量時(shí)要注意:條件變量在等待被喚醒時(shí)需要重新對(duì)條件進(jìn)行判斷,是否條件滿足。
3. 測(cè)試阻塞隊(duì)列
下面是生產(chǎn)者線程的控制流,由于只有一個(gè)生產(chǎn)者所以不用在其控制流中加鎖和引入條件變量來維護(hù)生產(chǎn)者和生產(chǎn)者之間的同步與互斥關(guān)系。
我們讓生產(chǎn)者每隔一秒生產(chǎn)一個(gè)數(shù)據(jù):
void* ProducerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = rand()%100+1; p->PushData(data); cout<<"[producer] push data:"<<data<<endl;sleep(1); } }消費(fèi)者每隔一秒拿取一個(gè)數(shù)據(jù):
void* ConsumerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = 0; p->PopData(data); cout<<"[consumer] get data:"<<data<<endl; sleep(1); } }下面是main.cpp的全部代碼:
// 包含所有需要的頭文件和阻塞隊(duì)列的定義 #include "blockqueue.h" // 生產(chǎn)者線程控制流 void* ProducerAction(void* arg) {BlockQueue<int>* p = (BlockQueue<int>*)arg;while(true){int data = rand()%100+1;p->PushData(data);cout<<"[producer] push data:"<<data<<endl;} }// 消費(fèi)者線程控制流 void* ConsumerAction(void* arg) {BlockQueue<int>* p = (BlockQueue<int>*)arg;while(true){int data = 0;p->PopData(data);cout<<"[consumer] get data:"<<data<<endl;sleep(2);} }int main() {srand((unsigned int)time(nullptr));// 1、new一個(gè)阻塞隊(duì)列BlockQueue<int>* p = new BlockQueue<int>;// 2、創(chuàng)建兩個(gè)新線程,分別代表生產(chǎn)者和消費(fèi)者pthread_t pro, con;pthread_create(&pro, nullptr, ProducerAction, p);pthread_create(&con, nullptr, ConsumerAction, p);// 3、主線程等待它們完成任務(wù)后負(fù)責(zé)銷毀阻塞隊(duì)列pthread_join(pro, nullptr);pthread_join(pro, nullptr);delete p;return 0; }編譯運(yùn)行,發(fā)現(xiàn)每生產(chǎn)一個(gè)數(shù)據(jù)馬上又被消費(fèi)者拿走了,這種情況隊(duì)列永遠(yuǎn)都不會(huì)滿:
另外由于我們是先創(chuàng)建生產(chǎn)者線程,再創(chuàng)建消費(fèi)者線程。所以是生產(chǎn)者先生產(chǎn),消費(fèi)者后消費(fèi)。
如果我們先創(chuàng)建消費(fèi)者線程的話,消費(fèi)者線程先拿到隊(duì)列鎖,正欲拿取數(shù)據(jù)時(shí)發(fā)現(xiàn)隊(duì)列為空,然后自己會(huì)在條件empty下阻塞掛起并且釋放操作隊(duì)列的鎖mutex(注意,如果有多個(gè)消費(fèi)者的話,它們是沒有機(jī)會(huì)搶這把鎖的,因?yàn)樗鼈冊(cè)趽尣僮麝?duì)列的這個(gè)鎖之前必須要獲得內(nèi)部競(jìng)爭(zhēng)的鎖);等到生產(chǎn)者線程輪流生產(chǎn)完所有數(shù)據(jù)之后,最后一個(gè)生產(chǎn)者發(fā)現(xiàn)隊(duì)列已經(jīng)滿了就會(huì)喚醒被一開始被阻塞掛起的消費(fèi)者來消費(fèi);在所有消費(fèi)者線程拿走完隊(duì)列數(shù)據(jù)之前,這個(gè)生產(chǎn)者需要一直阻塞等待:
我們先創(chuàng)建消費(fèi)者線程,消費(fèi)者發(fā)現(xiàn)隊(duì)列為空后輸出“queue is empty”,然后阻塞掛起等待生產(chǎn)者生產(chǎn)完所有數(shù)據(jù)后喚醒這個(gè)消費(fèi)者線程:
4. 阻塞隊(duì)列完整代碼
分兩個(gè)文件:
- 頭文件blockqueue.h里包含阻塞隊(duì)列的聲明。
- main.cpp:負(fù)責(zé)創(chuàng)建生產(chǎn)者、消費(fèi)者線程并聲明它們的執(zhí)行邏輯。
blockqueue.h
#pragma once #include <queue> #include <unistd.h> #include <stdlib.h> #include <iostream> #include <pthread.h> using namespace std; template<class T> class BlockQueue { public: BlockQueue(size_t capcity = 5) :_capacity(capcity) { pthread_cond_init(&full, nullptr); pthread_cond_init(&empty, nullptr); pthread_mutex_init(&mutex, nullptr); } ~BlockQueue() { pthread_cond_destroy(&full); pthread_cond_destroy(&empty); pthread_mutex_destroy(&mutex);}void PushData(T data) {pthread_mutex_lock(&mutex);while(IsFull()){cout<<"queue is full"<<endl;pthread_cond_signal(&empty);pthread_cond_wait(&full, &mutex);}_q.push(data);pthread_mutex_unlock(&mutex);}void PopData(T& data){pthread_mutex_lock(&mutex);while(IsEmpty()){cout<<"queue is empty"<<endl;pthread_cond_signal(&full);pthread_cond_wait(&empty, &mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&mutex);}private:bool IsFull(){return _q.size() >= _capacity;}bool IsEmpty(){return _q.empty();}queue<T> _q; size_t _capacity;pthread_cond_t full;pthread_cond_t empty;pthread_mutex_t mutex; };main.cpp
#include "blockqueue.h" void* ProducerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = rand()%100+1; p->PushData(data); cout<<"[producer] push data:"<<data<<endl; sleep(1); } } void* ConsumerAction(void* arg) { BlockQueue<int>* p = (BlockQueue<int>*)arg; while(true) { int data = 0; p->PopData(data); cout<<"[consumer] get data:"<<data<<endl; sleep(1); } } int main() {srand((unsigned int)time(nullptr));// 1、new一個(gè)阻塞隊(duì)列BlockQueue<int>* p = new BlockQueue<int>;// 2、創(chuàng)建兩個(gè)新線程,分別代表生產(chǎn)者和消費(fèi)者pthread_t pro, con;pthread_create(&pro, nullptr, ProducerAction, p);pthread_create(&con, nullptr, ConsumerAction, p);// 3、主線程等待它們完成任務(wù)后負(fù)責(zé)銷毀阻塞隊(duì)列pthread_join(pro, nullptr);pthread_join(pro, nullptr);delete p;return 0; }5. 關(guān)于改進(jìn)阻塞隊(duì)列的幾點(diǎn)補(bǔ)充
5.1 多生產(chǎn)者多消費(fèi)者的設(shè)計(jì)
只有一個(gè)生產(chǎn)者和只有一個(gè)消費(fèi)者的情況,只需在阻塞隊(duì)列push和pop時(shí)維護(hù)生產(chǎn)者和消費(fèi)者的同步與互斥關(guān)系即可。如果有多個(gè)生產(chǎn)者和消費(fèi)者的話需要在它們各自的控制流中加不同鎖和不同的條件變量,確保每次只有一個(gè)消費(fèi)者和一個(gè)生產(chǎn)者能去操作隊(duì)列。
5.2 阻塞隊(duì)列所存儲(chǔ)數(shù)據(jù)可以是更復(fù)雜的任務(wù)
阻塞隊(duì)列不僅僅可以存簡(jiǎn)單的整型數(shù)字,還可以是復(fù)雜任務(wù)的結(jié)構(gòu)體指針,這樣生產(chǎn)者派發(fā)任務(wù),消費(fèi)者拿到后解決里面的任務(wù)。比如生產(chǎn)者派發(fā)用戶輸入的賬號(hào)密碼,消費(fèi)者拿到后負(fù)責(zé)把賬號(hào)密碼傳輸?shù)綌?shù)據(jù)庫中。
四. 基于環(huán)形隊(duì)列的生產(chǎn)者消費(fèi)者模型
1. 基本規(guī)則
2. 環(huán)形隊(duì)列的實(shí)現(xiàn)
成員變量說明:
- 這里用一個(gè)數(shù)組來模擬環(huán)形隊(duì)列,因?yàn)樯a(chǎn)者和消費(fèi)者要并發(fā)執(zhí)行且不能同時(shí)操作相同位置的數(shù)據(jù),剛好數(shù)組可以通過下標(biāo)隨機(jī)訪問數(shù)據(jù),所以這里我們選用數(shù)組。
- 定義了兩個(gè)無符號(hào)整型對(duì)象_proPos和_cusPos分別指向生產(chǎn)者要生產(chǎn)數(shù)據(jù)的格子下標(biāo)和消費(fèi)者要拿取數(shù)據(jù)的位置下標(biāo)。
- 還定義了_proSem和_cusSem兩個(gè)信號(hào)量對(duì)象,分別記錄著環(huán)形隊(duì)列中格子數(shù)量和以生產(chǎn)數(shù)據(jù)個(gè)數(shù)。
- 最后還有必要記錄環(huán)形隊(duì)列的容量大小,可以用它來取模更新_proPos和_cusPos的值。
成員函數(shù)說明:
- 這里特意封裝了信號(hào)量的PV操作,只需把信號(hào)量對(duì)象作為參數(shù)傳入就能完成信號(hào)量的申請(qǐng)、釋放操作。
- 生產(chǎn)者執(zhí)行Push()操作生產(chǎn)數(shù)據(jù)時(shí),需要先申請(qǐng)(減一)_proSem信號(hào)量,生產(chǎn)完成后釋放(加一)_cusPos信號(hào)量,讓消費(fèi)者來消費(fèi)。反之亦然
2.3 單生產(chǎn)者單消費(fèi)者
在主線程中創(chuàng)建兩個(gè)新線程分別代表生產(chǎn)者和消費(fèi)者,消費(fèi)者每隔一秒地從環(huán)形隊(duì)列中拿取數(shù)據(jù),生產(chǎn)者每隔一秒生產(chǎn)一個(gè)數(shù)據(jù):
// 基于環(huán)形隊(duì)列的單生產(chǎn)者單消費(fèi)者模型 #include "RingQueue.h"// 消費(fèi)者線程執(zhí)行的操作 void* Customer(void* arg) {RingQueue<int>* q = (RingQueue<int>*)arg;while(true){sleep(1);int getData;q->Pop(getData);cout<<"[Customer] pop data:"<<getData<<endl;} }// 生產(chǎn)者線程執(zhí)行的操作 void* Producer(void* arg) {RingQueue<int>* q = (RingQueue<int>*)arg;while(true){sleep(1);int putData = (rand()%100) + 1;q->Push(putData);cout<<"[Producer] push data:"<<putData<<endl;} }int main() { // 1、制造隨機(jī)數(shù)種子,作為生產(chǎn)者push到環(huán)形隊(duì)列當(dāng)中的數(shù)據(jù) srand((size_t)time(nullptr)); // 2、new一個(gè)環(huán)形隊(duì)列 RingQueue<int>* q = new RingQueue<int>; // 3、分別創(chuàng)建、等待一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者 pthread_t tid1, tid2; pthread_create(&tid1, nullptr, Customer, (void*)q); pthread_create(&tid2, nullptr, Producer, (void*)q); pthread_join(tid1, nullptr); pthread_join(tid2, nullptr); // 4、最后delete環(huán)形隊(duì)列 delete q; return 0; }編譯運(yùn)行,由于_proSem初始值為0,一開始沒有數(shù)據(jù)生產(chǎn)者線程要掛起等待,消費(fèi)者生產(chǎn)一個(gè)數(shù)據(jù),生產(chǎn)者就拿取一個(gè)數(shù)據(jù):
接下來我們讓生產(chǎn)者生產(chǎn)得快,消費(fèi)者消費(fèi)的慢:
編譯運(yùn)行,發(fā)現(xiàn)生產(chǎn)者生產(chǎn)的數(shù)據(jù)瞬間把隊(duì)列填滿了,接下來消費(fèi)者拿走一個(gè)數(shù)據(jù),生產(chǎn)者再生產(chǎn)一個(gè)數(shù)據(jù),二者串行執(zhí)行:
如果消費(fèi)者消費(fèi)得快,生產(chǎn)者生產(chǎn)得慢的話,可以推測(cè)結(jié)果是生產(chǎn)者生產(chǎn)完一個(gè)數(shù)據(jù),消費(fèi)者馬上就拿走,然后繼續(xù)等待生產(chǎn)者生產(chǎn)數(shù)據(jù),這個(gè)就不在做演示了。
2.4 多生產(chǎn)者多消費(fèi)者
這次我們?cè)谥骶€程中分別新建三個(gè)生產(chǎn)者線程、三個(gè)消費(fèi)者線程。生產(chǎn)者之間競(jìng)爭(zhēng)proLock這把鎖,消費(fèi)者之間競(jìng)爭(zhēng)cusLock這把鎖,競(jìng)爭(zhēng)到鎖的線程才能去生產(chǎn)或拿取數(shù)據(jù),它們完成一次操作后釋放鎖,然后重新內(nèi)部競(jìng)爭(zhēng):
// 基于環(huán)形隊(duì)列的多生產(chǎn)者多消費(fèi)者模型 #include "RingQueue.h"// 構(gòu)造兩個(gè)全局互斥鎖對(duì)象,分別用于所有生產(chǎn)者和所有消費(fèi)者線程 pthread_mutex_t cusLock; pthread_mutex_t proLock;// new一個(gè)存儲(chǔ)整數(shù)的全局環(huán)形隊(duì)列 RingQueue<int>* q = new RingQueue<int>;// 消費(fèi)者線程執(zhí)行的操作 void* Customer(void* arg) {while(true){size_t id = (size_t)arg;int getData;pthread_mutex_lock(&cusLock);q->Pop(getData); pthread_mutex_unlock(&cusLock); cout<<'['<<"Customer "<<id<<']'<<" Pop data:"<<getData<<endl;sleep(1);} }// 生產(chǎn)者線程執(zhí)行的操作 void* Producer(void* arg) {size_t id = (size_t)arg;while(true){int putData = (rand()%100) + 1;pthread_mutex_lock(&proLock);q->Push(putData);pthread_mutex_unlock(&proLock);cout<<'['<<"Producer "<<id<<']'<<" push data "<<putData<<endl;sleep(1);} }int main() {// 1、初始化兩把全局互斥鎖pthread_mutex_init(&cusLock, nullptr);pthread_mutex_init(&proLock, nullptr);// 2、創(chuàng)造種子,用于生產(chǎn)隨機(jī)數(shù)據(jù)插入到環(huán)形隊(duì)列中srand((size_t)time(nullptr));// 3、分別新建三個(gè)生產(chǎn)者、消費(fèi)者線程pthread_t cusTids[3];pthread_t proTids[3];for(size_t i = 0; i < 3; ++i){pthread_create(&cusTids[i], nullptr, Customer, (void*)(i+1));}for(size_t i = 0; i < 3; ++i){pthread_create(&proTids[i], nullptr, Producer, (void*)(i+1)); }// 4、分別等待三個(gè)生產(chǎn)者、消費(fèi)者線程for(size_t i = 0; i < 3; ++i){pthread_join(cusTids[i], nullptr);}for(size_t i = 0; i < 3; ++i){pthread_join(proTids[i], nullptr);}// 5、等待完成后delete環(huán)形隊(duì)列并銷毀互斥鎖對(duì)象delete q;pthread_mutex_destroy(&cusLock);pthread_mutex_destroy(&proLock);return 0; }編譯運(yùn)行,生產(chǎn)和消費(fèi)操作并發(fā)執(zhí)行:
總結(jié)
以上是生活随笔為你收集整理的【Linux】生产者消费者模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【STM32】stm32独立看门狗(IW
- 下一篇: xilinx linux内核,Xilin