【技术干货】浅析State-Thread
State-Thread(以下簡稱st),是一個由C語言編寫的小巧、簡潔卻高效的開源協(xié)程庫。這個庫基于單線程運作、不強制占用用戶線程,給予了開發(fā)者最大程度的輕量級和較低的侵入性。本篇文章中,網(wǎng)易云信音視頻研發(fā)大神將為大家簡要分析State-Thread,歡迎大家積極留言,和我們共同討論。
在開始這個話題之前,我們先來聊一聊協(xié)程。
什么是協(xié)程?
協(xié)程是一種程序組件。通常我們把協(xié)程理解為是一種程序自己實現(xiàn)調(diào)度、用于提高運行效率、降低開發(fā)復(fù)雜度的東西。提高運行效率很好理解,因為在程序?qū)幼约和瓿闪瞬糠值恼{(diào)度,降低了對系統(tǒng)調(diào)度的依賴,減少了大量的中斷和換頁操作。而降低了開發(fā)復(fù)雜度,則是指對于開發(fā)者而言,可以使用同步的方式去進行代碼開發(fā)(不需要考慮異步模型的諸多回調(diào)),也不需要考慮多線程模型的線程調(diào)度和諸多的臨界資源問題。
很多語言都擁有協(xié)程,例如python或者golang。而對于c/c++而言,通常實現(xiàn)協(xié)程的常見方式,通常是依賴于glibc提供的setjump&longjump或者基于匯編語言,當然還有基于語義實現(xiàn)(protothread)。linux上使用協(xié)程庫的方式,通常也會分為替換函數(shù)和更為暴力的替換so來實現(xiàn)。當然而各種方式有各自的優(yōu)劣。而st選用的匯編語言實現(xiàn)setjump&longjump和要求用戶調(diào)用st_打頭的函數(shù)來嵌入程序。所以st具備了跨平臺的能力,以及讓開發(fā)者們更開心的“與允許調(diào)用者自行選擇切換時機”的能力。
st究竟是如何實現(xiàn)了這一切?
首先我們先看看st的整體工作流程:
在宏觀的來看,ST的結(jié)構(gòu)主要分成:
vp_schedule。主要是負責(zé)了一個調(diào)度的能力。有點類似于linux內(nèi)核當中的schedule()函數(shù)。每次當這個函數(shù)被調(diào)用的時候,都會完成一次線程的切換。
各種Queue。用于保存各種狀態(tài)下等待被調(diào)度協(xié)程(st_thread)
Timer。用于記錄各種超時和sleep。
poll。用于監(jiān)聽各種io事件,會根據(jù)系統(tǒng)能力不同而進行切換(kqueue、epoll、poll、select)。
st_thread。用于保存各種協(xié)程的信息。
其中比較重要的是schedule模塊和thread模塊兩者。這兩者實現(xiàn)了一個完整的協(xié)程切換和調(diào)度。屬于st的核心。而schedule部分通常是開發(fā)者們最需要關(guān)心的部分。
接下來我們會深入到代碼層,看一下具體在這個過程里做了些什么。
通常對于st而言,所有暴露給用戶的除了init函數(shù),就是一系列的st_xxx函數(shù)了。那么先看看init函數(shù)。
int st_init(void){_st_thread_t *thread;if (_st_active_count) {/* Already initialized */return 0;}/* We can ignore return value here */st_set_eventsys(ST_EVENTSYS_DEFAULT);if (_st_io_init() < 0)return -1;memset(&_st_this_vp, 0, sizeof(_st_vp_t));ST_INIT_CLIST(&_ST_RUNQ);ST_INIT_CLIST(&_ST_IOQ);ST_INIT_CLIST(&_ST_ZOMBIEQ);if ((*_st_eventsys->init)() < 0)return -1;_st_this_vp.pagesize = getpagesize();_st_this_vp.last_clock = st_utime();/** Create idle thread*/_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,NULL, 0, 0);if (!_st_this_vp.idle_thread)return -1;_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;_st_active_count--;_ST_DEL_RUNQ(_st_this_vp.idle_thread);/** Initialize primordial thread*/thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +(ST_KEYS_MAX * sizeof(void *)));if (!thread)return -1;thread->private_data = (void **) (thread + 1);thread->state = _ST_ST_RUNNING;thread->flags = _ST_FL_PRIMORDIAL;_ST_SET_CURRENT_THREAD(thread);_st_active_count++;return 0;}這段函數(shù)一共做了3事情,創(chuàng)建了一個idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、_ST_ZOMBIEQ三個隊列,把當前調(diào)用者初始化成原始函數(shù)(通常st_init會在main里面調(diào)用,所以這個原始的thread相當于是主線程)。idle_thread函數(shù),其實就是整個IO和定時器相關(guān)的本體函數(shù)了。st會在每一次_ST_RUNQ運行完成后,調(diào)用idle_thread來獲取可讀寫的io和定時器。這個我們后續(xù)再說。
那么,st_xxx一般會分成io類和延遲類(sleep)。兩者入口其實是同一個,只不過在io類的會多調(diào)用一層。我們這里選擇st_send為代表。
int st_sendmsg(_st_netfd_t *fd, const struct msghdr *msg, int flags,st_utime_t timeout){int n;while ((n = sendmsg(fd->osfd, msg, flags)) < 0) {if (errno == EINTR)continue;if (!_IO_NOT_READY_ERROR)return -1;/* Wait until the socket becomes writable */if (st_netfd_poll(fd, POLLOUT, timeout) < 0)return -1;}return n;}本質(zhì)上所有的st函數(shù)都是以異步接口+ st_netfd_poll來實現(xiàn)的。在st_netfd_poll以內(nèi),會去調(diào)用st_poll,而st_poll本質(zhì)上會調(diào)用并且切換線程。
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout){struct pollfd pd;int n;pd.fd = fd->osfd;pd.events = (short) how;pd.revents = 0;if ((n = st_poll(&pd, 1, timeout)) < 0)return -1;if (n == 0) {/* Timed out */errno = ETIME;return -1;}if (pd.revents & POLLNVAL) {errno = EBADF;return -1;}return 0;}int st_poll(struct pollfd *pds, int npds, st_utime_t timeout){struct pollfd *pd;struct pollfd *epd = pds + npds;_st_pollq_t pq;_st_thread_t *me = _ST_CURRENT_THREAD();int n;if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT;errno = EINTR;return -1;}if ((*_st_eventsys->pollset_add)(pds, npds) < 0)return -1;pq.pds = pds;pq.npds = npds;pq.thread = me;pq.on_ioq = 1;_ST_ADD_IOQ(pq);if (timeout != ST_UTIME_NO_TIMEOUT)_ST_ADD_SLEEPQ(me, timeout);me->state = _ST_ST_IO_WAIT;_ST_SWITCH_CONTEXT(me);n = 0;if (pq.on_ioq) {/* If we timed out, the pollq might still be on the ioq. Remove it */_ST_DEL_IOQ(pq);(*_st_eventsys->pollset_del)(pds, npds);} else {/* Count the number of ready descriptors */for (pd = pds; pd < epd; pd++) {if (pd->revents)n++;}}if (me->flags & _ST_FL_INTERRUPT) {me->flags &= ~_ST_FL_INTERRUPT;errno = EINTR;return -1;}return n;}那么到此為止,st_poll中就出現(xiàn)了我們最關(guān)心的調(diào)度部分了。
當一個線程進行調(diào)度的時候一般都是poll_add(如果是io操作), add_queue,?_ST_SWITCH_CONTEXT完成一次調(diào)度。根據(jù)不同的類型,會add到不同的queue。
例如需要超時,則會add到IOQ和SLEEPQ。而_ST_SWITCH_CONTEXT,則是最關(guān)鍵的切換線程操作了。
_ST_SWITCH_CONTEXT其實是一個宏,它的本質(zhì)是調(diào)用了MD_SETJMP和_st_vp_schedule().
#define _ST_SWITCH_CONTEXT(_thread) \ST_BEGIN_MACRO \ST_SWITCH_OUT_CB(_thread); \if (!MD_SETJMP((_thread)->context)) { \_st_vp_schedule(); \} \ST_DEBUG_ITERATE_THREADS(); \ST_SWITCH_IN_CB(_thread); ?\ST_END_MACRO這個函數(shù)其實就是一個完成的線程切換了。在st里線程的切換會使用MD_SETJMP->_st_vp_schedule->MD_LONGJMP。
MD_SETJMP和MD_LONGJMP其實就是st使用匯編自己寫的setjmp和longjmp函數(shù)(glibc),效果也是幾乎等效的。(因為st本身會做平臺適配,所以我們以x86-64的匯編為例)
#elif defined(__amd64__) || defined(__x86_64__) /* * Internal __jmp_buf layout */ #define JB_RBX 0 #define JB_RBP 1 #define JB_R12 2#define JB_R13 3 #define JB_R14 4 #define JB_R15 5 #define JB_RSP 6#define JB_PC 7 ?.file "md.S".text/* _st_md_cxt_save(__jmp_buf env) */.globl _st_md_cxt_save.type _st_md_cxt_save, @function .align 16 _st_md_cxt_save:/** Save registers. */movq %rbx, (JB_RBX*8)(%rdi)movq %rbp, (JB_RBP*8)(%rdi) movq %r12, (JB_R12*8)(%rdi)movq %r13, (JB_R13*8)(%rdi)movq %r14, (JB_R14*8)(%rdi)movq %r15, (JB_R15*8)(%rdi)/* Save SP */leaq 8(%rsp), %rdxmovq %rdx, (JB_RSP*8)(%rdi)/* Save PC we are returning to */movq (%rsp), %raxmovq %rax, (JB_PC*8)(%rdi)xorq %rax, %raxret.size _st_md_cxt_save, .-_st_md_cxt_save /****************************************************************/ /* _st_md_cxt_restore(__jmp_buf env, int val) */.globl _st_md_cxt_restore .type _st_md_cxt_restore, @function.align 16 _st_md_cxt_restore:/* * Restore registers.*/movq (JB_RBX*8)(%rdi), %rbxmovq (JB_RBP*8)(%rdi), %rbp movq (JB_R12*8)(%rdi), %r12movq (JB_R13*8)(%rdi), %r13movq (JB_R14*8)(%rdi), %r14movq (JB_R15*8)(%rdi), %r15 /* Set return value */test %esi, %esimov $01, %eaxcmove %eax, %esimov %esi, %eaxmovq (JB_PC*8)(%rdi), %rdxmovq (JB_RSP*8)(%rdi), %rsp/* Jump to saved PC */jmpq *%rdx.size _st_md_cxt_restore, .-_st_md_cxt_restore/****************************************************************/MD_SETJMP的時候,會使用匯編把所有寄存器的信息保留下來,而MD_LONGJMP則會把所有的寄存器信息重新加載出來。兩者配合使用的時候,可以完成一次函數(shù)間的跳轉(zhuǎn)。
那么我們已經(jīng)看到了MD_SETJMP的調(diào)用,MD_LONGJMP調(diào)用在哪兒呢?
讓我們繼續(xù)看下去,在最一開始,我們就提及過_st_vp_schedule()這個核心函數(shù)。
void _st_vp_schedule(void){_st_thread_t *thread;if (_ST_RUNQ.next != &_ST_RUNQ) {/* Pull thread off of the run queue */thread = _ST_THREAD_PTR(_ST_RUNQ.next);_ST_DEL_RUNQ(thread);} else {/* If there are no threads to run, switch to the idle thread */thread = _st_this_vp.idle_thread;}ST_ASSERT(thread->state == _ST_ST_RUNNABLE);/* Resume the thread */thread->state = _ST_ST_RUNNING;_ST_RESTORE_CONTEXT(thread);}這個函數(shù)其實非常簡單,基本工作原理可以認為是執(zhí)行以下幾步:?
1.查看當前RUNQ是否有可以調(diào)用的,如果有,則RUNQ pop一個thread。?
2. 如果沒有,則運行idle_thread。
3. 調(diào)用_ST_RESTORE_CONTEXT。
那么_ST_RESTORE_CONTEXT做了什么呢?
#define _ST_RESTORE_CONTEXT(_thread) \ST_BEGIN_MACRO \_ST_SET_CURRENT_THREAD(_thread); \MD_LONGJMP((_thread)->context, 1); \ST_END_MACRO簡單來說,_ST_RESTORE_CONTEXT就是調(diào)用了我們之前所沒有看到的MD_LONGJMP。
所以,我們可以簡單地認為,在攜程需要schedule的時候,會先把自身當前的棧通過MD_SETJMP保存起來,當線程被schedule再次調(diào)度出來的時候,則會使用MD_SETJMP來還原棧,完成一次協(xié)程切換。
然后我們來看看idle_thread做了什么。
雖然這個協(xié)程名字叫做idle,但是其實做了很多的事情。
void *_st_idle_thread_start(void *arg){_st_thread_t *me = _ST_CURRENT_THREAD();while (_st_active_count > 0) {/* Idle vp till I/O is ready or the smallest timeout expired */_ST_VP_IDLE();/* Check sleep queue for expired threads */_st_vp_check_clock();me->state = _ST_ST_RUNNABLE;_ST_SWITCH_CONTEXT(me);}/* No more threads */exit(0);/* NOTREACHED */return NULL;}總的來說,idle_thread做了兩件事情。
1. _ST_VP_IDLE()?
2. _st_vp_check_clock()。
_st_vp_check_clock很好理解,就是檢查定時器是否超時,如果超時了,則設(shè)置超時標記之后,放回RUNQ。而_ST_VP_IDLE,其實就是查看io是否已經(jīng)ready了。例如linux的話,則會調(diào)用
epoll_wait(_st_epoll_data->epfd,
_st_epoll_data->evtlist,?
_st_epoll_data->evtlist_size, timeout)
去查看是否有可響應(yīng)的io。timeout值會根據(jù)當前空閑情況進行變化,通常來說會是一個極小的值。
那么看到這里,整體的線程調(diào)度已經(jīng)全部走完了。(詳見前面最一開始的流程圖)總體流程總結(jié)來說基本上是func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。
所以對于st而言,所以的調(diào)度,是基于用戶調(diào)用。那么如果用戶一直不調(diào)用st_xxx()(例如計算密集性服務(wù)),st也就無法進行協(xié)程切換,那么其他協(xié)程也就產(chǎn)生極大的阻塞了。這也是為什么st并不太合適計算密集型的原因(其實單線程框架大多都不合適計算密集型)。
點擊“閱讀原文”,技術(shù)干貨,行業(yè)洞察。
總結(jié)
以上是生活随笔為你收集整理的【技术干货】浅析State-Thread的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 互联网1分钟 |1101
- 下一篇: 即时通讯音视频开发(一):视频编解码之理