Linux下c开发 之 线程通信与pthread_cond_wait()的使用
pthread_cond_wait()
/************pthread_cond_wait()的使用方法**********/ pthread_mutex_lock(&qlock); ? ? pthread_cond_wait(&qready, &qlock); pthread_mutex_unlock(&qlock); /*****************************************************/ The mutex passed to pthread_cond_wait protects the condition.The caller?passes it locked to the function, which then atomically places them?calling thread on the list of threads waiting for the condition and?unlocks the mutex. This closes the window between the time that the?condition is checked and the time that the thread goes to sleep waiting?for the condition to change, so that the thread doesn't miss a change?in the condition. When pthread_cond_wait returns, the mutex is again?locked. 上面是APUE的原話,就是說pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)函數(shù)傳入的參數(shù)mutex用于保護(hù)條件,因?yàn)槲覀冊(cè)谡{(diào)用pthread_cond_wait時(shí),如果條件不成立我們就進(jìn)入阻塞,但是進(jìn)入阻塞這個(gè)期間,如果條件變量改變了的話,那我們就漏掉了這個(gè)條件。因?yàn)檫@個(gè)線程還沒有放到等待隊(duì)列上,所以調(diào)用pthread_cond_wait前要先鎖互斥量,即調(diào)用pthread_mutex_lock(),pthread_cond_wait在把線程放進(jìn)阻塞隊(duì)列后,自動(dòng)對(duì)mutex進(jìn)行解鎖,使得其它線程可以獲得加鎖的權(quán)利。這樣其它線程才能對(duì)臨界資源進(jìn)行訪問并在適當(dāng)?shù)臅r(shí)候喚醒這個(gè)阻塞的進(jìn)程。當(dāng)pthread_cond_wait返回的時(shí)候又自動(dòng)給mutex加鎖。 實(shí)際上邊代碼的加解鎖過程如下: /************pthread_cond_wait()的使用方法**********/ pthread_mutex_lock(&qlock); ? ?/*lock*/ pthread_cond_wait(&qready, &qlock); /*block-->unlock-->wait() return-->lock*/ pthread_mutex_unlock(&qlock); /*unlock*/ /*****************************************************/#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>
void* testThreadPool(int *t);
pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clifd_cond = PTHREAD_COND_INITIALIZER;
int a = 0;
int main() {
int sock_fd, conn_fd;
int optval;
socklen_t cli_len;
struct sockaddr_in cli_addr, serv_addr;
sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd < 0) {
?? printf("socket\n");
}
optval = 1;
if (setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, (void *) &optval,
??? sizeof(int)) < 0) {
?? printf("setsockopt\n");
}
memset(&serv_addr, 0, sizeof(struct sockaddr_in));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(4507);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(sock_fd, (struct sockaddr *) &serv_addr,
??? sizeof(struct sockaddr_in)) < 0) {
?? printf("bind\n");
}
if (listen(sock_fd, 100) < 0) {
?? printf("listen\n");
}
cli_len = sizeof(struct sockaddr_in);
int t;
pthread_t * mythread;
mythread = (pthread_t*) malloc(100 * sizeof(pthread_t));
for (t = 0; t < 5; t++) {
?? int *i=(int*)malloc(sizeof(int));
?? *i=t;
?? if (pthread_create(&mythread[t], NULL, (void*)testThreadPool, (void*)i) != 0) {
??? printf("pthread_create");
?? }
}
while (1) {
?? conn_fd = accept(sock_fd, (struct sockaddr *) &cli_addr, &cli_len);
?? if (conn_fd < 0) {
??? printf("accept\n");
?? }
?? printf("accept a new client, ip:%s\n", inet_ntoa(cli_addr.sin_addr));
?? pthread_mutex_lock(&clifd_mutex);
?? a=conn_fd;
?? pthread_cond_signal(&clifd_cond);
?? pthread_mutex_unlock(&clifd_mutex);
}
return 0;
}
void* testThreadPool(int *t) {
printf("t is %d\n", *t);
for (;;) {
?? pthread_mutex_lock(&clifd_mutex);
?? pthread_cond_wait(&clifd_cond, &clifd_mutex);
?? printf("a is %d\n", a);
?? printf("t is %d\n", *t);
?? pthread_mutex_unlock(&clifd_mutex);
?? sleep(100);
}
return (void*) 0;
}
了解 pthread_cond_wait() 的作用非常重要 -- 它是 POSIX 線程信號(hào)發(fā)送系統(tǒng)的核心,也是最難以理解的部分。
首先,讓我們考慮以下情況:線程為查看已鏈接列表而鎖定了互斥對(duì)象,然而該列表恰巧是空的。這一特定線程什么也干不了 -- 其設(shè)計(jì)意圖是從列表中除去節(jié)點(diǎn),但是現(xiàn)在卻沒有節(jié)點(diǎn)。因此,它只能:
鎖定互斥對(duì)象時(shí),線程將調(diào)用 pthread_cond_wait(&mycond,&mymutex)。pthread_cond_wait() 調(diào)用相當(dāng)復(fù)雜,因此我們每次只執(zhí)行它的一個(gè)操作。
pthread_cond_wait() 所做的第一件事就是同時(shí)對(duì)互斥對(duì)象解鎖(于是其它線程可以修改已鏈接列表),并等待條件 mycond 發(fā)生(這樣當(dāng) pthread_cond_wait() 接收到另一個(gè)線程的“信號(hào)”時(shí),它將蘇醒)。現(xiàn)在互斥對(duì)象已被解鎖,其它線程可以訪問和修改已鏈接列表,可能還會(huì)添加項(xiàng)。 【要求解鎖并阻塞是一個(gè)原子操作】
此時(shí),pthread_cond_wait() 調(diào)用還未返回。對(duì)互斥對(duì)象解鎖會(huì)立即發(fā)生,但等待條件 mycond 通常是一個(gè)阻塞操作,這意味著線程將睡眠,在它蘇醒之前不會(huì)消耗 CPU 周期。這正是我們期待發(fā)生的情況。線程將一直睡眠,直到特定條件發(fā)生,在這期間不會(huì)發(fā)生任何浪費(fèi) CPU 時(shí)間的繁忙查詢。從線程的角度來看,它只是在等待 pthread_cond_wait() 調(diào)用返回。
現(xiàn)在繼續(xù)說明,假設(shè)另一個(gè)線程(稱作“2 號(hào)線程”)鎖定了 mymutex 并對(duì)已鏈接列表添加了一項(xiàng)。在對(duì)互斥對(duì)象解鎖之后,2 號(hào)線程會(huì)立即調(diào)用函數(shù) pthread_cond_broadcast(&mycond)。此操作之后,2 號(hào)線程將使所有等待 mycond 條件變量的線程立即蘇醒。這意味著第一個(gè)線程(仍處于 pthread_cond_wait() 調(diào)用中)現(xiàn)在將蘇醒。
現(xiàn)在,看一下第一個(gè)線程發(fā)生了什么。您可能會(huì)認(rèn)為在 2 號(hào)線程調(diào)用 pthread_cond_broadcast(&mymutex) 之后,1 號(hào)線程的 pthread_cond_wait() 會(huì)立即返回。不是那樣!實(shí)際上,pthread_cond_wait() 將執(zhí)行最后一個(gè)操作:重新鎖定 mymutex。一旦 pthread_cond_wait() 鎖定了互斥對(duì)象,那么它將返回并允許 1 號(hào)線程繼續(xù)執(zhí)行。那時(shí),它可以馬上檢查列表,查看它所感興趣的更改。
停止并回顧!
那個(gè)過程非常復(fù)雜,因此讓我們先來回顧一下。第一個(gè)線程首先調(diào)用:
pthread_mutex_lock(&mymutex);
然后,它檢查了列表。沒有找到感興趣的東西,于是它調(diào)用:
pthread_cond_wait(&mycond, &mymutex);
然后,pthread_cond_wait() 調(diào)用在返回前執(zhí)行許多操作:
?
pthread_mutex_unlock(&mymutex);
?
它對(duì) mymutex 解鎖,然后進(jìn)入睡眠狀態(tài),等待 mycond 以接收 POSIX 線程“信號(hào)”。一旦接收到“信號(hào)”(加引號(hào)是因?yàn)槲覀儾⒉皇窃谟懻搨鹘y(tǒng)的 UNIX 信號(hào),而是來自 pthread_cond_signal() 或 pthread_cond_broadcast() 調(diào)用的信號(hào)),它就會(huì)蘇醒。但 pthread_cond_wait() 沒有立即返回 -- 它還要做一件事:重新鎖定 mutex:
pthread_mutex_lock(&mymutex);
?
pthread_cond_wait() 知道我們?cè)诓檎?mymutex “背后”的變化,因此它繼續(xù)操作,為我們鎖定互斥對(duì)象,然后才返回。
1.Linux“線程”
?????進(jìn)程與線程之間是有區(qū)別的,不過Linux內(nèi)核只提供了輕量進(jìn)程的支持,未實(shí)現(xiàn)線程模型。Linux是一種“多進(jìn)程單線程”的操作系統(tǒng)。Linux本身只有進(jìn)程的概念,而其所謂的“線程”本質(zhì)上在內(nèi)核里仍然是進(jìn)程。
?????大家知道,進(jìn)程是資源分配的單位,同一進(jìn)程中的多個(gè)線程共享該進(jìn)程的資源(如作為共享內(nèi)存的全局變量)。Linux中所謂的“線程”只是在被創(chuàng)建時(shí)clone了父進(jìn)程的資源,因此clone出來的進(jìn)程表現(xiàn)為“線程”,這一點(diǎn)一定要弄清楚。因此,Linux“線程”這個(gè)概念只有在打冒號(hào)的情況下才是最準(zhǔn)確的。
?????目前Linux中最流行的線程機(jī)制為LinuxThreads,所采用的就是線程-進(jìn)程“一對(duì)一”模型,調(diào)度交給核心,而在用戶級(jí)實(shí)現(xiàn)一個(gè)包括信號(hào)處理在內(nèi)的線程管理機(jī)制。LinuxThreads由Xavier Leroy (Xavier.Leroy@inria.fr)負(fù)責(zé)開發(fā)完成,并已綁定在GLIBC中發(fā)行,它實(shí)現(xiàn)了一種BiCapitalized面向Linux的Posix 1003.1c “pthread”標(biāo)準(zhǔn)接口。Linuxthread可以支持Intel、Alpha、MIPS等平臺(tái)上的多處理器系統(tǒng)。
按照POSIX 1003.1c 標(biāo)準(zhǔn)編寫的程序與Linuxthread 庫相鏈接即可支持Linux平臺(tái)上的多線程,在程序中需包含頭文件pthread. h,在編譯鏈接時(shí)使用命令:
| gcc -D -REENTRANT -lpthread xxx. c |
其中-REENTRANT宏使得相關(guān)庫函數(shù)(如stdio.h、errno.h中函數(shù)) 是可重入的、線程安全的(thread-safe),-lpthread則意味著鏈接庫目錄下的libpthread.a或libpthread.so文件。使用Linuxthread庫需要2.0以上版本的Linux內(nèi)核及相應(yīng)版本的C庫(libc 5.2.18、libc 5.4.12、libc 6)。
?????2.“線程”控制
線程創(chuàng)建
進(jìn)程被創(chuàng)建時(shí),系統(tǒng)會(huì)為其創(chuàng)建一個(gè)主線程,而要在進(jìn)程中創(chuàng)建新的線程,則可以調(diào)用pthread_create:
| pthread_create(pthread_t *thread, const pthread_attr_t *attr, void * (start_routine)(void*), void *arg); |
start_routine為新線程的入口函數(shù),arg為傳遞給start_routine的參數(shù)。
每個(gè)線程都有自己的線程ID,以便在進(jìn)程內(nèi)區(qū)分。線程ID在pthread_create調(diào)用時(shí)回返給創(chuàng)建線程的調(diào)用者;一個(gè)線程也可以在創(chuàng)建后使用pthread_self()調(diào)用獲取自己的線程ID:
| pthread_self (void) ; |
線程退出
線程的退出方式有三:
(1)執(zhí)行完成后隱式退出;
(2)由線程本身顯示調(diào)用pthread_exit 函數(shù)退出;
| pthread_exit (void * retval) ; |
(3)被其他線程用pthread_cance函數(shù)終止:
| pthread_cance (pthread_t thread) ; |
在某線程中調(diào)用此函數(shù),可以終止由參數(shù)thread 指定的線程。
如果一個(gè)線程要等待另一個(gè)線程的終止,可以使用pthread_join函數(shù),該函數(shù)的作用是調(diào)用pthread_join的線程將被掛起直到線程ID為參數(shù)thread的線程終止:
| pthread_join (pthread_t thread, void** threadreturn); |
3.線程通信
線程互斥
互斥意味著“排它”,即兩個(gè)線程不能同時(shí)進(jìn)入被互斥保護(hù)的代碼。Linux下可以通過pthread_mutex_t 定義互斥體機(jī)制完成多線程的互斥操作,該機(jī)制的作用是對(duì)某個(gè)需要互斥的部分,在進(jìn)入時(shí)先得到互斥體,如果沒有得到互斥體,表明互斥部分被其它線程擁有,此時(shí)欲獲取互斥體的線程阻塞,直到擁有該互斥體的線程完成互斥部分的操作為止。
下面的代碼實(shí)現(xiàn)了對(duì)共享全局變量x 用互斥體mutex 進(jìn)行保護(hù)的目的:
| int x; // 進(jìn)程中的全局變量 pthread_mutex_t mutex; pthread_mutex_init(&mutex, NULL); //按缺省的屬性初始化互斥體變量mutex pthread_mutex_lock(&mutex); // 給互斥體變量加鎖 … //對(duì)變量x 的操作 phtread_mutex_unlock(&mutex); // 給互斥體變量解除鎖 |
線程同步
同步就是線程等待某個(gè)事件的發(fā)生。只有當(dāng)?shù)却氖录l(fā)生線程才繼續(xù)執(zhí)行,否則線程掛起并放棄處理器。當(dāng)多個(gè)線程協(xié)作時(shí),相互作用的任務(wù)必須在一定的條件下同步。
Linux下的C語言編程有多種線程同步機(jī)制,最典型的是條件變量(condition variable)。pthread_cond_init用來創(chuàng)建一個(gè)條件變量,其函數(shù)原型為:
| pthread_cond_init (pthread_cond_t *cond, const pthread_condattr_t *attr); |
pthread_cond_wait和pthread_cond_timedwait用來等待條件變量被設(shè)置,值得注意的是這兩個(gè)等待調(diào)用需要一個(gè)已經(jīng)上鎖的互斥體mutex,這是為了防止在真正進(jìn)入等待狀態(tài)之前別的線程有可能設(shè)置該條件變量而產(chǎn)生競爭。pthread_cond_wait的函數(shù)原型為:
| pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex); |
pthread_cond_broadcast用于設(shè)置條件變量,即使得事件發(fā)生,這樣等待該事件的線程將不再阻塞:
| pthread_cond_broadcast (pthread_cond_t *cond) ; |
pthread_cond_signal則用于解除某一個(gè)等待線程的阻塞狀態(tài):
| pthread_cond_signal (pthread_cond_t *cond) ; |
pthread_cond_destroy 則用于釋放一個(gè)條件變量的資源。
在頭文件semaphore.h 中定義的信號(hào)量則完成了互斥體和條件變量的封裝,按照多線程程序設(shè)計(jì)中訪問控制機(jī)制,控制對(duì)資源的同步訪問,提供程序設(shè)計(jì)人員更方便的調(diào)用接口。
| sem_init(sem_t *sem, int pshared, unsigned int val); |
這個(gè)函數(shù)初始化一個(gè)信號(hào)量sem 的值為val,參數(shù)pshared 是共享屬性控制,表明是否在進(jìn)程間共享。
| sem_wait(sem_t *sem); |
調(diào)用該函數(shù)時(shí),若sem為無狀態(tài),調(diào)用線程阻塞,等待信號(hào)量sem值增加(post )成為有信號(hào)狀態(tài);若sem為有狀態(tài),調(diào)用線程順序執(zhí)行,但信號(hào)量的值減一。
| sem_post(sem_t *sem); |
調(diào)用該函數(shù),信號(hào)量sem的值增加,可以從無信號(hào)狀態(tài)變?yōu)橛行盘?hào)狀態(tài)。
?????
?
4.實(shí)例下面我們還是以名的生產(chǎn)者/消費(fèi)者問題為例來闡述Linux線程的控制和通信。一組生產(chǎn)者線程與一組消費(fèi)者線程通過緩沖區(qū)發(fā)生聯(lián)系。生產(chǎn)者線程將生產(chǎn)的產(chǎn)品送入緩沖區(qū),消費(fèi)者線程則從中取出產(chǎn)品。緩沖區(qū)有N 個(gè),是一個(gè)環(huán)形的緩沖池。
| #include <stdio.h> #include <pthread.h> #define BUFFER_SIZE 16 // 緩沖區(qū)數(shù)量 struct prodcons { // 緩沖區(qū)相關(guān)數(shù)據(jù)結(jié)構(gòu) int buffer[BUFFER_SIZE]; /* 實(shí)際數(shù)據(jù)存放的數(shù)組*/ pthread_mutex_t lock; /* 互斥體lock 用于對(duì)緩沖區(qū)的互斥操作 */ int readpos, writepos; /* 讀寫指針*/ pthread_cond_t notempty; /* 緩沖區(qū)非空的條件變量 */ pthread_cond_t notfull; /* 緩沖區(qū)未滿的條件變量 */ }; /* 初始化緩沖區(qū)結(jié)構(gòu) */ void init(struct prodcons *b) { pthread_mutex_init(&b->lock, NULL); pthread_cond_init(&b->notempty, NULL); pthread_cond_init(&b->notfull, NULL); b->readpos = 0; b->writepos = 0; } /* 將產(chǎn)品放入緩沖區(qū),這里是存入一個(gè)整數(shù)*/ void put(struct prodcons *b, int data) { pthread_mutex_lock(&b->lock); /* 等待緩沖區(qū)未滿*/ if ((b->writepos + 1) % BUFFER_SIZE == b->readpos) { pthread_cond_wait(&b->notfull, &b->lock); } /* 寫數(shù)據(jù),并移動(dòng)指針 */ b->buffer[b->writepos] = data; b->writepos++; if (b->writepos > = BUFFER_SIZE) b->writepos = 0; /* 設(shè)置緩沖區(qū)非空的條件變量*/ pthread_cond_signal(&b->notempty); pthread_mutex_unlock(&b->lock); } /* 從緩沖區(qū)中取出整數(shù)*/ int get(struct prodcons *b) { int data; pthread_mutex_lock(&b->lock); /* 等待緩沖區(qū)非空*/ if (b->writepos == b->readpos) { pthread_cond_wait(&b->notempty, &b->lock); } /* 讀數(shù)據(jù),移動(dòng)讀指針*/ data = b->buffer[b->readpos]; b->readpos++; if (b->readpos > = BUFFER_SIZE) b->readpos = 0; /* 設(shè)置緩沖區(qū)未滿的條件變量*/ pthread_cond_signal(&b->notfull); pthread_mutex_unlock(&b->lock); return data; } /* 測試:生產(chǎn)者線程將1 到10000 的整數(shù)送入緩沖區(qū),消費(fèi)者線 程從緩沖區(qū)中獲取整數(shù),兩者都打印信息*/ #define OVER ( - 1) struct prodcons buffer; void *producer(void *data) { int n; for (n = 0; n < 10000; n++) { printf("%d --->\n", n); put(&buffer, n); } put(&buffer, OVER); return NULL; } void *consumer(void *data) { int d; while (1) { d = get(&buffer); if (d == OVER) break; printf("--->%d \n", d); } return NULL; } int main(void) { pthread_t th_a, th_b; void *retval; init(&buffer); /* 創(chuàng)建生產(chǎn)者和消費(fèi)者線程*/ pthread_create(&th_a, NULL, producer, 0); pthread_create(&th_b, NULL, consumer, 0); /* 等待兩個(gè)線程結(jié)束*/ pthread_join(th_a, &retval); pthread_join(th_b, &retval); return 0; } |
目前為止,筆者已經(jīng)創(chuàng)作了《基于嵌入式操作系統(tǒng)VxWorks的多任務(wù)并發(fā)程序設(shè)計(jì)》(《軟件報(bào)》2006年5~12期連載)、《深入淺出Win32多線程程序設(shè)計(jì)》(天極網(wǎng)技術(shù)專題)系列,我們來找出這兩個(gè)系列文章與本文的共通點(diǎn)。
看待技術(shù)問題要瞄準(zhǔn)其本質(zhì),不管是Linux、VxWorks還是WIN32,其涉及到多線程的部分都是那些內(nèi)容,無非就是線程控制和線程通信,它們的許多函數(shù)只是名稱不同,其實(shí)質(zhì)含義是等價(jià)的,下面我們來列個(gè)三大操作系統(tǒng)共同點(diǎn)詳細(xì)表單:
| 事項(xiàng) | WIN32 | VxWorks | Linux |
| 線程創(chuàng)建 | CreateThread | taskSpawn | pthread_create |
| 線程終止 | 執(zhí)行完成后退出;線程自身調(diào)用ExitThread函數(shù)即終止自己;被其他線程調(diào)用函數(shù)TerminateThread函數(shù) | 執(zhí)行完成后退出;由線程本身調(diào)用exit退出;被其他線程調(diào)用函數(shù)taskDelete終止 | 執(zhí)行完成后退出;由線程本身調(diào)用pthread_exit 退出;被其他線程調(diào)用函數(shù)pthread_cance終止 |
| 獲取線程ID | GetCurrentThreadId | taskIdSelf | pthread_self |
| 創(chuàng)建互斥 | CreateMutex | semMCreate | pthread_mutex_init |
| 獲取互斥 | WaitForSingleObject、WaitForMultipleObjects | semTake | pthread_mutex_lock |
| 釋放互斥 | ReleaseMutex | semGive | phtread_mutex_unlock |
| 創(chuàng)建信號(hào)量 | CreateSemaphore | semBCreate、semCCreate | sem_init |
| 等待信號(hào)量 | WaitForSingleObject | semTake | sem_wait |
| 釋放信號(hào)量 | ReleaseSemaphore | semGive | sem_post |
6.小結(jié)
本章講述了Linux下多線程的控制及線程間通信編程方法,給出了一個(gè)生產(chǎn)者/消費(fèi)者的實(shí)例,并將Linux的多線程與WIN32、VxWorks多線程進(jìn)行了類比,總結(jié)了一般規(guī)律。鑒于多線程編程已成為開發(fā)并發(fā)應(yīng)用程序的主流方法,學(xué)好本章的意義也便不言自明。
總結(jié)
以上是生活随笔為你收集整理的Linux下c开发 之 线程通信与pthread_cond_wait()的使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: query的list()和iterate
- 下一篇: linux setsockopt详解