日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

生产者/消费者模式(一)

發(fā)布時間:2025/3/15 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 生产者/消费者模式(一) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
生產(chǎn)者/消費(fèi)者模式(一)

  生產(chǎn)者消費(fèi)者問題是一個多線程同步問題的經(jīng)典案例,大多數(shù)多線程編程問題都是以生產(chǎn)者-消費(fèi)者模式為基礎(chǔ),擴(kuò)展衍生來的。在生產(chǎn)者消費(fèi)者模式中,緩沖區(qū)起到了連接兩個模塊的作用:生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū),而消費(fèi)者從緩沖區(qū)取出數(shù)據(jù),如下圖所示:

  

  可以看出Buffer緩沖區(qū)作為一個中介,將生產(chǎn)者和消費(fèi)者分開,使得兩部分相對獨(dú)立,生產(chǎn)者消費(fèi)者不需要知道對方的實現(xiàn)邏輯,對其中一個的修改,不會影響另一個,從設(shè)計模式的角度看,降低了耦合度。而對于圖中處在多線程環(huán)境中Buffer,需要共享給多個多個生產(chǎn)者和消費(fèi)者,為了保證讀寫數(shù)據(jù)和操作的正確性與時序性,程序需要對Buffer結(jié)構(gòu)進(jìn)行同步處理。通常情況下,生產(chǎn)者-消費(fèi)者模式中的廣泛使用的Buffer緩沖區(qū)結(jié)構(gòu)是阻塞隊列。

  阻塞隊列的特點(diǎn)為:“當(dāng)隊列是空的時,從隊列中獲取元素的操作將會被阻塞,或者當(dāng)隊列是滿時,往隊列里添加元素的操作會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻 塞,直到其他的線程往空的隊列插入新的元素。同樣,試圖往已滿的阻塞隊列中添加新元素的線程同樣也會被阻塞,直到其他的線程使隊列重新變得空閑起來,如從 隊列中移除一個或者多個元素,或者完全清空隊列。” 阻塞隊列的實現(xiàn)通常是對普通隊列進(jìn)行同步控制,封裝起來。一個linux下的通用隊列定義如下,代碼所示的隊列沒有為滿的情況,不設(shè)定隊列元素總數(shù)的上限:

1 template<class T> 2 class BlockQueue 3 { 4 public: 5 BlockQueue(); 6 ~BlockQueue(); 7 8 void Add(T *pData); 9 T *Get(int timeout=0); 10 int Count(); 11 void SetBlockFlag(); 12 private: 13 typedef struct _Node 14 { 15 T * m_pData; 16 struct _Node * m_pNext; 17 } Node; 18 Node * m_pHead; //隊列頭 19 Node * m_pTail; //隊列尾 20 int m_nCount; //隊列中元素個數(shù) 21 22 bool m_bBlockFlag; //是否阻塞 23 CCond m_condQueue; 24 }; 25 26 template<class T> 27 BlockQueue<T>::BlockQueue():m_pHead(NULL), m_pTail(NULL), m_nCount(0), m_bBlockFlag(false) 28 { 29 } 30 31 template<class T> 32 void BlockQueue<T>::SetBlockFlag() 33 { 34 m_bBlockFlag = true; 35 } 36 37 template<class T> 38 BlockQueue<T>::~BlockQueue() 39 { 40 Node *pTmp = NULL; 41 while (m_pHead != NULL) 42 { 43 pTmp = m_pHead; 44 m_pHead = m_pHead->m_pNext; 45 delete pTmp; 46 pTmp = NULL; 47 } 48 m_pTail = NULL; 49 } 50 51 template<class T> 52 void BlockQueue<T>::Add(T *pData) 53 { 54 (m_condQueue.GetMutex()).EnterMutex(); 55 56 Node *pTmp = new Node; 57 pTmp->m_pData = pData; 58 pTmp->m_pNext = NULL; 59 60 if (m_nCount == 0) 61 { 62 m_pHead = pTmp; 63 m_pTail = pTmp; 64 } 65 else 66 { 67 m_pTail->m_pNext = pTmp; 68 m_pTail = pTmp; 69 } 70 m_nCount++; 71 72 if (m_bBlockFlag) 73 { 74 m_condQueue.Signal(); 75 } 76 77 (m_condQueue.GetMutex()).LeaveMutex(); 78 } 79 80 template<class T> 81 T *BlockQueue<T>::Get(int timeout) 82 { 83 (m_condQueue.GetMutex()).EnterMutex(); 84 85 while (m_nCount == 0) 86 { 87 if (!m_bBlockFlag) 88 { 89 (m_condQueue.GetMutex()).LeaveMutex(); 90 return NULL; 91 } 92 else 93 { 94 if (m_condQueue.WaitNoLock(timeout) == 1) 95 { 96 (m_condQueue.GetMutex()).LeaveMutex(); 97 return NULL; 98 } 99 } 100 } 101 102 T * pTmpData = m_pHead->m_pData; 103 Node *pTmp = m_pHead; 104 105 m_pHead = m_pHead->m_pNext; 106 delete pTmp; 107 pTmp = NULL; 108 m_nCount--; 109 if (m_nCount == 0) 110 { 111 m_pTail = NULL; 112 } 113 114 (m_condQueue.GetMutex()).LeaveMutex(); 115 116 return pTmpData; 117 } 118 119 template<class T> 120 int BlockQueue<T>::Count() 121 { 122 (m_condQueue.GetMutex()).EnterMutex(); 123 int nCount = m_nCount; 124 (m_condQueue.GetMutex()).LeaveMutex(); 125 126 return nCount; 127 }

  隊列中使用了Cond條件變量,實現(xiàn)多線程環(huán)境中隊列的同步與阻塞:

  • 條件變量自己持有mutex,在向隊列中Add/Get元素時,mutex將隊列鎖住,同一時刻只允許一個線程對隊列進(jìn)行操作;
  • 消費(fèi)者從隊列中獲取數(shù)據(jù)時,如果使用SetBlockFlag函數(shù)設(shè)置了阻塞模式,那么當(dāng)隊列元素為空時,需要阻塞在條件變量上進(jìn)行等待,其他線程調(diào)用Add函數(shù),向隊列中添加元素后,喚醒阻塞在條件變量上的線程。注意這里用到的等待條件是while(count == 0)而不是if(count == 0),因為在多核處理器環(huán)境中,Signal喚醒操作可能會激活多于一個線程(阻塞在條件變量上的線程),使得多個調(diào)用等待的線程返回。所以用while循環(huán)對condition多次判斷,可以避免這種假喚醒。
  • 隊列元素為包含T類型指針的Node類型指針,析構(gòu)時只負(fù)責(zé)釋放隊列中的Node元素,而真正指向T類型數(shù)據(jù)的指針,需要調(diào)用者進(jìn)行分配和釋放,一般由生產(chǎn)者調(diào)用new分配,消費(fèi)者使用后調(diào)用delete釋放。這里需要特別注意,因為如果忘記釋放,會造成內(nèi)存泄露。

  這段代碼是我在多線程環(huán)境下,較為簡單的生產(chǎn)者-消費(fèi)者模式中,在通用的隊列模型進(jìn)行的修改。目前隊列在服務(wù)器中穩(wěn)定運(yùn)行,還未出現(xiàn)太大的性能問題,當(dāng)然這與應(yīng)用模塊邏輯較為簡單、交易量較小有很大關(guān)。此版本中,BlockQueue的Add函數(shù)每次調(diào)用都是指向Signal操作,喚醒阻塞在環(huán)境變量上線程,如果系統(tǒng)中有大量的寫線程,而讀線程的操作只有少數(shù),那么很多情況下Signal調(diào)用都是無意義的,系統(tǒng)將其忽略。如何使Signal操作變得有意義,即每次Signal操作都會喚醒阻塞線程,最初的想法是在m_nCount == 0時即隊列為空時進(jìn)行調(diào)用,根據(jù)這個想法修改Add函數(shù)如下:

1 template<class T> 2 void BlockQueue<T>::Add(T *pData) 3 { 4 (m_condQueue.GetMutex()).EnterMutex(); 5 6 Node *pTmp = new Node; 7 pTmp->m_pData = pData; 8 pTmp->m_pNext = NULL; 9 10 bool bEmpty = false; 11 if (m_nCount == 0) 12 { 13 m_pHead = pTmp; 14 m_pTail = pTmp; 15 bEmpty = true; 16 } 17 else 18 { 19 m_pTail->m_pNext = pTmp; 20 m_pTail = pTmp; 21 } 22 m_nCount++; 23 24 if (m_bBlockFlag && bEmpty) 25 { 26 m_condQueue.Signal(); 27 } 28 29 (m_condQueue.GetMutex()).LeaveMutex(); 30 }

  目前來看這樣處理沒什么邏輯問題,很快發(fā)現(xiàn)這樣做還是會有一些Signal操作時無意義的,比如當(dāng)讀線程讀取到隊列中最后一個元素時,m_nCount--,值為零,那么此時寫線程調(diào)用Add函數(shù),因為m_nCount==0,調(diào)用了Signal操作,而這時如果沒有讀線程在阻塞,那么這個操作仍然會被系統(tǒng)忽略。仔細(xì)想想,這里不能根據(jù)m_nCount的值來判斷是否有線程在等待,而應(yīng)該以是否有隊列阻塞來進(jìn)行Signal的調(diào)用。這里可以用一個計數(shù)器來表示當(dāng)前阻塞在Get操作上的線程個數(shù),只有當(dāng)它大于零時,才說明需要激活,根據(jù)這個條件,繼續(xù)修改代碼,下面只展示修改后與之前相比的差異代碼:

1 template<class T> 2 class BlockQueue 3 { 4 public: 5 /*與之前代碼相同*/ 6 private: 7 /*與之前代碼相同*/ 8 int m_nCount; 9 int m_nBlockCnt; //阻塞線程數(shù) 10 bool m_bBlockFlag; //是否阻塞 11 CCond m_condQueue; 12 }; 13 14 template<class T> 15 BlockQueue<T>::BlockQueue():m_pHead(NULL), m_pTail(NULL), m_nCount(0), m_nBlockCnt(0), m_bBlockFlag(false) 16 {} 17 18 template<class T> 19 void BlockQueue<T>::Add(T *pData) 20 { 21 (m_condQueue.GetMutex()).EnterMutex(); 22 23 Node *pTmp = new Node; 24 pTmp->m_pData = pData; 25 pTmp->m_pNext = NULL; 26 27 if (m_nCount == 0) 28 { 29 m_pHead = pTmp; 30 m_pTail = pTmp; 31 bEmpty = true; 32 } 33 else 34 { 35 m_pTail->m_pNext = pTmp; 36 m_pTail = pTmp; 37 } 38 m_nCount++; 39 40 if (m_bBlockFlag && m_nBlockCnt>0) //阻塞模式,并且有線程等待時調(diào)用Signal進(jìn)行喚醒 41 { 42 m_condQueue.Signal(); 43 } 44 45 (m_condQueue.GetMutex()).LeaveMutex(); 46 } 47 48 template<class T> 49 T *BlockQueue<T>::Get(int timeout) 50 { 51 (m_condQueue.GetMutex()).EnterMutex(); 52 53 while (m_nCount == 0) 54 { 55 if (!m_bBlockFlag) 56 { 57 (m_condQueue.GetMutex()).LeaveMutex(); 58 return NULL; 59 } 60 else 61 { 62 m_nBlockCnt++; //線程阻塞數(shù)增加 63 if (m_condQueue.WaitNoLock(timeout) == 1) 64 { 65 (m_condQueue.GetMutex()).LeaveMutex(); 66 m_nBlockCnt--; 67 return NULL; 68 } 69 m_nBlockCnt--; //喚醒后線程阻塞數(shù)減少 70 } 71 } 72 73 T * pTmpData = m_pHead->m_pData; 74 Node *pTmp = m_pHead; 75 76 m_pHead = m_pHead->m_pNext; 77 delete pTmp; 78 pTmp = NULL; 79 m_nCount--; 80 if (m_nCount == 0) 81 { 82 m_pTail = NULL; 83 } 84 85 (m_condQueue.GetMutex()).LeaveMutex(); 86 87 return pTmpData; 88 }

?  現(xiàn)在代碼已經(jīng)修改完成,繼續(xù)分析可以看出幾乎對每個函數(shù)的操作都進(jìn)行了加鎖保護(hù),臨界區(qū)從始到末,粒度較大;再者,隊列中持有的T對象指針,均是由調(diào)用者動態(tài)分配和釋放的,而內(nèi)部也是有鏈表實現(xiàn),Add和Get操作時,會調(diào)用鏈表元素的內(nèi)存分配和釋放。如此一來,在交易量很大的情況下,頻繁讀寫,不僅會導(dǎo)致加鎖解鎖的性能消耗,也會使系統(tǒng)產(chǎn)生大量內(nèi)存碎片。

  未完待續(xù)……

posted on 2014-11-18 16:21 Tourun 閱讀(...) 評論(...) 編輯 收藏

轉(zhuǎn)載于:https://www.cnblogs.com/Tour/p/4106085.html

總結(jié)

以上是生活随笔為你收集整理的生产者/消费者模式(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。