Go netpoller 网络模型之源码全面解析
作者:allanpan,騰訊 IEG 后臺(tái)開發(fā)工程師
近兩萬字長(zhǎng)文從 Linux 底層 Nonblocking I/O、 I/O multiplexing: select/epoll 以及 Go 源碼全方位剖析 Go 語言的網(wǎng)絡(luò)模型和底層實(shí)現(xiàn);最后介紹分析當(dāng)前主流的高性能開源網(wǎng)絡(luò)庫(kù)所使用的經(jīng)典 Reactors 模式,以及如何基于此實(shí)現(xiàn)一個(gè)?(在某些特定場(chǎng)景下)?比 Go 原生網(wǎng)絡(luò)庫(kù)性能更好的網(wǎng)絡(luò)庫(kù)??赡苁侨W(wǎng)最詳盡的 Go 網(wǎng)絡(luò)底層剖析文章,一文帶你完全吃透 Go 語言的網(wǎng)絡(luò)編程底層原理。
導(dǎo)言
Go 基于 I/O multiplexing 和 goroutine scheduler 構(gòu)建了一個(gè)簡(jiǎn)潔而高性能的原生網(wǎng)絡(luò)模型(基于 Go 的 I/O 多路復(fù)用 netpoller ),提供了 goroutine-per-connection 這樣簡(jiǎn)單的網(wǎng)絡(luò)編程模式。在這種模式下,開發(fā)者使用的是同步的模式去編寫異步的邏輯,極大地降低了開發(fā)者編寫網(wǎng)絡(luò)應(yīng)用時(shí)的心智負(fù)擔(dān),且借助于 Go runtime scheduler 對(duì) goroutines 的高效調(diào)度,這個(gè)原生網(wǎng)絡(luò)模型不論從適用性還是性能上都足以滿足絕大部分的應(yīng)用場(chǎng)景。
然而,在工程性上能做到如此高的普適性和兼容性,最終暴露給開發(fā)者提供接口/模式如此簡(jiǎn)潔,其底層必然是基于非常復(fù)雜的封裝,做了很多取舍,也有可能放棄了一些追求極致性能的設(shè)計(jì)和理念。事實(shí)上 Go netpoller 底層就是基于 epoll/kqueue/iocp 這些 I/O 多路復(fù)用技術(shù)來做封裝的,最終暴露出 goroutine-per-connection 這樣的極簡(jiǎn)的開發(fā)模式給使用者。
Go netpoller 在不同的操作系統(tǒng),其底層使用的 I/O 多路復(fù)用技術(shù)也不一樣,可以從 Go 源碼目錄結(jié)構(gòu)和對(duì)應(yīng)代碼文件了解 Go 在不同平臺(tái)下的網(wǎng)絡(luò) I/O 模式的實(shí)現(xiàn)。比如,在 Linux 系統(tǒng)下基于 epoll,freeBSD 系統(tǒng)下基于 kqueue,以及 Windows 系統(tǒng)下基于 iocp。
本文將基于 Linux 平臺(tái)來解析 Go netpoller 之 I/O 多路復(fù)用的底層是如何基于 epoll 封裝實(shí)現(xiàn)的,從源碼層層推進(jìn),全面而深度地解析 Go netpoller 的設(shè)計(jì)理念和實(shí)現(xiàn)原理,以及 Go 是如何利用 netpoller 來構(gòu)建它的原生網(wǎng)絡(luò)模型的。主要涉及到的一些概念:I/O 模型、用戶/內(nèi)核空間、epoll、Linux 源碼、goroutine scheduler 等等,我會(huì)盡量簡(jiǎn)單地講解,如果有對(duì)相關(guān)概念不熟悉的同學(xué),還是希望能提前熟悉一下。
用戶空間與內(nèi)核空間
現(xiàn)代操作系統(tǒng)都是采用虛擬存儲(chǔ)器,那么對(duì) 32 位操作系統(tǒng)而言,它的尋址空間(虛擬存儲(chǔ)空間)為 4G(2 的 32 次方)。操作系統(tǒng)的核心是內(nèi)核,獨(dú)立于普通的應(yīng)用程序,可以訪問受保護(hù)的內(nèi)存空間,也有訪問底層硬件設(shè)備的所有權(quán)限。為了保證用戶進(jìn)程不能直接操作內(nèi)核(kernel),保證內(nèi)核的安全,操心系統(tǒng)將虛擬空間劃分為兩部分,一部分為內(nèi)核空間,一部分為用戶空間。針對(duì) Linux 操作系統(tǒng)而言,將最高的 1G 字節(jié)(從虛擬地址 0xC0000000 到 0xFFFFFFFF),供內(nèi)核使用,稱為內(nèi)核空間,而將較低的 3G 字節(jié)(從虛擬地址 0x00000000 到 0xBFFFFFFF),供各個(gè)進(jìn)程使用,稱為用戶空間。
現(xiàn)代的網(wǎng)絡(luò)服務(wù)的主流已經(jīng)完成從 CPU 密集型到 IO 密集型的轉(zhuǎn)變,所以服務(wù)端程序?qū)?I/O 的處理必不可少,而一旦操作 I/O 則必定要在用戶態(tài)和內(nèi)核態(tài)之間來回切換。I/O 模型
在神作《UNIX 網(wǎng)絡(luò)編程》里,總結(jié)歸納了 5 種 I/O 模型,包括同步和異步 I/O:
阻塞 I/O (Blocking I/O)
非阻塞 I/O (Nonblocking I/O)
I/O 多路復(fù)用 (I/O multiplexing)
信號(hào)驅(qū)動(dòng) I/O (Signal driven I/O)
異步 I/O (Asynchronous I/O)
操作系統(tǒng)上的 I/O 是用戶空間和內(nèi)核空間的數(shù)據(jù)交互,因此 I/O 操作通常包含以下兩個(gè)步驟:
等待網(wǎng)絡(luò)數(shù)據(jù)到達(dá)網(wǎng)卡(讀就緒)/等待網(wǎng)卡可寫(寫就緒) –> 讀取/寫入到內(nèi)核緩沖區(qū)
從內(nèi)核緩沖區(qū)復(fù)制數(shù)據(jù) –> 用戶空間(讀)/從用戶空間復(fù)制數(shù)據(jù) -> 內(nèi)核緩沖區(qū)(寫)
而判定一個(gè) I/O 模型是同步還是異步,主要看第二步:數(shù)據(jù)在用戶和內(nèi)核空間之間復(fù)制的時(shí)候是不是會(huì)阻塞當(dāng)前進(jìn)程,如果會(huì),則是同步 I/O,否則,就是異步 I/O。基于這個(gè)原則,這 5 種 I/O 模型中只有一種異步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。
這 5 種 I/O 模型的對(duì)比如下:
Non-blocking I/O
什么叫非阻塞 I/O,顧名思義就是:所有 I/O 操作都是立刻返回而不會(huì)阻塞當(dāng)前用戶進(jìn)程。I/O 多路復(fù)用通常情況下需要和非阻塞 I/O 搭配使用,否則可能會(huì)產(chǎn)生意想不到的問題。比如,epoll 的 ET(邊緣觸發(fā)) 模式下,如果不使用非阻塞 I/O,有極大的概率會(huì)導(dǎo)致阻塞 event-loop 線程,從而降低吞吐量,甚至導(dǎo)致 bug。
Linux 下,我們可以通過 fcntl 系統(tǒng)調(diào)用來設(shè)置 O_NONBLOCK 標(biāo)志位,從而把 socket 設(shè)置成 Non-blocking。當(dāng)對(duì)一個(gè) Non-blocking socket 執(zhí)行讀操作時(shí),流程是這個(gè)樣子:
當(dāng)用戶進(jìn)程發(fā)出 read 操作時(shí),如果 kernel 中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會(huì) block 用戶進(jìn)程,而是立刻返回一個(gè) EAGAIN error。
從用戶進(jìn)程角度講 ,它發(fā)起一個(gè) read 操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè) error 時(shí),它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是它可以再次發(fā)送 read 操作。一旦 kernel 中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的 system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存,然后返回。所以,Non-blocking I/O 的特點(diǎn)是用戶進(jìn)程需要不斷的主動(dòng)詢問 kernel 數(shù)據(jù)好了沒有。下一節(jié)我們要講的 I/O 多路復(fù)用需要和 Non-blocking I/O 配合才能發(fā)揮出最大的威力!
I/O 多路復(fù)用
所謂 I/O 多路復(fù)用指的就是 select/poll/epoll 這一系列的多路選擇器:支持單一線程同時(shí)監(jiān)聽多個(gè)文件描述符(I/O 事件),阻塞等待,并在其中某個(gè)文件描述符可讀寫時(shí)收到通知。I/O 復(fù)用其實(shí)復(fù)用的不是 I/O 連接,而是復(fù)用線程,讓一個(gè) thread of control 能夠處理多個(gè)連接(I/O 事件)。
select & poll
#include?<sys/select.h>/*?According?to?earlier?standards?*/ #include?<sys/time.h> #include?<sys/types.h> #include?<unistd.h>int?select(int?nfds,?fd_set?*readfds,?fd_set?*writefds,?fd_set?*exceptfds,?struct?timeval?*timeout);//?和 select 緊密結(jié)合的四個(gè)宏: void?FD_CLR(int?fd,?fd_set?*set); int?FD_ISSET(int?fd,?fd_set?*set); void?FD_SET(int?fd,?fd_set?*set); void?FD_ZERO(fd_set?*set);select 是 epoll 之前 Linux 使用的 I/O 事件驅(qū)動(dòng)技術(shù)。
理解 select 的關(guān)鍵在于理解 fd_set,為說明方便,取 fd_set 長(zhǎng)度為 1 字節(jié),fd_set 中的每一 bit 可以對(duì)應(yīng)一個(gè)文件描述符 fd,則 1 字節(jié)長(zhǎng)的 fd_set 最大可以對(duì)應(yīng) 8 個(gè) fd。select 的調(diào)用過程如下:
執(zhí)行 FD_ZERO(&set), 則 set 用位表示是 0000,0000
若 fd=5, 執(zhí)行 FD_SET(fd, &set); 后 set 變?yōu)?0001,0000(第 5 位置為 1)
再加入 fd=2, fd=1,則 set 變?yōu)?0001,0011
執(zhí)行 select(6, &set, 0, 0, 0) 阻塞等待
若 fd=1, fd=2 上都發(fā)生可讀事件,則 select 返回,此時(shí) set 變?yōu)?0000,0011 (注意:沒有事件發(fā)生的 fd=5 被清空)
基于上面的調(diào)用過程,可以得出 select 的特點(diǎn):
可監(jiān)控的文件描述符個(gè)數(shù)取決于 sizeof(fd_set) 的值。假設(shè)服務(wù)器上 sizeof(fd_set)=512,每 bit 表示一個(gè)文件描述符,則服務(wù)器上支持的最大文件描述符是 512*8=4096。fd_set 的大小調(diào)整可參考 【原創(chuàng)】技術(shù)系列之 網(wǎng)絡(luò)模型(二) 中的模型 2,可以有效突破 select 可監(jiān)控的文件描述符上限
將 fd 加入 select 監(jiān)控集的同時(shí),還要再使用一個(gè)數(shù)據(jù)結(jié)構(gòu) array 保存放到 select 監(jiān)控集中的 fd,一是用于在 select 返回后,array 作為源數(shù)據(jù)和 fd_set 進(jìn)行 FD_ISSET 判斷。二是 select 返回后會(huì)把以前加入的但并無事件發(fā)生的 fd 清空,則每次開始 select 前都要重新從 array 取得 fd 逐一加入(FD_ZERO 最先),掃描 array 的同時(shí)取得 fd 最大值 maxfd,用于 select 的第一個(gè)參數(shù)
可見 select 模型必須在 select 前循環(huán) array(加 fd,取 maxfd),select 返回后循環(huán) array(FD_ISSET 判斷是否有事件發(fā)生)
所以,select 有如下的缺點(diǎn):
最大并發(fā)數(shù)限制:使用 32 個(gè)整數(shù)的 32 位,即 32*32=1024 來標(biāo)識(shí) fd,雖然可修改,但是有以下第 2, 3 點(diǎn)的瓶頸
每次調(diào)用 select,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài),這個(gè)開銷在 fd 很多時(shí)會(huì)很大
性能衰減嚴(yán)重:每次 kernel 都需要線性掃描整個(gè) fd_set,所以隨著監(jiān)控的描述符 fd 數(shù)量增長(zhǎng),其 I/O 性能會(huì)線性下降
poll 的實(shí)現(xiàn)和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 結(jié)構(gòu)而不是 select 的 fd_set 結(jié)構(gòu),poll 解決了最大文件描述符數(shù)量限制的問題,但是同樣需要從用戶態(tài)拷貝所有的 fd 到內(nèi)核態(tài),也需要線性遍歷所有的 fd 集合,所以它和 select 只是實(shí)現(xiàn)細(xì)節(jié)上的區(qū)分,并沒有本質(zhì)上的區(qū)別。
epoll
epoll 是 Linux kernel 2.6 之后引入的新 I/O 事件驅(qū)動(dòng)技術(shù),I/O 多路復(fù)用的核心設(shè)計(jì)是 1 個(gè)線程處理所有連接的 等待消息準(zhǔn)備好 I/O 事件,這一點(diǎn)上 epoll 和 select&poll 是大同小異的。但 select&poll 錯(cuò)誤預(yù)估了一件事,當(dāng)數(shù)十萬并發(fā)連接存在時(shí),可能每一毫秒只有數(shù)百個(gè)活躍的連接,同時(shí)其余數(shù)十萬連接在這一毫秒是非活躍的。select&poll 的使用方法是這樣的:返回的活躍連接 == select(全部待監(jiān)控的連接) 。
什么時(shí)候會(huì)調(diào)用 select&poll 呢?在你認(rèn)為需要找出有報(bào)文到達(dá)的活躍連接時(shí),就應(yīng)該調(diào)用。所以,select&poll 在高并發(fā)時(shí)是會(huì)被頻繁調(diào)用的。這樣,這個(gè)頻繁調(diào)用的方法就很有必要看看它是否有效率,因?yàn)?#xff0c;它的輕微效率損失都會(huì)被 高頻 二字所放大。它有效率損失嗎?顯而易見,全部待監(jiān)控連接是數(shù)以十萬計(jì)的,返回的只是數(shù)百個(gè)活躍連接,這本身就是無效率的表現(xiàn)。被放大后就會(huì)發(fā)現(xiàn),處理并發(fā)上萬個(gè)連接時(shí),select&poll 就完全力不從心了。這個(gè)時(shí)候就該 epoll 上場(chǎng)了,epoll 通過一些新的設(shè)計(jì)和優(yōu)化,基本上解決了 select&poll 的問題。
epoll 的 API 非常簡(jiǎn)潔,涉及到的只有 3 個(gè)系統(tǒng)調(diào)用:
#include?<sys/epoll.h>?? int?epoll_create(int?size);?//?int?epoll_create1(int?flags); int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event); int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);其中,epoll_create 創(chuàng)建一個(gè) epoll 實(shí)例并返回 epollfd;epoll_ctl 注冊(cè) file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 實(shí)例上;epoll_wait 則是阻塞監(jiān)聽 epoll 實(shí)例上所有的 file descriptor 的 I/O 事件,它接收一個(gè)用戶空間上的一塊內(nèi)存地址 (events 數(shù)組),kernel 會(huì)在有 I/O 事件發(fā)生的時(shí)候把文件描述符列表復(fù)制到這塊內(nèi)存地址上,然后 epoll_wait 解除阻塞并返回,最后用戶空間上的程序就可以對(duì)相應(yīng)的 fd 進(jìn)行讀寫了:
#include?<unistd.h> ssize_t?read(int?fd,?void?*buf,?size_t?count); ssize_t?write(int?fd,?const?void?*buf,?size_t?count);epoll 的工作原理如下:
與 select&poll 相比,epoll 分清了高頻調(diào)用和低頻調(diào)用。例如,epoll_ctl 相對(duì)來說就是非頻繁調(diào)用的,而 epoll_wait 則是會(huì)被高頻調(diào)用的。所以 epoll 利用 epoll_ctl 來插入或者刪除一個(gè) fd,實(shí)現(xiàn)用戶態(tài)到內(nèi)核態(tài)的數(shù)據(jù)拷貝,這確保了每一個(gè) fd 在其生命周期只需要被拷貝一次,而不是每次調(diào)用 epoll_wait 的時(shí)候都拷貝一次。epoll_wait 則被設(shè)計(jì)成幾乎沒有入?yún)⒌恼{(diào)用,相比 select&poll 需要把全部監(jiān)聽的 fd 集合從用戶態(tài)拷貝至內(nèi)核態(tài)的做法,epoll 的效率就高出了一大截。
在實(shí)現(xiàn)上 epoll 采用紅黑樹來存儲(chǔ)所有監(jiān)聽的 fd,而紅黑樹本身插入和刪除性能比較穩(wěn)定,時(shí)間復(fù)雜度 O(logN)。通過 epoll_ctl 函數(shù)添加進(jìn)來的 fd 都會(huì)被放在紅黑樹的某個(gè)節(jié)點(diǎn)內(nèi),所以,重復(fù)添加是沒有用的。當(dāng)把 fd 添加進(jìn)來的時(shí)候時(shí)候會(huì)完成關(guān)鍵的一步:該 fd 會(huì)與相應(yīng)的設(shè)備(網(wǎng)卡)驅(qū)動(dòng)程序建立回調(diào)關(guān)系,也就是在內(nèi)核中斷處理程序?yàn)樗?cè)一個(gè)回調(diào)函數(shù),在 fd 相應(yīng)的事件觸發(fā)(中斷)之后(設(shè)備就緒了),內(nèi)核就會(huì)調(diào)用這個(gè)回調(diào)函數(shù),該回調(diào)函數(shù)在內(nèi)核中被稱為:ep_poll_callback ,這個(gè)回調(diào)函數(shù)其實(shí)就是把這個(gè) fd 添加到 rdllist 這個(gè)雙向鏈表(就緒鏈表)中。epoll_wait 實(shí)際上就是去檢查 rdllist 雙向鏈表中是否有就緒的 fd,當(dāng) rdllist 為空(無就緒 fd)時(shí)掛起當(dāng)前進(jìn)程,直到 rdllist 非空時(shí)進(jìn)程才被喚醒并返回。
相比于 select&poll 調(diào)用時(shí)會(huì)將全部監(jiān)聽的 fd 從用戶態(tài)空間拷貝至內(nèi)核態(tài)空間并線性掃描一遍找出就緒的 fd 再返回到用戶態(tài),epoll_wait 則是直接返回已就緒 fd,因此 epoll 的 I/O 性能不會(huì)像 select&poll 那樣隨著監(jiān)聽的 fd 數(shù)量增加而出現(xiàn)線性衰減,是一個(gè)非常高效的 I/O 事件驅(qū)動(dòng)技術(shù)。
由于使用 epoll 的 I/O 多路復(fù)用需要用戶進(jìn)程自己負(fù)責(zé) I/O 讀寫,從用戶進(jìn)程的角度看,讀寫過程是阻塞的,所以 select&poll&epoll 本質(zhì)上都是同步 I/O 模型,而像 Windows 的 IOCP 這一類的異步 I/O,只需要在調(diào)用 WSARecv 或 WSASend 方法讀寫數(shù)據(jù)的時(shí)候把用戶空間的內(nèi)存 buffer 提交給 kernel,kernel 負(fù)責(zé)數(shù)據(jù)在用戶空間和內(nèi)核空間拷貝,完成之后就會(huì)通知用戶進(jìn)程,整個(gè)過程不需要用戶進(jìn)程參與,所以是真正的異步 I/O。
延伸
另外,我看到有些文章說 epoll 之所以性能高是因?yàn)槔昧?Linux 的 mmap 內(nèi)存映射讓內(nèi)核和用戶進(jìn)程共享了一片物理內(nèi)存,用來存放就緒 fd 列表和它們的數(shù)據(jù) buffer,所以用戶進(jìn)程在 epoll_wait 返回之后用戶進(jìn)程就可以直接從共享內(nèi)存那里讀取/寫入數(shù)據(jù)了,這讓我很疑惑,因?yàn)槭紫瓤?epoll_wait 的函數(shù)聲明:
int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);第二個(gè)參數(shù):就緒事件列表,是需要在用戶空間分配內(nèi)存然后再傳給 epoll_wait 的,如果內(nèi)核會(huì)用 mmap 設(shè)置共享內(nèi)存,直接傳遞一個(gè)指針進(jìn)去就行了,根本不需要在用戶態(tài)分配內(nèi)存,多此一舉。其次,內(nèi)核和用戶進(jìn)程通過 mmap 共享內(nèi)存是一件極度危險(xiǎn)的事情,內(nèi)核無法確定這塊共享內(nèi)存什么時(shí)候會(huì)被回收,而且這樣也會(huì)賦予用戶進(jìn)程直接操作內(nèi)核數(shù)據(jù)的權(quán)限和入口,非常容易出現(xiàn)大的系統(tǒng)漏洞,因此一般極少會(huì)這么做。所以我很懷疑 epoll 是不是真的在 Linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 Linux kernel 源碼:
/**?Implement?the?event?wait?interface?for?the?eventpoll?file.?It?is?the?kernel*?part?of?the?user?space?epoll_wait(2).*/ static?int?do_epoll_wait(int?epfd,?struct?epoll_event?__user?*events,int?maxevents,?int?timeout) {.../*?Time?to?fish?for?events?...?*/error?=?ep_poll(ep,?events,?maxevents,?timeout); }//?如果?epoll_wait?入?yún)r(shí)設(shè)定?timeout?==?0,?那么直接通過?ep_events_available?判斷當(dāng)前是否有用戶感興趣的事件發(fā)生,如果有則通過?ep_send_events?進(jìn)行處理 //?如果設(shè)置 timeout >?0,并且當(dāng)前沒有用戶關(guān)注的事件發(fā)生,則進(jìn)行休眠,并添加到 ep->wq 等待隊(duì)列的頭部;對(duì)等待事件描述符設(shè)置 WQ_FLAG_EXCLUSIVE 標(biāo)志 //?ep_poll?被事件喚醒后會(huì)重新檢查是否有關(guān)注事件,如果對(duì)應(yīng)的事件已經(jīng)被搶走,那么?ep_poll?會(huì)繼續(xù)休眠等待 static?int?ep_poll(struct?eventpoll?*ep,?struct?epoll_event?__user?*events,?int?maxevents,?long?timeout) {...send_events:/**?Try?to?transfer?events?to?user?space.?In?case?we?get?0?events?and*?there's?still?timeout?left?over,?we?go?trying?again?in?search?of*?more?luck.*///?如果一切正常,?有?event?發(fā)生,?就開始準(zhǔn)備數(shù)據(jù)?copy?給用戶空間了//?如果有就緒的事件發(fā)生,那么就調(diào)用?ep_send_events?將就緒的事件?copy?到用戶態(tài)內(nèi)存中,//?然后返回到用戶態(tài),否則判斷是否超時(shí),如果沒有超時(shí)就繼續(xù)等待就緒事件發(fā)生,如果超時(shí)就返回用戶態(tài)。//?從?ep_poll?函數(shù)的實(shí)現(xiàn)可以看到,如果有就緒事件發(fā)生,則調(diào)用?ep_send_events?函數(shù)做進(jìn)一步處理if?(!res?&&?eavail?&&!(res?=?ep_send_events(ep,?events,?maxevents))?&&?!timed_out)goto?fetch_events;... }//?ep_send_events?函數(shù)是用來向用戶空間拷貝就緒?fd?列表的,它將用戶傳入的就緒?fd?列表內(nèi)存簡(jiǎn)單封裝到 // ep_send_events_data 結(jié)構(gòu)中,然后調(diào)用 ep_scan_ready_list 將就緒隊(duì)列中的事件寫入用戶空間的內(nèi)存; //?用戶進(jìn)程就可以訪問到這些數(shù)據(jù)進(jìn)行處理 static?int?ep_send_events(struct?eventpoll?*ep,struct?epoll_event?__user?*events,?int?maxevents) {struct?ep_send_events_data?esed;esed.maxevents?=?maxevents;esed.events?=?events;//?調(diào)用?ep_scan_ready_list?函數(shù)檢查?epoll?實(shí)例?eventpoll?中的?rdllist?就緒鏈表,//?并注冊(cè)一個(gè)回調(diào)函數(shù)?ep_send_events_proc,如果有就緒?fd,則調(diào)用?ep_send_events_proc?進(jìn)行處理ep_scan_ready_list(ep,?ep_send_events_proc,?&esed,?0,?false);return?esed.res; }//?調(diào)用?ep_scan_ready_list?的時(shí)候會(huì)傳遞指向?ep_send_events_proc?函數(shù)的函數(shù)指針作為回調(diào)函數(shù), //?一旦有就緒?fd,就會(huì)調(diào)用?ep_send_events_proc?函數(shù) static?__poll_t?ep_send_events_proc(struct?eventpoll?*ep,?struct?list_head?*head,?void?*priv) {.../**?If?the?event?mask?intersect?the?caller-requested?one,*?deliver?the?event?to?userspace.?Again,?ep_scan_ready_list()*?is?holding?ep->mtx,?so?no?operations?coming?from?userspace*?can?change?the?item.*/revents?=?ep_item_poll(epi,?&pt,?1);//?如果?revents?為?0,說明沒有就緒的事件,跳過,否則就將就緒事件拷貝到用戶態(tài)內(nèi)存中if?(!revents)continue;//?將當(dāng)前就緒的事件和用戶進(jìn)程傳入的數(shù)據(jù)都通過?__put_user?拷貝回用戶空間,//?也就是調(diào)用?epoll_wait?之時(shí)用戶進(jìn)程傳入的?fd?列表的內(nèi)存if?(__put_user(revents,?&uevent->events)?||?__put_user(epi->event.data,?&uevent->data))?{list_add(&epi->rdllink,?head);ep_pm_stay_awake(epi);if?(!esed->res)esed->res?=?-EFAULT;return?0;}... }從 do_epoll_wait 開始層層跳轉(zhuǎn),我們可以很清楚地看到最后內(nèi)核是通過 __put_user 函數(shù)把就緒 fd 列表和事件返回到用戶空間,而 __put_user 正是內(nèi)核用來拷貝數(shù)據(jù)到用戶空間的標(biāo)準(zhǔn)函數(shù)。此外,我并沒有在 Linux kernel 的源碼中和 epoll 相關(guān)的代碼里找到 mmap 系統(tǒng)調(diào)用做內(nèi)存映射的邏輯,所以基本可以得出結(jié)論:epoll 在 Linux kernel 里并沒有使用 mmap 來做用戶空間和內(nèi)核空間的內(nèi)存共享,所以那些說 epoll 使用了 mmap 的文章都是誤解。
Go netpoller 核心
Go netpoller 基本原理
Go netpoller 通過在底層對(duì) epoll/kqueue/iocp 的封裝,從而實(shí)現(xiàn)了使用同步編程模式達(dá)到異步執(zhí)行的效果??偨Y(jié)來說,所有的網(wǎng)絡(luò)操作都以網(wǎng)絡(luò)描述符 netFD 為中心實(shí)現(xiàn)。netFD 與底層 PollDesc 結(jié)構(gòu)綁定,當(dāng)在一個(gè) netFD 上讀寫遇到 EAGAIN 錯(cuò)誤時(shí),就將當(dāng)前 goroutine 存儲(chǔ)到這個(gè) netFD 對(duì)應(yīng)的 PollDesc 中,同時(shí)調(diào)用 gopark 把當(dāng)前 goroutine 給 park 住,直到這個(gè) netFD 上再次發(fā)生讀寫事件,才將此 goroutine 給 ready 激活重新運(yùn)行。顯然,在底層通知 goroutine 再次發(fā)生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅(qū)動(dòng)機(jī)制。
總所周知,Go 是一門跨平臺(tái)的編程語言,而不同平臺(tái)針對(duì)特定的功能有不用的實(shí)現(xiàn),這當(dāng)然也包括了 I/O 多路復(fù)用技術(shù),比如 Linux 里的 I/O 多路復(fù)用有 select、poll 和 epoll,而 freeBSD 或者 MacOS 里則是 kqueue,而 Windows 里則是基于異步 I/O 實(shí)現(xiàn)的 iocp,等等;因此,Go 為了實(shí)現(xiàn)底層 I/O 多路復(fù)用的跨平臺(tái),分別基于上述的這些不同平臺(tái)的系統(tǒng)調(diào)用實(shí)現(xiàn)了多版本的 netpollers,具體的源碼路徑如下:
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go
本文的解析基于 epoll 版本,如果讀者對(duì)其他平臺(tái)的 netpoller 底層實(shí)現(xiàn)感興趣,可以在閱讀完本文后自行翻閱其他 netpoller 源碼,所有實(shí)現(xiàn)版本的機(jī)制和原理基本類似,所以了解了 epoll 版本的實(shí)現(xiàn)后再去學(xué)習(xí)其他版本實(shí)現(xiàn)應(yīng)該沒什么障礙。
接下來讓我們通過分析最新的 Go 源碼(v1.15.3),全面剖析一下整個(gè) Go netpoller 的運(yùn)行機(jī)制和流程。
數(shù)據(jù)結(jié)構(gòu)
netFD
net.Listen("tcp", ":8888") 方法返回了一個(gè) *TCPListener,它是一個(gè)實(shí)現(xiàn)了 net.Listener 接口的 struct,而通過 listener.Accept() 接收的新連接 *TCPConn 則是一個(gè)實(shí)現(xiàn)了 net.Conn 接口的 struct,它內(nèi)嵌了 net.conn struct。仔細(xì)閱讀上面的源碼可以發(fā)現(xiàn),不管是 Listener 的 Accept 還是 Conn 的 Read/Write 方法,都是基于一個(gè) netFD 的數(shù)據(jù)結(jié)構(gòu)的操作, netFD 是一個(gè)網(wǎng)絡(luò)描述符,類似于 Linux 的文件描述符的概念,netFD 中包含一個(gè) poll.FD 數(shù)據(jù)結(jié)構(gòu),而 poll.FD 中包含兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu) Sysfd 和 pollDesc,前者是真正的系統(tǒng)文件描述符,后者對(duì)是底層事件驅(qū)動(dòng)的封裝,所有的讀寫超時(shí)等操作都是通過調(diào)用后者的對(duì)應(yīng)方法實(shí)現(xiàn)的。
netFD 和 poll.FD 的源碼:
//?Network?file?descriptor. type?netFD?struct?{pfd?poll.FD//?immutable?until?Closefamily??????intsotype??????intisConnected?bool?//?handshake?completed?or?use?of?association?with?peernet?????????stringladdr???????Addrraddr???????Addr }//?FD?is?a?file?descriptor.?The?net?and?os?packages?use?this?type?as?a //?field?of?a?larger?type?representing?a?network?connection?or?OS?file. type?FD?struct?{//?Lock?sysfd?and?serialize?access?to?Read?and?Write?methods.fdmu?fdMutex//?System?file?descriptor.?Immutable?until?Close.Sysfd?int//?I/O?poller.pd?pollDesc//?Writev?cache.iovecs?*[]syscall.Iovec//?Semaphore?signaled?when?file?is?closed.csema?uint32//?Non-zero?if?this?file?has?been?set?to?blocking?mode.isBlocking?uint32//?Whether?this?is?a?streaming?descriptor,?as?opposed?to?a//?packet-based?descriptor?like?a?UDP?socket.?Immutable.IsStream?bool//?Whether?a?zero?byte?read?indicates?EOF.?This?is?false?for?a//?message?based?socket?connection.ZeroReadIsEOF?bool//?Whether?this?is?a?file?rather?than?a?network?socket.isFile?bool }pollDesc
前面提到了 pollDesc 是底層事件驅(qū)動(dòng)的封裝,netFD 通過它來完成各種 I/O 相關(guān)的操作,它的定義如下:
type?pollDesc?struct?{runtimeCtx?uintptr }這里的 struct 只包含了一個(gè)指針,而通過 pollDesc 的 init 方法,我們可以找到它具體的定義是在 runtime.pollDesc 這里:
func?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }//?Network?poller?descriptor. // //?No?heap?pointers. // //go:notinheap type?pollDesc?struct?{link?*pollDesc?//?in?pollcache,?protected?by?pollcache.lock//?The?lock?protects?pollOpen,?pollSetDeadline,?pollUnblock?and?deadlineimpl?operations.//?This?fully?covers?seq,?rt?and?wt?variables.?fd?is?constant?throughout?the?PollDesc?lifetime.//?pollReset,?pollWait,?pollWaitCanceled?and?runtime·netpollready?(IO?readiness?notification)//?proceed?w/o?taking?the?lock.?So?closing,?everr,?rg,?rd,?wg?and?wd?are?manipulated//?in?a?lock-free?way?by?all?operations.//?NOTE(dvyukov):?the?following?code?uses?uintptr?to?store?*g?(rg/wg),//?that?will?blow?up?when?GC?starts?moving?objects.lock????mutex?//?protects?the?following?fieldsfd??????uintptrclosing?booleverr???bool????//?marks?event?scanning?error?happeneduser????uint32??//?user?settable?cookierseq????uintptr?//?protects?from?stale?read?timersrg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?read?or?nilrt??????timer???//?read?deadline?timer?(set?if?rt.f?!=?nil)rd??????int64???//?read?deadlinewseq????uintptr?//?protects?from?stale?write?timerswg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?write?or?nilwt??????timer???//?write?deadline?timerwd??????int64???//?write?deadline }這里重點(diǎn)關(guān)注里面的 rg 和 wg,這里兩個(gè) uintptr "萬能指針"類型,取值分別可能是 pdReady、pdWait、等待 file descriptor 就緒的 goroutine 也就是 g 數(shù)據(jù)結(jié)構(gòu)以及 nil,它們是實(shí)現(xiàn)喚醒 goroutine 的關(guān)鍵。
runtime.pollDesc 包含自身類型的一個(gè)指針,用來保存下一個(gè) runtime.pollDesc 的地址,以此來實(shí)現(xiàn)鏈表,可以減少數(shù)據(jù)結(jié)構(gòu)的大小,所有的 runtime.pollDesc 保存在 runtime.pollCache 結(jié)構(gòu)中,定義如下:
type?pollCache?struct?{lock??mutexfirst?*pollDesc//?PollDesc?objects?must?be?type-stable,//?because?we?can?get?ready?notification?from?epoll/kqueue//?after?the?descriptor?is?closed/reused.//?Stale?notifications?are?detected?using?seq?variable,//?seq?is?incremented?when?deadlines?are?changed?or?descriptor?is?reused. }因?yàn)?runtime.pollCache 是一個(gè)在 runtime 包里的全局變量,因此需要用一個(gè)互斥鎖來避免 data race 問題,從它的名字也能看出這是一個(gè)用于緩存的數(shù)據(jù)結(jié)構(gòu),也就是用來提高性能的,具體如何實(shí)現(xiàn)呢?
const?pollBlockSize?=?4?*?1024func?(c?*pollCache)?alloc()?*pollDesc?{lock(&c.lock)if?c.first?==?nil?{const?pdSize?=?unsafe.Sizeof(pollDesc{})n?:=?pollBlockSize?/?pdSizeif?n?==?0?{n?=?1}//?Must?be?in?non-GC?memory?because?can?be?referenced//?only?from?epoll/kqueue?internals.mem?:=?persistentalloc(n*pdSize,?0,?&memstats.other_sys)for?i?:=?uintptr(0);?i?<?n;?i++?{pd?:=?(*pollDesc)(add(mem,?i*pdSize))pd.link?=?c.firstc.first?=?pd}}pd?:=?c.firstc.first?=?pd.linklockInit(&pd.lock,?lockRankPollDesc)unlock(&c.lock)return?pd }Go runtime 會(huì)在調(diào)用 poll_runtime_pollOpen 往 epoll 實(shí)例注冊(cè) fd 之時(shí)首次調(diào)用 runtime.pollCache.alloc方法時(shí)批量初始化大小 4KB 的 runtime.pollDesc 結(jié)構(gòu)體的鏈表,初始化過程中會(huì)調(diào)用 runtime.persistentalloc 來為這些數(shù)據(jù)結(jié)構(gòu)分配不會(huì)被 GC 回收的內(nèi)存,確保這些數(shù)據(jù)結(jié)構(gòu)只能被 epoll和kqueue 在內(nèi)核空間去引用。
再往后每次調(diào)用這個(gè)方法則會(huì)先判斷鏈表頭是否已經(jīng)分配過值了,若是,則直接返回表頭這個(gè) pollDesc,這種批量初始化數(shù)據(jù)進(jìn)行緩存而后每次都直接從緩存取數(shù)據(jù)的方式是一種很常見的性能優(yōu)化手段,在這里這種方式可以有效地提升 netpoller 的吞吐量。
Go runtime 會(huì)在關(guān)閉 pollDesc 之時(shí)調(diào)用 runtime.pollCache.free 釋放內(nèi)存:
func?(c?*pollCache)?free(pd?*pollDesc)?{lock(&c.lock)pd.link?=?c.firstc.first?=?pdunlock(&c.lock) }實(shí)現(xiàn)原理
使用 Go 編寫一個(gè)典型的 TCP echo server:
package?mainimport?("log""net" )func?main()?{listen,?err?:=?net.Listen("tcp",?":8888")if?err?!=?nil?{log.Println("listen?error:?",?err)return}for?{conn,?err?:=?listen.Accept()if?err?!=?nil?{log.Println("accept?error:?",?err)break}//?start?a?new?goroutine?to?handle?the?new?connection.go?HandleConn(conn)} }func?HandleConn(conn?net.Conn)?{defer?conn.Close()packet?:=?make([]byte,?1024)for?{//?block?here?if?socket?is?not?available?for?reading?data.n,?err?:=?conn.Read(packet)if?err?!=?nil?{log.Println("read?socket?error:?",?err)return}//?same?as?above,?block?here?if?socket?is?not?available?for?writing._,?_?=?conn.Write(packet[:n])} }上面是一個(gè)基于 Go 原生網(wǎng)絡(luò)模型(基于 netpoller)編寫的一個(gè) TCP server,模式是 goroutine-per-connection ,在這種模式下,開發(fā)者使用的是同步的模式去編寫異步的邏輯而且對(duì)于開發(fā)者來說 I/O 是否阻塞是無感知的,也就是說開發(fā)者無需考慮 goroutines 甚至更底層的線程、進(jìn)程的調(diào)度和上下文切換。而 Go netpoller 最底層的事件驅(qū)動(dòng)技術(shù)肯定是基于 epoll/kqueue/iocp 這一類的 I/O 事件驅(qū)動(dòng)技術(shù),只不過是把這些調(diào)度和上下文切換的工作轉(zhuǎn)移到了 runtime 的 Go scheduler,讓它來負(fù)責(zé)調(diào)度 goroutines,從而極大地降低了程序員的心智負(fù)擔(dān)!
Go 的這種同步模式的網(wǎng)絡(luò)服務(wù)器的基本架構(gòu)通常如下:
上面的示例代碼中相關(guān)的在源碼里的幾個(gè)數(shù)據(jù)結(jié)構(gòu)和方法:
//?TCPListener?is?a?TCP?network?listener.?Clients?should?typically //?use?variables?of?type?Listener?instead?of?assuming?TCP. type?TCPListener?struct?{fd?*netFDlc?ListenConfig }//?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }//?TCPConn?is?an?implementation?of?the?Conn?interface?for?TCP?network //?connections. type?TCPConn?struct?{conn }//?Conn type?conn?struct?{fd?*netFD }type?conn?struct?{fd?*netFD }func?(c?*conn)?ok()?bool?{?return?c?!=?nil?&&?c.fd?!=?nil?}//?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }//?Write?implements?the?Conn?Write?method. func?(c?*conn)?Write(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Write(b)if?err?!=?nil?{err?=?&OpError{Op:?"write",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }net.Listen
調(diào)用 net.Listen 之后,底層會(huì)通過 Linux 的系統(tǒng)調(diào)用 socket 方法創(chuàng)建一個(gè) fd 分配給 listener,并用以來初始化 listener 的 netFD ,接著調(diào)用 netFD 的 listenStream 方法完成對(duì) socket 的 bind&listen 操作以及對(duì) netFD 的初始化(主要是對(duì) netFD 里的 pollDesc 的初始化),調(diào)用鏈?zhǔn)?runtime.runtime_pollServerInit --> runtime.poll_runtime_pollServerInit --> runtime.netpollGenericInit,主要做的事情是:
調(diào)用 epollcreate1 創(chuàng)建一個(gè) epoll 實(shí)例 epfd,作為整個(gè) runtime 的唯一 event-loop 使用;
調(diào)用 runtime.nonblockingPipe 創(chuàng)建一個(gè)用于和 epoll 實(shí)例通信的管道,這里為什么不用更新且更輕量的 eventfd 呢?我個(gè)人猜測(cè)是為了兼容更多以及更老的系統(tǒng)版本;
將 netpollBreakRd 通知信號(hào)量封裝成 epollevent 事件結(jié)構(gòu)體注冊(cè)進(jìn) epoll 實(shí)例。
相關(guān)源碼如下:
//?調(diào)用?linux?系統(tǒng)調(diào)用?socket?創(chuàng)建?listener?fd?并設(shè)置為為阻塞?I/O s,?err?:=?socketFunc(family,?sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC,?proto) //?On?Linux?the?SOCK_NONBLOCK?and?SOCK_CLOEXEC?flags?were //?introduced?in?2.6.27?kernel?and?on?FreeBSD?both?flags?were //?introduced?in?10?kernel.?If?we?get?an?EINVAL?error?on?Linux //?or?EPROTONOSUPPORT?error?on?FreeBSD,?fall?back?to?using //?socket?without?them.socketFunc????????func(int,?int,?int)?(int,?error)??=?syscall.Socket//?用上面創(chuàng)建的?listener?fd?初始化?listener?netFD if?fd,?err?=?newFD(s,?family,?sotype,?net);?err?!=?nil?{poll.CloseFunc(s)return?nil,?err }//?對(duì)?listener?fd?進(jìn)行?bind&listen?操作,并且調(diào)用?init?方法完成初始化 func?(fd?*netFD)?listenStream(laddr?sockaddr,?backlog?int,?ctrlFn?func(string,?string,?syscall.RawConn)?error)?error?{...//?完成綁定操作if?err?=?syscall.Bind(fd.pfd.Sysfd,?lsa);?err?!=?nil?{return?os.NewSyscallError("bind",?err)}//?完成監(jiān)聽操作if?err?=?listenFunc(fd.pfd.Sysfd,?backlog);?err?!=?nil?{return?os.NewSyscallError("listen",?err)}//?調(diào)用?init,內(nèi)部會(huì)調(diào)用?poll.FD.Init,最后調(diào)用?pollDesc.initif?err?=?fd.init();?err?!=?nil?{return?err}lsa,?_?=?syscall.Getsockname(fd.pfd.Sysfd)fd.setAddr(fd.addrFunc()(lsa),?nil)return?nil }//?使用?sync.Once?來確保一個(gè)?listener?只持有一個(gè)?epoll?實(shí)例 var?serverInit?sync.Once//?netFD.init?會(huì)調(diào)用?poll.FD.Init?并最終調(diào)用到?pollDesc.init, //?它會(huì)創(chuàng)建?epoll?實(shí)例并把?listener?fd?加入監(jiān)聽隊(duì)列 func?(pd?*pollDesc)?init(fd?*FD)?error?{//?runtime_pollServerInit?通過?`go:linkname`?鏈接到具體的實(shí)現(xiàn)函數(shù)?poll_runtime_pollServerInit,//?接著再調(diào)用?netpollGenericInit,然后會(huì)根據(jù)不同的系統(tǒng)平臺(tái)去調(diào)用特定的?netpollinit?來創(chuàng)建?epoll?實(shí)例serverInit.Do(runtime_pollServerInit)//?runtime_pollOpen?內(nèi)部調(diào)用了?netpollopen?來將?listener?fd?注冊(cè)到?//?epoll?實(shí)例中,另外,它會(huì)初始化一個(gè)?pollDesc?并返回ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}//?把真正初始化完成的?pollDesc?實(shí)例賦值給當(dāng)前的?pollDesc?代表自身的指針,//?后續(xù)使用直接通過該指針操作pd.runtimeCtx?=?ctxreturn?nil }var?(//?全局唯一的?epoll?fd,只在?listener?fd?初始化之時(shí)被指定一次epfd?int32?=?-1?//?epoll?descriptor )//?netpollinit?會(huì)創(chuàng)建一個(gè)?epoll?實(shí)例,然后把?epoll?fd?賦值給?epfd, //?后續(xù)?listener?以及它?accept?的所有?sockets?有關(guān)?epoll?的操作都是基于這個(gè)全局的?epfd func?netpollinit()?{epfd?=?epollcreate1(_EPOLL_CLOEXEC)if?epfd?<?0?{epfd?=?epollcreate(1024)if?epfd?<?0?{println("runtime:?epollcreate?failed?with",?-epfd)throw("runtime:?netpollinit?failed")}closeonexec(epfd)}r,?w,?errno?:=?nonblockingPipe()if?errno?!=?0?{println("runtime:?pipe?failed?with",?-errno)throw("runtime:?pipe?failed")}ev?:=?epollevent{events:?_EPOLLIN,}*(**uintptr)(unsafe.Pointer(&ev.data))?=?&netpollBreakRderrno?=?epollctl(epfd,?_EPOLL_CTL_ADD,?r,?&ev)if?errno?!=?0?{println("runtime:?epollctl?failed?with",?-errno)throw("runtime:?epollctl?failed")}netpollBreakRd?=?uintptr(r)netpollBreakWr?=?uintptr(w) }//?netpollopen?會(huì)被?runtime_pollOpen?調(diào)用,注冊(cè)?fd?到?epoll?實(shí)例, //?注意這里使用的是?epoll?的?ET?模式,同時(shí)會(huì)利用萬能指針把?pollDesc?保存到?epollevent?的一個(gè)?8?位的字節(jié)數(shù)組?data?里 func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32?{var?ev?epolleventev.events?=?_EPOLLIN?|?_EPOLLOUT?|?_EPOLLRDHUP?|?_EPOLLET*(**pollDesc)(unsafe.Pointer(&ev.data))?=?pdreturn?-epollctl(epfd,?_EPOLL_CTL_ADD,?int32(fd),?&ev) }我們前面提到的 epoll 的三個(gè)基本調(diào)用,Go 在源碼里實(shí)現(xiàn)了對(duì)那三個(gè)調(diào)用的封裝:
#include?<sys/epoll.h>?? int?epoll_create(int?size);?? int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event);?? int?epoll_wait(int?epfd,?struct?epoll_event?*?events,?int?maxevents,?int?timeout);//?Go?對(duì)上面三個(gè)調(diào)用的封裝 func?netpollinit() func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32 func?netpoll(block?bool)?gListnetFD 就是通過這三個(gè)封裝來對(duì) epoll 進(jìn)行創(chuàng)建實(shí)例、注冊(cè) fd 和等待事件操作的。
Listener.Accept()
netpoll accept socket 的工作流程如下:
服務(wù)端的 netFD 在 listen 時(shí)會(huì)創(chuàng)建 epoll 的實(shí)例,并將 listenerFD 加入 epoll 的事件隊(duì)列
netFD 在 accept 時(shí)將返回的 connFD 也加入 epoll 的事件隊(duì)列
netFD 在讀寫時(shí)出現(xiàn) syscall.EAGAIN 錯(cuò)誤,通過 pollDesc 的 waitRead 方法將當(dāng)前的 goroutine park 住,直到 ready,從 pollDesc 的 waitRead 中返回
Listener.Accept() 接收來自客戶端的新連接,具體還是調(diào)用 netFD.accept 方法來完成這個(gè)功能:
//?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }func?(fd?*netFD)?accept()?(netfd?*netFD,?err?error)?{//?調(diào)用?poll.FD?的?Accept?方法接受新的?socket?連接,返回?socket?的?fdd,?rsa,?errcall,?err?:=?fd.pfd.Accept()if?err?!=?nil?{if?errcall?!=?""?{err?=?wrapSyscallError(errcall,?err)}return?nil,?err}//?以?socket?fd?構(gòu)造一個(gè)新的?netFD,代表這個(gè)新的?socketif?netfd,?err?=?newFD(d,?fd.family,?fd.sotype,?fd.net);?err?!=?nil?{poll.CloseFunc(d)return?nil,?err}//?調(diào)用?netFD?的?init?方法完成初始化if?err?=?netfd.init();?err?!=?nil?{fd.Close()return?nil,?err}lsa,?_?:=?syscall.Getsockname(netfd.pfd.Sysfd)netfd.setAddr(netfd.addrFunc()(lsa),?netfd.addrFunc()(rsa))return?netfd,?nil }netFD.accept 方法里會(huì)再調(diào)用 poll.FD.Accept ,最后會(huì)使用 Linux 的系統(tǒng)調(diào)用 accept 來完成新連接的接收,并且會(huì)把 accept 的 socket 設(shè)置成非阻塞 I/O 模式:
//?Accept?wraps?the?accept?network?call. func?(fd?*FD)?Accept()?(int,?syscall.Sockaddr,?string,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?-1,?nil,?"",?err}defer?fd.readUnlock()if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?-1,?nil,?"",?err}for?{//?使用?linux?系統(tǒng)調(diào)用?accept?接收新連接,創(chuàng)建對(duì)應(yīng)的?sockets,?rsa,?errcall,?err?:=?accept(fd.Sysfd)//?因?yàn)?listener?fd?在創(chuàng)建的時(shí)候已經(jīng)設(shè)置成非阻塞的了,//?所以 accept 方法會(huì)直接返回,不管有沒有新連接到來;如果 err == nil 則表示正常建立新連接,直接返回if?err?==?nil?{return?s,?rsa,?"",?err}//?如果?err?!=?nil,則判斷?err?==?syscall.EAGAIN,符合條件則進(jìn)入?pollDesc.waitRead?方法switch?err?{case?syscall.EAGAIN:if?fd.pd.pollable()?{//?如果當(dāng)前沒有發(fā)生期待的?I/O?事件,那么?waitRead?會(huì)通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}case?syscall.ECONNABORTED://?This?means?that?a?socket?on?the?listen//?queue?was?closed?before?we?Accept()ed?it;//?it's?a?silly?error,?so?try?again.continue}return?-1,?nil,?errcall,?err} }//?使用?linux?的?accept?系統(tǒng)調(diào)用接收新連接并把這個(gè)?socket?fd?設(shè)置成非阻塞?I/O ns,?sa,?err?:=?Accept4Func(s,?syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) //?On?Linux?the?accept4?system?call?was?introduced?in?2.6.28 //?kernel?and?on?FreeBSD?it?was?introduced?in?10?kernel.?If?we //?get?an?ENOSYS?error?on?both?Linux?and?FreeBSD,?or?EINVAL //?error?on?Linux,?fall?back?to?using?accept.//?Accept4Func?is?used?to?hook?the?accept4?call. var?Accept4Func?func(int,?int)?(int,?syscall.Sockaddr,?error)?=?syscall.Accept4pollDesc.waitRead 方法主要負(fù)責(zé)檢測(cè)當(dāng)前這個(gè) pollDesc 的上層 netFD 對(duì)應(yīng)的 fd 是否有『期待的』I/O 事件發(fā)生,如果有就直接返回,否則就 park 住當(dāng)前的 goroutine 并持續(xù)等待直至對(duì)應(yīng)的 fd 上發(fā)生可讀/可寫或者其他『期待的』I/O 事件為止,然后它就會(huì)返回到外層的 for 循環(huán),讓 goroutine 繼續(xù)執(zhí)行邏輯。
poll.FD.Accept() 返回之后,會(huì)構(gòu)造一個(gè)對(duì)應(yīng)這個(gè)新 socket 的 netFD,然后調(diào)用 init() 方法完成初始化,這個(gè) init 過程和前面 net.Listen() 是一樣的,調(diào)用鏈:netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),最終又會(huì)走到這里:
var?serverInit?sync.Oncefunc?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }然后把這個(gè) socket fd 注冊(cè)到 listener 的 epoll 實(shí)例的事件隊(duì)列中去,等待 I/O 事件。
Conn.Read/Conn.Write
我們先來看看 Conn.Read 方法是如何實(shí)現(xiàn)的,原理其實(shí)和 Listener.Accept 是一樣的,具體調(diào)用鏈還是首先調(diào)用 conn 的 netFD.Read ,然后內(nèi)部再調(diào)用 poll.FD.Read ,最后使用 Linux 的系統(tǒng)調(diào)用 read: syscall.Read 完成數(shù)據(jù)讀取:
//?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }func?(fd?*netFD)?Read(p?[]byte)?(n?int,?err?error)?{n,?err?=?fd.pfd.Read(p)runtime.KeepAlive(fd)return?n,?wrapSyscallError("read",?err) }//?Read?implements?io.Reader. func?(fd?*FD)?Read(p?[]byte)?(int,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?0,?err}defer?fd.readUnlock()if?len(p)?==?0?{//?If?the?caller?wanted?a?zero?byte?read,?return?immediately//?without?trying?(but?after?acquiring?the?readLock).//?Otherwise?syscall.Read?returns?0,?nil?which?looks?like//?io.EOF.//?TODO(bradfitz):?make?it?wait?for?readability??(Issue?15735)return?0,?nil}if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?0,?err}if?fd.IsStream?&&?len(p)?>?maxRW?{p?=?p[:maxRW]}for?{//?嘗試從該?socket?讀取數(shù)據(jù),因?yàn)?socket?在被?listener?accept?的時(shí)候設(shè)置成//?了非阻塞?I/O,所以這里同樣也是直接返回,不管有沒有可讀的數(shù)據(jù)n,?err?:=?syscall.Read(fd.Sysfd,?p)if?err?!=?nil?{n?=?0//?err?==?syscall.EAGAIN?表示當(dāng)前沒有期待的?I/O?事件發(fā)生,也就是?socket?不可讀if?err?==?syscall.EAGAIN?&&?fd.pd.pollable()?{//?如果當(dāng)前沒有發(fā)生期待的?I/O?事件,那么?waitRead?//?會(huì)通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}//?On?MacOS?we?can?see?EINTR?here?if?the?user//?pressed?^Z.??See?issue?#22838.if?runtime.GOOS?==?"darwin"?&&?err?==?syscall.EINTR?{continue}}err?=?fd.eofError(n,?err)return?n,?err} }conn.Write 和 conn.Read 的原理是一致的,它也是通過類似 pollDesc.waitRead 的 pollDesc.waitWrite 來 park 住 goroutine 直至期待的 I/O 事件發(fā)生才返回恢復(fù)執(zhí)行。
pollDesc.waitRead/pollDesc.waitWrite
pollDesc.waitRead 內(nèi)部調(diào)用了 poll.runtime_pollWait --> runtime.poll_runtime_pollWait 來達(dá)成無 I/O 事件時(shí) park 住 goroutine 的目的:
//go:linkname?poll_runtime_pollWait?internal/poll.runtime_pollWait func?poll_runtime_pollWait(pd?*pollDesc,?mode?int)?int?{err?:=?netpollcheckerr(pd,?int32(mode))if?err?!=?pollNoError?{return?err}//?As?for?now?only?Solaris,?illumos,?and?AIX?use?level-triggered?IO.if?GOOS?==?"solaris"?||?GOOS?==?"illumos"?||?GOOS?==?"aix"?{netpollarm(pd,?mode)}//?進(jìn)入?netpollblock?并且判斷是否有期待的?I/O?事件發(fā)生,//?這里的?for?循環(huán)是為了一直等到?io?readyfor?!netpollblock(pd,?int32(mode),?false)?{err?=?netpollcheckerr(pd,?int32(mode))if?err?!=?0?{return?err}//?Can?happen?if?timeout?has?fired?and?unblocked?us,//?but?before?we?had?a?chance?to?run,?timeout?has?been?reset.//?Pretend?it?has?not?happened?and?retry.}return?0 }//?returns?true?if?IO?is?ready,?or?false?if?timedout?or?closed //?waitio?-?wait?only?for?completed?IO,?ignore?errors func?netpollblock(pd?*pollDesc,?mode?int32,?waitio?bool)?bool?{//?gpp?保存的是?goroutine?的數(shù)據(jù)結(jié)構(gòu)?g,這里會(huì)根據(jù)?mode?的值決定是?rg?還是?wg,//?前面提到過,rg?和?wg?是用來保存等待?I/O?就緒的?gorouine?的,后面調(diào)用?gopark?之后,//?會(huì)把當(dāng)前的?goroutine?的抽象數(shù)據(jù)結(jié)構(gòu)?g?存入?gpp?這個(gè)指針,也就是?rg?或者?wggpp?:=?&pd.rgif?mode?==?'w'?{gpp?=?&pd.wg}//?set?the?gpp?semaphore?to?WAIT//?這個(gè)?for?循環(huán)是為了等待?io?ready?或者?io?waitfor?{old?:=?*gpp//?gpp?==?pdReady?表示此時(shí)已有期待的?I/O?事件發(fā)生,//?可以直接返回?unblock?當(dāng)前?goroutine?并執(zhí)行響應(yīng)的?I/O?操作if?old?==?pdReady?{*gpp?=?0return?true}if?old?!=?0?{throw("runtime:?double?wait")}//?如果沒有期待的?I/O?事件發(fā)生,則通過原子操作把?gpp?的值置為?pdWait?并退出?for?循環(huán)if?atomic.Casuintptr(gpp,?0,?pdWait)?{break}}//?need?to?recheck?error?states?after?setting?gpp?to?WAIT//?this?is?necessary?because?runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl//?do?the?opposite:?store?to?closing/rd/wd,?membarrier,?load?of?rg/wg//?waitio?此時(shí)是?false,netpollcheckerr?方法會(huì)檢查當(dāng)前?pollDesc?對(duì)應(yīng)的?fd?是否是正常的,//?通常來說??netpollcheckerr(pd,?mode)?==?0?是成立的,所以這里會(huì)執(zhí)行?gopark?//?把當(dāng)前?goroutine?給?park?住,直至對(duì)應(yīng)的?fd?上發(fā)生可讀/可寫或者其他『期待的』I/O?事件為止,//?然后?unpark?返回,在?gopark?內(nèi)部會(huì)把當(dāng)前?goroutine?的抽象數(shù)據(jù)結(jié)構(gòu)?g?存入//?gpp(pollDesc.rg/pollDesc.wg)?指針里,以便在后面的?netpoll?函數(shù)取出?pollDesc?之后,//?把?g?添加到鏈表里返回,接著重新調(diào)度?goroutineif?waitio?||?netpollcheckerr(pd,?mode)?==?0?{//?注冊(cè)?netpollblockcommit?回調(diào)給?gopark,在?gopark?內(nèi)部會(huì)執(zhí)行它,保存當(dāng)前?goroutine?到?gppgopark(netpollblockcommit,?unsafe.Pointer(gpp),?waitReasonIOWait,?traceEvGoBlockNet,?5)}//?be?careful?to?not?lose?concurrent?READY?notificationold?:=?atomic.Xchguintptr(gpp,?0)if?old?>?pdWait?{throw("runtime:?corrupted?polldesc")}return?old?==?pdReady }//?gopark?會(huì)停住當(dāng)前的?goroutine?并且調(diào)用傳遞進(jìn)來的回調(diào)函數(shù)?unlockf,從上面的源碼我們可以知道這個(gè)函數(shù)是 //?netpollblockcommit func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.//?gopark?最終會(huì)調(diào)用?park_m,在這個(gè)函數(shù)內(nèi)部會(huì)調(diào)用?unlockf,也就是?netpollblockcommit,//?然后會(huì)把當(dāng)前的?goroutine,也就是?g?數(shù)據(jù)結(jié)構(gòu)保存到?pollDesc?的?rg?或者?wg?指針里mcall(park_m) }//?park?continuation?on?g0. func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{//?調(diào)用?netpollblockcommit,把當(dāng)前的?goroutine,//?也就是?g?數(shù)據(jù)結(jié)構(gòu)保存到?pollDesc?的?rg?或者?wg?指針里ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }//?netpollblockcommit?在?gopark?函數(shù)里被調(diào)用 func?netpollblockcommit(gp?*g,?gpp?unsafe.Pointer)?bool?{//?通過原子操作把當(dāng)前?goroutine?抽象的數(shù)據(jù)結(jié)構(gòu)?g,也就是這里的參數(shù)?gp?存入?gpp?指針,//?此時(shí)?gpp?的值是?pollDesc?的?rg?或者?wg?指針r?:=?atomic.Casuintptr((*uintptr)(gpp),?pdWait,?uintptr(unsafe.Pointer(gp)))if?r?{//?Bump?the?count?of?goroutines?waiting?for?the?poller.//?The?scheduler?uses?this?to?decide?whether?to?block//?waiting?for?the?poller?if?there?is?nothing?else?to?do.atomic.Xadd(&netpollWaiters,?1)}return?r }pollDesc.waitWrite 的內(nèi)部實(shí)現(xiàn)原理和 pollDesc.waitRead 是一樣的,都是基于 poll.runtime_pollWait --> runtime.poll_runtime_pollWait,這里就不再贅述。
netpoll
前面已經(jīng)從源碼的層面分析完了 netpoll 是如何通過 park goroutine 從而達(dá)到阻塞 Accept/Read/Write 的效果,而通過調(diào)用 gopark,goroutine 會(huì)被放置在某個(gè)等待隊(duì)列中,這里是放到了 epoll 的 "interest list" 里,底層數(shù)據(jù)結(jié)構(gòu)是由紅黑樹實(shí)現(xiàn)的 ?eventpoll.rbr,此時(shí) G 的狀態(tài)由 _Grunning為_Gwaitting ,因此 G 必須被手動(dòng)喚醒(通過 goready ),否則會(huì)丟失任務(wù),應(yīng)用層阻塞通常使用這種方式。
所以我們現(xiàn)在可以來從整體的層面來概括 Go 的網(wǎng)絡(luò)業(yè)務(wù) goroutine 是如何被規(guī)劃調(diào)度的了:
首先,client 連接 server 的時(shí)候,listener 通過 accept 調(diào)用接收新 connection,每一個(gè)新 connection 都啟動(dòng)一個(gè) goroutine 處理,accept 調(diào)用會(huì)把該 connection 的 fd 連帶所在的 goroutine 上下文信息封裝注冊(cè)到 epoll 的監(jiān)聽列表里去,當(dāng) goroutine 調(diào)用 conn.Read 或者 conn.Write 等需要阻塞等待的函數(shù)時(shí),會(huì)被 gopark 給封存起來并使之休眠,讓 P 去執(zhí)行本地調(diào)度隊(duì)列里的下一個(gè)可執(zhí)行的 goroutine,往后 Go scheduler 會(huì)在循環(huán)調(diào)度的 runtime.schedule() 函數(shù)以及 sysmon 監(jiān)控線程中調(diào)用 runtime.nepoll 以獲取可運(yùn)行的 goroutine 列表并通過調(diào)用 injectglist 把剩下的 g 放入全局調(diào)度隊(duì)列或者當(dāng)前 P 本地調(diào)度隊(duì)列去重新執(zhí)行。
那么當(dāng) I/O 事件發(fā)生之后,netpoller 是通過什么方式喚醒那些在 I/O wait 的 goroutine 的?答案是通過 runtime.netpoll。
runtime.netpoll 的核心邏輯是:
根據(jù)調(diào)用方的入?yún)?delay,設(shè)置對(duì)應(yīng)的調(diào)用 epollwait 的 timeout 值;
調(diào)用 epollwait 等待發(fā)生了可讀/可寫事件的 fd;
循環(huán) epollwait 返回的事件列表,處理對(duì)應(yīng)的事件類型, 組裝可運(yùn)行的 goroutine 鏈表并返回。
Go 在多種場(chǎng)景下都可能會(huì)調(diào)用 netpoll 檢查文件描述符狀態(tài),netpoll 里會(huì)調(diào)用 epoll_wait 從 epoll 的 eventpoll.rdllist 就緒雙向鏈表返回,從而得到 I/O 就緒的 socket fd 列表,并根據(jù)取出最初調(diào)用 epoll_ctl 時(shí)保存的上下文信息,恢復(fù) g。所以執(zhí)行完netpoll 之后,會(huì)返回一個(gè)就緒 fd 列表對(duì)應(yīng)的 goroutine 鏈表,接下來將就緒的 goroutine 通過調(diào)用 injectglist 加入到全局調(diào)度隊(duì)列或者 P 的本地調(diào)度隊(duì)列中,啟動(dòng) M 綁定 P 去執(zhí)行。
具體調(diào)用 netpoll 的地方,首先在 Go runtime scheduler 循環(huán)調(diào)度 goroutines 之時(shí)就有可能會(huì)調(diào)用 netpoll 獲取到已就緒的 fd 對(duì)應(yīng)的 goroutine 來調(diào)度執(zhí)行。
首先 Go scheduler 的核心方法 runtime.schedule() 里會(huì)調(diào)用一個(gè)叫 runtime.findrunable() 的方法獲取可運(yùn)行的 goroutine 來執(zhí)行,而在 runtime.findrunable() 方法里就調(diào)用了 runtime.netpoll 獲取已就緒的 fd 列表對(duì)應(yīng)的 goroutine 列表:
//?One?round?of?scheduler:?find?a?runnable?goroutine?and?execute?it. //?Never?returns. func?schedule()?{...if?gp?==?nil?{gp,?inheritTime?=?findrunnable()?//?blocks?until?work?is?available}... }//?Finds?a?runnable?goroutine?to?execute. //?Tries?to?steal?from?other?P's,?get?g?from?global?queue,?poll?network. func?findrunnable()?(gp?*g,?inheritTime?bool)?{...//?Poll?network.if?netpollinited()?&&?(atomic.Load(&netpollWaiters)?>?0?||?pollUntil?!=?0)?&&?atomic.Xchg64(&sched.lastpoll,?0)?!=?0?{atomic.Store64(&sched.pollUntil,?uint64(pollUntil))if?_g_.m.p?!=?0?{throw("findrunnable:?netpoll?with?p")}if?_g_.m.spinning?{throw("findrunnable:?netpoll?with?spinning")}if?faketime?!=?0?{//?When?using?fake?time,?just?poll.delta?=?0}list?:=?netpoll(delta)?//?同步阻塞調(diào)用?netpoll,直至有可用的?goroutineatomic.Store64(&sched.pollUntil,?0)atomic.Store64(&sched.lastpoll,?uint64(nanotime()))if?faketime?!=?0?&&?list.empty()?{//?Using?fake?time?and?nothing?is?ready;?stop?M.//?When?all?M's?stop,?checkdead?will?call?timejump.stopm()goto?top}lock(&sched.lock)_p_?=?pidleget()?//?查找是否有空閑的?P?可以來就緒的?goroutineunlock(&sched.lock)if?_p_?==?nil?{injectglist(&list)?//?如果當(dāng)前沒有空閑的?P,則把就緒的?goroutine?放入全局調(diào)度隊(duì)列等待被執(zhí)行}?else?{//?如果當(dāng)前有空閑的?P,則?pop?出一個(gè)?g,返回給調(diào)度器去執(zhí)行,//?并通過調(diào)用?injectglist?把剩下的?g?放入全局調(diào)度隊(duì)列或者當(dāng)前?P?本地調(diào)度隊(duì)列acquirep(_p_)if?!list.empty()?{gp?:=?list.pop()injectglist(&list)casgstatus(gp,?_Gwaiting,?_Grunnable)if?trace.enabled?{traceGoUnpark(gp,?0)}return?gp,?false}if?wasSpinning?{_g_.m.spinning?=?trueatomic.Xadd(&sched.nmspinning,?1)}goto?top}}?else?if?pollUntil?!=?0?&&?netpollinited()?{pollerPollUntil?:=?int64(atomic.Load64(&sched.pollUntil))if?pollerPollUntil?==?0?||?pollerPollUntil?>?pollUntil?{netpollBreak()}}stopm()goto?top }另外, sysmon 監(jiān)控線程會(huì)在循環(huán)過程中檢查距離上一次 runtime.netpoll 被調(diào)用是否超過了 10ms,若是則會(huì)去調(diào)用它拿到可運(yùn)行的 goroutine 列表并通過調(diào)用 injectglist 把 g 列表放入全局調(diào)度隊(duì)列或者當(dāng)前 P 本地調(diào)度隊(duì)列等待被執(zhí)行:
//?Always?runs?without?a?P,?so?write?barriers?are?not?allowed. // //go:nowritebarrierrec func?sysmon()?{...//?poll?network?if?not?polled?for?more?than?10mslastpoll?:=?int64(atomic.Load64(&sched.lastpoll))if?netpollinited()?&&?lastpoll?!=?0?&&?lastpoll+10*1000*1000?<?now?{atomic.Cas64(&sched.lastpoll,?uint64(lastpoll),?uint64(now))list?:=?netpoll(0)?//?non-blocking?-?returns?list?of?goroutinesif?!list.empty()?{//?Need?to?decrement?number?of?idle?locked?M's//?(pretending?that?one?more?is?running)?before?injectglist.//?Otherwise?it?can?lead?to?the?following?situation://?injectglist?grabs?all?P's?but?before?it?starts?M's?to?run?the?P's,//?another?M?returns?from?syscall,?finishes?running?its?G,//?observes?that?there?is?no?work?to?do?and?no?other?running?M's//?and?reports?deadlock.incidlelocked(-1)injectglist(&list)incidlelocked(1)}}... }Go runtime 在程序啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)獨(dú)立的 M 作為監(jiān)控線程,叫 sysmon ,這個(gè)線程為系統(tǒng)級(jí)的 daemon 線程,無需 P 即可運(yùn)行, sysmon 每 20us~10ms 運(yùn)行一次。sysmon 中以輪詢的方式執(zhí)行以下操作(如上面的代碼所示):
以非阻塞的方式調(diào)用 runtime.netpoll ,從中找出能從網(wǎng)絡(luò) I/O 中喚醒的 g 列表,并通過調(diào)用 injectglist 把 g 列表放入全局調(diào)度隊(duì)列或者當(dāng)前 P 本地調(diào)度隊(duì)列等待被執(zhí)行,調(diào)度觸發(fā)時(shí),有可能從這個(gè)全局 runnable 調(diào)度隊(duì)列獲取 g。然后再循環(huán)調(diào)用 startm ,直到所有 P 都不處于 _Pidle 狀態(tài)。
調(diào)用 retake ,搶占長(zhǎng)時(shí)間處于 _Psyscall 狀態(tài)的 P。
綜上,Go 借助于 epoll/kqueue/iocp 和 runtime scheduler 等的幫助,設(shè)計(jì)出了自己的 I/O 多路復(fù)用 netpoller,成功地讓 Listener.Accept / conn.Read / conn.Write 等方法從開發(fā)者的角度看來是同步模式。
Go netpoller 的價(jià)值
通過前面對(duì)源碼的分析,我們現(xiàn)在知道 Go netpoller 依托于 runtime scheduler,為開發(fā)者提供了一種強(qiáng)大的同步網(wǎng)絡(luò)編程模式;然而,Go netpoller 存在的意義卻遠(yuǎn)不止于此,Go netpoller I/O 多路復(fù)用搭配 Non-blocking I/O 而打造出來的這個(gè)原生網(wǎng)絡(luò)模型,它最大的價(jià)值是把網(wǎng)絡(luò) I/O 的控制權(quán)牢牢掌握在 Go 自己的 runtime 里,關(guān)于這一點(diǎn)我們需要從 Go 的 runtime scheduler 說起,Go 的 G-P-M 調(diào)度模型如下:
G 在運(yùn)行過程中如果被阻塞在某個(gè) system call 操作上,那么不光 G 會(huì)阻塞,執(zhí)行該 G 的 M 也會(huì)解綁 P(實(shí)質(zhì)是被 sysmon 搶走了),與 G 一起進(jìn)入 sleep 狀態(tài)。如果此時(shí)有 idle 的 M,則 P 與其綁定繼續(xù)執(zhí)行其他 G;如果沒有 idle M,但仍然有其他 G 要去執(zhí)行,那么就會(huì)創(chuàng)建一個(gè)新的 M。當(dāng)阻塞在 system call 上的 G 完成 syscall 調(diào)用后,G 會(huì)去嘗試獲取一個(gè)可用的 P,如果沒有可用的 P,那么 G 會(huì)被標(biāo)記為 _Grunnable 并把它放入全局的 runqueue 中等待調(diào)度,之前的那個(gè) sleep 的 M 將再次進(jìn)入 sleep。
現(xiàn)在清楚為什么 netpoll 為什么一定要使用非阻塞 I/O 了吧?就是為了避免讓操作網(wǎng)絡(luò) I/O 的 goroutine 陷入到系統(tǒng)調(diào)用從而進(jìn)入內(nèi)核態(tài),因?yàn)橐坏┻M(jìn)入內(nèi)核態(tài),整個(gè)程序的控制權(quán)就會(huì)發(fā)生轉(zhuǎn)移(到內(nèi)核),不再屬于用戶進(jìn)程了,那么也就無法借助于 Go 強(qiáng)大的 runtime scheduler 來調(diào)度業(yè)務(wù)程序的并發(fā)了;而有了 netpoll 之后,借助于非阻塞 I/O ,G 就再也不會(huì)因?yàn)橄到y(tǒng)調(diào)用的讀寫而 (長(zhǎng)時(shí)間) 陷入內(nèi)核態(tài),當(dāng) G 被阻塞在某個(gè) network I/O 操作上時(shí),實(shí)際上它不是因?yàn)橄萑雰?nèi)核態(tài)被阻塞住了,而是被 Go runtime 調(diào)用 gopark 給 park 住了,此時(shí) G 會(huì)被放置到某個(gè) wait queue 中,而 M 會(huì)嘗試運(yùn)行下一個(gè) _Grunnable 的 G,如果此時(shí)沒有 _Grunnable 的 G 供 M 運(yùn)行,那么 M 將解綁 P,并進(jìn)入 sleep 狀態(tài)。當(dāng) I/O available,在 epoll 的 eventpoll.rdr 中等待的 G 會(huì)被放到 eventpoll.rdllist 鏈表里并通過 netpoll 中的 epoll_wait 系統(tǒng)調(diào)用返回放置到全局調(diào)度隊(duì)列或者 P 的本地調(diào)度隊(duì)列,標(biāo)記為 _Grunnable ,等待 P 綁定 M 恢復(fù)執(zhí)行。
Goroutine 的調(diào)度
這一小節(jié)主要是講處理網(wǎng)絡(luò) I/O 的 goroutines 阻塞之后,Go scheduler 具體是如何像前面幾個(gè)章節(jié)所說的那樣,避免讓操作網(wǎng)絡(luò) I/O 的 goroutine 陷入到系統(tǒng)調(diào)用從而進(jìn)入內(nèi)核態(tài)的,而是封存 goroutine 然后讓出 CPU 的使用權(quán)從而令 P 可以去調(diào)度本地調(diào)度隊(duì)列里的下一個(gè) goroutine 的。
溫馨提示:這一小節(jié)屬于延伸閱讀,涉及到的知識(shí)點(diǎn)更偏系統(tǒng)底層,需要有一定的匯編語言基礎(chǔ)才能通讀,另外,這一節(jié)對(duì) Go scheduler 的講解僅僅涉及核心的一部分,不會(huì)把整個(gè)調(diào)度器都講一遍(事實(shí)上如果真要解析 Go scheduler 的話恐怕重開一篇幾萬字的文章才能基本講清楚。。。),所以也要求讀者對(duì) Go 的并發(fā)調(diào)度器有足夠的了解,因此這一節(jié)可能會(huì)稍顯深?yuàn)W。當(dāng)然這一節(jié)也可選擇不讀,因?yàn)橥ㄟ^前面的整個(gè)解析,我相信讀者應(yīng)該已經(jīng)能夠基本掌握 Go netpoller 處理網(wǎng)絡(luò) I/O 的核心細(xì)節(jié)了,以及能從宏觀層面了解 netpoller 對(duì)業(yè)務(wù) goroutines 的基本調(diào)度了。而這一節(jié)主要是通過對(duì) goroutines 調(diào)度細(xì)節(jié)的剖析,能夠加深讀者對(duì)整個(gè) Go netpoller 的徹底理解,接上前面幾個(gè)章節(jié),形成一個(gè)完整的閉環(huán)。如果對(duì)調(diào)度的底層細(xì)節(jié)沒興趣的話這也可以直接跳過這一節(jié),對(duì)理解 Go netpoller 的基本原理影響不大,不過還是建議有條件的讀者可以看看。
從源碼可知,Go scheduler 的調(diào)度 goroutine 過程中所調(diào)用的核心函數(shù)鏈如下:
runtime.schedule?-->?runtime.execute?-->?runtime.gogo?-->?goroutine?code?-->?runtime.goexit?-->?runtime.goexit1?-->?runtime.mcall?-->?runtime.goexit0?-->?runtime.scheduleGo scheduler 會(huì)不斷循環(huán)調(diào)用 runtime.schedule() 去調(diào)度 goroutines,而每個(gè) goroutine 執(zhí)行完成并退出之后,會(huì)再次調(diào)用 runtime.schedule(),使得調(diào)度器回到調(diào)度循環(huán)去執(zhí)行其他的 goroutine,不斷循環(huán),永不停歇。
當(dāng)我們使用 go 關(guān)鍵字啟動(dòng)一個(gè)新 goroutine 時(shí),最終會(huì)調(diào)用 runtime.newproc --> runtime.newproc1,來得到 g,runtime.newproc1 會(huì)先從 P 的 gfree 緩存鏈表中查找可用的 g,若緩存未生效,則會(huì)新創(chuàng)建 g 給當(dāng)前的業(yè)務(wù)函數(shù),最后這個(gè) g 會(huì)被傳給 runtime.gogo 去真正執(zhí)行。
這里首先需要了解一個(gè) gobuf 的結(jié)構(gòu)體,它用來保存 goroutine 的調(diào)度信息,是 runtime.gogo 的入?yún)?#xff1a;
//?gobuf?存儲(chǔ)?goroutine?調(diào)度上下文信息的結(jié)構(gòu)體 type?gobuf?struct?{//?The?offsets?of?sp,?pc,?and?g?are?known?to?(hard-coded?in)?libmach.////?ctxt?is?unusual?with?respect?to?GC:?it?may?be?a//?heap-allocated?funcval,?so?GC?needs?to?track?it,?but?it//?needs?to?be?set?and?cleared?from?assembly,?where?it's//?difficult?to?have?write?barriers.?However,?ctxt?is?really?a//?saved,?live?register,?and?we?only?ever?exchange?it?between//?the?real?register?and?the?gobuf.?Hence,?we?treat?it?as?a//?root?during?stack?scanning,?which?means?assembly?that?saves//?and?restores?it?doesn't?need?write?barriers.?It's?still//?typed?as?a?pointer?so?that?any?other?writes?from?Go?get//?write?barriers.sp???uintptr?//?Stack?Pointer?棧指針pc???uintptr?//?Program?Counter?程序計(jì)數(shù)器g????guintptr?//?持有當(dāng)前?gobuf?的?goroutinectxt?unsafe.Pointerret??sys.Uintreglr???uintptrbp???uintptr?//?for?GOEXPERIMENT=framepointer }執(zhí)行 runtime.execute(),進(jìn)而調(diào)用 runtime.gogo:
func?execute(gp?*g,?inheritTime?bool)?{_g_?:=?getg()//?Assign?gp.m?before?entering?_Grunning?so?running?Gs?have?an//?M._g_.m.curg?=?gpgp.m?=?_g_.mcasgstatus(gp,?_Grunnable,?_Grunning)gp.waitsince?=?0gp.preempt?=?falsegp.stackguard0?=?gp.stack.lo?+?_StackGuardif?!inheritTime?{_g_.m.p.ptr().schedtick++}//?Check?whether?the?profiler?needs?to?be?turned?on?or?off.hz?:=?sched.profilehzif?_g_.m.profilehz?!=?hz?{setThreadCPUProfiler(hz)}if?trace.enabled?{//?GoSysExit?has?to?happen?when?we?have?a?P,?but?before?GoStart.//?So?we?emit?it?here.if?gp.syscallsp?!=?0?&&?gp.sysblocktraced?{traceGoSysExit(gp.sysexitticks)}traceGoStart()}//?gp.sched?就是?gobufgogo(&gp.sched) }這里還需要了解一個(gè)概念:g0,Go G-P-M 調(diào)度模型中,g 代表 goroutine,而實(shí)際上一共有三種 g:
執(zhí)行用戶代碼的 g;
執(zhí)行調(diào)度器代碼的 g,也即是 g0;
執(zhí)行 runtime.main 初始化工作的 main goroutine;
第一種 g 就是使用 go 關(guān)鍵字啟動(dòng)的 goroutine,也是我們接觸最多的一類 g;第三種 g 是調(diào)度器啟動(dòng)之后用來執(zhí)行的一系列初始化工作的,包括但不限于啟動(dòng) sysmon 監(jiān)控線程、內(nèi)存初始化和啟動(dòng) GC 等等工作;第二種 g 叫 g0,用來執(zhí)行調(diào)度器代碼,g0 在底層和其他 g 是一樣的數(shù)據(jù)結(jié)構(gòu),但是性質(zhì)上有很大的區(qū)別,首先 g0 的棧大小是固定的,比如在 Linux 或者其他 Unix-like 的系統(tǒng)上一般是固定 8MB,不能動(dòng)態(tài)伸縮,而普通的 g 初始棧大小是 2KB,可按需擴(kuò)展,g0 其實(shí)就是線程棧,我們知道每個(gè)線程被創(chuàng)建出來之時(shí)都需要操作系統(tǒng)為之分配一個(gè)初始固定的線程棧,就是前面說的 8MB 大小的棧,g0 棧就代表了這個(gè)線程棧,因此每一個(gè) m 都需要綁定一個(gè) g0 來執(zhí)行調(diào)度器代碼,然后跳轉(zhuǎn)到執(zhí)行用戶代碼的地方。
runtime.gogo 是真正去執(zhí)行 goroutine 代碼的函數(shù),這個(gè)函數(shù)由匯編實(shí)現(xiàn),為什么需要用匯編?因?yàn)?gogo 的工作是完成線程 M 上的堆棧切換:從系統(tǒng)堆棧 g0 切換成 goroutine gp,也就是 CPU 使用權(quán)和堆棧的切換,這種切換本質(zhì)上是對(duì) CPU 的 PC、SP 等寄存器和堆棧指針的更新,而這一類精度的底層操作別說是 Go,就算是最貼近底層的 C 也無法做到,這種程度的操作已超出所有高級(jí)語言的范疇,因此只能借助于匯編來實(shí)現(xiàn)。
runtime.gogo 在不同的 CPU 架構(gòu)平臺(tái)上的實(shí)現(xiàn)各不相同,但是核心原理殊途同歸,我們這里選用 amd64 架構(gòu)的匯編實(shí)現(xiàn)來分析,我會(huì)在關(guān)鍵的地方加上解釋:
// func gogo(buf *gobuf) // restore state from Gobuf; longjmp TEXT runtime·gogo(SB), NOSPLIT, $16-8// 將第一個(gè) FP 偽寄存器所指向的 gobuf 的第一個(gè)參數(shù)存入 BX 寄存器, // gobuf 的一個(gè)參數(shù)即是 SP 指針MOVQ buf+0(FP), BXMOVQ gobuf_g(BX), DX // 將 gp.sched.g 保存到 DX 寄存器MOVQ 0(DX), CX // make sure g != nil// 將 tls (thread local storage) 保存到 CX 寄存器,然后把 gp.sched.g 放到 tls[0],// 這樣以后調(diào)用 getg() 之時(shí)就可以通過 TLS 直接獲取到當(dāng)前 goroutine 的 g 結(jié)構(gòu)體實(shí)例,// 進(jìn)而可以得到 g 所在的 m 和 p,TLS 里一開始存儲(chǔ)的是系統(tǒng)堆棧 g0 的地址get_tls(CX)MOVQ DX, g(CX)// 下面的指令則是對(duì)函數(shù)棧的 BP/SP 寄存器(指針)的存取,// 最后進(jìn)入到指定的代碼區(qū)域,執(zhí)行函數(shù)棧幀MOVQ gobuf_sp(BX), SP // restore SPMOVQ gobuf_ret(BX), AXMOVQ gobuf_ctxt(BX), DXMOVQ gobuf_bp(BX), BP// 這里是在清空 gp.sched,因?yàn)榍懊嬉呀?jīng)把 gobuf 里的字段值都存入了寄存器,// 所以 gp.sched 就可以提前清空了,不需要等到后面 GC 來回收,減輕 GC 的負(fù)擔(dān)MOVQ $0, gobuf_sp(BX) // clear to help garbage collectorMOVQ $0, gobuf_ret(BX)MOVQ $0, gobuf_ctxt(BX)MOVQ $0, gobuf_bp(BX)// 把 gp.sched.pc 值放入 BX 寄存器// PC 指針指向 gogo 退出時(shí)需要執(zhí)行的函數(shù)地址MOVQ gobuf_pc(BX), BX// 用 BX 寄存器里的值去修改 CPU 的 IP 寄存器,// 這樣就可以根據(jù) CS:IP 寄存器的段地址+偏移量跳轉(zhuǎn)到 BX 寄存器里的地址,也就是 gp.sched.pcJMP BXruntime.gogo 函數(shù)接收 gp.sched 這個(gè) gobuf 結(jié)構(gòu)體實(shí)例,其中保存了函數(shù)棧寄存器 SP/PC/BP,如果熟悉操作系統(tǒng)原理的話可以知道這些寄存器是 CPU 進(jìn)行函數(shù)調(diào)用和返回時(shí)切換對(duì)應(yīng)的函數(shù)棧幀所需的寄存器,而 goroutine 的執(zhí)行和函數(shù)調(diào)用的原理是一致的,也是 CPU 寄存器的切換過程,所以這里的幾個(gè)寄存器當(dāng)前存的就是 G 的函數(shù)執(zhí)行棧,當(dāng) goroutine 在處理網(wǎng)絡(luò) I/O 之時(shí),如果恰好處于 I/O 就緒的狀態(tài)的話,則正常完成 runtime.gogo,并在最后跳轉(zhuǎn)到特定的地址,那么這個(gè)地址是哪里呢?
我們知道 CPU 執(zhí)行函數(shù)的時(shí)候需要知道函數(shù)在內(nèi)存里的代碼段地址和偏移量,然后才能去取來函數(shù)棧執(zhí)行,而典型的提供代碼段地址和偏移量的寄存器就是 CS 和 IP 寄存器,而 JMP BX 指令則是用 BX 寄存器去更新 IP 寄存器,而 BX 寄存器里的值是 gp.sched.pc,那么這個(gè) PC 指針究竟是指向哪里呢?讓我們來看另一處源碼。
眾所周知,啟動(dòng)一個(gè)新的 goroutine 是通過 go 關(guān)鍵字來完成的,而 go compiler 會(huì)在編譯期間利用 cmd/compile/internal/gc.state.stmt 和 cmd/compile/internal/gc.state.call 這兩個(gè)函數(shù)將 go 關(guān)鍵字翻譯成 runtime.newproc 函數(shù)調(diào)用,而 runtime.newproc 接收了函數(shù)指針和其大小之后,會(huì)獲取 goroutine 和調(diào)用處的程序計(jì)數(shù)器,接著再調(diào)用 runtime.newproc1:
//?Create?a?new?g?in?state?_Grunnable,?starting?at?fn,?with?narg?bytes //?of?arguments?starting?at?argp.?callerpc?is?the?address?of?the?go //?statement?that?created?this.?The?caller?is?responsible?for?adding //?the?new?g?to?the?scheduler. // //?This?must?run?on?the?system?stack?because?it's?the?continuation?of //?newproc,?which?cannot?split?the?stack. // //go:systemstack func?newproc1(fn?*funcval,?argp?unsafe.Pointer,?narg?int32,?callergp?*g,?callerpc?uintptr)?*g?{...memclrNoHeapPointers(unsafe.Pointer(&newg.sched),?unsafe.Sizeof(newg.sched))newg.sched.sp?=?spnewg.stktopsp?=?sp//?把?goexit?函數(shù)地址存入?gobuf?的?PC?指針里newg.sched.pc?=?funcPC(goexit)?+?sys.PCQuantum?//?+PCQuantum?so?that?previous?instruction?is?in?same?functionnewg.sched.g?=?guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched,?fn)newg.gopc?=?callerpcnewg.ancestors?=?saveAncestors(callergp)newg.startpc?=?fn.fnif?_g_.m.curg?!=?nil?{newg.labels?=?_g_.m.curg.labels}if?isSystemGoroutine(newg,?false)?{atomic.Xadd(&sched.ngsys,?+1)}casgstatus(newg,?_Gdead,?_Grunnable)... }這里可以看到,newg.sched.pc 被設(shè)置了 runtime.goexit 的函數(shù)地址,newg 就是后面 runtime.gogo 執(zhí)行的 goroutine,因此 runtime.gogo 最后的匯編指令 JMP BX是跳轉(zhuǎn)到了 runtime.goexit,讓我們來繼續(xù)看看這個(gè)函數(shù)做了什么:
// The top-most function running on a goroutine // returns to goexit+PCQuantum. Defined as ABIInternal // so as to make it identifiable to traceback (this // function it used as a sentinel; traceback wants to // see the func PC, not a wrapper PC). TEXT runtime·goexit<ABIInternal>(SB),NOSPLIT,$0-0BYTE $0x90 // NOPCALL runtime·goexit1(SB) // does not return// traceback from goexit1 must hit code range of goexitBYTE $0x90 // NOP這個(gè)函數(shù)也是匯編實(shí)現(xiàn)的,但是非常簡(jiǎn)單,就是直接調(diào)用 runtime·goexit1:
//?Finishes?execution?of?the?current?goroutine. func?goexit1()?{if?raceenabled?{racegoend()}if?trace.enabled?{traceGoEnd()}mcall(goexit0) }調(diào)用 runtime.mcall函數(shù):
// func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g.// 切換回 g0 的系統(tǒng)堆棧,執(zhí)行 fn(g) TEXT runtime·mcall(SB), NOSPLIT, $0-8// 取入?yún)?funcval 對(duì)象的指針存入 DI 寄存器,此時(shí) fn.fn 是 goexit0 的地址MOVQ fn+0(FP), DIget_tls(CX)MOVQ g(CX), AX // save state in g->schedMOVQ 0(SP), BX // caller's PCMOVQ BX, (g_sched+gobuf_pc)(AX)LEAQ fn+0(FP), BX // caller's SPMOVQ BX, (g_sched+gobuf_sp)(AX)MOVQ AX, (g_sched+gobuf_g)(AX)MOVQ BP, (g_sched+gobuf_bp)(AX)// switch to m->g0 & its stack, call fnMOVQ g(CX), BXMOVQ g_m(BX), BX// 把 g0 的棧指針存入 SI 寄存器,后面需要用到MOVQ m_g0(BX), SICMPQ SI, AX // if g == m->g0 call badmcallJNE 3(PC)MOVQ $runtime·badmcall(SB), AXJMP AX// 這兩個(gè)指令是把 g0 地址存入到 TLS 里,// 然后從 SI 寄存器取出 g0 的棧指針,// 替換掉 SP 寄存器里存的當(dāng)前 g 的棧指針MOVQ SI, g(CX) // g = m->g0MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.spPUSHQ AXMOVQ DI, DX// 入口處的第一個(gè)指令已經(jīng)把 funcval 實(shí)例對(duì)象的指針存入了 DI 寄存器,// 0(DI) 表示取出 DI 的第一個(gè)成員,即 goexit0 函數(shù)地址,再存入 DIMOVQ 0(DI), DICALL DI // 調(diào)用 DI 寄存器里的地址,即 goexit0POPQ AXMOVQ $runtime·badmcall2(SB), AXJMP AXRET可以看到 runtime.mcall 函數(shù)的主要邏輯是從當(dāng)前 goroutine 切換回 g0 的系統(tǒng)堆棧,然后調(diào)用 fn(g),此處的 g 即是當(dāng)前運(yùn)行的 goroutine,這個(gè)方法會(huì)保存當(dāng)前運(yùn)行的 G 的 PC/SP 到 g->sched 里,以便該 G 可以在以后被重新恢復(fù)執(zhí)行,因?yàn)橐采婕暗郊拇嫫骱投褩V羔樀牟僮?#xff0c;所以也需要使用匯編實(shí)現(xiàn),該函數(shù)最后會(huì)在 g0 系統(tǒng)堆棧下執(zhí)行 runtime.goexit0:
func?goexit0(gp?*g)?{_g_?:=?getg()casgstatus(gp,?_Grunning,?_Gdead)if?isSystemGoroutine(gp,?false)?{atomic.Xadd(&sched.ngsys,?-1)}gp.m?=?nillocked?:=?gp.lockedm?!=?0gp.lockedm?=?0_g_.m.lockedg?=?0gp.preemptStop?=?falsegp.paniconfault?=?falsegp._defer?=?nil?//?should?be?true?already?but?just?in?case.gp._panic?=?nil?//?non-nil?for?Goexit?during?panic.?points?at?stack-allocated?data.gp.writebuf?=?nilgp.waitreason?=?0gp.param?=?nilgp.labels?=?nilgp.timer?=?nilif?gcBlackenEnabled?!=?0?&&?gp.gcAssistBytes?>?0?{//?Flush?assist?credit?to?the?global?pool.?This?gives//?better?information?to?pacing?if?the?application?is//?rapidly?creating?an?exiting?goroutines.scanCredit?:=?int64(gcController.assistWorkPerByte?*?float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit,?scanCredit)gp.gcAssistBytes?=?0}dropg()if?GOARCH?==?"wasm"?{?//?no?threads?yet?on?wasmgfput(_g_.m.p.ptr(),?gp)schedule()?//?never?returns}if?_g_.m.lockedInt?!=?0?{print("invalid?m->lockedInt?=?",?_g_.m.lockedInt,?"\n")throw("internal?lockOSThread?error")}gfput(_g_.m.p.ptr(),?gp)if?locked?{//?The?goroutine?may?have?locked?this?thread?because//?it?put?it?in?an?unusual?kernel?state.?Kill?it//?rather?than?returning?it?to?the?thread?pool.//?Return?to?mstart,?which?will?release?the?P?and?exit//?the?thread.if?GOOS?!=?"plan9"?{?//?See?golang.org/issue/22227.gogo(&_g_.m.g0.sched)}?else?{//?Clear?lockedExt?on?plan9?since?we?may?end?up?re-using//?this?thread._g_.m.lockedExt?=?0}}schedule() }runtime.goexit0 的主要工作是就是
利用 CAS 操作把 g 的狀態(tài)從 _Grunning 更新為 _Gdead;
對(duì) g 做一些清理操作,把一些字段值置空;
調(diào)用 runtime.dropg 解綁 g 和 m;
把 g 放入 p 存儲(chǔ) g 的 gfree 鏈表作為緩存,后續(xù)如果需要啟動(dòng)新的 goroutine 則可以直接從鏈表里取而不用重新初始化分配內(nèi)存。
最后,調(diào)用 runtime.schedule() 再次進(jìn)入調(diào)度循環(huán)去調(diào)度新的 goroutines,永不停歇。
另一方面,如果 goroutine 處于 I/O 不可用狀態(tài),我們前面已經(jīng)分析過 netpoller 利用非阻塞 I/O + I/O 多路復(fù)用避免了陷入系統(tǒng)調(diào)用,所以此時(shí)會(huì)調(diào)用 runtime.gopark 并把 goroutine 暫時(shí)封存在用戶態(tài)空間,并休眠當(dāng)前的 goroutine,因此不會(huì)阻塞 runtime.gogo 的匯編執(zhí)行,而是通過 runtime.mcall 調(diào)用 runtime.park_m:
func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.mcall(park_m) }func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }runtime.mcall 方法我們?cè)谇懊嬉呀?jīng)介紹過,它主要的工作就是是從當(dāng)前 goroutine 切換回 g0 的系統(tǒng)堆棧,然后調(diào)用 fn(g),而此時(shí) runtime.mcall 調(diào)用執(zhí)行的是 runtime.park_m,這個(gè)方法里會(huì)利用 CAS 把當(dāng)前運(yùn)行的 goroutine -- gp 的狀態(tài) 從 _Grunning 切換到 _Gwaiting,表明該 goroutine 已進(jìn)入到等待喚醒狀態(tài),此時(shí)封存和休眠 G 的操作就完成了,只需等待就緒之后被重新喚醒執(zhí)行即可。最后調(diào)用 runtime.schedule() 再次進(jìn)入調(diào)度循環(huán),去執(zhí)行下一個(gè) goroutine,充分利用 CPU。
至此,我們完成了對(duì) Go netpoller 原理剖析的整個(gè)閉環(huán)。
Go netpoller 的問題
Go netpoller 的設(shè)計(jì)不可謂不精巧、性能也不可謂不高,配合 goroutine 開發(fā)網(wǎng)絡(luò)應(yīng)用的時(shí)候就一個(gè)字:爽。因此 Go 的網(wǎng)絡(luò)編程模式是及其簡(jiǎn)潔高效的,然而,沒有任何一種設(shè)計(jì)和架構(gòu)是完美的, goroutine-per-connection 這種模式雖然簡(jiǎn)單高效,但是在某些極端的場(chǎng)景下也會(huì)暴露出問題:goroutine 雖然非常輕量,它的自定義棧內(nèi)存初始值僅為 2KB,后面按需擴(kuò)容;海量連接的業(yè)務(wù)場(chǎng)景下, goroutine-per-connection ,此時(shí) goroutine 數(shù)量以及消耗的資源就會(huì)呈線性趨勢(shì)暴漲,雖然 Go scheduler 內(nèi)部做了 g 的緩存鏈表,可以一定程度上緩解高頻創(chuàng)建銷毀 goroutine 的壓力,但是對(duì)于瞬時(shí)性暴漲的長(zhǎng)連接場(chǎng)景就無能為力了,大量的 goroutines 會(huì)被不斷創(chuàng)建出來,從而對(duì) Go runtime scheduler 造成極大的調(diào)度壓力和侵占系統(tǒng)資源,然后資源被侵占又反過來影響 Go scheduler 的調(diào)度,進(jìn)而導(dǎo)致性能下降。
Reactor 網(wǎng)絡(luò)模型
目前 Linux 平臺(tái)上主流的高性能網(wǎng)絡(luò)庫(kù)/框架中,大都采用 Reactor 模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。
Reactor 模式本質(zhì)上指的是使用 I/O 多路復(fù)用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O) 的模式。
通常設(shè)置一個(gè)主線程負(fù)責(zé)做 event-loop 事件循環(huán)和 I/O 讀寫,通過 select/poll/epoll_wait 等系統(tǒng)調(diào)用監(jiān)聽 I/O 事件,業(yè)務(wù)邏輯提交給其他工作線程去做。而所謂『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系統(tǒng)調(diào)用上,這樣可以最大限度的復(fù)用 event-loop 線程,讓一個(gè)線程能服務(wù)于多個(gè) sockets。在 Reactor 模式中,I/O 線程只能阻塞在 I/O multiplexing 函數(shù)上(select/poll/epoll_wait)。
Reactor 模式的基本工作流程如下:
Server 端完成在 bind&listen 之后,將 listenfd 注冊(cè)到 epollfd 中,最后進(jìn)入 event-loop 事件循環(huán)。循環(huán)過程中會(huì)調(diào)用 select/poll/epoll_wait 阻塞等待,若有在 listenfd 上的新連接事件則解除阻塞返回,并調(diào)用 socket.accept 接收新連接 connfd,并將 connfd 加入到 epollfd 的 I/O 復(fù)用(監(jiān)聽)隊(duì)列。
當(dāng) connfd 上發(fā)生可讀/可寫事件也會(huì)解除 select/poll/epoll_wait 的阻塞等待,然后進(jìn)行 I/O 讀寫操作,這里讀寫 I/O 都是非阻塞 I/O,這樣才不會(huì)阻塞 event-loop 的下一個(gè)循環(huán)。然而,這樣容易割裂業(yè)務(wù)邏輯,不易理解和維護(hù)。
調(diào)用 read 讀取數(shù)據(jù)之后進(jìn)行解碼并放入隊(duì)列中,等待工作線程處理。
工作線程處理完數(shù)據(jù)之后,返回到 event-loop 線程,由這個(gè)線程負(fù)責(zé)調(diào)用 write 把數(shù)據(jù)寫回 client。
accept 連接以及 conn 上的讀寫操作若是在主線程完成,則要求是非阻塞 I/O,因?yàn)?Reactor 模式一條最重要的原則就是:I/O 操作不能阻塞 event-loop 事件循環(huán)。實(shí)際上 event loop 可能也可以是多線程的,只是一個(gè)線程里只有一個(gè) select/poll/epoll_wait。
上面提到了 Go netpoller 在某些場(chǎng)景下可能因?yàn)閯?chuàng)建太多的 goroutine 而過多地消耗系統(tǒng)資源,而在現(xiàn)實(shí)世界的網(wǎng)絡(luò)業(yè)務(wù)中,服務(wù)器持有的海量連接中在極短的時(shí)間窗口內(nèi)只有極少數(shù)是 active 而大多數(shù)則是 idle,就像這樣(非真實(shí)數(shù)據(jù),僅僅是為了比喻):
那么為每一個(gè)連接指派一個(gè) goroutine 就顯得太過奢侈了,而 Reactor 模式這種利用 I/O 多路復(fù)用進(jìn)而只需要使用少量線程即可管理海量連接的設(shè)計(jì)就可以在這樣網(wǎng)絡(luò)業(yè)務(wù)中大顯身手了:
MultiReactors.png
在絕大部分應(yīng)用場(chǎng)景下,我推薦大家還是遵循 Go 的 best practices,使用原生的 Go 網(wǎng)絡(luò)庫(kù)來構(gòu)建自己的網(wǎng)絡(luò)應(yīng)用。然而,在某些極度追求性能、壓榨系統(tǒng)資源以及技術(shù)棧必須是原生 Go (不考慮 C/C++ 寫中間層而 Go 寫業(yè)務(wù)層)的業(yè)務(wù)場(chǎng)景下,我們可以考慮自己構(gòu)建 Reactor 網(wǎng)絡(luò)模型。
gnet
gnet 是一個(gè)基于事件驅(qū)動(dòng)的高性能和輕量級(jí)網(wǎng)絡(luò)框架。它直接使用 epoll 和 kqueue 系統(tǒng)調(diào)用而非標(biāo)準(zhǔn) Go 網(wǎng)絡(luò)包:net 來構(gòu)建網(wǎng)絡(luò)應(yīng)用,它的工作原理類似兩個(gè)開源的網(wǎng)絡(luò)庫(kù):netty 和 libuv,這也使得gnet 達(dá)到了一個(gè)遠(yuǎn)超 Go net 的性能表現(xiàn)。
gnet 設(shè)計(jì)開發(fā)的初衷不是為了取代 Go 的標(biāo)準(zhǔn)網(wǎng)絡(luò)庫(kù):net,而是為了創(chuàng)造出一個(gè)類似于 Redis、Haproxy 能高效處理網(wǎng)絡(luò)包的 Go 語言網(wǎng)絡(luò)服務(wù)器框架。
gnet 的賣點(diǎn)在于它是一個(gè)高性能、輕量級(jí)、非阻塞的純 Go 實(shí)現(xiàn)的傳輸層(TCP/UDP/Unix Domain Socket)網(wǎng)絡(luò)框架,開發(fā)者可以使用 gnet 來實(shí)現(xiàn)自己的應(yīng)用層網(wǎng)絡(luò)協(xié)議(HTTP、RPC、Redis、WebSocket 等等),從而構(gòu)建出自己的應(yīng)用層網(wǎng)絡(luò)應(yīng)用:比如在 gnet 上實(shí)現(xiàn) HTTP 協(xié)議就可以創(chuàng)建出一個(gè) HTTP 服務(wù)器 或者 Web 開發(fā)框架,實(shí)現(xiàn) Redis 協(xié)議就可以創(chuàng)建出自己的 Redis 服務(wù)器等等。
gnet,在某些極端的網(wǎng)絡(luò)業(yè)務(wù)場(chǎng)景,比如海量連接、高頻短連接、網(wǎng)絡(luò)小包等等場(chǎng)景,gnet 在性能和資源占用上都遠(yuǎn)超 Go 原生的 net 包(基于 netpoller)。
gnet 已經(jīng)實(shí)現(xiàn)了 Multi-Reactors 和 Multi-Reactors + Goroutine Pool 兩種網(wǎng)絡(luò)模型,也得益于這些網(wǎng)絡(luò)模型,使得 gnet 成為一個(gè)高性能和低損耗的 Go 網(wǎng)絡(luò)框架:
MultiReactors.png
multireactorsthreadpool.png???? 功能
[x] 高性能 的基于多線程/Go程網(wǎng)絡(luò)模型的 event-loop 事件驅(qū)動(dòng)
[x] 內(nèi)置 goroutine 池,由開源庫(kù) ants 提供支持
[x] 內(nèi)置 bytes 內(nèi)存池,由開源庫(kù) bytebufferpool 提供支持
[x] 整個(gè)生命周期是無鎖的
[x] 簡(jiǎn)單易用的 APIs
[x] 基于 Ring-Buffer 的高效且可重用的內(nèi)存 buffer
[x] 支持多種網(wǎng)絡(luò)協(xié)議/IPC 機(jī)制:TCP、UDP 和 Unix Domain Socket
[x] 支持多種負(fù)載均衡算法:Round-Robin(輪詢)、Source-Addr-Hash(源地址哈希) 和 Least-Connections(最少連接數(shù))
[x] 支持兩種事件驅(qū)動(dòng)機(jī)制:Linux 里的 epoll 以及 FreeBSD/DragonFly/Darwin 里的 kqueue
[x] 支持異步寫操作
[x] 靈活的事件定時(shí)器
[x] SO_REUSEPORT 端口重用
[x] 內(nèi)置多種編解碼器,支持對(duì) TCP 數(shù)據(jù)流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,參考自 netty codec,而且支持自定制編解碼器
[x] 支持 Windows 平臺(tái),基于 IOCP 事件驅(qū)動(dòng)機(jī)制 Go 標(biāo)準(zhǔn)網(wǎng)絡(luò)庫(kù)
[ ] 實(shí)現(xiàn) gnet 客戶端
參考&延伸閱讀
The Go netpoller
Nonblocking I/O
epoll(7) — Linux manual page
I/O Multiplexing: The select and poll Functions
The method to epoll’s madness
Scalable Go Scheduler Design Doc
Scheduling In Go : Part I - OS Scheduler
Scheduling In Go : Part II - Go Scheduler
Scheduling In Go : Part III - Concurrency
Goroutines, Nonblocking I/O, And Memory Usage
IO多路復(fù)用與Go網(wǎng)絡(luò)庫(kù)的實(shí)現(xiàn)
關(guān)于select函數(shù)中timeval和fd_set重新設(shè)置的問題
A Million WebSockets and Go
Going Infinite, handling 1M websockets connections in Go
字節(jié)跳動(dòng)在 Go 網(wǎng)絡(luò)庫(kù)上的實(shí)踐
總結(jié)
以上是生活随笔為你收集整理的Go netpoller 网络模型之源码全面解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最新开源:3TS腾讯事务处理技术验证系统
- 下一篇: 我在腾讯做运维--快速玩转蓝鲸社区版6.