日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

Linux多线程实践(10) --使用 C++11 编写 Linux 多线程程序

發(fā)布時(shí)間:2025/3/17 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Linux多线程实践(10) --使用 C++11 编写 Linux 多线程程序 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在這個(gè)多核時(shí)代,如何充分利用每個(gè)?CPU?內(nèi)核是一個(gè)繞不開的話題,從需要為成千上萬(wàn)的用戶同時(shí)提供服務(wù)的服務(wù)端應(yīng)用程序,到需要同時(shí)打開十幾個(gè)頁(yè)面,每個(gè)頁(yè)面都有幾十上百個(gè)鏈接的?web?瀏覽器應(yīng)用程序,從保持著幾?t?甚或幾?p?的數(shù)據(jù)的數(shù)據(jù)庫(kù)系統(tǒng),到手機(jī)上的一個(gè)有良好用戶響應(yīng)能力的?app,為了充分利用每個(gè)?CPU?內(nèi)核,都會(huì)想到是否可以使用多線程技術(shù)。這里所說(shuō)的“充分利用”包含了兩個(gè)層面的意思,一個(gè)是使用到所有的內(nèi)核,再一個(gè)是內(nèi)核不空閑,不讓某個(gè)內(nèi)核長(zhǎng)時(shí)間處于空閑狀態(tài)。在?C++98?的時(shí)代,C++標(biāo)準(zhǔn)并沒有包含多線程的支持,人們只能直接調(diào)用操作系統(tǒng)提供的?SDK?API?來(lái)編寫多線程程序,不同的操作系統(tǒng)提供的?SDK?API?以及線程控制能力不盡相同,到了?C++11,終于在標(biāo)準(zhǔn)之中加入了正式的多線程的支持,從而我們可以使用標(biāo)準(zhǔn)形式的類來(lái)創(chuàng)建與執(zhí)行線程,也使得我們可以使用標(biāo)準(zhǔn)形式的鎖、原子操作、線程本地存儲(chǔ)?(TLS)?等來(lái)進(jìn)行復(fù)雜的各種模式的多線程編程,而且,C++11?還提供了一些高級(jí)概念,比如?promise/future,packaged_task,async?等以簡(jiǎn)化某些模式的多線程編程。

多線程可以讓我們的應(yīng)用程序擁有更加出色的性能,同時(shí),如果沒有用好,多線程又是比較容易出錯(cuò)的且難以查找錯(cuò)誤所在,甚至可以讓人們覺得自己陷進(jìn)了泥潭,希望本文能夠幫助您更好地使用?C++11?來(lái)進(jìn)行?Linux?下的多線程編程。

?

認(rèn)識(shí)多線程

首先我們應(yīng)該正確地認(rèn)識(shí)線程。維基百科對(duì)線程的定義是:線程是一個(gè)編排好的指令序列,這個(gè)指令序列(線程)可以和其它的指令序列(線程)并行執(zhí)行,操作系統(tǒng)調(diào)度器將線程作為最小的?CPU?調(diào)度單元。在進(jìn)行架構(gòu)設(shè)計(jì)時(shí),我們應(yīng)該多從操作系統(tǒng)線程調(diào)度的角度去考慮應(yīng)用程序的線程安排,而不僅僅是代碼

當(dāng)只有一個(gè)?CPU?內(nèi)核可供調(diào)度時(shí),多個(gè)線程的運(yùn)行示意如下:


圖?1、單個(gè)?CPU?內(nèi)核上的多個(gè)線程運(yùn)行示意圖

我們可以看到,這時(shí)的多線程本質(zhì)上是單個(gè)?CPU?的時(shí)間分片,一個(gè)時(shí)間片運(yùn)行一個(gè)線程的代碼,它可以支持并發(fā)處理,但是不能說(shuō)是真正的并行計(jì)算。

當(dāng)有多個(gè)?CPU?或者多個(gè)內(nèi)核可供調(diào)度時(shí),可以做到真正的并行計(jì)算,多個(gè)線程的運(yùn)行示意如下:


圖?2、雙核?CPU?上的多個(gè)線程運(yùn)行示意圖

從上述兩圖,我們可以直接得到使用多線程的一些常見場(chǎng)景:

????進(jìn)程中的某個(gè)線程執(zhí)行了一個(gè)阻塞操作時(shí),其它線程可以依然運(yùn)行,比如,等待用戶輸入或者等待網(wǎng)絡(luò)數(shù)據(jù)包的時(shí)候處理啟動(dòng)后臺(tái)線程處理業(yè)務(wù),或者在一個(gè)游戲引擎中,一個(gè)線程等待用戶的交互動(dòng)作輸入,另外一個(gè)線程在后臺(tái)合成下一幀要畫的圖像或者播放背景音樂等。

????將某個(gè)任務(wù)分解為小的可以并行進(jìn)行的子任務(wù),讓這些子任務(wù)在不同的?CPU?或者內(nèi)核上同時(shí)進(jìn)行計(jì)算,然后匯總結(jié)果,比如歸并排序,或者分段查找,這樣子來(lái)提高任務(wù)的執(zhí)行速度。

需要注意一點(diǎn),因?yàn)閱蝹€(gè)?CPU?內(nèi)核下多個(gè)線程并不是真正的并行,有些問題,比如?CPU?緩存不一致問題,不一定能表現(xiàn)出來(lái),一旦這些代碼被放到了多核或者多?CPU?的環(huán)境運(yùn)行,就很可能會(huì)出現(xiàn)“在開發(fā)測(cè)試環(huán)境一切沒有問題,到了實(shí)施現(xiàn)場(chǎng)就莫名其妙”的情況,所以,在進(jìn)行多線程開發(fā)時(shí),開發(fā)與測(cè)試環(huán)境應(yīng)該是多核或者多?CPU?的,以避免出現(xiàn)這類情況。

?

C++11?的線程類?std::thread

C++11?的標(biāo)準(zhǔn)類?std::thread?對(duì)線程進(jìn)行了封裝,它的聲明放在頭文件?thread?中,其中聲明了線程類?thread,?線程標(biāo)識(shí)符?id,以及名字空間?this_thread,按照?C++11?規(guī)范,這個(gè)頭文件至少應(yīng)該兼容如下內(nèi)容:

清單?1.例子?thread?頭文件主要內(nèi)容

namespace std { struct thread {// native_handle_type 是連接 thread 類和操作系統(tǒng) SDK API 之間的橋梁。// typedef pthread_t __gthread_t;typedef __gthread_t native_handle_type;native_handle_type native_handle();//struct id{id() noexcept;// 可以由==, < 兩個(gè)運(yùn)算衍生出其它大小關(guān)系運(yùn)算。bool operator==(thread::id x, thread::id y) noexcept;bool operator<(thread::id x, thread::id y) noexcept;template<class charT, class traits>basic_ostream<charT, traits>&operator<<(basic_ostream<charT, traits>&out, thread::id id);// 哈希函數(shù)template <class T> struct hash;template <> struct hash<thread::id>;};id get_id() const noexcept;// 構(gòu)造與析構(gòu)thread() noexcept;template<class F, class… Args> explicit thread(F&f, Args&&… args);~thread();thread(const thread&) = delete;thread(thread&&) noexcept;thread& operator=( const thread&) = delete;thread& operator=(thread&&) noexcept;//void swap(thread&) noexcept;bool joinable() const noexcept;void join();void detach();// 獲取物理線程數(shù)目static unsigned hardware_concurrency() noexcept; }namespace this_thead {thread::id get_id();void yield();template<class Clock, class Duration>void sleep_until(const chrono::time_point<Clock, Duration>& abs_time);template<class Rep, class Period>void sleep_for(const chromo::duration<Rep, Period>& rel_time); } }

和有些語(yǔ)言中定義的線程不同,C++11?所定義的線程是和操作系的線程是一一對(duì)應(yīng)的,也就是說(shuō)我們生成的線程都是直接接受操作系統(tǒng)的調(diào)度的,通過操作系統(tǒng)的相關(guān)命令(比如?ps?-M?命令)是可以看到的,一個(gè)進(jìn)程所能創(chuàng)建的線程數(shù)目以及一個(gè)操作系統(tǒng)所能創(chuàng)建的總的線程數(shù)目等都由運(yùn)行時(shí)操作系統(tǒng)限定

native_handle_type?是連接?thread?類和操作系統(tǒng)?SDK?API?之間的橋梁,在?g++(libstdc++)?for?Linux?里面,native_handle_type?其實(shí)就是?pthread?里面的?pthread_t?類型,當(dāng)?thread?類的功能不能滿足我們的要求的時(shí)候(比如改變某個(gè)線程的優(yōu)先級(jí)),可以通過?thread?類實(shí)例的?native_handle()?返回值作為參數(shù)來(lái)調(diào)用相關(guān)的?pthread?函數(shù)達(dá)到目的。thread::id?定義了在運(yùn)行時(shí)操作系統(tǒng)內(nèi)唯一能夠標(biāo)識(shí)該線程的標(biāo)識(shí)符,同時(shí)其值還能指示所標(biāo)識(shí)的線程的狀態(tài),其默認(rèn)值?(thread::id())?表示不存在可控的正在執(zhí)行的線程(即空線程,比如,調(diào)用?thread()?生成的沒有指定入口函數(shù)的線程類實(shí)例),當(dāng)一個(gè)線程類實(shí)例的?get_id()?等于默認(rèn)值的時(shí)候,即?get_id()?==?thread::id(),表示這個(gè)線程類實(shí)例處于下述狀態(tài)之一:

????尚未指定運(yùn)行的任務(wù)

????線程運(yùn)行完畢

????線程已經(jīng)被轉(zhuǎn)移?(move)?到另外一個(gè)線程類實(shí)例

????線程已經(jīng)被分離?(detached)

空線程?id?字符串表示形式依具體實(shí)現(xiàn)而定,有些編譯器為?0X0,有些為一句語(yǔ)義解釋。

有時(shí)候我們需要在線程執(zhí)行代碼里面對(duì)當(dāng)前調(diào)用者線程進(jìn)行操作,針對(duì)這種情況,C++11?里面專門定義了一個(gè)名字空間?this_thread,其中包括?get_id()?函數(shù)可用來(lái)獲取當(dāng)前調(diào)用者線程的?id,yield()?函數(shù)可以用來(lái)將調(diào)用者線程跳出運(yùn)行狀態(tài),重新交給操作系統(tǒng)進(jìn)行調(diào)度,sleep_until?和?sleep_for?函數(shù)則可以讓調(diào)用者線程休眠若干時(shí)間。get_id()?函數(shù)實(shí)際上是通過調(diào)用?pthread_self()?函數(shù)獲得調(diào)用者線程的標(biāo)識(shí)符,而?yield()?函數(shù)則是通過調(diào)用操作系統(tǒng)?API?sched_yield()?進(jìn)行調(diào)度切換。

?

如何創(chuàng)建和結(jié)束一個(gè)線程

和?pthread_create?不同,使用?thread?類創(chuàng)建線程可以使用一個(gè)函數(shù)作為入口,也可以是其它的?Callable?對(duì)象,而且,可以給入口傳入任意個(gè)數(shù)任意類型的參數(shù)

清單?2.例子?thread_run_func_var_args.cc

int funcReturnInt(const char* fmt, ...) {va_list ap;va_start(ap, fmt);vprintf( fmt, ap );va_end(ap);return 0xabcd; }void threadRunFunction(void) {thread* t = new thread(funcReturnInt, "%d%s\n", 100, "\%");t->join();delete t; }

我們也可以傳入一個(gè)?Lambda?表達(dá)式作為入口,比如:

清單?3.例子?thread_run_lambda.cc

void threadRunLambda(void) {int a = 100, b = 200;thread* t = new thread( [](int ia, int ib){cout << (ia + ib) << endl;},a, b );t->join();delete t; }

一個(gè)類的成員函數(shù)也可以作為線程入口:

清單?4.例子?thread_run_member_func.cc

struct God {void create(const char* anything){cout << "create " << anything << endl;} };void threadRunMemberFunction(void) {God god;thread* t = new thread( &God::create, god, "the world" );t->join();delete t; }

雖然?thread?類的初始化可以提供這么豐富和方便的形式,其實(shí)現(xiàn)的底層依然是創(chuàng)建一個(gè)?pthread?線程并運(yùn)行之,有些實(shí)現(xiàn)甚至是直接調(diào)用?pthread_create?來(lái)創(chuàng)建

創(chuàng)建一個(gè)線程之后,我們還需要考慮一個(gè)問題:該如何處理這個(gè)線程的結(jié)束?一種方式是等待這個(gè)線程結(jié)束,在一個(gè)合適的地方調(diào)用?thread?實(shí)例的?join()?方法,調(diào)用者線程將會(huì)一直等待著目標(biāo)線程的結(jié)束,當(dāng)目標(biāo)線程結(jié)束之后調(diào)用者線程繼續(xù)運(yùn)行;另一個(gè)方式是將這個(gè)線程分離,由其自己結(jié)束,通過調(diào)用?thread?實(shí)例的?detach()?方法將目標(biāo)線程置于分離模式。一個(gè)線程的?join()?方法與?detach()?方法只能調(diào)用一次,不能在調(diào)用了?join()?之后又調(diào)用?detach(),也不能在調(diào)用?detach()?之后又調(diào)用?join(),在調(diào)用了?join()?或者?detach()?之后,該線程的?id?即被置為默認(rèn)值(空線程),表示不能繼續(xù)再對(duì)該線程作修改變化。如果沒有調(diào)用?join()?或者?detach(),那么,在析構(gòu)的時(shí)候,該線程實(shí)例將會(huì)調(diào)用?std::terminate(),這會(huì)導(dǎo)致整個(gè)進(jìn)程退出,所以,如果沒有特別需要,一般都建議在生成子線程后調(diào)用其?join()?方法等待其退出,這樣子最起碼知道這些子線程在什么時(shí)候已經(jīng)確保結(jié)束。

在?C++11?里面沒有提供?kill?掉某個(gè)線程的能力,只能被動(dòng)地等待某個(gè)線程的自然結(jié)束,如果我們要主動(dòng)停止某個(gè)線程的話,可以通過調(diào)用?Linux?操作系統(tǒng)提供的?pthread_kill?函數(shù)給目標(biāo)線程發(fā)送信號(hào)來(lái)實(shí)現(xiàn),示例如下:

清單?5.例子?thread_kill.cc

int counter = 0; static void on_signal_term(int sig) {cout << "on SIGTERM:" << this_thread::get_id() << endl;cout << "Usage: pthread_self: " << pthread_self() << endl;cout << "counter = " << counter << endl;pthread_exit(NULL); } void threadPosixKill(void) {signal(SIGTERM, on_signal_term);thread* t = new thread( [](){while(true){++ counter;}});pthread_t tid = t->native_handle();cout << "tid=" << tid << endl;// 確保子線程已經(jīng)在運(yùn)行。this_thread::sleep_for( chrono::seconds(1) );pthread_kill(tid, SIGTERM);t->join();delete t;cout << "thread destroyed." << endl; }

上述例子還可以用來(lái)給某個(gè)線程發(fā)送其它信號(hào),具體的?pthread_exit?函數(shù)調(diào)用的約定依賴于具體的操作系統(tǒng)的實(shí)現(xiàn),所以,這個(gè)方法是依賴于具體的操作系統(tǒng)的,而且,因?yàn)樵?C++11?里面沒有這方面的具體約定,用這種方式也是依賴于?C++編譯器的具體實(shí)現(xiàn)的。

?

線程類?std::thread?的其它方法和特點(diǎn)

thread?類是一個(gè)特殊的類,它不能被拷貝,只能被轉(zhuǎn)移或者互換,這是符合線程的語(yǔ)義的,不要忘記這里所說(shuō)的線程是直接被操作系統(tǒng)調(diào)度的。線程的轉(zhuǎn)移使用?move?函數(shù),示例如下:

清單?6.例子?thread_move.cc

void threadMove(void) {int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2 = move(t); // 改為 t2 = t 將不能編譯。t2.join();cout << "a=" << a << endl; }

在這個(gè)例子中,如果將?t2.join()?改為?t.join()?將會(huì)導(dǎo)致整個(gè)進(jìn)程被結(jié)束,因?yàn)橥浟苏{(diào)用?t2?也就是被轉(zhuǎn)移的線程的?join()?方法,從而導(dǎo)致整個(gè)進(jìn)程被結(jié)束,而?t?則因?yàn)橐呀?jīng)被轉(zhuǎn)移,其?id?已被置空

線程實(shí)例互換使用?swap?函數(shù),示例如下:

清單?7.例子?thread_swap.cc

void threadSwap(void) {int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2;cout << "before swap: t=" << t.get_id()<< ", t2=" << t2.get_id() << endl;swap(t, t2);cout << "after swap : t=" << t.get_id()<< ", t2=" << t2.get_id() << endl;t2.join();cout << "a=" << a << endl; }

互換和轉(zhuǎn)移很類似,但是互換僅僅進(jìn)行實(shí)例(以?id?作標(biāo)識(shí))的互換,而轉(zhuǎn)移則在進(jìn)行實(shí)例標(biāo)識(shí)的互換之前,還進(jìn)行了轉(zhuǎn)移目的實(shí)例(如下例的t2)的清理,如果?t2?是可聚合的(joinable()?方法返回?true),則調(diào)用?std::terminate(),這會(huì)導(dǎo)致整個(gè)進(jìn)程退出,比如下面這個(gè)例子:

清單?8.例子?thread_move_term.cc

void threadMoveTerm(void) {int a = 1;thread t( [](int* pa){for(;;){*pa = (*pa * 33) % 0x7fffffff;if ( ( (*pa) >> 30) & 1) break;}}, &a);thread t2( [](){int i = 0;for(;;)i++;} );t2 = move(t); // 將會(huì)導(dǎo)致 std::terminate()cout << "should not reach here" << endl;t2.join(); }

所以,在進(jìn)行線程實(shí)例轉(zhuǎn)移的時(shí)候,要注意判斷目的實(shí)例的?id?是否為空值(即?id())。

如果我們繼承了?thread?類,則還需要禁止拷貝構(gòu)造函數(shù)、拷貝賦值函數(shù)以及賦值操作符重載函數(shù)等,另外,thread?類的析構(gòu)函數(shù)并不是虛析構(gòu)函數(shù)。示例如下:

清單?9.例子?thread_inherit.cc

class MyThread : public thread { public:MyThread() noexcept : thread() {};template<typename Callable, typename... Args>explicitMyThread(Callable&& func, Args&&... args) :thread( std::forward<Callable>(func),std::forward<Args>(args)...){}~MyThread(){}// disable copy constructorsMyThread( MyThread& ) = delete;MyThread( const MyThread& ) = delete;MyThread& operator=(const MyThread&) = delete; };

因?yàn)?thread?類的析構(gòu)函數(shù)不是虛析構(gòu)函數(shù),在上例中,需要避免出現(xiàn)下面這種情況:

MyThread* tc = new MyThread(…); … thread* tp = tc; … delete tp;

這種情況會(huì)導(dǎo)致?MyThread?的析構(gòu)函數(shù)沒有被調(diào)用

?

線程的調(diào)度

我們可以調(diào)用?this_thread::yield()?將當(dāng)前調(diào)用者線程切換到重新等待調(diào)度(放棄當(dāng)前所占用的CPU),但是不能對(duì)非調(diào)用者線程進(jìn)行調(diào)度切換,也不能讓非調(diào)用者線程休眠(這是操作系統(tǒng)調(diào)度器干的活)。

清單?10.例子?thread_yield.cc

void threadYield(void) {unsigned int procs = thread::hardware_concurrency(), // 獲取物理線程數(shù)目i = 0;thread* ta = new thread( [](){struct timeval t1, t2;gettimeofday(&t1, NULL);for(int i = 0, m = 13; i < COUNT; i++, m *= 17){this_thread::yield();}gettimeofday(&t2, NULL);print_time(t1, t2, " with yield");} );thread** tb = new thread*[ procs ];for( i = 0; i < procs; i++){tb[i] = new thread( [](){struct timeval t1, t2;gettimeofday(&t1, NULL);for(int i = 0, m = 13; i < COUNT; i++, m *= 17){do_nothing();}gettimeofday(&t2, NULL);print_time(t1, t2, "without yield");});}ta->join();delete ta;for( i = 0; i < procs; i++){tb[i]->join();delete tb[i];};delete tb; }

ta?線程因?yàn)樾枰?jīng)常切換去重新等待調(diào)度,它運(yùn)行的時(shí)間要比?tb?要多,比如在作者的機(jī)器上運(yùn)行得到如下結(jié)果:

without?yield?elapse?0.050199s

without?yield?elapse?0.051042s

without?yield?elapse?0.05139s

without?yield?elapse?0.048782s

with?yield?elapse?1.63366s

real????0m1.643s

user????0m1.175s

sys?0m0.611s

ta?線程即使扣除系統(tǒng)調(diào)用運(yùn)行時(shí)間?0.611s?之后,它的運(yùn)行時(shí)間也遠(yuǎn)大于沒有進(jìn)行切換的線程。

C++11?沒有提供調(diào)整線程的調(diào)度策略或者優(yōu)先級(jí)的能力,如果需要,只能通過調(diào)用相關(guān)的?pthread?函數(shù)來(lái)進(jìn)行,需要的時(shí)候,可以通過調(diào)用?thread?類實(shí)例的?native_handle()?方法或者操作系統(tǒng)?API?pthread_self()?來(lái)獲得?pthread?線程?id,作為?pthread?函數(shù)的參數(shù)。

?

線程間的數(shù)據(jù)交互和數(shù)據(jù)爭(zhēng)用?(Data?Racing)

同一個(gè)進(jìn)程內(nèi)的多個(gè)線程之間總是免不了要有數(shù)據(jù)互相來(lái)往的,隊(duì)列和共享數(shù)據(jù)是實(shí)現(xiàn)多個(gè)線程之間的數(shù)據(jù)交互的常用方式,封裝好的隊(duì)列使用起來(lái)相對(duì)來(lái)說(shuō)不容易出錯(cuò)一些,而共享數(shù)據(jù)則是最基本的也是較容易出錯(cuò)的,因?yàn)樗鼤?huì)產(chǎn)生數(shù)據(jù)爭(zhēng)用的情況,即有超過一個(gè)線程試圖同時(shí)搶占某個(gè)資源,比如對(duì)某塊內(nèi)存進(jìn)行讀寫等,如下例所示:

清單?11.例子?thread_data_race.cc

static void inc(int *p) {for(int i = 0; i < COUNT; i++)(*p)++; } void threadDataRacing(void) {int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

這是簡(jiǎn)化了的極端情況,我們可以一眼看出來(lái)這是兩個(gè)線程在同時(shí)對(duì)&a?這個(gè)內(nèi)存地址進(jìn)行寫操作,但是在實(shí)際工作中,在代碼的海洋中發(fā)現(xiàn)它并不一定容易。從表面看,兩個(gè)線程執(zhí)行完之后,最后的?a?值應(yīng)該是?COUNT?*?2,但是實(shí)際上并非如此,因?yàn)楹?jiǎn)單如?(*p)++這樣的操作并不是一個(gè)原子動(dòng)作,要解決這個(gè)問題,對(duì)于簡(jiǎn)單的基本類型數(shù)據(jù)如字符、整型、指針等,C++提供了原子模版類?atomic(#include?<atomic>),而對(duì)于復(fù)雜的對(duì)象,則提供了最常用的鎖機(jī)制,比如互斥類?mutex,門鎖?lock_guard,唯一鎖?unique_lock,條件變量?condition_variable?等。

現(xiàn)在我們使用原子模版類?atomic?改造上述例子得到預(yù)期結(jié)果:

清單?12.例子?thread_atomic.cc

static void inc(atomic<int> *p ) {for(int i = 0; i < COUNT; i++)(*p)++; } void threadDataRacing(void) {atomic<int> a(0) ;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

我們也可以使用?lock_guard,lock_guard?是一個(gè)范圍鎖,本質(zhì)是?RAII(Resource?Acquire?Is?Initialization),在構(gòu)建的時(shí)候自動(dòng)加鎖,在析構(gòu)的時(shí)候自動(dòng)解鎖,這保證了每一次加鎖都會(huì)得到解鎖。即使是調(diào)用函數(shù)發(fā)生了異常,在清理?xiàng)臅r(shí)候也會(huì)調(diào)用它的析構(gòu)函數(shù)得到解鎖,從而保證每次加鎖都會(huì)解鎖,但是我們不能手工調(diào)用加鎖方法或者解鎖方法來(lái)進(jìn)行更加精細(xì)的資源占用管理,使用?lock_guard?示例如下:

清單?13.例子?thread_lock_guard.cc

static mutex g_mutex; static void inc(int *p ) {for(int i = 0; i < COUNT; i++){lock_guard<mutex> _(g_mutex);(*p)++;} } void threadLockGuard(void) {int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

如果要支持手工加鎖,可以考慮使用?unique_lock?或者直接使用?mutex。unique_lock?也支持?RAII,它也可以一次性將多個(gè)鎖加鎖;如果使用?mutex(#include?<mutex>)?則直接調(diào)用?mutex?類的?lock,?unlock,?trylock?等方法進(jìn)行更加精細(xì)的鎖管理:

清單?14.例子?thread_mutex.cc

static mutex g_mutex; static void inc(int *p ) {thread_local int i; // TLS 變量for(; i < COUNT; i++){g_mutex.lock();(*p)++;g_mutex.unlock();} } void threadMutex(void) {int a = 0;thread ta( inc, &a);thread tb( inc, &a);ta.join();tb.join();cout << "a=" << a << endl; }

在上例中,我們還使用了線程本地存儲(chǔ)?(TLS,?其實(shí)現(xiàn)原理類似于線程特定數(shù)據(jù),?關(guān)于線程特定數(shù)據(jù)的詳細(xì)說(shuō)明請(qǐng)參考我的前面的一篇博客)?變量,我們只需要在變量前面聲明它是?thread_local?即可。TLS?變量在線程棧內(nèi)分配,線程棧只有在線程創(chuàng)建之后才生效,在線程退出的時(shí)候銷毀,需要注意不同系統(tǒng)的線程棧的大小是不同的,如果?TLS?變量占用空間比較大,需要注意這個(gè)問題。TLS?變量一般不能跨線程,其初始化在調(diào)用線程第一次使用這個(gè)變量時(shí)進(jìn)行,默認(rèn)初始化為?0。

對(duì)于線程間的事件通知,C++11?提供了條件變量類?condition_variable(#include?<condition_variable>),可視為?pthread_cond_t?的封裝,使用條件變量可以讓一個(gè)線程等待其它線程的通知?(wait,wait_for,wait_until),也可以給其它線程發(fā)送通知?(notify_one,notify_all),條件變量必須和鎖配合使用(與pthread_cond_t類似),在等待時(shí)因?yàn)橛薪怄i和重新加鎖,所以,在等待時(shí)必須使用可以手工解鎖和加鎖的鎖,比如?unique_lock,而不能使用?lock_guard,示例如下:

清單?15.例子?thread_cond_var.cc

# define THREAD_COUNT 10 mutex m; condition_variable cv;void threadCondVar(void) {thread** t = new thread*[THREAD_COUNT];int i;for(i = 0; i < THREAD_COUNT; i++){t[i] = new thread( [](int index){unique_lock<mutex> lck(m);cv.wait_for(lck, chrono::hours(1000));cout << index << endl;}, i );this_thread::sleep_for( chrono::milliseconds(50));}for(i = 0; i < THREAD_COUNT; i++){lock_guard<mutex> _(m);cv.notify_one();}for(i = 0; i < THREAD_COUNT; i++){t[i]->join();delete t[i];}delete t; }

從上例的運(yùn)行結(jié)果也可以看到,條件變量是不保證次序的,即首先調(diào)用?wait?的不一定首先被喚醒。

?

幾個(gè)高級(jí)概念

C++11?提供了若干多線程編程的高級(jí)概念:promise/future(#include?<future>),?packaged_task,?async,來(lái)簡(jiǎn)化多線程編程,尤其是線程之間的數(shù)據(jù)交互比較簡(jiǎn)單的情況下,讓我們可以將注意力更多地放在業(yè)務(wù)處理上。

promise/future?可以用來(lái)在線程之間進(jìn)行簡(jiǎn)單的數(shù)據(jù)交互,而不需要考慮鎖的問題,線程?A?將數(shù)據(jù)保存在一個(gè)?promise?變量中,另外一個(gè)線程?B?可以通過這個(gè)?promise?變量的?get_future()?獲取其值,當(dāng)線程?A?尚未在?promise?變量中賦值時(shí),線程?B?也可以等待這個(gè)?promise?變量的賦值

清單?16.例子?thread_promise_future.cc

promise<string> val; static void threadPromiseFuture() {thread ta([](){future<string> fu = val.get_future();cout << "waiting promise->future" << endl;cout << fu.get() << endl;});thread tb([](){this_thread::sleep_for( chrono::milliseconds(100) );val.set_value("promise is set");});ta.join();tb.join(); }

一個(gè)?future?變量只能調(diào)用一次?get(),如果需要多次調(diào)用?get(),可以使用?shared_future,通過?promise/future?還可以在線程之間傳遞異常。

如果將一個(gè)?callable?對(duì)象和一個(gè)?promise?組合,那就是?packaged_task,它可以進(jìn)一步簡(jiǎn)化操作:

清單?17.例子?thread_packaged_task.cc

static mutex g_mutex; static void threadPackagedTask() {auto run = [=](int index){{lock_guard<mutex> _(g_mutex);cout << "tasklet " << index << endl;}this_thread::sleep_for( chrono::seconds(10) );return index * 1000;};packaged_task<int(int)> pt1(run);packaged_task<int(int)> pt2(run);thread t1([&](){pt1(2);} );thread t2([&](){pt2(3);} );int f1 = pt1.get_future().get();int f2 = pt2.get_future().get();cout << "task result=" << f1 << endl;cout << "task result=" << f2 << endl;t1.join();t2.join(); }

我們還可以試圖將一個(gè)?packaged_task?和一個(gè)線程組合,那就是?async()?函數(shù)。使用?async()?函數(shù)啟動(dòng)執(zhí)行代碼,返回一個(gè)?future?對(duì)象來(lái)保存代碼返回值,不需要我們顯式地創(chuàng)建和銷毀線程等,而是由?C++11?庫(kù)的實(shí)現(xiàn)決定何時(shí)創(chuàng)建和銷毀線程,以及創(chuàng)建幾個(gè)線程等,示例如下:

清單?18.例子?thread_async.cc

static long do_sum(vector<long> *arr, size_t start, size_t count) {static mutex _m;long sum = 0;for(size_t i = 0; i < count; i++){sum += (*arr)[start + i];}{lock_guard<mutex> _(_m);cout << "thread " << this_thread::get_id()<< ", count=" << count<< ", sum=" << sum << endl;}return sum; }static void threadAsync() { # define COUNT 1000000vector<long> data(COUNT);for(size_t i = 0; i < COUNT; i++){data[i] = random() & 0xff;} //vector< future<long> > result;size_t ptc = thread::hardware_concurrency() * 2;for(size_t batch = 0; batch < ptc; batch++){size_t batch_each = COUNT / ptc;if (batch == ptc - 1){batch_each = COUNT - (COUNT / ptc * batch);}result.push_back(async(do_sum, &data, batch * batch_each, batch_each));}long total = 0;for(size_t batch = 0; batch < ptc; batch++){total += result[batch].get();}cout << "total=" << total << endl; }

如果是在多核或者多?CPU?的環(huán)境上面運(yùn)行上述例子,仔細(xì)觀察輸出結(jié)果,可能會(huì)發(fā)現(xiàn)有些線程?ID?是重復(fù)的,這說(shuō)明重復(fù)使用了線程,也就是說(shuō),通過使用?async()?還可達(dá)到一些線程池的功能。

?

幾個(gè)需要注意的地方

thread?同時(shí)也是棉線、毛線、絲線等意思,我想大家都能體會(huì)面對(duì)一團(tuán)亂麻不知從何處查找頭緒的感受,不要忘了,線程不是靜態(tài)的,它是不斷變化的,請(qǐng)想像一下面對(duì)一團(tuán)會(huì)動(dòng)態(tài)變化的亂麻的情景。所以,使用多線程技術(shù)的首要準(zhǔn)則是我們自己要十分清楚我們的線程在哪里?線頭(線程入口和出口)在哪里?先安排好線程的運(yùn)行,注意不同線程的交叉點(diǎn)(訪問或者修改同一個(gè)資源,包括內(nèi)存、I/O?設(shè)備等),盡量減少線程的交叉點(diǎn),要知道幾條線堆在一起最怕的是互相打結(jié)。

當(dāng)我們的確需要不同線程訪問一個(gè)共同的資源時(shí),一般都需要進(jìn)行加鎖保護(hù),否則很可能會(huì)出現(xiàn)數(shù)據(jù)不一致的情況,從而出現(xiàn)各種時(shí)現(xiàn)時(shí)不現(xiàn)的莫名其妙的問題,加鎖保護(hù)時(shí)有幾個(gè)問題需要特別注意:一是一個(gè)線程內(nèi)連續(xù)多次調(diào)用非遞歸鎖?(non-recursive?lock)?的加鎖動(dòng)作,這很可能會(huì)導(dǎo)致異常;二是加鎖的粒度;三是出現(xiàn)死鎖?(deadlock),多個(gè)線程互相等待對(duì)方釋放鎖導(dǎo)致這些線程全部處于罷工狀態(tài)。

第一個(gè)問題只要根據(jù)場(chǎng)景調(diào)用合適的鎖即可,當(dāng)我們可能會(huì)在某個(gè)線程內(nèi)重復(fù)調(diào)用某個(gè)鎖的加鎖動(dòng)作時(shí),我們應(yīng)該使用遞歸鎖?(recursive?lock),在?C++11?中,可以根據(jù)需要來(lái)使用?recursive_mutex,或者?recursive_timed_mutex。

第二個(gè)問題,即鎖的粒度,原則上應(yīng)該是粒度越小越好,那意味著阻塞的時(shí)間越少,效率更高,比如一個(gè)數(shù)據(jù)庫(kù),給一個(gè)數(shù)據(jù)行?(data?row)?加鎖當(dāng)然比給一個(gè)表?(table)?加鎖要高效,但是同時(shí)復(fù)雜度也會(huì)越大,越容易出錯(cuò),比如死鎖等。

對(duì)于第三個(gè)問題我們需要先看下出現(xiàn)死鎖的條件:

????資源互斥,某個(gè)資源在某一時(shí)刻只能被一個(gè)線程持有?(hold);

????請(qǐng)求和保持,持有一個(gè)以上的互斥資源的線程在等待被其它進(jìn)程持有的互斥資源;

????不可搶占,只有在某互斥資源的持有線程釋放了該資源之后,其它線程才能去持有該資源;

????環(huán)形等待,有兩個(gè)或者兩個(gè)以上的線程各自持有某些互斥資源,并且各自在等待其它線程所持有的互斥資源。

我們只要不讓上述四個(gè)條件中的任意一個(gè)不成立即可。在設(shè)計(jì)的時(shí)候,非常有必要先分析一下會(huì)否出現(xiàn)滿足四個(gè)條件的情況,特別是檢查有無(wú)試圖去同時(shí)保持兩個(gè)或者兩個(gè)以上的鎖,當(dāng)我們發(fā)現(xiàn)試圖去同時(shí)保持兩個(gè)或者兩個(gè)以上的鎖的時(shí)候,就需要特別警惕了。下面我們來(lái)看一個(gè)簡(jiǎn)化了的死鎖的例子:

清單?19.例子?thread_deadlock.cc

static mutex g_mutex1, g_mutex2; static void inc1(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex1.lock();(*p)++;g_mutex2.lock();// do something.g_mutex2.unlock();g_mutex1.unlock();} } static void inc2(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex2.lock();g_mutex1.lock();(*p)++;g_mutex1.unlock();// do other thing.g_mutex2.unlock();} } void threadMutex(void) {int a = 0;thread ta( inc1, &a);thread tb( inc2, &a);ta.join();tb.join();cout << "a=" << a << endl; }

在這個(gè)例子中,g_mutex1?和?g_mutex2?都是互斥的資源,任意時(shí)刻都只有一個(gè)線程可以持有(加鎖成功),而且只有持有線程調(diào)用?unlock?釋放鎖資源的時(shí)候其它線程才能去持有,滿足條件?1?和?3,線程?ta?持有了?g_mutex1?之后,在釋放?g_mutex1?之前試圖去持有?g_mutex2,而線程?tb?持有了?g_mutex2?之后,在釋放?g_mutex2?之前試圖去持有?g_mutex1,滿足條件?2?和?4,這種情況之下,當(dāng)線程?ta?試圖去持有?g_mutex2?的時(shí)候,如果?tb?正持有?g_mutex2?而試圖去持有?g_mutex1?時(shí)就發(fā)生了死鎖。在有些環(huán)境下,可能要多次運(yùn)行這個(gè)例子才出現(xiàn)死鎖,實(shí)際工作中這種偶現(xiàn)特性讓查找問題變難。要破除這個(gè)死鎖,我們只要按如下代碼所示破除條件?3?和?4?即可:

清單?20.例子?thread_break_deadlock.cc

static mutex g_mutex1, g_mutex2; static void inc1(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex1.lock();(*p)++;g_mutex1.unlock();g_mutex2.lock();// do something.g_mutex2.unlock();} } static void inc2(int *p ) {for(int i = 0; i < COUNT; i++){g_mutex2.lock();// do other thing.g_mutex2.unlock();g_mutex1.lock();(*p)++;g_mutex1.unlock();} } void threadMutex(void) {int a = 0;thread ta( inc1, &a);thread tb( inc2, &a);ta.join();tb.join();cout << "a=" << a << endl; }

在一些復(fù)雜的并行編程場(chǎng)景,如何避免死鎖是一個(gè)很重要的話題,在實(shí)踐中,當(dāng)我們看到有兩個(gè)鎖嵌套加鎖的時(shí)候就要特別提高警惕,它極有可能滿足了條件?2?或者?4。

?

結(jié)束語(yǔ)

上述例子在?CentOS?6.5,g++?4.8.1/g++4.9?以及?clang?3.5?下面編譯通過,在編譯的時(shí)候,請(qǐng)注意下述幾點(diǎn):

????設(shè)置?-std=c++11

????鏈接的時(shí)候設(shè)置?-pthread

????使用?g++編譯鏈接時(shí)設(shè)置?-Wl,–no-as-needed?傳給鏈接器,有些版本的?g++需要這個(gè)設(shè)置;

????設(shè)置宏定義?-D_REENTRANT,有些庫(kù)函數(shù)是依賴于這個(gè)宏定義來(lái)確定是否使用多線程版本的。

在用?gdb?調(diào)試多線程程序的時(shí)候,可以輸入命令?info?threads?查看當(dāng)前的線程列表,通過命令?thread?n?切換到第?n?個(gè)線程的上下文,這里的?n?是?info?threads?命令輸出的線程索引數(shù)字,例如,如果要切換到第?2?個(gè)線程的上下文,則輸入命令?thread?2。

聰明地使用多線程,擁抱多線程吧。

注意:?本文只是簡(jiǎn)單的概略性的簡(jiǎn)單介紹了一下使用C++11進(jìn)行線程并發(fā)編程,?如果讀者需要了解并掌握更加深入的內(nèi)容,?請(qǐng)參考其他相關(guān)書籍或資料,?在此,?我推薦一個(gè)博客系列,?作者是中科院計(jì)算所碩士,?博客網(wǎng)址:http://www.cnblogs.com/haippy/p/3284540.html


總結(jié)

以上是生活随笔為你收集整理的Linux多线程实践(10) --使用 C++11 编写 Linux 多线程程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。