CSAPP 并发编程 ——深入理解计算机系统
12.3 基于線程的并發編程
到目前為止,我們已經看到了兩種創建并發邏輯流的方法。
在第一種方法中,我們為每個流使用了單獨的進程。內核會自動調度每個進程. 而每個進程有它自己的私有地址空間,這使得流共享數據很困難。
在第二種方法中,我們創建自己的邏輯流,并利用 I/O 多路復用來顯式地調度流。因為只有一個進程,所有的流共享整個地址空間。本節介紹第三種方法——基于線程,它是這兩種方法的混合。
線程(thread)就是運行在進程上下文中的邏輯流。在本書里迄今為止,程序都是由每個進程中一個線程組成的。但是現代系統也允許我們編寫一個進程里同時運行多個線程的程序。線程由內核自動調度。
每個線程都有它自己的線程上下文(thread context),包括一個唯一的整數線程 ID(Thread ID,TID)、棧、棧指針、程序計數器、通用目的寄存器和條件碼。所有的運行在一個進程里的線程共享該進程的整個虛擬地址空間。
基于線程的邏輯流結合了基于進程和基于 I/O 多路復用的流的特性。同進程一樣,線程由內核自動調度,并且內核通過一個整數 ID 來識別線程。同基于 I/O 多路復用的流一樣,多個線程運行在單一進程的上下文中,因此共享這個進程虛擬地址空間的所有內容,包括它的代碼、數據、堆、共享庫和打開的文件。
12.3.1 線程執行模型
多線程的執行模型在某些方面和多進程的執行模型是相似的。思考圖 12-12 中的示例。每個進程開始生命周期時都是單一線程,這個線程稱為主線程(main thread)。在某一時刻,主線程創建一個對等線程(peer thread),從這個時間點開始,兩個線程就并發地運行。最后,因為主線程執行一個慢速系統調用,例如 read 或者 sleep,或者因為被系統的間隔計時器中斷,控制就會通過上下文切換傳遞到對等線程。對等線程會執行一段時間,然后控制傳遞回主線程,依次類推。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Ku8knUt9-1622542085838)(assert/image-20210531150941218.png)]
在一些重要的方面,線程執行是不同于進程的。因為一個線程的上下文要比一個進程的上下文小得多,線程的上下文切換要比進程的上下文切換快得多。另一個不同就是線程不像進程那樣,不是按照嚴格的父子層次來組織的。和一個進程相關的線程組成一個對等(線程)池,獨立于其他線程創建的線程。主線程和其他線程的區別僅在于它總是進程中第一個運行的線程。對等(線程)池概念的主要影響是,**一個線程可以殺死它的任何對等線程,或者等待它的任意對等線程終止。**另外,每個對等線程都能讀寫相同的共享數據。
wow,這里很重要,線程是沒有地位區別的,都是同等地位!!!
12.3.2 Posix 線程
——注意,線程函數返回值必須是void*,參數也必須是void *
**Posix 線程(Pthreads)**是在 C 程序中處理線程的一個標準接口。它最早出現在 1995 年,而且在所有的 Linux 系統上都可用。Pthreads 定義了大約 60 個函數,允許程序創建、殺死和回收線程,與對等線程安全地共享數據,還可以通知對等線程系統狀態的變化。
圖 12-13 展示了一個簡單的 Pthreads 程序。
#include "csapp.h" void *thread(void *vargp);int main() {pthread_t tid;Pthread_create(&tid, NULL, thread, NULL);Pthread_join(tid, NULL);exit(0); }void *thread(void *vargp) /* Thread routine */ {printf("Hello, world!\n");return NULL; } //圖 12-13 hello.c:使用 Pthreads 的 “Hello, world!” 程序主線程創建一個對等線程,然后等待它的終止。對等線程輸岀 “Hello, world!\n” 并且終止。當主線程檢測到對等線程終止后,它就通過調用 exit 終止該進程。這是我們看到的第一個線程化的程序,所以讓我們仔細地解析它。線程的代碼和本地數據被封裝在一個線程例程(thread routine)中。正如第二行里的原型所示,每個線程例程都以一個通用指針作為輸入,并返回一個通用指針。如果想傳遞多個參數給線程例程,那么你應該將參數放到一個結構中,并傳遞一個指向該結構的指針。相似地,如果想要線程例程返回多個參數,你可以返回一個指向一個結構的指針。
第 4 行標出了主線程代碼的開始。主線程聲明了一個本地變量 tid,可以用來存放對等線程的 ID(第 6 行)。主線程通過調用 pthread_create 函數創建一個新的對等線程(第 7 行)。當對 pthread_create 的調用返回時,主線程和新創建的對等線程同時運行,并且 tid 包含新線程的 ID。通過在第 8 行調用 pthread_join,主線程等待對等線程終止。最后,主線程調用 exit(第 9 行),終止當時運行在這個進程中的所有線程(在這個示例中就只有主線程)。
第 12 ~ 16 行定義了對等線程的例程。它只打印一個字符串,然后就通過執行第 15 行中的 return 語句來終止對等線程。
注意:線程在這個函數return時就結束了.
可以看到,我們的main線程執行main的函數,在return 0后exit,而新線程從函數return就terminate終止了.
我們如果在中間加一句bye,如下
可以看到,在create之后,我們的新線程打印Hello,wolrd后就return了,然后就終止了,main線程打印一個bye,然后用join等新線程終止,才繼續執行。
12.3.3 創建線程
線程通過調用 pthread_create 函數來創建其他線程
#include <pthread.h> typedef void *(func)(void *);int pthread_create(pthread_t *tid, pthread_attr_t *attr,func *f, void *arg);// 若成功則返回 0,若出錯則為非零。pthread_create 函數創建一個新的線程,并帶著一個輸入變量 arg,在新線程的上下文中運行線程例程 f。能用 attr 參數來改變新創建線程的默認屬性。改變這些屬性已超出我們學習的范圍,在我們的示例中,總是用一個為 NULL 的參數來調用 pthread_create 函數。
返回值:若成功則返回 0,若出錯則為非零。
當 pthread_create 返回時,參數 tid 包含新創建線程的 ID。新線程可以通過調用 pthread_self 函數來獲得它自己的線程 ID。
#include <pthread.h>pthread_t pthread_self(void);// 返回調用者的線程 ID。12.3.4 終止線程
——貫徹整個的原則是,我們的所有線程都終止,進程才會終止
一個線程是以下列方式之一來終止的:🦅
-
當頂層的線程例程返回時,線程會隱式地終止。
-
通過調用 pthread_exit 函數,線程會顯式地終止。如果主線程調用 pthread_exit,它會等待所有其他對等線程終止,然后再終止主線程和整個進程,返回值為 thread_return。
-
對等線程調用exit函數,該函數終止該進程的所有線程并終止該進程
-
pthread_cancel(pid),對等線程調用該函數,終止線程ID為tid的線程
1.線程例程return時,線程會隱式終止
🥇 先來看第一點——頂層線程返回時,線程會隱式地終止(這個意思不是說main線程返回,子線程會隱式的終止,而是說thread函數return時,自動終止這個線程,return自動終止這個子線程是顯然的,我們來測試,main線程返回時不會使子線程終止)
#include <csapp.c> #include <sys/syscall.h> void *thread(void *vargp); pthread_t main_tid; int main() {main_tid = pthread_self(); //全局變量tid=main線程pthread_t tid;Pthread_create(&tid, NULL, thread, NULL);printf("Main Hello\n");syscall(SYS_exit, 0);printf("Main Bye\n"); }void *thread(void *vargp) /* Thread routine */ {printf("Hello, world!\n");sleep(2);int num = 0;while (num < 10){sleep(1);printf("Thread spin\n");num ++;}printf("Thread Bye\n"); }結果:
我們可以看到,父線程__exit時,我們的子線程仍然在運行,并且thread return終止時,此時所有線程都終止了,那么進程就終止.
2.主線程調用pthread_exit會等待其他對等線程終止后,終止當前線程并終止整個進程,返回值為 thread_return
#include <pthread.h>void pthread_exit(void *thread_return);// 從不返回。🥈第二點,主線程調用pthread_exit,它會等待所有其他對等線程終止,然后再終止這個線程和整個進程.
#include <csapp.c> void *thread(void *vargp);int main() {pthread_t tid;Pthread_create(&tid, NULL, thread, NULL);printf("Main Hello\n");pthread_exit(0);printf("Main Bye\n"); }void *thread(void *vargp) /* Thread routine */ {printf("Hello, world!\n");while (1);return NULL; }在這個例子中,由于子線程不會終止,所以主線程用pthread_exit來對等線程終止,會卡在那,如下圖
再舉一個例子,看看成功用pthread等待對等線程終止是什么效果.
#include <csapp.c> void *thread(void *vargp);int main() {pthread_t tid;Pthread_create(&tid, NULL, thread, NULL);Pthread_create(&tid, NULL, thread, NULL);printf("Main Hello\n");pthread_exit(0);printf("Main Bye\n"); }void *thread(void *vargp) /* Thread routine */ {printf("Hello, world!\n");sleep(2);return NULL; }這里有三個知識:
- pthread_exit要等所有對等線程都終止了才會返回,所以兩個Hello.World都會打印
- pthread_exit等對等線程都終止了以后,Main線程不會繼續執行,而是執行退出,所以不會打印Main Bye
- 經測試,必須是主線程調用pthread_exit才有用.
3.某個線程調用exit函數,會終止所有線程以及這個進程(exit_group)
🥉某個對等線程調用 Linux 的 exit 函數,該函數終止進程以及所有與該進程相關的線程。
見下面的例子:
#include <csapp.c> void *thread(void *vargp);int main() {pthread_t tid;Pthread_create(&tid, NULL, thread, NULL);printf("Main Hello\n");while (1); }void *thread(void *vargp) /* Thread routine */ {printf("Hello, world!\n");sleep(2);exit(0); }可以看到,在子線程,我們調用exit(0),main線程是while(1)的循環的
結果如下:
可以看到,確實是exit了.
我們在jyy的筆記虛擬化——進程抽象中就說了這件事:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-khX67gHz-1622542085843)(assert/image-20210531192936006.png)]
(其中,__exit()其實是syscall(SYS_exit, 0)😉
我們的exit和_exit,都是調用exit_group(),不僅調用exit的該線程,還終止調用該進程的線程組的所有線程!!
我們如果把exit改成syscall(SYS_exit,0)的話,就不會終止所有線程,如下:
另外,我們main函數的return 0,是exit_group(),所以這就是為什么,我們main函數return 0時,子線程就自動終止,并且把整個進程終止啦
4.線程執行pthread_cancel(tid)函數來終止tid線程。
4??另一個對等線程通過以當前線程 ID 作為參數調用 pthread_Cancel 函數來終止當前線程。
#include <pthread.h>int pthread_cancel(pthread_t tid);// 若成功則返回 0,若出錯則為非零。pthread_cancel是指定一個線程去終止他.
舉一個例子吧:
#include <csapp.c> void *thread(void *vargp); pthread_t main_tid; int main() {main_tid = pthread_self(); //全局變量tid=main線程pthread_t tid;Pthread_create(&tid, NULL, thread, NULL);printf("Main Hello\n");while (1){sleep(1);printf("Main spin\n");} printf("Main Bye\n");}void *thread(void *vargp) /* Thread routine */ {printf("Hello, world!\n");sleep(2);pthread_cancel(main_tid);while (1){sleep(1);printf("Thread spin\n");}printf("Thread Bye\n"); }可以看到,在這里,我們的thread去pthread_cancle(main_tid)去終止main線程,確實成功終止了,然后我們的thread繼續執行.
12.3.5 回收已終止線程的資源(pthread_join)
線程通過調用 pthread_join 函數等待其他線程終止。
#include <pthread.h>int pthread_join(pthread_t tid, void **thread_return);// 若成功則返回 0,若出錯則為非零。pthread_join 函數會阻塞,直到線程 tid 終止,將線程例程返回的通用 (void ) 指針賦值為 thread_return 指向的位置,然后回收已終止線程占用的所有內存資源*。
注意,和 Linux 的 wait 函數不同,pthread_join 函數只能等待一個指定的線程終止。沒有辦法讓 pthread_wait 等待任意一個線程終止。這使得代碼更加復雜,因為它迫使我們去使用其他一些不那么直觀的機制來檢測進程的終止。實際上,Stevens 在【110】中就很有說服力地論證了這是規范中的一個錯誤。
12.3.6 分離線程(pthread_detach)
在任何一個時間點上,線程是可結合的(joinable)或者是分離的(detached)。
一個可結合的線程能夠被其他線程收回和殺死。在被其他線程回收之前,它的內存資源(例如棧)是不釋放的。
相反,一個分離的線程是不能被其他線程回收或殺死的。它的內存資源在它終止時由系統自動釋放。
默認情況下,線程被創建成可結合的。為了避免內存泄漏,每個可結合線程都應該要么被其他線程顯式地收回,要么通過調用 pthread_detach 函數被分離。
#include <pthread.h> int pthread_detach(pthread_t tid); // 若成功則返回 0,若出錯則為非零。pthread_detach 函數分離一個可結合線程 tid。線程能夠通過以 pthread_self() 參數的 pthread_detach 調用來分離它們自己。
盡管我們的一些例子會使用可結合線程,但是在現實程序中,有很好的理由要使用分離的線程。例如,一個高性能 Web 服務器可能在每次收到 Web 瀏覽器的連接請求時都創建一個新的對等線程。因為每個連接都是由一個單獨的線程獨立處理的,所以對于服務器而言,就很沒有必要(實際上也不愿意)顯式地等待每個對等線程終止。在這種情況下,每個對等線程都應該在它開始處理請求之前分離它自身,這樣就能在它終止后回收它的內存資源了。
12.3.7 初始化線程(pthread_once)
在多線程環境中,有些事僅需要執行一次。通常當初始化應用程序時,可以比較容易地將其放在main函數中。但當你寫一個庫時,就不能在main里面初始化了,你可以用靜態初始化,但使用一次初始化(pthread_once)會比較容易些。
pthread_once 函數允許你初始化與線程例程相關的狀態。
#include <pthread.h>pthread_once_t once_control = PTHREAD_ONCE_INIT;int pthread_once(pthread_once_t *once_control,void (*init_routine)(void));// 總是返回 0。once_control 變量是一個全局或者靜態變量,總是被初始化為 PTHREAD_ONCE_INIT。(本函數使用初值為PTHREAD_ONCE_INIT的once_control變量保證init_routine()函數在本進程執行序列中僅執行一次。)
當你第一次用參數 once_control 調用 pthread_once 時,它調用 init_routine,這是一個沒有輸入參數、也不返回什么的函數。
接下來的以 once_control 為參數的 pthread_once 調用不做任何事情。無論何時,當你需要動態初始化多個線程共享的全局變量時,pthread_once 函數是很有用的。我們將在 12.5.5 節里看到一個示例。
(現在還沒看懂啥意思,等學到后面再回頭看吧)
12.3.8 基于線程的并發服務器
圖 12-14 展示了基于線程的并發 echo 服務器的代碼。
#include <csapp.c> void echo(int connfd); void *thread(void *vargp);int main(int argc, char **argv) {int listenfd, *connfdp, port;socklen_t clientlen = sizeof(struct sockaddr_in);struct sockaddr_in clientaddr;pthread_t tid;if (argc != 2) {fprintf(stderr, "usage: %s <port>\n", argv[0]);exit(0);}port = atoi(argv[1]); //接收argv[1]為端口號listenfd = Open_listenfd(port); //在port端口上創建一個監聽描述符while (1) {// 為了避免對等線程的賦值語句和主線程的accept語句間引入的競爭connfdp = Malloc(sizeof(int));*connfdp = Accept(listenfd, (SA *) &clientaddr, &clientlen); //fdPthread_create(&tid, NULL, thread, connfdp); //產生一個線程去處理} }void echo(int connfd) {size_t n;char buf[MAXLINE];rio_t rio;Rio_readinitb(&rio, connfd);while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {printf("server received %d bytes\n", n);Rio_writen(connfd, buf, n);} }void *thread(void *vargp) {int connfd = *((int *)vargp);Pthread_detach(pthread_self());Free(vargp);echo(connfd);Close(connfd);return NULL; }[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-xeU07UBZ-1622542085846)(assert/image-20210531204840928.png)]
整體結構類似于基于進程的設計。主線程不斷地等待連接請求,然后創建一個對等線程處理該請求。雖然代碼看似簡單,但是有幾個普遍而且有些微妙的問題需要我們更仔細地看一看。第一個問題是當我們調用 pthread_create 時,如何將已連接描述符傳遞給對等線程。最明顯的方法就是傳遞一個指向這個描述符的指針,就像下面這樣
connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen); Pthread_create(&tid, NULL, thread, &connfd);然后,我們讓對等線程間接引用這個指針,并將它賦值給一個局部變量,如下所示
void *thread(void *vargp) {int connfd = *((int *)vargp);... }然而,這樣可能會出錯,因為它在對等線程的賦值語句和主線程的 accept 語句間引入了競爭(race)。如果賦值語句在下一個 accept 之前完成,那么對等線程中的局部變量 connfd 就得到正確的描述符值。然而,如果賦值語句是在 accept 之后才完成的,那么對等線程中的局部變量 connfd 就得到下一次連接的描述符值。那么不幸的結果就是,現在兩個線程在同一個描述符上執行輸入和輸出。為了避免這種潛在的致命競爭,我們必須將 accept 返回的每個已連接描述符分配到它自己的動態分配的內存塊,如第 21~22 行所示。我們會在 12.7.4 節中回過來討論競爭的問題。
另一個問題是在線程例程中避免內存泄漏。既然不顯式地收回線程,就必須分離每個線程,使得在它終止時它的內存資源能夠被收回(第 43 行)。更進一步,我們必須小心釋放主線程分配的內存塊(第 44 行)。
12.4 多線程程序中的共享變量
從程序員的角度來看,線程很有吸引力的一個方面是多個線程很容易共享相同的程序變量。然而,這種共享也是很棘手的。為了編寫正確的多線程程序,我們必須對所謂的共享以及它是如何工作的有很清楚的了解。
為了理解 C 程序中的一個變量是否是共享的,有一些基本的問題要解答:
為了讓我們對共享的討論具體化,我們將使用圖 12-15 中的程序作為運行示例。盡管有些人為的痕跡,但是它仍然值得研究,因為它說明了關于共享的許多細微之處。示例程序由一個創建了兩個對等線程的主線程組成。主線程傳遞一個唯一的 ID 給每個對等線程,每個對等線程利用這個 ID 輸出一條個性化的信息,以及調用該線程例程的總次數。
#include "csapp.h" #define N 2 void *thread(void *vargp);char **ptr; /* Global variable */int main() {int i;pthread_t tid;char *msgs[N] = {"Hello from foo","Hello from bar"};ptr = msgs;for (i = 0; i < N; i++)Pthread_create(&tid, NULL, thread, (void *)i);Pthread_exit(NULL); }void *thread(void *vargp) {int myid = (int)vargp;static int cnt = 0;printf("[%d]: %s (cnt=%d)\n", myid, ptr[myid], ++cnt);return NULL; }12.4.1 線程內存模型
一組并發線程運行在一個進程的上下文中。每個線程都有它自己獨立的線程上下文,包括線程 ID、棧、棧指針、程序計數器、條件碼和通用目的寄存器值。每個線程和其他線程一起共享進程上下文的剩余部分。這包括整個用戶虛擬地址空間,它是由只讀文本(代碼)、讀/寫數據、堆以及所有的共享庫代碼和數據區域組成的。線程也共享相同的打開文件的集合。
從實際操作的角度來說,讓一個線程去讀或寫另一個線程的寄存器值是不可能的。另一方面,任何線程都可以訪問共享虛擬內存的任意位置。如果某個線程修改了一個內存位置,那么其他每個線程最終都能在它讀這個位置時發現這個變化。因此,寄存器是從不共享的,而虛擬內存總是共享的。
各自獨立的線程棧的內存模型不是那么整齊清楚的。這些棧被保存在虛擬地址空間的棧區域中,并且通常是被相應的線程獨立地訪問的。我們說通常而不是總是,是因為不同的線程棧是不對其他線程設防的。所以,如果一個線程以某種方式得到一個指向其他線程棧的指針,那么它就可以讀寫這個棧的任何部分。示例程序在第 26 行展示了這一點,其中對等線程直接通過全局變量 ptr 間接引用主線程的棧的內容。
12.4.2 將變量映射到內存
多線程的 C 程序中變量根據它們的存儲類型被映射到虛擬內存:
-
**全局變量。**全局變量是定義在函數之外的變量。在運行時,虛擬內存的讀/寫區域只包含每個全局變量的一個實例,任何線程都可以引用。例如,第 5 行聲明的全局變量 ptr 在虛擬內存的讀/寫區域中有一個運行時實例。當一個變量只有一個實例時,我們只用變量名(在這里就是 ptr)來表示這個實例。
-
**本地自動變量。**本地自動變量就是定義在函數內部但是沒有 static 屬性的變量。在運行時,每個線程的棧都包含它自己的所有本地自動變量的實例。即使多個線程執行同一個線程例程時也是如此。例如,有一個本地變量 tid 的實例,它保存在主線程的棧中。我們用 tid.m 來表示這個實例。再來看一個例子,本地變量 myid 有兩個實例,一個在對等線程。的棧內,另一個在對等線程 1 的棧內。我們將這兩個實例分別表示為 myid.p0 和 myid.p1。
-
**本地靜態變量。**本地靜態變量是定義在函數內部并有 static 屬性的變量。和全局變量一樣,虛擬內存的讀/寫區域只包含在程序中聲明的每個本地靜態變量的一個實例。例如,即使示例程序中的每個對等線程都在第 25 行聲明了 cnt,在運行時,虛擬內存的讀/寫區域中也只有一個 cnt 的實例。每個對等線程都讀和寫這個實例。
12.4.3 共享變量
我們說一個變量 v 是共享的,當且僅當它的一個實例被一個以上的線程引用。例如,示例程序中的變量 cnt 就是共享的,因為它只有一個運行時實例,并且這個實例被兩個對等線程引用。在另一方面,myid 不是共享的,因為它的兩個實例中每一個都只被一個線程引用。然而,認識到像 msgs 這樣的本地自動變量也能被共享是很重要的。
? 說明:
-
ptr:一個被主線程寫和被對等線程讀的全局變量。
-
cnt:一個靜態變量,在內存中只有一個實例,被兩個對等線程讀和寫。
-
i.m:一個存儲在主線程棧中的本地自動變量。雖然它的值被傳遞給對等線程,但是對等線程也絕不會在棧中引用它,因此它不是共享的。
-
msgs.m:一個存儲在主線程棧中的本地自動變量,被兩個對等線程通過 ptr 間接地引用。
-
myid.0 和 myid.1;—個本地自動變量的實例,分別駐留在對等線程 0 和線程 1 的棧中。
B. 變量 ptr、ent 和 msgs 被多于一個線程引用,因此它們是共享的。
12.5 用信號量同步線程
共享變量是十分方便,但是它們也引入了同步錯誤(synchronization error)的可能性。考慮圖 12-16 中的程序 badcnt.c,它創建了兩個線程,每個線程都對共享計數變量 cnt 加 1。
badcnt.c
/* WARNING: This code is buggy! */ #include "csapp.h"void *thread(void *vargp); /* Thread routine prototype *//* Global shared variable */ volatile long cnt = 0; /* Counter */int main(int argc, char **argv) {long niters;pthread_t tid1, tid2;/* Check input argument */if (argc != 2) {printf("usage: %s <niters>\n", argv[0]);exit(0);}niters = atoi(argv[1]);/* Create threads and wait for them to finish */Pthread_create(&tid1, NULL, thread, &niters);Pthread_create(&tid2, NULL, thread, &niters);Pthread_join(tid1, NULL);Pthread_join(tid2, NULL);/* Check result */if (cnt != (2 * niters))printf("BOOM! cnt=%ld\n", cnt);elseprintf("OK cnt=%ld\n", cnt);exit(0); }/* Thread routine */ void *thread(void *vargp) {long i, niters = *((long *)vargp);for (i = 0; i < niters; i++)cnt++;return NULL; }圖 12-16 badcnt.c:一個同步不正確的計數器程序
因為每個線程都對計數器增加了 niters 次,我們預計它的最終值是 2 × niters。這看上去簡單而直接。然而,當在 Linux 系統上運行 badcnt.c 時,我們不僅得到錯誤的答案,而且每次得到的答案都還不相同!
我們ubuntu上得到的匯編代碼記錄一下吧:
0000000000401c20 <thread>:401c20: 48 8b 0f mov (%rdi),%rcx401c23: 48 85 c9 test %rcx,%rcx401c26: 7e 20 jle 401c48 <thread+0x28>401c28: b8 00 00 00 00 mov $0x0,%eax401c2d: 48 8b 15 5c 36 20 00 mov 0x20365c(%rip),%rdx # 605290 <cnt>401c34: 48 83 c2 01 add $0x1,%rdx401c38: 48 89 15 51 36 20 00 mov %rdx,0x203651(%rip) # 605290 <cnt>401c3f: 48 83 c0 01 add $0x1,%rax401c43: 48 39 c8 cmp %rcx,%rax401c46: 75 e5 jne 401c2d <thread+0xd>401c48: b8 00 00 00 00 mov $0x0,%eax401c4d: c3 retq那么哪里出錯了呢?為了清晰地理解這個問題,我們需要研究計數器循環(第 40 ~ 41 行)的匯編代碼,如圖 12-17 所示。
我們發現,將線程 i 的循環代碼分解成五個部分是很有幫助的:
H:
L: U: S: T:當 badcnt.c 中的兩個對等線程在一個單處理器上并發運行時,機器指令以某種順序一個接一個地完成。因此,每個并發執行定義了兩個線程中的指令的某種全序(或者交叉)。不幸的是,這些順序中的一些將會產生正確結果,但是其他的則不會。
這里有個關鍵點:**一般而言,你沒有辦法預測操作系統是否將為你的線程選擇一個正確的順序。**例如,圖 12-18a 展示了一個正確的指令順序的分步操作。在每個線程更新了共享變量 cnt 之后,它在內存中的值就是 2,這正是期望的值。
另一方面,圖 12-18b 的順序產生一個不正確的 cnt 的值。會發生這樣的問題是因為,線程 2 在第 5 步加載 ent,是在第 2 步線程 1 加載 cnt 之后,而在第 6 步線程 1 存儲它的更新值之前。因此,每個線程最終都會存儲一個值為 1 的更新后的計數器值。我們能夠借助于一種叫做進度圖(progress graph)的方法來闡明這些正確的和不正確的指令順序的概念,這個圖我們將在下一節中介紹。
具體這個在下節介紹,這里我們看到,正確的順序中,LOAD和STORE中間沒有插入另一個LOAD。
而錯誤的順序中,LOAD1和STORE1中間插入了一個LOAD2.
12.5.1 進度圖
進度圖將指令執行模型化為從一種狀態到另一種狀態的轉換(transition)。轉換被表示為一條從一點到相鄰點的有向邊。合法的轉換是向右(線程 1 中的一條指令完成)或者向上(線程 2 中的一條指令完成)的。兩條指令不能在同一時刻完成一對角線轉換是不允許的。程序決不會反向運行,所以向下或者向左移動的轉換也是不合法的。
一個程序的執行歷史被模型化為狀態空間中的一條軌跡線。圖 12-20 展示了下面指令順序對應的軌跡線:
我們的每個點,表示剛完成了這個點的任務。比如(H1,L2),表示線程1剛執行完H1,線程2剛執行完L2.
這個正方形的內部表示不安全區,不包括邊界的點。
我們原點是O,第一個坐標是H,第二個坐標是L,…
OK,那么就可以分析這個不安全區了:
- 線程1是L1時,即剛執行LOAD,如果線程2是L2或者U2,即剛LOAD完或者UPDATE完,這是不安全的
- 線程1是U1時,線程2的L2或U2依然是不安全的
進度圖就是這樣的,如果從旁邊擦過去沒關系,穿過去就是錯的.
感覺可以用進度圖的思想實現一個并發bug檢測器,待完成todo…
12.5.2 信號量
Edsger Dijkstra,并發編程領域的先鋒人物,提出了一種經典的解決同步不同執行線程問題的方法,這種方法是基于一種叫做信號量(semaphore)的特殊類型變量的。信號量 s 是具有非負整數值的全局變量,只能由兩種特殊的操作來處理,這兩種操作稱為 P 和 V:
-
P(s):測試 如果s是非零的,P將sj減1,并立即返回。如果s為零,那么就掛起這個線程,直到s變為非零。而一個 V 操作會重啟這個線程。在重啟之后,P 操作將 s 減 1,并將控制返回給調用者。
// 這里的代碼僅僅是為了模擬,不是真正的,真正的需要保證一定的原子性 void P(s){while (s == 0);s -= 1; } -
V(s):增加 V操作將s加1。如果有任何線程阻塞在P操作等待s變成非零,那么V操作會重啟這些線程中的一個。
void V(s){s += 1; }P 中的測試和減 1 操作是不可分割的,也就是說,一旦預測信號量 s 變為非零,就會將 s 減 1,不能有中斷。(這個不可分割的語義,使得測試如果不是0,就會立即-1,信號s不可能出現負數的情況)
V 中的加 1 操作也是不可分割的,也就是Load、加 1 和Store信號量的過程中沒有中斷。注意,V 的定義中沒有定義等待線程被重啟動的順序。唯一的要求是 V 必須只能重啟一個正在等待的線程。因此,當有多個線程在等待同一個信號量時,你不能預測 V 操作要重啟哪一個線程。
(V不可分割的語義,并且s+=1后,只能有一個退出s=0的自旋,也保證信號s不可能出現負數的情況)
——信號量ss絕不可能變為負值,這種屬性稱為信號量不變性,這為控制并發程序的軌跡線提供了強有力的工具,在下一節中我們將看到。
信號量提供一種方便的方法來確保對共享變量的互斥訪問,基本思想就是將每個共享變量和一個信號量聯系起來,然后使用P(s)和V(s)將相應的臨界區包圍起來
這種信號量又稱為二元信號量,因為它的值總是0或1,也常叫做互斥鎖,相應的P和V操作就叫做加鎖和解鎖
s的意義和鎖很像
s=1,說明當前沒有線程正在獲得鎖,所以當前的鎖是可獲取的.
s=0,說明當前如果有線程想進程P操作,就會卡在while循環,直到獲得鎖的那個線程用P(V)釋放了鎖.
Posix 標準定義了許多操作信號量的函數。
#include <semaphore.h>int sem_init(sem_t *sem, 0, unsigned int value); int sem_wait(sem_t *s); /* P(s) */ int sem_post(sem_t *s); /* V(s) */// 返回:若成功則為 0,若出錯則為 -1。- sem_init 函數將信號量 sem 初始化為 value。每個信號量在使用前必須初始化。針對我們的目的,中間的參數總是零。
- 程序分別通過調用 sem_wait 和 sem_post 函數來執行 P 和 V 操作。為了簡明,我們更喜歡使用下面這些等價的 P 和 V 的包裝函數:
12.5.3 使用信號量來實現互斥
信號量提供了一種很方便的方法來確保對共享變量的互斥訪問。基本思想是將每個共享變量(或者一組相關的共享變量)與一個信號量 s(初始為 1)聯系起來,然后用P(s)和V(s)操作將相應的臨界區包圍起來。
以這種方式來保護共享變量的信號量叫做二元信號量(binary semaphore),因為它的值總是 0 或者 1。以提供互斥為目的的二元信號量常常也稱為互斥鎖(mutex)。在一個互斥鎖上執行 P 操作稱為對互斥鎖加鎖。類似地,執行 V 操作稱為對互斥鎖解鎖。對一個互斥鎖加了鎖但是還沒有解鎖的線程稱為占用這個互斥鎖。一個被用作一組可用資源的計數器的信號量被稱為計數信號量。
圖 12-22 中的進度圖展示了我們如何利用二元信號量來正確地同步計數器程序示例。每個狀態都標出了該狀態中信號量 s 的值。關鍵思想是這種 P 和 V 操作的結合創建了一組狀態,叫做禁止區(forbidden region),其中因為信號量的不變性,沒有實際可行的軌跡線能夠包含禁止區中的狀態。而且,因為禁止區完全包括了不安全區,所以沒有實際可行的軌跡線能夠接觸不安全區的任何部分。因此,每條實際可行的軌跡線都是安全的,而且不管運行時指令順序是怎樣的,程序都會正確地增加計數器值。
從可操作的意義上來說,由 P 和 V 操作創建的禁止區使得在任何時間點上,在被包圍的臨界區中,不可能有多個線程在執行指令。換句話說,信號量操作確保了對臨界區的互斥訪問。
總的來說,為了用信號量正確同步圖 12-16 中的計數器程序示例,我們首先聲明一個信號量 mutex:
volatile long cnt = 0; /* Counter */ sem_t mutex; /* Semaphore that protects counter */然后在主例程中將 mutex 初始化為 1:
Sem_init(&mutex, 0, 1); /* mutex = 1 */最后,我們通過把在線程例程中對共享變量 cnt 的更新包圍 P 和 V 操作,從而保護它們:
for (i = 0; i < niters; i++) {P(&mutex);cnt++;V(&mutex); }總的代碼如下:
/* * goodcnt.c - A correctly synchronized counter program */ /* $begin goodcnt */ #include "csapp.h"void *thread(void *vargp); /* Thread routine prototype *//* Global shared variables */ /* $begin goodcntsemdef */volatile long cnt = 0; /* Counter */sem_t mutex; /* Semaphore that protects counter */ /* $end goodcntsemdef */int main(int argc, char **argv) {int niters;pthread_t tid1, tid2;/* Check input argument */if (argc != 2) {printf("usage: %s <niters>\n", argv[0]);exit(0);}niters = atoi(argv[1]);/* Create threads and wait for them to finish */ /* $begin goodcntseminit */Sem_init(&mutex, 0, 1); /* mutex = 1 */ /* $end goodcntseminit */Pthread_create(&tid1, NULL, thread, &niters);Pthread_create(&tid2, NULL, thread, &niters);Pthread_join(tid1, NULL);Pthread_join(tid2, NULL);/* Check result */if (cnt != (2 * niters))printf("BOOM! cnt=%ld\n", cnt);elseprintf("OK cnt=%ld\n", cnt);exit(0); }/* Thread routine */ void *thread(void *vargp) {int i, niters = *((int *)vargp);/* $begin goodcntthread */for (i = 0; i < niters; i++) {P(&mutex);cnt++;V(&mutex);} /* $end goodcntthread */return NULL; } /* $end goodcnt */當我們運行這個正確同步的程序時,現在它每次都能產生正確的結果了。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-KGyoW0Jf-1622542085849)(assert/image-20210601005115377.png)]
旁注 - 進度圖的局限性
進度圖給了我們一種較好的方法,將在單處理器上的并發程序執行可視化,也幫助我們理解為什么需要同步。然而,它們確實也有局限性,特別是對于在多處理器上的并發執行,在多處理器上一組 CPU/高速緩存對共享同一個主存。多處理器的工作方式是進度圖不能解釋的。特別是,一個多處理器內存系統可以處于一種狀態,不對應于進度圖中任何軌跡線。不管如何,結論總是一樣的:無論是在單處理器還是多處理器上運行程序,都要同步你對共享變量的訪問。
12.5.4 利用信號量來調度共享資源
除了提供互斥之外,信號量的另一個重要作用是調度對共享資源的訪問。在這種場景中,一個線程用信號量操作來通知另一個線程,程序狀態中的某個條件已經為真了。兩個經典而有用的例子是生產者—消費者和讀者—寫者問題。
1. 生產者—消費者問題
圖 12-23 給出了生產者—消費者問題。生產者和消費者線程共享一個有 n 個槽的有限緩沖區。生產者線程反復地生成新的項目(item),并把它們插入到緩沖區中。消費者線程不斷地從緩沖區中取出這些項目,然后消費(使用)它們。也可能有多個生產者和消費者的變種。
因為插入和取出項目都涉及更新共享變量,所以我們必須保證對緩沖區的訪問是互斥的。但是只保證互斥訪問是不夠的,我們還需要調度對緩沖區的訪問。如果緩沖區是滿的(沒有空的槽位),那么生產者必須等待直到有一個槽位變為可用。與之相似,如果緩沖區是空的(沒有可取用的項目),那么消費者必須等待直到有一個項目變為可用。
生產者—消費者的相互作用在現實系統中是很普遍的。例如,在一個多媒體系統中,生產者編碼視頻幀,而消費者解碼并在屏幕上呈現出來。緩沖區的目的是為了減少視頻流的抖動,而這種抖動是由各個幀的編碼和解碼時與數據相關的差異引起的。緩沖區為生產者提供了一個槽位池,而為消費者提供一個已編碼的幀池。另一個常見的示例是圖形用戶接口設計。生產者檢測到鼠標和鍵盤事件,并將它們插入到緩沖區中。消費者以某種基于優先級的方式從緩沖區取出這些事件,并顯示在屏幕上。
在本節中,我們將開發一個簡單的包,叫做 SBUF,用來構造生產者—消費者程序。在下一節里,我們會看到如何用它來構造一個基于預線程化(prethreading)的有趣的并發服務器。SBUF 操作類型為 sbuf_t 的有限緩沖區(圖 12-24)。
- 項目存放在一個動態分配的 1 項整數數組(buf)中。
- front 和 rear 索引值記錄該數組中的第一項和最后一項。
- 三個信號量同步對緩沖區的訪問。mutex 信號量提供互斥的緩沖區訪問。slots 和 items 信號量分別記錄空槽位和可用項目的數量。
我們給每一個資源一個信號,來控制這個資源。我們給buf緩沖區一個mutex信號量,表示同時只能有一個線程去修改buf。空槽slots也是一個資源,這是對于生產者而言的資源,生產者想要生產的話,必須要先用P(slots)來獲得這個空槽.槽上的產品也是一個資源,消費者想要消費的話,必須要P(items)來獲得這個空槽.
sbuf.h:
typedef struct {int *buf; /* Buffer array */int n; /* Maximum number of slots */int front; /* buf[(front+1)%n] is first item */int rear; /* buf[rear%n] is last item */sem_t mutex; /* Protects accesses to buf */sem_t slots; /* Counts available slots */sem_t items; /* Counts available items */ } sbuf_t;圖 12-24 sbuf_t:SBUF 包使用的有限緩沖區
圖 12-25 給出了 SBUF 函數的實現。sbuf_init 函數為緩沖區分配堆內存,設置 front 和 rear 表示一個空的緩沖區,并為三個信號量賦初始值。這個函數在調用其他三個函數中的任何一個之前調用一次。sbuf_deinit 函數是當應用程序使用完緩沖區時,釋放緩沖區存儲的。sbuf_insert 函數等待一個可用的槽位,對互斥鎖加鎖,添加項目,對互斥鎖解鎖,然后宣布有一個新項目可用。sbuf_remove 函數是與 sbuf_insert 函數對稱的。在等待一個可用的緩沖區項目之后,對互斥鎖加鎖,從緩沖區的前面取出該項目,對互斥鎖解鎖,然后發信號通知一個新的槽位可供使用。
sbuf.c:
#include "csapp.h" #include "sbuf.h"/* Create an empty, bounded, shared FIFO buffer with n slots */ void sbuf_init(sbuf_t *sp, int n) //為 {sp->buf = Calloc(n, sizeof(int));sp->n = n; /* Buffer holds max of n items */sp->front = sp->rear = 0; /* Empty buffer iff front == rear */Sem_init(&sp->mutex, 0, 1); /* Binary semaphore for locking */Sem_init(&sp->slots, 0, n); /* Initially, buf has n empty slots */Sem_init(&sp->items, 0, 0); /* Initially, buf has zero data items */ }/* Clean up buffer sp */ void sbuf_deinit(sbuf_t *sp) {Free(sp->buf); }/* Insert item onto the rear of shared buffer sp */ void sbuf_insert(sbuf_t *sp, int item) {P(&sp->slots); /* Wait for available slot */P(&sp->mutex); /* Lock the buffer */sp->buf[(++sp->rear) % (sp->n)] = item; /* Insert the item */V(&sp->mutex); /* Unlock the buffer */V(&sp->items); /* Announce available item */ }/* Remove and return the first item from buffer sp */ int sbuf_remove(sbuf_t *sp) {int item;P(&sp->items); /* Wait for available item */P(&sp->mutex); /* Lock the buffer */item = sp->buf[(++sp->front) % (sp->n)]; /* Remove the item */V(&sp->mutex); /* Unlock the buffer */V(&sp->slots); /* Announce available slot */return item; }圖 12-25 SBUF:同步對有限緩沖區并發訪問的包
這個代碼,對于有多個生產者和消費者,也是成立的,因為無非是多個生產者去獲得這個P(items),但是由于信號量所給的只能有一個獲得資源的語義,所以是正確的.
Ok,我們寫一個生產者消費者實例,并用checker.c去檢查正確性:
// sbuf.c /* $begin sbufc */ #include <csapp.c> #include <unistd.h> #include "sbuf.h" #define N 10 sbuf_t *sp;/* Create an empty, bounded, shared FIFO buffer with n slots */ /* $begin sbuf_init */ void sbuf_init(sbuf_t *sp, int n) {sp->buf = Calloc(n, sizeof(int)); sp->n = n; /* Buffer holds max of n items */sp->front = sp->rear = 0; /* Empty buffer iff front == rear */Sem_init(&sp->mutex, 0, 1); /* Binary semaphore for locking */Sem_init(&sp->slots, 0, n); /* Initially, buf has n empty slots */Sem_init(&sp->items, 0, 0); /* Initially, buf has zero data items */ } /* $end sbuf_init *//* Clean up buffer sp */ /* $begin sbuf_deinit */ void sbuf_deinit(sbuf_t *sp) {Free(sp->buf); } /* $end sbuf_deinit *//* Insert item onto the rear of shared buffer sp */ /* $begin sbuf_insert */ void sbuf_insert(sbuf_t *sp, int item) {P(&sp->slots); /* Wait for available slot */P(&sp->mutex); /* Lock the buffer */printf("p\n");sp->buf[(++sp->rear)%(sp->n)] = item; /* Insert the item */V(&sp->mutex); /* Unlock the buffer */V(&sp->items); /* Announce available item */ } /* $end sbuf_insert *//* Remove and return the first item from buffer sp */ /* $begin sbuf_remove */ int sbuf_remove(sbuf_t *sp) {int item;P(&sp->items); /* Wait for available item */P(&sp->mutex); /* Lock the buffer */printf("c\n");item = sp->buf[(++sp->front)%(sp->n)]; /* Remove the item */V(&sp->mutex); /* Unlock the buffer */V(&sp->slots); /* Announce available slot */return item; } /* $end sbuf_remove */ /* $end sbufc */void *producer(void *args) {while(1){usleep(100);sbuf_insert(sp,1);} } void *consumer(void *args) {while(1){usleep(100);sbuf_remove(sp);} } int main() {pthread_t tid1;pthread_t tid2;sp = (sbuf_t *) malloc(sizeof(sbuf_t));sbuf_init(sp,N);pthread_create(&tid1,NULL,producer,NULL);pthread_create(&tid2,NULL,consumer,NULL);pthread_exit(0);sbuf_deinit(sp); }我們這個生產者,打印p,消費者,打印c
再來看checker.c
//checker.c #include <csapp.c> #define MAX_LEN 1000 int main(int argc,char **argv) {int con_num = 0;int pro_num = 0;if (argc < 3){printf ("argv usage\n");}int N = atoi(argv[2]);int fd = Open(argv[1], O_RDONLY, 0);char * buf = (char *)malloc(MAX_LEN);int n;int linshi = 0;while ((n=read(fd,buf,MAX_LEN))>0){for (int i = 0;i < n;i++){if (buf[i] == 'p'){linshi++;pro_num++;if ((pro_num-con_num)>N){printf("produce too much\n");printf("%d\n",linshi);return 0;}} else if (buf[i] == 'c'){linshi++;con_num++;if (con_num > pro_num){printf("consume too much\n");printf("%d\n",linshi);return 0;}}}}printf("the model is OK!\n"); }結果如下:
A:p=1,c=1,n>1,需要互斥鎖,這是因為要保護buf臨界區
B:p=1,c=1,n=1,不用互斥鎖,這是因為n只有1,本來就不可能生產者和消費者同時去寫buf.
C: p>1,c>1,n=1,不用互斥鎖,原因和B一樣,同時只能有一個生產者或消費者在寫buf.
2. 讀者—寫者問題
讀者—寫者問題是互斥問題的一個概括。一組并發的線程要訪問一個共享對象,例如一個主存中的數據結構,或者一個磁盤上的數據庫。有些線程只讀對象,而其他的線程只修改對象。修改對象的線程叫做寫者。只讀對象的線程叫做讀者。寫者必須擁有對對象的獨占的訪問,而讀者可以和無限多個其他的讀者共享對象。一般來說,有無限多個并發的讀者和寫者。
讀者—寫者交互在現實系統中很常見。例如,一個在線航空預定系統中,允許有無限多個客戶同時查看座位分配,但是正在預訂座位的客戶必須擁有對數據庫的獨占的訪問。再來看另一個例子,在一個多線程緩存 Web 代理中,無限多個線程可以從共享頁面緩存中取出已有的頁面,但是任何向緩存中寫入一個新頁面的線程必須擁有獨占的訪問。
讀者—寫者問題有幾個變種,分別基于讀者和寫者的優先級:
-
第一類讀者—寫者問題,讀者優先,要求不要讓讀者等待,除非已經把使用對象的權限賦予了一個寫者。換句話說,讀者不會因為有一個寫者在等待而等待。
-
第二類讀者—寫者問題,寫者優先,要求一旦一個寫者準備好可以寫,它就會盡可能快地完成它的寫操作。同第一類問題不同,在一個寫者后到達的讀者必須等待,即使這個寫者也是在等待。
圖 12-26 給出了一個對第一類讀者—寫者問題的解答。同許多同步問題的解答一樣,這個解答很微妙,極具欺騙性地簡單。信號量 w 控制對訪問共享對象的臨界區的訪問。信號量 mutex 保護對共享變量 readcnt 的訪問,readcnt 統計當前在臨界區中的讀者數量。每當一個寫者進入臨界區時,它對互斥鎖 w 加鎖,每當它離開臨界區時,對 w 解鎖。這就保證了任意時刻臨界區中最多只有一個寫者。另一方面,只有第一個進入臨界區的讀者對 w 加鎖,而只有最后一個離開臨界區的讀者對 w 解鎖。當一個讀者進入和離開臨界區時,如果還有其他讀者在臨界區中,那么這個讀者會忽略互斥鎖 w。這就意味著只要還有一個讀者占用互斥鎖 w,無限多數量的讀者可以沒有障礙地進入臨界區。
/* Global variables */ int readcnt; /* Initially = 0 */ sem_t mutex, w; /* Both initially = 1 */void reader(void) {while (1) {P(&mutex);readcnt++;if (readcnt == 1) /* First in 第一個讀者會用P獲得訪問權限*/P(&w);V(&mutex);/* Critical section *//* Reading happens */P(&mutex);readcnt--;if (readcnt == 0) /* Last out 最后一個讀者會用V釋放訪問資源 */V(&w);V(&mutex);} }void writer(void) {while (1) {P(&w);/* Critical section *//* Writing happens */V(&w);} }圖 12-26 對第一類讀者—寫者問題的解答。讀者優先級高于寫者
對這兩種讀者—寫者問題的正確解答可能導致饑餓(starvation),饑餓就是一個線程無限期地阻塞,無法進展。例如,圖 12-26 所示的解答中,如果有讀者不斷地到達,寫者就可能無限期地等待。
分析:line10~11,保證了第一個讀者獲得這個信號量(這個保證了讀者優先,一旦有第一個讀者到了,那么就一直持有這個信號量,直到最后一個讀者退出了,寫著才能或者的信號量)。 第一個讀者到了以后,由于用P(&w)獲得這個信號量,所以寫者就只能卡在P(&w)中.所以后面的讀者到了,就直接去訪問臨界區,然后退出.
這個問題說的是,我們如果有讀者和寫者共同想要獲得信號量w,我們需要把信號量w給讀者而不是寫者,但是上面的代碼并不能準確的做到給讀者,所以稱為弱優先級.描述一個場景,這種若優先級會導致一群寫者而使得一個讀者饑餓
旁注 - 其他同步機制
我們已經向你展示了如何利用信號量來同步線程,主要是因為它們簡單、經典,并且有一個清晰的語義模型。但是你應該知道還是存在著其他同步技術的。例如,Java 線程是用一種叫做 Java 監控器(Java Monitor)【48】的機制來同步的,它提供了對信號量互斥和調度能力的更高級別的抽象;實際上,監控器可以用信號量來實現。再來看一個例子,Pthreads 接口定義了一組對互斥鎖和條件變量的同步操作。Pthreads 互斥鎖被用來實現互斥。條件變量用來調度對共享資源的訪問,例如在一個生產者—消費者程序中的有限緩沖區。
具體參見第21章 并發
12.5.5 綜合:基于預線程化的并發服務器
我們已經知道了如何使用信號量來訪問共享變量和調度對共享資源的訪問。為了幫助你更清晰地理解這些思想,讓我們把它們應用到一個基于稱為預線程化(prethreading)技術的并發服務器上。
在圖 12-14 所示的并發服務器中,我們為每一個新客戶端創建了一個新線程。這種方法的缺點是我們為每一個新客戶端創建一個新線程,導致不小的代價。一個基于預線程化的服務器試圖通過使用如圖 12-27 所示的生產者—消費者模型來降低這種開銷。服務器是由一個主線程和一組工作者線程構成的。主線程不斷地接受來自客戶端的連接請求,并將得到的連接描述符放在一個有限緩沖區中。每一個工作者線程反復地從共享緩沖區中取出描述符,為客戶端服務,然后等待下一個描述符。
圖 12-28 顯示了我們怎樣用 SBUF 包來實現一個預線程化的并發 echo 服務器。在初始化了緩沖區 sbuf(第 24 行)后,主線程創建了一組工作者線程(第 25 ~ 26 行)。然后它進入了無限的服務器循環,接受連接請求,并將得到的已連接描述符插入到緩沖區 sbuf 中。每個工作者線程的行為都非常簡單。它等待直到它能從緩沖區中取出一個已連接描述符(第 39 行),然后調用 echo_cnt 函數回送客戶端的輸入。
echoservert-pre.c:
#include "csapp.h" #include "sbuf.h" #define NTHREADS 4 #define SBUFSIZE 16void echo_cnt(int connfd); void *thread(void *vargp);sbuf_t sbuf; /* Shared buffer of connected descriptors */int main(int argc, char **argv) {int i, listenfd, connfd;socklen_t clientlen;struct sockaddr_storage clientaddr;pthread_t tid;if (argc != 2) {fprintf(stderr, "usage: %s <port>\n", argv[0]);exit(0);}listenfd = Open_listenfd(argv[1]); //監聽文件描述符sbuf_init(&sbuf, SBUFSIZE); //創建sbuf緩沖區structfor (i = 0; i < NTHREADS; i++) //創建4個工作線程Pthread_create(&tid, NULL, thread, NULL);while (1) {clientlen = sizeof(struct sockaddr_storage);connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);sbuf_insert(&sbuf, connfd); /* Insert connfd in buffer,thread會自動取出處理*/} }void *thread(void *vargp) {Pthread_detach(pthread_self()); //分離線程,線程結束后自動釋放資源while (1) {int connfd = sbuf_remove(&sbuf); //能從緩沖區中取出一個已連接描述符echo_cnt(connfd); /* Service client */Close(connfd);} }圖 12-28 一個預線程化的并發 echo 服務器。這個服務器使用的是有一個生產者和多個消費者的生產者—消費者模型
可以看到,這里的行為就是,建立4個線程去不斷的從s_buf取出connfd去進行處理.我們main線程在accept后,用sbuf_insert來插入待處理的connfd,然后thraed中我們會sub_remove(&sbuf)去取出一個已連接的描述符,哇妙啊,生產者消費者模型果然很實用!!!我們的服務器非常高效,只是accept然后insert到緩沖區中,線程自動會處理
圖 12-29 所示的函數 echo_cnt 是圖 11-22 中的 echo 函數的一個版本,它在全局變量 byte_cnt 中記錄了從所有客戶端接收到的累計字節數。這是一段值得研究的有趣代碼,因為它向你展示了一個從線程例程調用的初始化程序包的一般技術。在這種情況中,我們需要初始化 byte_cnt 計數器和 mutex 信號量。一個方法是我們為 SBUF 和 RIO 程序包使用過的,它要泰主線程顯式地調用一個初始化函數。另外一個方法,在此顯示的,是當第一次有某個線程調用 echo_cnt 函數時,使用 pthread_once 函數(第 19 行)去調用初始化函數。這個方法的優點是它屆程序包的使用更加容易。這種方法的缺點是每一次調用 echo_ent 都會導致調用 pthread_once 函數,而在大多數時候它沒有做什么有用的事
echo_cnt.c:
#include "csapp.h"static int byte_cnt; /* Byte counter */ static sem_t mutex; /* and the mutex that protects it */static void init_echo_cnt(void) {Sem_init(&mutex, 0, 1);//初始化鎖byte_cnt = 0; //從所有客戶端收到總字節數 }void echo_cnt(int connfd) {int n;char buf[MAXLINE];rio_t rio;static pthread_once_t once = PTHREAD_ONCE_INIT; //靜態變量,效果相當于全局Pthread_once(&once, init_echo_cnt); //第一個執行這個函數的線程,會去調用這個初始化函數Rio_readinitb(&rio, connfd);while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) { //反復讀取一行P(&mutex); //PV mutex來保護byte_cnt的原子性增加byte_cnt += n;printf("server received %d (%d total) bytes on fd %d\n",n, byte_cnt, connfd);V(&mutex);Rio_writen(connfd, buf, n);} }圖 12-29 echo_cnt:echo 的一個版本,它對從客戶端接收的所有字節計數
一旦程序包被初始化,echo_cnt 函數會初始化 RIO 帶緩沖區的 I/O 包(第 20 行),然后回送從客戶端接收到的每一個文本行。注意,在第 23 ~ 25 行中對共享變量 byte_cnt 的訪問是被 P 和 V 操作保護的。
結果:
確實可以處理多個
旁注 - 基于線程的事件驅動程序
I/O 多路復用不是編寫事件驅動程序的唯一方法。例如,你可能已經注意到我們剛才開發的并發的預線程化的服務器實際上是一個事件驅動服務器,帶有主線程和工作者線程的簡單狀態機。主線程有兩種狀態(“等待連接請求” 和 “等待可用的緩沖區槽位”)、兩個 I/O 事件(“連接請求到達” 和 “緩沖區槽位變為可用”)和兩個轉換(“接受連接請求” 和“插入緩沖區項目”)。類似地,每個工作者線程有一個狀態(“等待可用的緩沖項目”)、一個 I/O 事件(“緩沖區項目變為可用”)和一個轉換(“取出緩沖區項目”)。
12.6 使用線程提高并行性
到目前為止,在對并發的研究中,我們都假設并發線程是在單處理器系統上執行的。然而,大多數現代機器具有多核處理器。并發程序通常在這樣的機器上運行得更快,因為操作系統內核在多個核上并行地調度這些并發線程,而不是在單個核上順序地調度。在像繁忙的 Web 服務器、數據庫服務器和大型科學計算代碼這樣的應用中利用這樣的并行性是至關重要的,而且在像 Web 瀏覽器、電子表格處理程序和文檔處理程序這樣的主流應用中,并行性也變得越來越有用。
圖 12-30 給出了順序、并發和并行程序之間的集合關系。所有程序的集合能夠被劃分成不相交的順序程序集合和并發程序的集合。寫順序程序只有一條邏輯流。寫并發程序有多條并發流。并行程序是一個運行在多個處理器上的并發程序。因此,并行程序的集合是并發程序集合的真子集。
將任務分配到不同線程的最直接方法是將序列劃分成 t 個不相交的區域,然后給,個不同的線程每個分配一個區域。為了簡單,假設 n 是 t 的倍數,這樣每個區域有以 n/t 個元素。讓我們來看看多個線程并行處理分配給它們的區域的不同方法。
最簡單也最直接的選擇是將線程的和放入一個共享全局變量中,用互斥鎖保護這個變量。圖 12-31 給出了我們會如何實現這種方法。在第 28 ~ 33 行,主線程創建對等線程,然后等待它們結束。注意,主線程傳遞給每個對等線程一個小整數,作為唯一的線程 ID。每個對等線程會用它的線程 ID 來決定它應該計算序列的哪一部分。這個向對等線程傳遞一個小的唯一的線程 ID 的思想是一項通用技術,許多并行應用中都用到了它。在對等線程終止后,全局變量 gsum 包含著最終的和。然后主線程用閉合形式解答來驗證結果(第 36 ~ 37 行)。
psum-mutex.c:
#include "csapp.h" #define MAXTHREADS 32void *sum_mutex(void *vargp); /* Thread routine *//* Global shared variables */ long gsum = 0; /* Global sum */ long nelems_per_thread; /* Number of elements to sum */ sem_t mutex; /* Mutex to protect global sum */int main(int argc, char **argv) {long i, nelems, log_nelems, nthreads, myid[MAXTHREADS];pthread_t tid[MAXTHREADS];/* Get input arguments */if (argc != 3) {printf("Usage: %s <nthreads> <log_nelems>\n", argv[0]);exit(0);}nthreads = atoi(argv[1]); //線程數log_nelems = atoi(argv[2]); //加法個數nelems = (1L << log_nelems);//nelem=log_nelems*2nelems_per_thread = nelems / nthreads; //每個線程執行幾個加法任務sem_init(&mutex, 0, 1);/* Create peer threads and wait for them to finish */for (i = 0; i < nthreads; i++) {myid[i] = i;Pthread_create(&tid[i], NULL, sum_mutex, &myid[i]);}for (i = 0; i < nthreads; i++)Pthread_join(tid[i], NULL);/* Check final answer */if (gsum != (nelems * (nelems - 1)) / 2)printf("Error: result=%ld\n", gsum);exit(0); }圖 12-31 psum-mutex 的主程序,使用多個線程將一個序列元素的和放入一個用互斥鎖保護的共享全局變量中
圖 12-32 給出了每個對等線程執行的函數。在第 4 行中,線程從線程參數中提取出線程 ID,然后用這個 ID 來決定它要計算的序列區域(第 5 ~ 6 行)。在第 9 ~ 13 行中,線程在它的那部分序列上迭代操作,每次迭代都更新共享全局變量 gsum。注意,我們很小心地用 P 和 V 互斥操作來保護每次更新。
/* Thread routine for psum-mutex.c */ void *sum_mutex(void *vargp) {long myid = *((long *)vargp); /* Extract the thread ID */long start = myid * nelems_per_thread; /* Start element index */long end = start + nelems_per_thread; /* End element index */long i;for (i = start; i < end; i++) {P(&mutex);gsum += i;V(&mutex);}return NULL; }圖 12-32 psum-mutex 的線程例程。每個對等線程將各自的和累加進一個用互斥鎖保護的共享全局變量中
程序單線程順序運行時非常慢,幾乎比多線程并行運行時慢了一個數量級。不僅如此,使用的核數越多,性能越差。造成性能差的原因是相對于內存更新操作的開銷,同步操作(P 和 V)代價太大。這突顯了并行編程的一項重要教訓:同步開銷巨大,要盡可能避免。如果無可避免,必須要用盡可能多的有用計算彌補這個開銷。
在我們的例子中,一種避免同步的方法是讓每個對等線程在一個私有變量中計算它自己的部分和,這個私有變量不與其他任何線程共享,如圖 12-33 所示。主線程(圖中未顯示)定義一個全局數組 psum,每個對等線程 i 把它的部分和累積在 psum[i] 中。因為小心地給了每個對等線程一個不同的內存位置來更新,所以不需要用互斥鎖來保護這些更新。唯一需要同步的地方是主線程必須等待所有的子線程完成。在對等線程結束后,主線程把 psum 向量的元素加起來,得到最終的結果。
/* Thread routine for psum-array.c */ void *sum_array(void *vargp) {long myid = *((long *)vargp); /* Extract the thread ID */long start = myid * nelems_per_thread; /* Start element index */long end = start + nelems_per_thread; /* End element index */long i;for (i = start; i < end; i++) {psum[myid] += i;}return NULL; }圖 12-33 psum-array 的線程例程。每個對等線程把它的部分和累積在一個私有數組元素中,不與其他任何對等線程共享該元素
運行 psum-array 時,我們看到它比 psum-mutex 運行得快好幾個數量級
在第 5 章中,我們學習到了如何使用局部變量來消除不必要的內存引用。圖 12-34 展示了如何應用這項原則,讓每個對等線程把它的部分和累積在一個局部變量而不是全局變量中。當在四核機器上運行 psum-local 時,得到一組新的遞減的運行時間:
/* Thread routine for psum-local.c */ void *sum_local(void *vargp) {long myid = *((long *)vargp); /* Extract the thread ID */long start = myid * nelems_per_thread; /* Start element index */long end = start + nelems_per_thread; /* End element index */long i, sum = 0;for (i = start; i < end; i++) {sum += i;}psum[myid] = sum;return NULL; }圖 12-34 psum-local 的線程例程。每個對等線程把它的部分和累積在一個局部變量中
從這個練習可以學習到一個重要的經驗,那就是寫并行程序相當棘手。對代碼看上去很小的改動可能會對性能有極大的影響。
12.7 其他并發問題
你可能已經注意到了,一旦我們要求同步對共享數據的訪問,那么事情就變得復雜得多了。迄今為止,我們已經看到了用于互斥和生產者—消費者同步的技術,但這僅僅是冰山一角。同步從根本上說是很難的問題,它引出了在普通的順序程序中不會出現的問題。這一小節是關于你在寫并發程序時需要注意的一些問題的(非常不完整的)綜述。為了讓事情具體化,我們將以線程為例描述討論。不過要記住,這些典型問題是任何類型的并發流操作共享資源時都會出現的。
12.7.1 線程安全
當用線程編寫程序時,必須小心地編寫那些具有稱為線程安全性(thread safety)屬性的函數。一個函數被稱為線程安全的(thread-safe),當且僅當被多個并發線程反復地調用時,它會一直產生正確的結果。如果一個函數不是線程安全的,我們就說它是線程不安全的(thread-unsafe)。
線程不安全函數
我們能夠定義出四個(不相交的)線程不安全函數類:
第 1 類:不保護共享變量的函數。
我們在圖 12-16 的 thread 函數中就已經遇到了這樣的問題,該函數對一個未受保護的全局計數器變量加 1。將這類線程不安全函數變成線程安全的,相對而言比較容易:利用像 P 和 V 操作這樣的同步操作來保護共享的變量。這個方法的優點是在調用程序中不需要做任何修改。缺點是同步操作將減慢程序的執行時間。
解決辦法:使用 P 和 V semaphore 操作,互斥鎖保護臨界區、
第 2 類:函數中有后續調用需要用的狀態的函數。
(在多次調用間保存狀態的函數)一個偽隨機數生成器是這類線程不安全函數的簡單例子。請參考圖 12-37 中的偽隨機數生成器程序包。rand 函數是線程不安全的,因為當前調用的結果依賴于前次調用的中間結果。當調用 srand 為 rand 設置了一個種子后,我們從一個單線程中反復地調用 rand,能夠預期得到一個可重復的隨機數字序列。然而,如果多線程調用 rand 函數,這種假設就不再成立了。
解決辦法:把狀態當做傳入參數
libc - rand.c
unsigned next_seed = 1;/* rand - return pseudorandom integer in the range 0..32767 */ unsigned rand(void) {next_seed = next_seed * 1103515245 + 12543;return (unsigned)(next_seed >> 16) % 32768; }/* srand - set the initial seed for rand() */ void srand(unsigned new_seed) {next_seed = new_seed; }圖 12-37 一個線程不安全的偽隨機數生成器(基于【61】)
要使像 rand 這樣的函數變成線程安全的唯一方式是重寫它,使得它不再使用任何 static 數據,而是依靠調用者在參數中傳遞狀態信息。這樣做的缺點是,程序員現在還要被迫修改調用程序中的代碼。在一個大的程序中,可能有成百上千個不同的調用位置,做這樣的修改將是非常麻煩的,而且容易出錯。
比如下面這個demo,展示了srand和rand的使用(單線程)
#include <csapp.c> int main() {srand(100);for (int i = 0;i < 4;i++){printf("%d\n",rand());}return 0; }可以看到,如果srand規定了全局變量next_rand之后,每次rand()都是依賴上次next_rand的結果,所以結果都是一樣的.
再看下面的例子,展示了多線程的srand下,相同種子,調用的結果卻不同
#include <csapp.c> void * thread(void *args); int main() {pthread_t tid1;pthread_t tid2;srand(100);int i1 = 1;int i2 = 2;Pthread_create(&tid1,NULL,thread,(void *)&i1);Pthread_create(&tid2,NULL,thread,(void *)&i2);pthread_exit(0);return 0; } void * thread(void *args) {int n = *((int *) args);srand(100);for (int i = 0;i < 4;i++){sleep(1);printf("[thread %d] 第[%d]次:%d\n",n,i+1,rand());}return 0; }可以看到,相同的種子下,得到的結果卻不同.
解決辦法:重新寫一個rand,每次都傳入參數,而且結果放在局部變量
第 3 類:返回指向靜態變量的指針的函數。
某些函數,例如 ctime 和 gethost-byname,將計算結果放在一個 static 變量中,然后返回一個指向這個變量的指針,所以總是返回相同的地址.如果我們從并發線程中調用這些函數,那么將可能發生災難,因為正在被一個線程使用的結果會被另一個線程悄悄地覆蓋了。(一個線程調用函數后得到變量指針,在這個指針的值拿來用,但是多線程中會被一直修改)
有兩種方法來處理這類線程不安全函數。一種選擇是重寫函數,使得調用者傳遞存放結果的變量的地址。這就消除了所有共享數據,但是它要求程序員能夠修改函數的源代碼。
如果線程不安全函數是難以修改或不可能修改的(例如,代碼非常復雜或是沒有源代碼可用),那么另外一種選擇就是使用加鎖—復制(lock-and-copy)技術。基本思想是將線程不安全函數與互斥鎖聯系起來。在每一個調用位置,對互斥鎖加鎖,調用線程不安全函數,將函數返回的結果復制到一個私有的內存位置,然后對互斥鎖解鎖。為了盡可能地減少對調用者的修改,你應該定義一個線程安全的包裝函數,它執行加鎖—復制,然后通過調用這個包裝函數來取代所有對線程不安全函數的調用。例如,圖 12-38 給出了 ctime 的一個線程安全的版本,利用的就是加鎖—復制技術。
char *ctime_ts(const time_t *timep, char *privatep) {char *sharedp;P(&mutex);sharedp = ctime(timep);strcpy(privatep, sharedp); /* Copy string from shared to private */V(&mutex);return privatep; }圖 12-38 C 標準庫函數 ctime 的線程安全的包裝函數。使用加鎖—復制技術調用一個第 3 類線程不安全函數
第 4 類:調用線程不安全函數的函數。
如果函數 f 調用線程不安全函數 g,那么 f 就是線程不安全的嗎?不一定。如果 g 是第 2 類函數,即依賴于跨越多次調用的狀態,那么 f 也是線程不安全的,而且除了重寫 g 以外,沒有什么辦法。然而,如果 g 是第 1 類或者第 3 類函數,那么只要你用一個互斥鎖保護調用位置和任何得到的共享數據,f 仍然可能是線程安全的。在圖 12-38 中我們看到了一個這種情況很好的示例,其中我們使用加鎖—復制編寫了一個線程安全函數,它調用了一個線程不安全的函數。
12.7.2 可重入性
有一類重要的線程安全函數,叫做可重入函數(reentrant function),其特點在于它們具有這樣一種屬性:當它們被多個線程調用時,不會引用任何共享數據。盡管線程安全和可重入有時會(不正確地)被用做同義詞,但是它們之間還是有清晰的技術差別,值得留意。圖 12-39 展示了可重入函數、線程安全函數和線程不安全函數之間的集合關系。所有函數的集合被劃分成不相交的線程安全和線程不安全函數集合。可重入函數集合是線程安全函數的一個真子集。
就是說,可重入函數一定是線程安全的,因為用的都是線程本地的變量
可重入函數通常要比不可重入的線程安全的函數高效一些,因為它們不需要同步操作。更進一步來說,將第 2 類線程不安全函數轉化為線程安全函數的唯一方法就是重寫它,使之變為可重入的。例如,圖 12-40 展示了圖 12-37 中 rand 函數的一個可重入的版本。關鍵思想是我們用一個調用者傳遞進來的指針取代了靜態的 next 變量。
/* rand_r - return a pseudorandom integer on 0..32767 */ int rand_r(unsigned int *nextp) {*nextp = *nextp * 1103515245 + 12345;return (unsigned int)(*nextp / 65536) % 32768; }圖 12-40 rand_r:圖 12-37 中的 rand 函數的可重入版本
檢査某個函數的代碼并先驗地斷定它是可重入的,這可能嗎?不幸的是,不一定能這樣。如果所有的函數參數都是傳值傳遞的(即沒有指針),并且所有的數據引用都是本地的自動棧變量(即沒有引用靜態或全局變量),那么函數就是顯式可重入的(explicitly reentrant),也就是說,無論它是被如何調用的,都可以斷言它是可重入的。
然而,如果把假設放寬松一點,允許顯式可重入函數中一些參數是引用傳遞的(即允許它們傳遞指針),那么我們就得到了一個隱式可重入的(implicitly reentrant)函數,也就是說,如果調用線程小心地傳遞指向非共享數據的指針,那么它是可重入的。例如,圖 12-40 中的 rand_r 函數就是隱式可重入的。
我們總是使用術語可重入的(reentrant)既包括顯式可重入函數也包括隱式可重入函數。然而,認識到可重入性有時既是調用者也是被調用者的屬性,并不只是被調用者單獨的屬性是非常重要的。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-QiauZCaw-1622542085852)(assert/image-20210601110817679.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-c0vdqzl7-1622542085853)(assert/image-20210601110830001.png)]
Unix大多數系統調用都是線程安全的,下面列出了一些線程不安全的
為左邊這些函數,Unix基本都提供了一個可重入函數的版本,不過它們通常有不同的參數
12.7.4 競爭
——競爭才是并發bug的源泉
當一個程序的正確性依賴于一個線程要在另一個線程到達 y 點之前到達它的控制流中的 x 點時,就會發生競爭(race)。通常發生競爭是因為程序員假定線程將按照某種特殊的軌跡線穿過執行狀態空間,而忘記了另一條準則規定:多線程的程序必須對任何可行的軌跡線都正確工作。
例子是理解競爭本質的最簡單的方法。讓我們來看看圖 12-42 中的簡單程序。主線程創建了四個對等線程,并傳遞一個指向一個唯一的整數 ID 的指針到每個線程。每個對等線程復制它的參數中傳遞的 ID 到一個局部變量中(第 22 行),然后輸出一個包含這個 ID 的信息。它看上去足夠簡單,但是當我們在系統上運行這個程序時,我們得到以下不正確的結果:
/* WARNING: This code is buggy! */ #include "csapp.h" #define N 4void *thread(void *vargp);int main() {pthread_t tid[N];int i;for (i = 0; i < N; i++)Pthread_create(&tid[i], NULL, thread, &i);for (i = 0; i < N; i++)Pthread_join(tid[i], NULL);exit(0); }/* Thread routine */ void *thread(void *vargp) {int myid = *((int *)vargp);printf("Hello from thread %d\n", myid);return NULL; }圖 12-42 一個具有競爭的程序
問題是由每個對等線程和主線程之間的競爭引起的。你能發現這個競爭嗎?下面是發生的情況。當主線程在第 13 行創建了一個對等線程,它傳遞了一個指向本地棧變量 i 的指針。在此時,競爭出現在下一次在第 12 行對 i 加 1 和第 22 行參數的間接引用和賦值之間。如果對等線程在主線程執行第 12 行對 i 加 1 之前就執行了第 22 行,那么 myid 變量就得到正確的 ID。否則,它包含的就會是其他線程的 ID。令人驚慌的是,我們是否得到正確的答案依賴于內核是如何調度線程的執行的。在我們的系統中它失敗了,但是在其他系統中,它可能就能正確工作,讓程序員 “幸福地”察覺不到程序的嚴重錯誤。
(wok,這么隱蔽,錯誤出現在傳遞&i,如果執行了i++,那就錯了)
為了消除競爭,我們可以動態地為每個整數 ID 分配一個獨立的塊,并且傳遞給線程例程一個指向這個塊的指針,如圖 12-43 所示(第 12 ~ 14 行)。請注意線程例程必須釋放這些塊以避免內存泄漏
#include "csapp.h" #define N 4void *thread(void *vargp);int main() {pthread_t tid[N];int i, *ptr;for (i = 0; i < N; i++) {ptr = Malloc(sizeof(int));*ptr = i;Pthread_create(&tid[i], NULL, thread, ptr);}for (i = 0; i < N; i++)Pthread_join(tid[i], NULL);exit(0); }/* Thread routine */ void *thread(void *vargp) {int myid = *((int *)vargp);Free(vargp);printf("Hello from thread %d\n", myid);return NULL; }圖 12-43 圖 12-42 中程序的一個沒有競爭的正確版本
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-9ZQ4NUgM-1622542085854)(assert/image-20210601112241081.png)]
12.13如果在第 14 行調用了 pthread_create 之后,我們立即釋放塊,那么將引入一個新的競爭,這次競爭發生在主線程對 free 的調用和線程例程中第 24 行的賦值語句之間。這個競爭的含義是,只有在將vargp傳給myid后才free才是對的
12.14
A. 另一種方法是直接傳遞整數 i,而不是傳遞一個指向 i 的指針:
for (i = 0; i < N; i++)Pthread_create(&tid[i], NULL, thread, (void *)i);在線程例程中,我們將參數強制轉換成一個 int 類型,并將它賦值給 myid:
int myid = (int) vargp;B. 優點是它通過消除對 malloc 和 free 的調用降低了開銷。一個明顯的缺點是,它假設指針至少和 int 一樣大。即便這種假設對于所有的現代系統來說都為真,但是它對于那些過去遺留下來的或今后的系統來說可能就不為真了。
12.7.5 死鎖
信號量引入了一種潛在的令人厭惡的運行時錯誤,叫做死鎖(deadlock),它指的是一組線程被阻塞了,等待一個永遠也不會為真的條件。進度圖對于理解死鎖是一個無價的工具。
在這個程序中,thread 0 會先P[mutex 0] 再mutex[1],thread[1]會先P[mutex 1]再P[mytex 0]
如上圖,我們用進度圖來幫助我們找到死鎖的區域。
- 具體來說,我們先找到mutex s0的禁止區,由于thread 0是先P(s0),thread 1是先P(s1),所以mutex s0的禁止區,就是在thread0獲得s0之后,thread1也獲得了這個s0,即左上角這個正方形.
- 我們再找到mutex s1的禁止區,thread 0在P(s1)之后,且thread1也執行了P(s1),即右下角這個正方形.
- 禁止區是肯定執行不到的狀態。由于我們的狀態機是往右上角走,所以夾角的這個正方形就是死鎖區。具體來看含義,其實就是thread 0獲得s0,且thread 1或者s1的情況.
OK,我們再來看看避免了死鎖的程序,就是按照一個順序上鎖
- mutex 0的禁止區:thread0 執行完P(s0)后,釋放s0前,thread1也執行完P(s0)后,也就是左上角
- mutex 1的禁止區:thraed1 執行完P(s1)后,釋放s1前,thread1也執行完P(s1)后,也就是右下角。上面的圖,右下角應該畫錯了,應該是書中這樣
B. 因為任何可行的軌跡最終都陷入死鎖狀態中,所以這個程序總是會死鎖。
C. 為了消除潛在的死鎖,將二元信號量 t 初始化為 1 而不是 0。
D. 改成后的程序的進度圖如圖 12-49 所示。[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-X0tefjbt-1622542263914)(https://github.com/A17764530215/csapp-labs/tree/master/notes/ch-12%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/assert#pic_center)]
在這里插入圖片描述
總結
以上是生活随笔為你收集整理的CSAPP 并发编程 ——深入理解计算机系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PP增强
- 下一篇: 第1章计算机系统概述__计算机系统性能评