日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > c/c++ >内容正文

c/c++

C/C++协程实现-学习笔记

發(fā)布時間:2024/4/18 c/c++ 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 C/C++协程实现-学习笔记 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

協(xié)程,又稱微線程,纖程。英文名Coroutine。

協(xié)程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua\go\C++20)中得到廣泛應用。

子程序,或者稱為函數(shù),在所有語言中都是層級調(diào)用,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C,C執(zhí)行完畢返回,B執(zhí)行完畢返回,最后是A執(zhí)行完畢。

所以子程序調(diào)用是通過棧實現(xiàn)的,一個線程就是執(zhí)行一個子程序。

子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。而協(xié)程的調(diào)用和子程序不同。

協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉而執(zhí)行別的子程序,在適當?shù)臅r候再返回來接著執(zhí)行。

注意,在一個子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷。比如子程序A、B:

def A():print '1'print '2'print '3'def B():print 'x'print 'y'print 'z'

假設由協(xié)程執(zhí)行,在執(zhí)行A的過程中,可以隨時中斷,去執(zhí)行B,B也可能在執(zhí)行過程中中斷再去執(zhí)行A,結果可能是:

12xy3z

但是在A中是沒有調(diào)用B的,所以協(xié)程的調(diào)用比函數(shù)調(diào)用理解起來要難一些。

看起來A、B的執(zhí)行有點像多線程,但協(xié)程的特點在于是一個線程執(zhí)行,那和多線程比,

協(xié)程有何優(yōu)勢?

最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。

第二大優(yōu)勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。

因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。

Python對協(xié)程的支持還非常有限,用在generator中的yield可以一定程度上實現(xiàn)協(xié)程。雖然支持不完全,但已經(jīng)可以發(fā)揮相當大的威力了。

來看例子:

傳統(tǒng)的生產(chǎn)者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。

如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過yield跳轉到消費者開始執(zhí)行,待消費者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:

import timedef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)time.sleep(1)r = '200 OK'def produce(c):c.next()n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()if __name__=='__main__':c = consumer()produce(c)

執(zhí)行結果:

  • [PRODUCER] Producing 1...

  • [CONSUMER] Consuming 1...

  • [PRODUCER] Consumer return: 200 OK

  • [PRODUCER] Producing 2...

  • [CONSUMER] Consuming 2...

  • [PRODUCER] Consumer return: 200 OK

  • [PRODUCER] Producing 3...

  • [CONSUMER] Consuming 3...

  • [PRODUCER] Consumer return: 200 OK

  • [PRODUCER] Producing 4...

  • [CONSUMER] Consuming 4...

  • [PRODUCER] Consumer return: 200 OK

  • [PRODUCER] Producing 5...

  • [CONSUMER] Consuming 5...

  • [PRODUCER] Consumer return: 200 OK

  • 注意到consumer函數(shù)是一個generator(生成器),把一個consumer傳入produce后:

  • 首先調(diào)用c.next()啟動生成器;

  • 然后,一旦生產(chǎn)了東西,通過c.send(n)切換到consumer執(zhí)行;

  • consumer通過yield拿到消息,處理,又通過yield把結果傳回;

  • produce拿到consumer處理的結果,繼續(xù)生產(chǎn)下一條消息;

  • produce決定不生產(chǎn)了,通過c.close()關閉consumer,整個過程結束。

  • 整個流程無鎖,由一個線程執(zhí)行,produce和consumer協(xié)作完成任務,所以稱為“協(xié)程”,而非線程的搶占式多任務。

    ?

    C/C++ 協(xié)程

    首先需要聲明的是,這里不打算花時間來介紹什么是協(xié)程,以及協(xié)程和線程有什么不同。如果對此有任何疑問,可以自行 google。與 Python 不同,C/C++ 語言本身是不能天然支持協(xié)程的。現(xiàn)有的 C++ 協(xié)程庫均基于兩種方案:利用匯編代碼控制協(xié)程上下文的切換,以及利用操作系統(tǒng)提供的 API 來實現(xiàn)協(xié)程上下文切換。典型的例如:

    • libco,Boost.context:基于匯編代碼的上下文切換
    • phxrpc:基于 ucontext/Boost.context 的上下文切換
    • libmill:基于 setjump/longjump 的協(xié)程切換

    一般而言,基于匯編的上下文切換要比采用系統(tǒng)調(diào)用的切換更加高效,這也是為什么 phxrpc 在使用 Boost.context 時要比使用 ucontext 性能更好的原因。關于 phxrpc 和 libmill 具體的協(xié)程實現(xiàn)方式,以后有時間再詳細介紹。

    libco 協(xié)程的創(chuàng)建和切換

    在介紹 coroutine 的創(chuàng)建之前,我們先來熟悉一下 libco 中用來表示一個 coroutine 的數(shù)據(jù)結構,即定義在 co_routine_inner.h 中的?stCoRoutine_t:

    struct stCoRoutine_t{stCoRoutineEnv_t *env; // 協(xié)程運行環(huán)境pfn_co_routine_t pfn; // 協(xié)程執(zhí)行的邏輯函數(shù)void *arg; // 函數(shù)參數(shù)coctx_t ctx; // 保存協(xié)程的下文環(huán)境...char cEnableSysHook; // 是否運行系統(tǒng) hook,即非侵入式邏輯char cIsShareStack; // 是否在共享棧模式void *pvEnv;stStackMem_t* stack_mem; // 協(xié)程運行時的棧空間char* stack_sp; // 用來保存協(xié)程運行時的棧空間unsigned int save_size;char* save_buffer;};

    我們暫時只需要了解表示協(xié)程的最簡單的幾個參數(shù),例如協(xié)程運行環(huán)境,協(xié)程的上下文環(huán)境,協(xié)程運行的函數(shù)以及運行時棧空間。后面的?stack_sp,save_size?和?save_buffer?與 libco 共享棧模式相關,有關共享棧的內(nèi)容我們后續(xù)再說

    協(xié)程創(chuàng)建和運行

    由于多個協(xié)程運行于一個線程內(nèi)部的,因此當創(chuàng)建線程中的第一個協(xié)程時,需要初始化該協(xié)程所在的環(huán)境?stCoRoutineEnv_t,這個環(huán)境是線程用來管理協(xié)程的,通過該環(huán)境,線程可以得知當前一共創(chuàng)建了多少個協(xié)程,當前正在運行哪一個協(xié)程,當前應當如何調(diào)度協(xié)程:

    struct stCoRoutineEnv_t{stCoRoutine_t *pCallStack[ 128 ]; // 記錄當前創(chuàng)建的協(xié)程int iCallStackSize; // 記錄當前一共創(chuàng)建了多少個協(xié)程stCoEpoll_t *pEpoll; // 該線程的協(xié)程調(diào)度器// 在使用共享棧模式拷貝棧內(nèi)存時記錄相應的 coroutinestCoRoutine_t* pending_co;stCoRoutine_t* occupy_co;};

    上述代碼表明 libco 允許一個線程內(nèi)最多創(chuàng)建 128 個協(xié)程,其中?pCallStack[iCallStackSize-1]?也就是棧頂?shù)膮f(xié)程表示當前正在運行的協(xié)程。當調(diào)用函數(shù)?co_create?時,首先檢查當前線程中的 coroutine env 結構是否創(chuàng)建。這里 libco 對于每個線程內(nèi)的 stCoRoutineEnv_t 并沒有使用 thread-local 的方式(例如gcc 內(nèi)置的?__thread,phxrpc采用這種方式)來管理,而是預先定義了一個大的數(shù)組,并通過對應的 PID 來獲取其協(xié)程環(huán)境。:

    static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]stCoRoutineEnv_t *co_get_curr_thread_env(){return g_arrCoEnvPerThread[ GetPid() ];}

    初始化?stCoRoutineEnv_t?時主要完成以下幾步:

  • 為?stCoRoutineEnv_t?申請空間并且進行初始化,設置協(xié)程調(diào)度器?pEpoll。
  • 創(chuàng)建一個空的 coroutine,初始化其上下文環(huán)境( 有關?coctx?在后文詳細介紹 ),將其加入到該線程的協(xié)程環(huán)境中進行管理,并且設置其為 main coroutine。這個 main coroutine 用來運行該線程主邏輯。
  • 當初始化完成協(xié)程環(huán)境之后,調(diào)用函數(shù)?co_create_env?來創(chuàng)建具體的協(xié)程,該函數(shù)初始化一個協(xié)程結構?stCoRoutine_t,設置該結構中的各項字段,例如運行的函數(shù)?pfn,運行時的棧地址等等。需要說明的就是,如果使用了非共享棧模式,則需要為該協(xié)程單獨申請棧空間,否則從共享棧中申請空間。棧空間表示如下:

    struct stStackMem_t{stCoRoutine_t* occupy_co; // 使用該棧的協(xié)程int stack_size; // 棧大小char* stack_bp; // 棧底指針,棧從高地址向低地址增長[棧底在高,棧頂在低]char* stack_buffer; // 棧底};

    使用?co_create?創(chuàng)建完一個協(xié)程之后,將調(diào)用?co_resume?來將該協(xié)程激活運行:

    void co_resume( stCoRoutine_t *co ){stCoRoutineEnv_t *env = co->env;// 獲取當前正在運行的協(xié)程的結構stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];if( !co->cStart ){// 為將要運行的 co 布置上下文環(huán)境coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );co->cStart = 1;}env->pCallStack[ env->iCallStackSize++ ] = co; // 設置co為運行的線程co_swap( lpCurrRoutine, co );}

    函數(shù)?co_swap?的作用類似于 Unix 提供的函數(shù)?swapcontext:將當前正在運行的 coroutine 的上下文以及狀態(tài)保存到結構?lpCurrRoutine?中,并且將?co?設置成為要運行的協(xié)程,從而實現(xiàn)協(xié)程的切換。co_swap?具體完成三項工作:

  • 記錄當前協(xié)程?curr?的運行棧的棧頂指針,通過?char c; curr_stack_sp=&c?實現(xiàn),當下次切換回?curr時,可以從該棧頂指針指向的位置繼續(xù),執(zhí)行完?curr?后可以順利釋放該棧。
  • 處理共享棧相關的操作,并且調(diào)用函數(shù)?coctx_swap?來完成上下文環(huán)境的切換。注意執(zhí)行完?coctx_swap之后,執(zhí)行流程將跳到新的 coroutine 也就是 pending_co 中運行,后續(xù)的代碼需要等下次切換回?curr?時才會執(zhí)行。
  • 當下次切換回?curr?時,處理共享棧相關的操作。
  • 對應于?co_resume?函數(shù),協(xié)程主動讓出執(zhí)行權則調(diào)用?co_yield?函數(shù)。co_yield?函數(shù)調(diào)用了?co_yield_env,將當前協(xié)程與當前線程中記錄的其他協(xié)程進行切換:

    void co_yield_env( stCoRoutineEnv_t *env ){stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];env->iCallStackSize--;co_swap( curr, last);}

    前面我們已經(jīng)提到過,pCallStack 棧頂所指向的即為當前正在運行的協(xié)程所對應的結構,因此該函數(shù)將?curr?取出來,并將當前正運行的協(xié)程上下文保存到該結構上,并切換到協(xié)程 last 上執(zhí)行。接下來我們以 32-bit 的系統(tǒng)為例來分析 libco 是如何實現(xiàn)協(xié)程運行環(huán)境的切換的。

    協(xié)程上下文的創(chuàng)建和切換

    libco 使用結構?struct coctx_t?來表示一個協(xié)程的上下文環(huán)境:

    struct coctx_t{#if defined(__i386__)void *regs[ 8 ];#elsevoid *regs[ 14 ];#endifsize_t ss_size;char *ss_sp;};

    可以看到,在 i386 的架構下,需要保存 8 個寄存器信息,以及棧指針和棧大小,究竟這 8 個寄存器如何保存,又是如何使用,需要配合后續(xù)的?coctx_swap?來理解。我們首先來回顧一下 Unix-like 系統(tǒng)的 stack frame layout,如果不能理解這個,那么剩下的內(nèi)容就不必看了。

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? ??

    ?

    結合上圖,我們需要知道關鍵的幾點:

  • 函數(shù)調(diào)用棧是調(diào)用者和被調(diào)用者共同負責布置的。Caller 將其參數(shù)從右向左反向壓棧,再將調(diào)用后的返回地址壓棧,然后將執(zhí)行流程交給 Callee。
  • 典型的編譯器會將 Callee 函數(shù)匯編成為以?push %ebp; move %ebp, %esp; sub $esp N;?這種形式開頭的匯編代碼。這幾句代碼主要目的是為了方便 Callee 利用?ebp?來訪問調(diào)用者提供的參數(shù)以及自身的局部變量(如下圖)。
  • 當調(diào)用過程完成清除了局部變量以后,會執(zhí)行?pop %ebp; ret,這樣指令會跳轉到 RA 也就是返回地址上面執(zhí)行。這一點也是實現(xiàn)協(xié)程切換的關鍵:我們只需要將指定協(xié)程的函數(shù)指針地址保存到 RA 中,當調(diào)用完?coctx_swap?之后,會自動跳轉到該協(xié)程的函數(shù)起始地址開始運行
  • 了解了這些,我們就來看一下協(xié)程上下文環(huán)境的初始化函數(shù)?coctx_make:

    int coctx_make( coctx_t *ctx, coctx_pfn_t pfn, const void *s, const void *s1 ){char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);sp = (char*)((unsigned long)sp & -16L);coctx_param_t* param = (coctx_param_t*)sp ;param->s1 = s;param->s2 = s1;memset(ctx->regs, 0, sizeof(ctx->regs));ctx->regs[ kESP ] = (char*)(sp) - sizeof(void*);ctx->regs[ kEIP ] = (char*)pfn;return 0;}

    這段代碼應該比較好理解,首先為函數(shù)?coctx_pfn_t?預留 2 個參數(shù)的棧空間并對其到 16 字節(jié),之后將實參設置到預留的棧上空間中。最后在?ctx?結構中填入相應的,其中記錄 reg[kEIP] 返回地址為函數(shù)指針?pfn,記錄 reg[kESP] 為獲得的棧頂指針?sp?減去一個指針長度,這個減去的空間是為返回地址 RA 預留的。當調(diào)用?coctx_swap?時,reg[kEIP] 會被放到返回地址 RA 的位置,待?coctx_swap?執(zhí)行結束,自然會跳轉到函數(shù)?pfn?處執(zhí)行。

    coctx_swap(ctx1, ctx2)?在 coctx_swap.S 中實現(xiàn)。這里可以看到,該函數(shù)并沒有使用?push %ebp; move %ebp, %esp; sub $esp N;?開頭,因此棧空間分布中不會出現(xiàn)?ebp?的位置。coctx_swap?函數(shù)主要分為兩段,其首先將當前的上下文環(huán)境保存到?ctx1?結構中:

    leal 4(%esp), %eax // eax = old_esp + 4movl 4(%esp), %esp // 將 esp 的值設為 &ctx1(即ctx1的地址)leal 32(%esp), %esp // esp = (char*)&ctx1 + 32pushl %eax // ctx1->regs[EAX] = %eaxpushl %ebp // ctx1->regs[EBP] = %ebppushl %esi // ctx1->regs[ESI] = %esipushl %edi // ctx1->regs[EDI] = %edipushl %edx // ctx1->regs[EDX] = %edxpushl %ecx // ctx1->regs[ECX] = %ecxpushl %ebx // ctx1->regs[EBX] = %ebxpushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp

    這里需要注意指令?leal?和?movl?的區(qū)別。leal?將 eax 的值設置成為 esp 的值加 4,而?movl?將 esp 的值設為 esp+4 所指向的內(nèi)存上的值,也就是參數(shù) ctx1 的地址。之后該函數(shù)將?ctx2?中記錄的上下文恢復到 CPU 寄存器中,并跳轉到其函數(shù)地址處運行:

    movl 4(%eax), %esp // 將 esp 的值設為 &ctx2(即ctx2的地址)popl %eax // %eax = ctx1->regs[EIP],也就是 &pfnpopl %ebx // %ebx = ctx1->regs[EBP]popl %ecx // %ecx = ctx1->regs[ECX]popl %edx // %edx = ctx1->regs[EDX]popl %edi // %edi = ctx1->regs[EDI]popl %esi // %esi = ctx1->regs[ESI]popl %ebp // %ebp = ctx1->regs[EBP]popl %esp // %esp = ctx1->regs[ESP],即(char*)(sp) - sizeof(void*)pushl %eax // RA = %eax = &pfn,注意此時esp已經(jīng)指向了新的espxorl %eax, %eax // reset eaxret

    上面的代碼看起來可能有些繞:

  • 首先 line 1 將 esp 設置為參數(shù) ctx2 的地址,后續(xù)的 popl 操作均在 ctx2 的內(nèi)存空間上執(zhí)行。
  • line 2-9 將?ctx2->regs[]?中的內(nèi)容恢復到相應的寄存器中。還記得在前面?coctx_make?中設置了?regs[EIP]?和?regs[ESP]?嗎?這里剛好就對應恢復了相應的值。
  • 當執(zhí)行完 line 9 之后,esp 已經(jīng)指向了?ctx2?中新的棧頂指針,由于在?coctx_make?中預留了一個指針長度的 RA 空間,line 10 剛好將新的函數(shù)指針?&pfn?設置到該 RA 上。
  • 最后執(zhí)行?ret?指令時,函數(shù)流程將跳到?pfn?處執(zhí)行。這樣,整個協(xié)程上下文的切換就完成了。
  • 如何使用 libco

    我們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應用程序如何使用 libco 來編寫服務端程序。 在 example_echosvr.cpp 的?main?函數(shù)中,主要執(zhí)行如下幾步:

  • 創(chuàng)建 socket,監(jiān)聽在本機的 1024 端口,并設置為非阻塞;
  • 主線程使用函數(shù)?readwrite_coroutine?創(chuàng)建多個讀寫協(xié)程,調(diào)用?co_resume?啟動協(xié)程運行直到其掛起。這里我們忽略掉無關的多進程 fork 的過程;
  • 主線程繼續(xù)創(chuàng)建 socket 接收協(xié)程 accpet_co,同樣調(diào)用?co_resume?啟動協(xié)程直到其掛起;
  • 主線程調(diào)用函數(shù)?co_eventloop?實現(xiàn)事件的監(jiān)聽和協(xié)程的循環(huán)切換;
  • 函數(shù)?readwrite_coroutine?在外層循環(huán)中將新創(chuàng)建的讀寫協(xié)程都加入到隊列?g_readwrite?中,此時這些讀寫協(xié)程都沒有具體與某個 socket 連接對應,可以將隊列?g_readwrite?看成一個 coroutine pool。當加入到隊列中之后,調(diào)用函數(shù)?co_yield_ct?函數(shù)讓出 CPU,此時控制權回到主線程。

    主線程中的函數(shù)?co_eventloop?監(jiān)聽網(wǎng)絡事件,將來自于客戶端新進的連接交由協(xié)程 accept_co 處理,關于?co_eventloop?如何喚醒 accept_co 的細節(jié)我們將在后續(xù)介紹。accept_co 調(diào)用函數(shù)?accept_routine?接收新連接,該函數(shù)的流程如下:

  • 檢查隊列?g_readwrite?是否有空閑的讀寫 coroutine,如果沒有,調(diào)用函數(shù)?poll?將該協(xié)程加入到 Epoll 管理的定時器隊列中,也就是 sleep(1000) 的作用;
  • 調(diào)用?co_accept?來接收新連接,如果接收連接失敗,那么調(diào)用?co_poll?將服務端的?listen_fd?加入到 Epoll 中來觸發(fā)下一次連接事件;
  • 對于成功的連接,從?g_readwrite?中取出一個讀寫協(xié)程來負責處理讀寫;
  • 再次回到函數(shù)?readwrite_coroutine?中,該函數(shù)會調(diào)用?co_poll?將新建立的連接的 fd 加入到 Epoll 監(jiān)聽中,并將控制流程返回到 main 協(xié)程;當有讀或者寫事件發(fā)生時,Epoll 會喚醒對應的 coroutine ,繼續(xù)執(zhí)行?read?函數(shù)以及?write?函數(shù)。

    上面的過程大致說明了控制流程是如何在不同的協(xié)程中切換,接下來我們介紹具體的實現(xiàn)細節(jié),即如何通過 Epoll 來管理協(xié)程,以及如何對系統(tǒng)函數(shù)進行改造以滿足 libco 的調(diào)用。

    通過 Epoll 管理和喚醒協(xié)程

    Epoll 監(jiān)聽 FD

    上一章節(jié)中介紹了協(xié)程可以通過函數(shù)?co_poll?來將 fd 交由 Epoll 管理,待 Epoll 的相應的事件觸發(fā)時,再切換回來執(zhí)行 read 或者 write 操作,從而實現(xiàn)由 Epoll 管理協(xié)程的功能。co_poll?函數(shù)原型如下:

    int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout_ms)

    stCoEpoll_t?是為 libco 定制的 Epoll 相關數(shù)據(jù)結構,fds?是?pollfd?結構的文件句柄,nfds?為?fds?數(shù)組的長度,最后一個參數(shù)表示定時器時間,也就是在?timeout?毫秒之后觸發(fā)處理這些文件句柄。這里可以看到,co_poll?能夠同時將多個文件句柄同時加入到 Epoll 管理中。我們先看?stCoEpoll_t?結構:

    struct stCoEpoll_t{int iEpollFd; // Epoll 主 FDstatic const int _EPOLL_SIZE = 1024 * 10; // Epoll 可以監(jiān)聽的句柄總數(shù)struct stTimeout_t *pTimeout; // 時間輪定時器struct stTimeoutItemLink_t *pstTimeoutList; // 已經(jīng)超時的時間struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件co_epoll_res *result; // Epoll 返回的事件結果};

    以?stTimeout_?開頭的數(shù)據(jù)結構與 libco 的定時器管理有關,我們在后面介紹。co_epoll_res?是對 Epoll 事件數(shù)據(jù)結構的封裝,也就是每次觸發(fā) Epoll 事件時的返回結果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,因此這里也保留了 kevent 數(shù)據(jù)結構。

    struct co_epoll_res{int size;struct epoll_event *events; // for linux epollstruct kevent *eventlist; // for Unix or MacOs kqueue};

    co_poll?實際是對函數(shù)?co_poll_inner?的封裝。我們將?co_epoll_inner?函數(shù)的結構分為上下兩半段。在上半段中,調(diào)用?co_poll?的協(xié)程?CC?將其需要監(jiān)聽的句柄數(shù)組?fds?都加入到 Epoll 管理中,并通過函數(shù)?co_yield_env?讓出 CPU;當 main 協(xié)程的事件循環(huán)?co_eventloop?中觸發(fā)了?CC?對應的監(jiān)聽事件時,會恢復?CC的執(zhí)行。此時,CC?將開始執(zhí)行下半段,即將上半段添加的句柄?fds?從 epoll 中移除,清理殘留的數(shù)據(jù)結構,下面的流程圖簡要說明了控制流的轉移過程:

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??

    有了上面的基本概念,我們來看具體的實現(xiàn)細節(jié)。co_poll?首先在內(nèi)部將傳入的文件句柄數(shù)組?fds?轉化為數(shù)據(jù)結構?stPoll_t,這一步主要是為了方便后續(xù)處理。該結構記錄了?iEpollFd,ndfs,fds?數(shù)組,以及該協(xié)程需要執(zhí)行的函數(shù)和參數(shù)。有兩點需要說明的是:

  • 對于每一個 fd,為其申請一個?stPollItem_t?來管理對應 Epoll 事件以及記錄回調(diào)參數(shù)。libco 在此做了一個小的優(yōu)化,對于長度小于 2 的?fds?數(shù)組,直接在棧上定義相應的?stPollItem_t?數(shù)組,否則從堆中申請內(nèi)存。這也是一種比較常見的優(yōu)化,畢竟從堆中申請內(nèi)存比較耗時;
  • 函數(shù)指針?OnPollProcessEvent?封裝了協(xié)程的切換過程。當傳入指定的?stPollItem_t?結構時,即可喚醒對應于該結構的 coroutine,將控制權交由其執(zhí)行;
  • co_poll?的第二步,也是最關鍵的一步,就是將 fd 數(shù)組全部加入到 Epoll 中進行監(jiān)聽。協(xié)程?CC?會將每一個 epoll_event 的?data.ptr?域設置為對應的?stPollItem_t?結構。這樣當事件觸發(fā)時,可以直接從對應的?ptr中取出?stPollItem_t?結構,然后喚醒指定協(xié)程。

    如果本次操作提供了 Timeout 參數(shù),co_poll?還會將協(xié)程?CC?本次操作對應的?stPoll_t?加入到定時器隊列中。這表明在 Timeout 定時觸發(fā)之后,也會喚醒協(xié)程?CC?的執(zhí)行。當整個上半段都完成后,co_poll?立即調(diào)用?co_yield_env?讓出 CPU,執(zhí)行流程跳轉回到 main 協(xié)程中。

    從上面的流程圖中也可以看出,當執(zhí)行流程再次跳回時,表明協(xié)程?CC?添加的讀寫等監(jiān)聽事件已經(jīng)觸發(fā),即可以執(zhí)行相應的讀寫操作了。此時?CC?首先將其在上半段中添加的監(jiān)聽事件從 Epoll 中刪除,清理殘留的數(shù)據(jù)結構,然后調(diào)用讀寫邏輯。

    定時器實現(xiàn)

    協(xié)程?CC?在將一組?fds?加入 Epoll 的同時,還能為其設置一個超時時間。在超時時間到期時,也會再次喚醒?CC?來執(zhí)行。libco 使用 Timing-Wheel 來實現(xiàn)定時器。關于 Timing-Wheel 算法,可以參考,其優(yōu)勢是 O(1) 的插入和刪除復雜度,缺點是只有有限的長度,在某些場合下不能滿足需求。

    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??

    回過去看?stCoEpoll_t?結構,其中?*pTimeout?代表時間輪,通過函數(shù)?AllocateTimeout?初始化為一個固定大小(60 * 1000)的數(shù)組。根據(jù) Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時事件。而實際上,在添加定時器時,libco 要求定時時間不超過 40s。成員?pstTimeoutList?記錄在?co_eventloop?中發(fā)生超時的事件,而?pstActiveList?記錄當前活躍的事件,包括超時事件。這兩個結構都將在?co_eventloop?中進行處理。

    下面我們簡要分析一下加入定時器的實現(xiàn):

    int AddTimeout( stTimeout_t *apTimeout, stTimeoutItem_t *apItem, unsigned long long allNow ){if( apTimeout->ullStart == 0 ) // 初始化時間輪的基準時間{apTimeout->ullStart = allNow;apTimeout->llStartIdx = 0; // 當前時間輪指針指向數(shù)組0}// 1. 當前時間不可能小于時間輪的基準時間// 2. 加入的定時器的超時時間不能小于當前時間if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow ){return __LINE__;}int diff = apItem->ullExpireTime - apTimeout->ullStart;if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過時間輪的大小{return __LINE__;}// 插入到時間輪盤的指定位置AddTail( apTimeout->pItems +(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );return 0;}

    定時器的超時檢查在函數(shù)?co_eventloop?中執(zhí)行。

    EPOLL 事件循環(huán)

    main 協(xié)程通過調(diào)用函數(shù)?co_eventloop?來監(jiān)聽 Epoll 事件,并在相應的事件觸發(fā)時切換到指定的協(xié)程執(zhí)行。有關?co_eventloop?與 應用協(xié)程的交互過程在上一節(jié)的流程圖中已經(jīng)比較清楚了,下面我們主要介紹一下?co_eventloop?函數(shù)的實現(xiàn):

    上文中也提到,通過?epoll_wait?返回的事件都保存在?stCoEpoll_t?結構的?co_epoll_res?中。因此?co_eventloop?首先為?co_epoll_res?申請空間,之后通過一個無限循環(huán)來監(jiān)聽所有 coroutine 添加的所有事件:

    for(;;){int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );...}

    對于每一個觸發(fā)的事件,co_eventloop?首先通過指針域?data.ptr?取出保存的?stPollItem_t?結構,并將其添加到?pstActiveList?列表中;之后從定時器輪盤中取出所有已經(jīng)超時的事件,也將其全部添加到?pstActiveList?中,pstActiveList?中的所有事件都作為活躍事件處理。

    對于每一個活躍事件,co_eventloop?將通過調(diào)用對應的?pfnProcess?也就是上圖中的OnPollProcessEvent?函數(shù)來切換到該事件對應的 coroutine,將流程跳轉到該 coroutine 處執(zhí)行。

    最后?co_eventloop?在調(diào)用時也提供一個額外的參數(shù)來供調(diào)用者傳入一個函數(shù)指針?pfn。該函數(shù)將會在每次循環(huán)完成之后執(zhí)行;當該函數(shù)返回 -1 時,將會終止整個事件循環(huán)。用戶可以利用該函數(shù)來控制 main 協(xié)程的終止或者完成一些統(tǒng)計需求。

    ?

    鏈接:https://blog.csdn.net/qq_25424545/article/details/81529717

    總結

    以上是生活随笔為你收集整理的C/C++协程实现-学习笔记的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。