生产者消费者模型详解
生產者消費者模型
文章目錄
- 生產者消費者模型
- 什么是生產者消費者模型
- 基于BlockingQueue的生產者消費者模型
- 單生產者單消費者模型
- 多生產者多消費者模型
什么是生產者消費者模型
生產者消費者模式就是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。這個阻塞隊列就是用來給生產者和消費者解耦的。
我們首先舉個例子便于理解生產者消費者模型:
我們平常都會去超市買一些需要的物品,給我們提供產品的是供應商,我們是消費者,我們是去超市消費,給我們提供產品的而是供應商,為什么我們會去超市呢?因為超市給我們提供了交易場所,生產者消費者模型本質是生產者把產品放到超市里面,消費者把產品從超市中拿出去。
計算機中的數據相當于產品,數據的加工處理是CPU以任務形式加工處理,生產者和消費者代表的是多線程或者多進程,超市相當于一段內存
生活中生產者將產品放在一個交易場所,讓我們自己去拿,為什么這樣呢?是因為效率高,比如菜鳥驛站的場景,快遞公司把一批包裹放進菜鳥驛站,然后我們去拿就好了,提高了效率
廠商將交易場所當作緩沖區來用,生產者和消費者一般是進程和線程,空間或者交易場所:一塊"內存塊",產品相當于數據
生產者消費者321原則(自己總結:3種關系,2種角色。一個交易場所):
3種關系,生產者和生產者,生產者和消費者,消費者和消費者
生產者和生產者競爭關系(互斥),生產者和消費者(同步),消費者和消費者競爭關系(互斥)
2種角色,生產者和消費者
1個交易場所:一塊"內存塊"
int add(int a,int b) {return a+b; } int main() {int a = 10;int b = 20;int c = add(a,b); }方式一:
方式二:
第一種方式main函數生產a和b后調用add,main函數需要等待add函數執行完才能繼續工作,這種稱為強耦合,這種工作方式是串行,這種方式耦合度高,而第二種方式是線程1去執行main函數生成a和b,線程2去執行add函數,它們直接有一塊區域來給線程1放數據和給線程2取數據,當線程2在add的時候,線程1也可以放數據,這種方式稱為并行,也稱之為解耦
并行是一種需求,表示有很多業務活動同時進行。異步是一種代碼編寫方式,同步和串行:同步是一種代碼編寫方式,串行是一種需求
基于BlockingQueue的生產者消費者模型
阻塞隊列有什么用?
如果不把數據往阻塞隊列里面放,假設數據通過網絡給服務器用了10ms,服務器讓線程1把數據寫入數據庫用了5ms,一共15ms,用戶一共等待15ms,如果把數據往阻塞隊列里面放,因為阻塞隊列是在內存當中的,而數據庫是文件是需要進行IO的,放在阻塞隊列的時間要比放在數據庫的時間少一些,所以用戶等待的時間就少一些。
下面我們寫一個單生產者單消費者模型:
單生產者單消費者模型
Makefile的編寫:
main:main.ccg++ $^ -o $@ -lpthread .PHONY:clean clean:rm -f mainBlockQueue.hpp
#ifndef __BLOCK_QUEUE_H__ #define __BLOCK_QUEUE_H__ #include<iostream> #include<queue> #include<pthread.h> #include<unistd.h> class BlockQueue { private:std::queue<int> q;size_t _cap;pthread_mutex_t lock;pthread_cond_t c_cond;//消費者的條件不滿足時,將來消費者在該條件變量下等pthread_cond_t p_cond;//生產者的條件不滿足時,將來生產者在該條件下等 public:bool IsFull(){return q.size() >= _cap;}bool IsEmpty(){return q.empty();}void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void WakeUpComsumer(){pthread_cond_signal(&c_cond);}void WakeUpProductor(){pthread_cond_signal(&p_cond);}void ProducterWait(){pthread_cond_wait(&p_cond,&lock);//這里為什么要傳鎖,我們在等待時肯定是條件不滿足了,我們通過判斷才知道條件滿不滿足,//判斷就需要保證進入臨界區,我們是持有鎖進入的,wait的時候必須要釋放鎖//在調用該函數的時候,自動會釋放lock//當該函數被返回時,返回到了臨界區內,所以,該函數會讓該線程重新持有該鎖}void ComsumerWait(){pthread_cond_wait(&c_cond,&lock);//在消費者釋放鎖時,生產者正申請鎖,而消費者在等待} public:BlockQueue(size_t cap):_cap(cap){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&c_cond,nullptr);pthread_cond_init(&p_cond,nullptr);}void Put(int in){LockQueue();//if(isFull())while(isFull()){WakeUpComsumor();//喚醒消費者ProducterWait();//生產者等待}q.push(in);UnLockQueue();}void Get(int& out){LockQueue();//if(IsEmpty)while(IsEmpty()){WakeUpProductor();ComsumerWait();//消費者者等待}out = q.front();q.pop();LockQueue();}~BlockQueue(){pthread_cond_destroy(&lcok);pthread_cond_destroy(&c_cond);pthread_cond_destroy(&p_cond); } }; #endifmain.cc
#include"BlockQueue.hpp" using namespace std; void* consumer_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){int n = 0;bq->Get(n);cout<<"consumer data is : " << n <<endl;} } void* productor_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){int data = rand()%10+1;bq->Put(data);cout<<"product data is : "<<data<<endl;} } int main() {BlockQueue *bq = new BlockQueue(5);pthread_t c,p;pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0; }上面代碼有個細節不能用if判斷滿和空,萬一被提前喚醒或者等待函數調用失敗,它們會繼續執行push數據和pop數據,此時就是非法操作了
多生產者多消費者模型
上面是單生產者單消費者模型,只解決了生產者和消費者之間的關系,如果是多生產者多消費者模型那么需要添加另外兩種關系:還有生產者和生產者的關系(互斥),以及消費者和消費者的關系(互斥)
所以我們只需要在外面生產者和消費者的例程中放數據和取數據加鎖即可:
#include"BlockQueue.hpp" using namespace std; pthread_mutex_t c_lock; pthread_mutex_t p_lock;void* consumer_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){int n = 0;pthread_mutex_lock(&c_lock);bq->Get(n);pthread_mutex_unlock(&c_lock);cout<<"consumer data is : " << n <<endl;} } void* productor_run(void* arg) {BlockQueue *bq = (BlockQueue*)arg;while(true){pthread_mutex_lock(&p_lock);int data = rand()%10+1;bq->Put(data);pthread_mutex_unlock(&p_lock);cout<<"product data is : "<<data<<endl;} } int main() {BlockQueue *bq = new BlockQueue(5);pthread_t c,p;pthread_mutex_init(&c_lock,nullptr);pthread_mutex_init(&p_lock,nullptr);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&c,nullptr,consumer_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_create(&p,nullptr,productor_run,(void*)bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete(bq);pthread_mutex_destroy(&c_lock);pthread_mutex_destroy(&p_lock);return 0; }那么上面做的事情有什么用呢?下面我們來看下面這個代碼:
#ifndef __QUEUE_BLOCK_H__ #define __QUEUE_BLOCK_H__ #include<iostream> #include<queue> #include<pthread.h> #include<unistd.h> class Task { public:int _x;int _y; public:Task(){}Task(int x,int y):_x(x),_y(y){}int Run(){return _x+_y;}~Task(){} }; class BlockQueue { private:std::queue<Task> q;size_t _cap;pthread_mutex_t lock;pthread_cond_t c_cond;//消費者的條件不滿足時,將來消費者在該條件變量下等pthread_cond_t p_cond;//生產者的條件不滿足時,將來生產者在該條件下等 public:bool IsFull(){return q.size() >= _cap;}bool IsEmpty(){return q.empty();}void LockQueue(){pthread_mutex_lock(&lock);}void UnLockQueue(){pthread_mutex_unlock(&lock);}void WakeUpComsumer(){pthread_cond_signal(&c_cond);}void WakeUpProductor(){pthread_cond_signal(&p_cond);}void ProducterWait(){pthread_cond_wait(&p_cond,&lock);//這里為什么要傳鎖,我們在等待時肯定是條件不滿足了,我們通過判斷才知道條件滿不滿足,//判斷就需要保證進入臨界區,我們是持有鎖進入的,wait的時候必須要釋放鎖//在調用該函數的時候,自動會釋放lock//當該函數被返回時,返回到了臨界區內,所以,該函數會讓該線程重新持有該鎖}void ComsumerWait(){pthread_cond_wait(&c_cond,&lock);//在消費者釋放鎖時,生產者正申請鎖,而消費者在等待} public:BlockQueue(size_t cap):_cap(cap){pthread_mutex_init(&lock,nullptr);pthread_cond_init(&c_cond,nullptr);pthread_cond_init(&p_cond,nullptr);}void Put(Task t){LockQueue();//if(isFull())while(isFull()){WakeUpComsumor();//喚醒消費者ProducterWait();//生產者等待}q.push(t);UnLockQueue();}void Get(Task& t){LockQueue();//if(IsEmpty)while(IsEmpty()){WakeUpProductor();ComsumerWait();//消費者者等待}t = q.front();q.pop();UnLockQueue();}~BlockQueue(){pthread_cond_destroy(&lcok);pthread_cond_destroy(&c_cond);pthread_cond_destroy(&p_cond); } };#endif #include"BlockQueue.hpp"pthread_mutex_t c_lock; pthread_mutex_t p_lock; void* r1(void* arg) {//生產者BlockQueue* bq =(BlockQueue*)arg;while(true){pthread_mutex_lock(&p_lock);int x = rand()%10+1;int y = rand()%100+1;Task t(x,y);bq->Put(t);pthread_mutex_unlock(&p_lock);cout<<"Product Task is: "<<x<<'+'<<y<<"= ?"<<endl;} } void* r2(void* arg) {//消費者BlockQueue* bq = (BlockQueue*)arg;while(true){//取數據Task t;pthread_mutex_lock(&c_lock);bq->Get(t);pthread_mutex_unlock(&c_lock);cout<<"Consumer: "<<t._x<<"+"<<t._y<<"="<<t.Run()<<endl;} } int main() {BlockQueue* bq = new BlockQueue(5);pthread_t c,p;pthread_mutex_init(&p_lock,NULL);pthread_mutex_init(&c_lock,NULL);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&p,NULL,r1,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_create(&c,NULL,r2,(void*)bq);pthread_join(p,nullptr);pthread_join(c,nullptr);pthread_mutex_destroy(&c_lock);pthread_mutex_destroy(&p_lock);delete bq;return 0; }
這樣就做到了一些線程放數據,另一些線程幫我們計算數據,也就是完成相應的任務。
總結
以上是生活随笔為你收集整理的生产者消费者模型详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (转)中科院理论物理所考研…
- 下一篇: 读博期间的“自我”