Linux下设计并发队列
設計并發(fā)隊列
#include <pthread.h> #include <list> using namespace std;template <typename T> class Queue { public: Queue( ) { pthread_mutex_init(&_lock, NULL); } ~Queue( ) { pthread_mutex_destroy(&_lock);} void push(const T& data);T pop( ); private: list<T> _list; pthread_mutex_t _lock; };template <typename T> void Queue<T>::push(const T& value ) { pthread_mutex_lock(&_lock);_list.push_back(value);pthread_mutex_unlock(&_lock); }template <typename T> T Queue<T>::pop( ) { if (_list.empty( )) { throw "element not found";}pthread_mutex_lock(&_lock); T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_lock);return _temp; }上述代碼是有效的。但是,請考慮這樣的情況:您有一個很長的隊列(可能包含超過 100,000 個元素),而且在代碼執(zhí)行期間的某個時候,從隊列中讀取數(shù)據(jù)的線程遠遠多于添加數(shù)據(jù)的線程。因為添加和取出數(shù)據(jù)操作使用相同的互斥鎖,所以讀取數(shù)據(jù)的速度會影響寫數(shù)據(jù)的線程訪問鎖。那么,使用兩個鎖怎么樣?一個鎖用于讀取操作,另一個用于寫操作。給出修改后的 Queue 類。
template <typename T> class Queue { public: Queue( ) { pthread_mutex_init(&_rlock, NULL); pthread_mutex_init(&_wlock, NULL);} ~Queue( ) { pthread_mutex_destroy(&_rlock);pthread_mutex_destroy(&_wlock);} void push(const T& data);T pop( ); private: list<T> _list; pthread_mutex_t _rlock, _wlock; };template <typename T> void Queue<T>::push(const T& value ) { pthread_mutex_lock(&_wlock);_list.push_back(value);pthread_mutex_unlock(&_wlock); }template <typename T> T Queue<T>::pop( ) { if (_list.empty( )) { throw "element not found";}pthread_mutex_lock(&_rlock);T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_rlock);return _temp; }設計并發(fā)阻塞隊列
目前,如果讀線程試圖從沒有數(shù)據(jù)的隊列讀取數(shù)據(jù),僅僅會拋出異常并繼續(xù)執(zhí)行。但是,這種做法不總是我們想要的,讀線程很可能希望等待(即阻塞自身),直到有數(shù)據(jù)可用時為止。這種隊列稱為阻塞的隊列。如何讓讀線程在發(fā)現(xiàn)隊列是空的之后等待?一種做法是定期輪詢隊列。但是,因為這種做法不保證隊列中有數(shù)據(jù)可用,它可能會導致浪費大量 CPU 周期。推薦的方法是使用條件變量,即 pthread_cond_t 類型的變量。
template <typename T> class BlockingQueue { public: BlockingQueue ( ) { pthread_mutexattr_init(&_attr); // set lock recursivepthread_mutexattr_settype(&_attr,PTHREAD_MUTEX_RECURSIVE_NP); pthread_mutex_init(&_lock,&_attr);pthread_cond_init(&_cond, NULL);} ~BlockingQueue ( ) { pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_cond);} void push(const T& data);bool push(const T& data, const int seconds); //time-out push T pop( );T pop(const int seconds); // time-out popprivate: list<T> _list; pthread_mutex_t _lock;pthread_mutexattr_t _attr;pthread_cond_t _cond; };template <typename T> T BlockingQueue<T>::pop( ) { pthread_mutex_lock(&_lock);while (_list.empty( )) { pthread_cond_wait(&_cond, &_lock) ;}T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_lock);return _temp; }template <typename T> void BlockingQueue <T>::push(const T& value ) { pthread_mutex_lock(&_lock);const bool was_empty = _list.empty( );_list.push_back(value);pthread_mutex_unlock(&_lock);if (was_empty) pthread_cond_broadcast(&_cond); }并發(fā)阻塞隊列設計有兩個要注意的方面:
1.可以不使用 pthread_cond_broadcast,而是使用 pthread_cond_signal。但是,pthread_cond_signal 會釋放至少一個等待條件變量的線程,這個線程不一定是等待時間最長的讀線程。盡管使用 pthread_cond_signal 不會損害阻塞隊列的功能,但是這可能會導致某些讀線程的等待時間過長。
2.可能會出現(xiàn)虛假的線程喚醒。因此,在喚醒讀線程之后,要確認列表非空,然后再繼續(xù)處理。強烈建議使用基于 while 循環(huán)的 pop()。
設計有超時限制的并發(fā)阻塞隊列
在許多系統(tǒng)中,如果無法在特定的時間段內處理新數(shù)據(jù),就根本不處理數(shù)據(jù)了。例如,新聞頻道的自動收報機顯示來自金融交易所的實時股票行情,它每 n 秒收到一次新數(shù)據(jù)。如果在 n 秒內無法處理以前的一些數(shù)據(jù),就應該丟棄這些數(shù)據(jù)并顯示最新的信息。根據(jù)這個概念,我們來看看如何給并發(fā)隊列的添加和取出操作增加超時限制。這意味著,如果系統(tǒng)無法在指定的時間限制內執(zhí)行添加和取出操作,就應該根本不執(zhí)行操作。
template <typename T> bool BlockingQueue <T>::push(const T& data, const int seconds) {struct timespec ts1, ts2;const bool was_empty = _list.empty( );clock_gettime(CLOCK_REALTIME, &ts1);pthread_mutex_lock(&_lock);clock_gettime(CLOCK_REALTIME, &ts2);if ((ts2.tv_sec – ts1.tv_sec) <seconds) {was_empty = _list.empty( );_list.push_back(value);}pthread_mutex_unlock(&_lock);if (was_empty) pthread_cond_broadcast(&_cond); }template <typename T> T BlockingQueue <T>::pop(const int seconds) { struct timespec ts1, ts2; clock_gettime(CLOCK_REALTIME, &ts1); pthread_mutex_lock(&_lock);clock_gettime(CLOCK_REALTIME, &ts2);// First Check: if time out when get the _lock if ((ts1.tv_sec – ts2.tv_sec) < seconds) { ts2.tv_sec += seconds; // specify wake up timewhile(_list.empty( ) && (result == 0)) { result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;}if (result == 0) // Second Check: if time out when timedwait {T _temp = _list.front( );_list.pop_front( );pthread_mutex_unlock(&_lock);return _temp;}}pthread_mutex_unlock(&lock);throw "timeout happened"; }設計有大小限制的并發(fā)阻塞隊列
最后,討論有大小限制的并發(fā)阻塞隊列。這種隊列與并發(fā)阻塞隊列相似,但是對隊列的大小有限制。在許多內存有限的嵌入式系統(tǒng)中,確實需要有大小限制的隊列。
對于阻塞隊列,只有讀線程需要在隊列中沒有數(shù)據(jù)時等待。對于有大小限制的阻塞隊列,如果隊列滿了,寫線程也需要等待。
要注意的第一點是,這個阻塞隊列有兩個條件變量而不是一個。如果隊列滿了,寫線程等待 _wcond 條件變量;讀線程在從隊列中取出數(shù)據(jù)之后需要通知所有線程。同樣,如果隊列是空的,讀線程等待 _rcond 變量,寫線程在把數(shù)據(jù)插入隊列中之后向所有線程發(fā)送廣播消息。如果在發(fā)送廣播通知時沒有線程在等待 _wcond 或 _rcond,會發(fā)生什么?什么也不會發(fā)生;系統(tǒng)會忽略這些消息。還要注意,兩個條件變量使用相同的互斥鎖。
?
來源《用于并行計算的多線程數(shù)據(jù)結構》
http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures1/index.html
http://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html
?
轉載于:https://www.cnblogs.com/luxiaoxun/archive/2012/10/07/2713576.html
總結
以上是生活随笔為你收集整理的Linux下设计并发队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: GNU make manual 翻译(
- 下一篇: Linux上安装JDK+Tomcat