并发编程(一): POSIX 使用互斥量和条件变量实现生产者/消费者问题
??? boost的mutex,condition_variable非常好用。但是在Linux上,boost實際上做的是對pthread_mutex_t和pthread_cond_t的一系列的封裝。因此通過對原生態的POSIX 的mutex,cond的生成者,消費者的實現,我們可以再次體會boost帶給我們的便利。
1. 什么是互斥量
? ? ?? 互斥量從本質上說是一把鎖,在訪問共享資源前對互斥量進行加鎖,在訪問完成后釋放互斥量上的鎖。對互斥量進行加鎖以后,任何其他試圖再次對互斥量加鎖的線程將會被阻塞直到當前線程釋放該互斥鎖。如果釋放互斥鎖時有多個線程阻塞,所以在該互斥鎖上的阻塞線程都會變成可進行狀態,第一個變成運行狀態的線程可以對互斥量加鎖,其他線程在次被阻塞,等待下次運行狀態。
pthread_mutex_t 就是POSIX對于mutex的實現。
| 函數名 | 參數 | 說明 |
| pthread_mutex_init | pthread_mutex_t * mutex, constpthread_mutex_t *attr | 初始化一個互斥量,靜態方式可以直接使用PTHREAD_MUTEX_INITIALIZER進行賦值初始化 |
| pthread_mutex_destroy | pthread_mutex_t *mutex | 釋放對互斥變量分配的資源。注意pthread_mutex_init有可能malloc了資源 |
| pthread_mutex_lock | pthread_mutex_t *mutex | 如果互斥量已經上鎖,調用線程阻塞直至互斥量解鎖 |
| pthread_mutex_trylock | pthread_mutex_t *mutex | 加鎖,如果失敗不阻塞 |
| pthread_mutex_unlock | pthread_mutex_t *mutex | 解鎖 |
#include <pthread.h>pthread_mutex_t foo_mutex;void foo() {pthread_mutex_init(&foo_mutex, NULL);pthread_mutex_lock(&foo_mutex);/* Do work. */pthread_mutex_unlock(&foo_mutex);pthread_mutex_destroy(&foo_mutex); }
當然該初始化
pthread_mutex_init(&foo_mutex, NULL);只能foo_mutex使用前初始化一次,最后destroy。初始化已經初始化的mutex將導致undefined behavior。
另外一種用法:
pthread_mutex_t foo_mutex = PTHREAD_MUTEX_INITIALIZER; void foo() {pthread_mutex_lock(&foo_mutex);/* Do work. */pthread_mutex_unlock(&foo_mutex); }當然了,這兩種用法都有問題:如果在lock住后unlock之前出現exception,那么這個鎖永遠也不能unlock。這種情況下需要guard這個資源。具體可參照boost::mutex::scoped_lock的實現,非常簡單但是極大簡化了mutex的安全使用。
2. 什么是條件變量
????? 與互斥鎖不同,條件變量是用來等待而不是用來上鎖的。條件變量用來自動阻塞一個線程,直到某特殊情況發生為止。通常條件變量和互斥鎖同時使用。
????? 條件變量使我們可以睡眠等待某種條件出現。條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起;另一個線程使"條件成立"(給出條件成立信號)。
???? 條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個線程自動阻塞,并釋放等待狀態改變的互斥鎖。如果另一個線程改變了條件,它發信號給關聯的條件變量,喚醒一個或多個等待它的線程,重新獲得互斥鎖,重新評價條件。如果兩進程共享可讀寫的內存,條件變量可以被用來實現這兩進程間的線程同步。
????? 條件變量的初始化和mutex的初始化差不多,也是有兩種方式:????????? pthread_cond_tmy_condition=PTHREAD_COND_INITIALIZER;
????????? 也可以利用函數pthread_cond_init動態初始化。
下面中各個函數的簡介。
| 函數名 | 參數 | 說明 |
| pthread_cond_init | pthread_cond_t *cond, const pthread_condattr_t *attr | 初始化 |
| pthread_cond_destroy | pthread_cond_t *cond | 回收 |
| pthread_cond_wait | pthread_cond_t *cond, pthread_mutex_t *mutex | 等待,無超時 |
| pthread_cond_timedwait | pthread_cond_t *cond,pthread_mutex_t *mutex, const struct timespec *abstime | 等待,有超時 |
| pthread_cond_signal | pthread_cond_t *cond | 一個在相同條件變量上阻塞的線程將被解鎖。如果同時有多個線程阻塞,則由調度策略確定接收通知的線程 |
| pthread_cond_broadcast | pthread_cond_t *cond | 將通知阻塞在這個條件變量上的所有線程。一旦被喚醒,線程仍然會要求互斥鎖。 |
一個簡單使用條件變量進行線程同步的小例子:#include <pthread.h> #include <stdio.h> #include <stdlib.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *thread1(void *); void *thread2(void *);int i=1; int main(void) {pthread_t t_a;pthread_t t_b;pthread_create(&t_a,NULL,thread2,(void *)NULL);/*create thread t_a*/pthread_create(&t_b,NULL,thread1,(void *)NULL); /*create thread t_b*/pthread_join(t_b, NULL);/*wait for exit of t_b*/pthread_join(t_a, NULL);pthread_mutex_destroy(&mutex);pthread_cond_destroy(&cond);exit(0); }void *thread1(void *junk) {for(i=1;i<=9;i++){pthread_mutex_lock(&mutex);if(i%3==0)pthread_cond_signal(&cond);elseprintf("thread1 running, i = %d\n",i);pthread_mutex_unlock(&mutex);sleep(1);} }void *thread2(void *junk) {while(i<9){pthread_mutex_lock(&mutex);if(i%3!=0)pthread_cond_wait(&cond,&mutex);/*..*/printf("thread2 running, i = %d\n",i);pthread_mutex_unlock(&mutex);sleep(1);} }
輸出:thread1 running, i = 1 thread1 running, i = 2 thread2 running, i = 3 thread1 running, i = 4 thread1 running, i = 5 thread2 running, i = 6 thread1 running, i = 7 thread1 running, i = 8 thread2 running, i = 9
3. 生產者-消費者的實現
??????? 生產者消費者問題(英語:Producer-consumer problem),也稱有限緩沖問題(英語:Bounded-buffer problem),是一個多線程同步問題的經典案例。該問題描述了兩個共享固定大小緩沖區的線程——即所謂的“生產者”和“消費者”——在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然后重復此過程。與此同時,消費者也在緩沖區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據。????
#include <stdio.h> #include <stdlib.h> #define MAX 5pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /*初始化互斥鎖*/ pthread_cond_t cond = PTHREAD_COND_INITIALIZER;/*初始化條件變量*/typedef struct{char buffer[MAX];int how_many; }BUFFER;BUFFER share={"",0}; char ch='A';/*初始化ch*/void *producer(void *); void *consumer(void *);int main(void) {pthread_t t_read;pthread_t t_write;pthread_create(&t_write,NULL,producer,(void *)NULL); /*創建進程t_a*/pthread_create(&t_read,NULL,consumer,(void *)NULL); /*創建進程t_b*/pthread_join(t_write,(void **)NULL);pthread_join(t_read, NULL);exit(0); }void *producer(void *junk) {int n=0;printf("Producer: starting\n");while(ch!='K'){pthread_mutex_lock(&mutex);/*鎖住互斥量*/if(share.how_many!=MAX){share.buffer[share.how_many++]=ch++;/*把字母寫入緩存*/printf("Producer: put char[%c]\n",ch-1);/*打印寫入字母*/if(share.how_many==MAX){printf("Producer: signaling full\n");pthread_cond_signal(&cond);/*如果緩存中的字母到達了最大值就發送信號*/}}pthread_mutex_unlock(&mutex);/*解鎖互斥量*/}sleep(1);printf("Producer:Exiting\n");return NULL; }void *consumer(void *junk) {int i;int n=0;printf("Consumer: starting\n");while(ch!='K'){pthread_mutex_lock(&mutex);/*鎖住互斥量*/printf("\nConsumer : Waiting\n");while(share.how_many!=MAX)/*如果緩存區字母不等于最大值就等待*/pthread_cond_wait(&cond,&mutex);printf("Consumer: getting buffer:: ");for(i=0;share.buffer[i]&&share.how_many;++i,share.how_many--)putchar(share.buffer[i]); /*循環輸出緩存區字母*/putchar('\n');pthread_mutex_unlock(&mutex);/*解鎖互斥量*/}printf("Consumer: exiting\n");return NULL; }輸出:Producer: starting
Producer: put char[A]
Producer: put char[B]
Producer: put char[C]
Producer: put char[D]
Producer: put char[E]
Producer: signaling full
Consumer: starting
Consumer : Waiting
Consumer: getting buffer:: ABCDE
Consumer : Waiting
Producer: put char[F]
Producer: put char[G]
Producer: put char[H]
Producer: put char[I]
Producer: put char[J]
Producer: signaling full
Consumer: getting buffer:: FGHIJ
Consumer: exiting
Producer:Exiting
轉載于:https://www.cnblogs.com/wuwa/p/6190832.html
總結
以上是生活随笔為你收集整理的并发编程(一): POSIX 使用互斥量和条件变量实现生产者/消费者问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nobelking 3 js 数字和数字
- 下一篇: ESXI转HYPER-V,问题接二连三啊