C/C++协程实现-学习笔记
協(xié)程,又稱微線程,纖程。英文名Coroutine。
協(xié)程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua\go\C++20)中得到廣泛應用。
子程序,或者稱為函數(shù),在所有語言中都是層級調(diào)用,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C,C執(zhí)行完畢返回,B執(zhí)行完畢返回,最后是A執(zhí)行完畢。
所以子程序調(diào)用是通過棧實現(xiàn)的,一個線程就是執(zhí)行一個子程序。
子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。而協(xié)程的調(diào)用和子程序不同。
協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉而執(zhí)行別的子程序,在適當?shù)臅r候再返回來接著執(zhí)行。
注意,在一個子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷。比如子程序A、B:
def A():print '1'print '2'print '3'def B():print 'x'print 'y'print 'z'假設由協(xié)程執(zhí)行,在執(zhí)行A的過程中,可以隨時中斷,去執(zhí)行B,B也可能在執(zhí)行過程中中斷再去執(zhí)行A,結果可能是:
12xy3z但是在A中是沒有調(diào)用B的,所以協(xié)程的調(diào)用比函數(shù)調(diào)用理解起來要難一些。
看起來A、B的執(zhí)行有點像多線程,但協(xié)程的特點在于是一個線程執(zhí)行,那和多線程比,
協(xié)程有何優(yōu)勢?
最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。
第二大優(yōu)勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。
因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。
Python對協(xié)程的支持還非常有限,用在generator中的yield可以一定程度上實現(xiàn)協(xié)程。雖然支持不完全,但已經(jīng)可以發(fā)揮相當大的威力了。
來看例子:
傳統(tǒng)的生產(chǎn)者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。
如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過yield跳轉到消費者開始執(zhí)行,待消費者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:
import timedef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)time.sleep(1)r = '200 OK'def produce(c):c.next()n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()if __name__=='__main__':c = consumer()produce(c)執(zhí)行結果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer函數(shù)是一個generator(生成器),把一個consumer傳入produce后:
首先調(diào)用c.next()啟動生成器;
然后,一旦生產(chǎn)了東西,通過c.send(n)切換到consumer執(zhí)行;
consumer通過yield拿到消息,處理,又通過yield把結果傳回;
produce拿到consumer處理的結果,繼續(xù)生產(chǎn)下一條消息;
produce決定不生產(chǎn)了,通過c.close()關閉consumer,整個過程結束。
整個流程無鎖,由一個線程執(zhí)行,produce和consumer協(xié)作完成任務,所以稱為“協(xié)程”,而非線程的搶占式多任務。
?
C/C++ 協(xié)程
首先需要聲明的是,這里不打算花時間來介紹什么是協(xié)程,以及協(xié)程和線程有什么不同。如果對此有任何疑問,可以自行 google。與 Python 不同,C/C++ 語言本身是不能天然支持協(xié)程的。現(xiàn)有的 C++ 協(xié)程庫均基于兩種方案:利用匯編代碼控制協(xié)程上下文的切換,以及利用操作系統(tǒng)提供的 API 來實現(xiàn)協(xié)程上下文切換。典型的例如:
- libco,Boost.context:基于匯編代碼的上下文切換
- phxrpc:基于 ucontext/Boost.context 的上下文切換
- libmill:基于 setjump/longjump 的協(xié)程切換
一般而言,基于匯編的上下文切換要比采用系統(tǒng)調(diào)用的切換更加高效,這也是為什么 phxrpc 在使用 Boost.context 時要比使用 ucontext 性能更好的原因。關于 phxrpc 和 libmill 具體的協(xié)程實現(xiàn)方式,以后有時間再詳細介紹。
libco 協(xié)程的創(chuàng)建和切換
在介紹 coroutine 的創(chuàng)建之前,我們先來熟悉一下 libco 中用來表示一個 coroutine 的數(shù)據(jù)結構,即定義在 co_routine_inner.h 中的?stCoRoutine_t:
struct stCoRoutine_t{stCoRoutineEnv_t *env; // 協(xié)程運行環(huán)境pfn_co_routine_t pfn; // 協(xié)程執(zhí)行的邏輯函數(shù)void *arg; // 函數(shù)參數(shù)coctx_t ctx; // 保存協(xié)程的下文環(huán)境...char cEnableSysHook; // 是否運行系統(tǒng) hook,即非侵入式邏輯char cIsShareStack; // 是否在共享棧模式void *pvEnv;stStackMem_t* stack_mem; // 協(xié)程運行時的棧空間char* stack_sp; // 用來保存協(xié)程運行時的棧空間unsigned int save_size;char* save_buffer;};我們暫時只需要了解表示協(xié)程的最簡單的幾個參數(shù),例如協(xié)程運行環(huán)境,協(xié)程的上下文環(huán)境,協(xié)程運行的函數(shù)以及運行時棧空間。后面的?stack_sp,save_size?和?save_buffer?與 libco 共享棧模式相關,有關共享棧的內(nèi)容我們后續(xù)再說
協(xié)程創(chuàng)建和運行
由于多個協(xié)程運行于一個線程內(nèi)部的,因此當創(chuàng)建線程中的第一個協(xié)程時,需要初始化該協(xié)程所在的環(huán)境?stCoRoutineEnv_t,這個環(huán)境是線程用來管理協(xié)程的,通過該環(huán)境,線程可以得知當前一共創(chuàng)建了多少個協(xié)程,當前正在運行哪一個協(xié)程,當前應當如何調(diào)度協(xié)程:
struct stCoRoutineEnv_t{stCoRoutine_t *pCallStack[ 128 ]; // 記錄當前創(chuàng)建的協(xié)程int iCallStackSize; // 記錄當前一共創(chuàng)建了多少個協(xié)程stCoEpoll_t *pEpoll; // 該線程的協(xié)程調(diào)度器// 在使用共享棧模式拷貝棧內(nèi)存時記錄相應的 coroutinestCoRoutine_t* pending_co;stCoRoutine_t* occupy_co;};上述代碼表明 libco 允許一個線程內(nèi)最多創(chuàng)建 128 個協(xié)程,其中?pCallStack[iCallStackSize-1]?也就是棧頂?shù)膮f(xié)程表示當前正在運行的協(xié)程。當調(diào)用函數(shù)?co_create?時,首先檢查當前線程中的 coroutine env 結構是否創(chuàng)建。這里 libco 對于每個線程內(nèi)的 stCoRoutineEnv_t 并沒有使用 thread-local 的方式(例如gcc 內(nèi)置的?__thread,phxrpc采用這種方式)來管理,而是預先定義了一個大的數(shù)組,并通過對應的 PID 來獲取其協(xié)程環(huán)境。:
static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]stCoRoutineEnv_t *co_get_curr_thread_env(){return g_arrCoEnvPerThread[ GetPid() ];}初始化?stCoRoutineEnv_t?時主要完成以下幾步:
當初始化完成協(xié)程環(huán)境之后,調(diào)用函數(shù)?co_create_env?來創(chuàng)建具體的協(xié)程,該函數(shù)初始化一個協(xié)程結構?stCoRoutine_t,設置該結構中的各項字段,例如運行的函數(shù)?pfn,運行時的棧地址等等。需要說明的就是,如果使用了非共享棧模式,則需要為該協(xié)程單獨申請棧空間,否則從共享棧中申請空間。棧空間表示如下:
struct stStackMem_t{stCoRoutine_t* occupy_co; // 使用該棧的協(xié)程int stack_size; // 棧大小char* stack_bp; // 棧底指針,棧從高地址向低地址增長[棧底在高,棧頂在低]char* stack_buffer; // 棧底};使用?co_create?創(chuàng)建完一個協(xié)程之后,將調(diào)用?co_resume?來將該協(xié)程激活運行:
void co_resume( stCoRoutine_t *co ){stCoRoutineEnv_t *env = co->env;// 獲取當前正在運行的協(xié)程的結構stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];if( !co->cStart ){// 為將要運行的 co 布置上下文環(huán)境coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );co->cStart = 1;}env->pCallStack[ env->iCallStackSize++ ] = co; // 設置co為運行的線程co_swap( lpCurrRoutine, co );}函數(shù)?co_swap?的作用類似于 Unix 提供的函數(shù)?swapcontext:將當前正在運行的 coroutine 的上下文以及狀態(tài)保存到結構?lpCurrRoutine?中,并且將?co?設置成為要運行的協(xié)程,從而實現(xiàn)協(xié)程的切換。co_swap?具體完成三項工作:
對應于?co_resume?函數(shù),協(xié)程主動讓出執(zhí)行權則調(diào)用?co_yield?函數(shù)。co_yield?函數(shù)調(diào)用了?co_yield_env,將當前協(xié)程與當前線程中記錄的其他協(xié)程進行切換:
void co_yield_env( stCoRoutineEnv_t *env ){stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];env->iCallStackSize--;co_swap( curr, last);}前面我們已經(jīng)提到過,pCallStack 棧頂所指向的即為當前正在運行的協(xié)程所對應的結構,因此該函數(shù)將?curr?取出來,并將當前正運行的協(xié)程上下文保存到該結構上,并切換到協(xié)程 last 上執(zhí)行。接下來我們以 32-bit 的系統(tǒng)為例來分析 libco 是如何實現(xiàn)協(xié)程運行環(huán)境的切換的。
協(xié)程上下文的創(chuàng)建和切換
libco 使用結構?struct coctx_t?來表示一個協(xié)程的上下文環(huán)境:
struct coctx_t{#if defined(__i386__)void *regs[ 8 ];#elsevoid *regs[ 14 ];#endifsize_t ss_size;char *ss_sp;};可以看到,在 i386 的架構下,需要保存 8 個寄存器信息,以及棧指針和棧大小,究竟這 8 個寄存器如何保存,又是如何使用,需要配合后續(xù)的?coctx_swap?來理解。我們首先來回顧一下 Unix-like 系統(tǒng)的 stack frame layout,如果不能理解這個,那么剩下的內(nèi)容就不必看了。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? ??
?
結合上圖,我們需要知道關鍵的幾點:
了解了這些,我們就來看一下協(xié)程上下文環(huán)境的初始化函數(shù)?coctx_make:
int coctx_make( coctx_t *ctx, coctx_pfn_t pfn, const void *s, const void *s1 ){char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);sp = (char*)((unsigned long)sp & -16L);coctx_param_t* param = (coctx_param_t*)sp ;param->s1 = s;param->s2 = s1;memset(ctx->regs, 0, sizeof(ctx->regs));ctx->regs[ kESP ] = (char*)(sp) - sizeof(void*);ctx->regs[ kEIP ] = (char*)pfn;return 0;}這段代碼應該比較好理解,首先為函數(shù)?coctx_pfn_t?預留 2 個參數(shù)的棧空間并對其到 16 字節(jié),之后將實參設置到預留的棧上空間中。最后在?ctx?結構中填入相應的,其中記錄 reg[kEIP] 返回地址為函數(shù)指針?pfn,記錄 reg[kESP] 為獲得的棧頂指針?sp?減去一個指針長度,這個減去的空間是為返回地址 RA 預留的。當調(diào)用?coctx_swap?時,reg[kEIP] 會被放到返回地址 RA 的位置,待?coctx_swap?執(zhí)行結束,自然會跳轉到函數(shù)?pfn?處執(zhí)行。
coctx_swap(ctx1, ctx2)?在 coctx_swap.S 中實現(xiàn)。這里可以看到,該函數(shù)并沒有使用?push %ebp; move %ebp, %esp; sub $esp N;?開頭,因此棧空間分布中不會出現(xiàn)?ebp?的位置。coctx_swap?函數(shù)主要分為兩段,其首先將當前的上下文環(huán)境保存到?ctx1?結構中:
leal 4(%esp), %eax // eax = old_esp + 4movl 4(%esp), %esp // 將 esp 的值設為 &ctx1(即ctx1的地址)leal 32(%esp), %esp // esp = (char*)&ctx1 + 32pushl %eax // ctx1->regs[EAX] = %eaxpushl %ebp // ctx1->regs[EBP] = %ebppushl %esi // ctx1->regs[ESI] = %esipushl %edi // ctx1->regs[EDI] = %edipushl %edx // ctx1->regs[EDX] = %edxpushl %ecx // ctx1->regs[ECX] = %ecxpushl %ebx // ctx1->regs[EBX] = %ebxpushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp這里需要注意指令?leal?和?movl?的區(qū)別。leal?將 eax 的值設置成為 esp 的值加 4,而?movl?將 esp 的值設為 esp+4 所指向的內(nèi)存上的值,也就是參數(shù) ctx1 的地址。之后該函數(shù)將?ctx2?中記錄的上下文恢復到 CPU 寄存器中,并跳轉到其函數(shù)地址處運行:
movl 4(%eax), %esp // 將 esp 的值設為 &ctx2(即ctx2的地址)popl %eax // %eax = ctx1->regs[EIP],也就是 &pfnpopl %ebx // %ebx = ctx1->regs[EBP]popl %ecx // %ecx = ctx1->regs[ECX]popl %edx // %edx = ctx1->regs[EDX]popl %edi // %edi = ctx1->regs[EDI]popl %esi // %esi = ctx1->regs[ESI]popl %ebp // %ebp = ctx1->regs[EBP]popl %esp // %esp = ctx1->regs[ESP],即(char*)(sp) - sizeof(void*)pushl %eax // RA = %eax = &pfn,注意此時esp已經(jīng)指向了新的espxorl %eax, %eax // reset eaxret上面的代碼看起來可能有些繞:
如何使用 libco
我們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應用程序如何使用 libco 來編寫服務端程序。 在 example_echosvr.cpp 的?main?函數(shù)中,主要執(zhí)行如下幾步:
函數(shù)?readwrite_coroutine?在外層循環(huán)中將新創(chuàng)建的讀寫協(xié)程都加入到隊列?g_readwrite?中,此時這些讀寫協(xié)程都沒有具體與某個 socket 連接對應,可以將隊列?g_readwrite?看成一個 coroutine pool。當加入到隊列中之后,調(diào)用函數(shù)?co_yield_ct?函數(shù)讓出 CPU,此時控制權回到主線程。
主線程中的函數(shù)?co_eventloop?監(jiān)聽網(wǎng)絡事件,將來自于客戶端新進的連接交由協(xié)程 accept_co 處理,關于?co_eventloop?如何喚醒 accept_co 的細節(jié)我們將在后續(xù)介紹。accept_co 調(diào)用函數(shù)?accept_routine?接收新連接,該函數(shù)的流程如下:
再次回到函數(shù)?readwrite_coroutine?中,該函數(shù)會調(diào)用?co_poll?將新建立的連接的 fd 加入到 Epoll 監(jiān)聽中,并將控制流程返回到 main 協(xié)程;當有讀或者寫事件發(fā)生時,Epoll 會喚醒對應的 coroutine ,繼續(xù)執(zhí)行?read?函數(shù)以及?write?函數(shù)。
上面的過程大致說明了控制流程是如何在不同的協(xié)程中切換,接下來我們介紹具體的實現(xiàn)細節(jié),即如何通過 Epoll 來管理協(xié)程,以及如何對系統(tǒng)函數(shù)進行改造以滿足 libco 的調(diào)用。
通過 Epoll 管理和喚醒協(xié)程
Epoll 監(jiān)聽 FD
上一章節(jié)中介紹了協(xié)程可以通過函數(shù)?co_poll?來將 fd 交由 Epoll 管理,待 Epoll 的相應的事件觸發(fā)時,再切換回來執(zhí)行 read 或者 write 操作,從而實現(xiàn)由 Epoll 管理協(xié)程的功能。co_poll?函數(shù)原型如下:
int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout_ms)stCoEpoll_t?是為 libco 定制的 Epoll 相關數(shù)據(jù)結構,fds?是?pollfd?結構的文件句柄,nfds?為?fds?數(shù)組的長度,最后一個參數(shù)表示定時器時間,也就是在?timeout?毫秒之后觸發(fā)處理這些文件句柄。這里可以看到,co_poll?能夠同時將多個文件句柄同時加入到 Epoll 管理中。我們先看?stCoEpoll_t?結構:
struct stCoEpoll_t{int iEpollFd; // Epoll 主 FDstatic const int _EPOLL_SIZE = 1024 * 10; // Epoll 可以監(jiān)聽的句柄總數(shù)struct stTimeout_t *pTimeout; // 時間輪定時器struct stTimeoutItemLink_t *pstTimeoutList; // 已經(jīng)超時的時間struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件co_epoll_res *result; // Epoll 返回的事件結果};以?stTimeout_?開頭的數(shù)據(jù)結構與 libco 的定時器管理有關,我們在后面介紹。co_epoll_res?是對 Epoll 事件數(shù)據(jù)結構的封裝,也就是每次觸發(fā) Epoll 事件時的返回結果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,因此這里也保留了 kevent 數(shù)據(jù)結構。
struct co_epoll_res{int size;struct epoll_event *events; // for linux epollstruct kevent *eventlist; // for Unix or MacOs kqueue};co_poll?實際是對函數(shù)?co_poll_inner?的封裝。我們將?co_epoll_inner?函數(shù)的結構分為上下兩半段。在上半段中,調(diào)用?co_poll?的協(xié)程?CC?將其需要監(jiān)聽的句柄數(shù)組?fds?都加入到 Epoll 管理中,并通過函數(shù)?co_yield_env?讓出 CPU;當 main 協(xié)程的事件循環(huán)?co_eventloop?中觸發(fā)了?CC?對應的監(jiān)聽事件時,會恢復?CC的執(zhí)行。此時,CC?將開始執(zhí)行下半段,即將上半段添加的句柄?fds?從 epoll 中移除,清理殘留的數(shù)據(jù)結構,下面的流程圖簡要說明了控制流的轉移過程:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
有了上面的基本概念,我們來看具體的實現(xiàn)細節(jié)。co_poll?首先在內(nèi)部將傳入的文件句柄數(shù)組?fds?轉化為數(shù)據(jù)結構?stPoll_t,這一步主要是為了方便后續(xù)處理。該結構記錄了?iEpollFd,ndfs,fds?數(shù)組,以及該協(xié)程需要執(zhí)行的函數(shù)和參數(shù)。有兩點需要說明的是:
co_poll?的第二步,也是最關鍵的一步,就是將 fd 數(shù)組全部加入到 Epoll 中進行監(jiān)聽。協(xié)程?CC?會將每一個 epoll_event 的?data.ptr?域設置為對應的?stPollItem_t?結構。這樣當事件觸發(fā)時,可以直接從對應的?ptr中取出?stPollItem_t?結構,然后喚醒指定協(xié)程。
如果本次操作提供了 Timeout 參數(shù),co_poll?還會將協(xié)程?CC?本次操作對應的?stPoll_t?加入到定時器隊列中。這表明在 Timeout 定時觸發(fā)之后,也會喚醒協(xié)程?CC?的執(zhí)行。當整個上半段都完成后,co_poll?立即調(diào)用?co_yield_env?讓出 CPU,執(zhí)行流程跳轉回到 main 協(xié)程中。
從上面的流程圖中也可以看出,當執(zhí)行流程再次跳回時,表明協(xié)程?CC?添加的讀寫等監(jiān)聽事件已經(jīng)觸發(fā),即可以執(zhí)行相應的讀寫操作了。此時?CC?首先將其在上半段中添加的監(jiān)聽事件從 Epoll 中刪除,清理殘留的數(shù)據(jù)結構,然后調(diào)用讀寫邏輯。
定時器實現(xiàn)
協(xié)程?CC?在將一組?fds?加入 Epoll 的同時,還能為其設置一個超時時間。在超時時間到期時,也會再次喚醒?CC?來執(zhí)行。libco 使用 Timing-Wheel 來實現(xiàn)定時器。關于 Timing-Wheel 算法,可以參考,其優(yōu)勢是 O(1) 的插入和刪除復雜度,缺點是只有有限的長度,在某些場合下不能滿足需求。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
回過去看?stCoEpoll_t?結構,其中?*pTimeout?代表時間輪,通過函數(shù)?AllocateTimeout?初始化為一個固定大小(60 * 1000)的數(shù)組。根據(jù) Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時事件。而實際上,在添加定時器時,libco 要求定時時間不超過 40s。成員?pstTimeoutList?記錄在?co_eventloop?中發(fā)生超時的事件,而?pstActiveList?記錄當前活躍的事件,包括超時事件。這兩個結構都將在?co_eventloop?中進行處理。
下面我們簡要分析一下加入定時器的實現(xiàn):
int AddTimeout( stTimeout_t *apTimeout, stTimeoutItem_t *apItem, unsigned long long allNow ){if( apTimeout->ullStart == 0 ) // 初始化時間輪的基準時間{apTimeout->ullStart = allNow;apTimeout->llStartIdx = 0; // 當前時間輪指針指向數(shù)組0}// 1. 當前時間不可能小于時間輪的基準時間// 2. 加入的定時器的超時時間不能小于當前時間if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow ){return __LINE__;}int diff = apItem->ullExpireTime - apTimeout->ullStart;if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過時間輪的大小{return __LINE__;}// 插入到時間輪盤的指定位置AddTail( apTimeout->pItems +(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );return 0;}定時器的超時檢查在函數(shù)?co_eventloop?中執(zhí)行。
EPOLL 事件循環(huán)
main 協(xié)程通過調(diào)用函數(shù)?co_eventloop?來監(jiān)聽 Epoll 事件,并在相應的事件觸發(fā)時切換到指定的協(xié)程執(zhí)行。有關?co_eventloop?與 應用協(xié)程的交互過程在上一節(jié)的流程圖中已經(jīng)比較清楚了,下面我們主要介紹一下?co_eventloop?函數(shù)的實現(xiàn):
上文中也提到,通過?epoll_wait?返回的事件都保存在?stCoEpoll_t?結構的?co_epoll_res?中。因此?co_eventloop?首先為?co_epoll_res?申請空間,之后通過一個無限循環(huán)來監(jiān)聽所有 coroutine 添加的所有事件:
for(;;){int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );...}對于每一個觸發(fā)的事件,co_eventloop?首先通過指針域?data.ptr?取出保存的?stPollItem_t?結構,并將其添加到?pstActiveList?列表中;之后從定時器輪盤中取出所有已經(jīng)超時的事件,也將其全部添加到?pstActiveList?中,pstActiveList?中的所有事件都作為活躍事件處理。
對于每一個活躍事件,co_eventloop?將通過調(diào)用對應的?pfnProcess?也就是上圖中的OnPollProcessEvent?函數(shù)來切換到該事件對應的 coroutine,將流程跳轉到該 coroutine 處執(zhí)行。
最后?co_eventloop?在調(diào)用時也提供一個額外的參數(shù)來供調(diào)用者傳入一個函數(shù)指針?pfn。該函數(shù)將會在每次循環(huán)完成之后執(zhí)行;當該函數(shù)返回 -1 時,將會終止整個事件循環(huán)。用戶可以利用該函數(shù)來控制 main 協(xié)程的終止或者完成一些統(tǒng)計需求。
?
鏈接:https://blog.csdn.net/qq_25424545/article/details/81529717
總結
以上是生活随笔為你收集整理的C/C++协程实现-学习笔记的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++协程
- 下一篇: 我所理解的设计模式(C++实现)——策略