日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

Go netpoller 网络模型之源码全面解析

發(fā)布時(shí)間:2024/2/28 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Go netpoller 网络模型之源码全面解析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

作者:allanpan,騰訊 IEG 后臺(tái)開發(fā)工程師

近兩萬(wàn)字長(zhǎng)文從 Linux 底層 Nonblocking I/O、 I/O multiplexing: select/epoll 以及 Go 源碼全方位剖析 Go 語(yǔ)言的網(wǎng)絡(luò)模型和底層實(shí)現(xiàn);最后介紹分析當(dāng)前主流的高性能開源網(wǎng)絡(luò)庫(kù)所使用的經(jīng)典 Reactors 模式,以及如何基于此實(shí)現(xiàn)一個(gè)?(在某些特定場(chǎng)景下)?比 Go 原生網(wǎng)絡(luò)庫(kù)性能更好的網(wǎng)絡(luò)庫(kù)。可能是全網(wǎng)最詳盡的 Go 網(wǎng)絡(luò)底層剖析文章,一文帶你完全吃透 Go 語(yǔ)言的網(wǎng)絡(luò)編程底層原理。

導(dǎo)言

Go 基于 I/O multiplexing 和 goroutine scheduler 構(gòu)建了一個(gè)簡(jiǎn)潔而高性能的原生網(wǎng)絡(luò)模型(基于 Go 的 I/O 多路復(fù)用 netpoller ),提供了 goroutine-per-connection 這樣簡(jiǎn)單的網(wǎng)絡(luò)編程模式。在這種模式下,開發(fā)者使用的是同步的模式去編寫異步的邏輯,極大地降低了開發(fā)者編寫網(wǎng)絡(luò)應(yīng)用時(shí)的心智負(fù)擔(dān),且借助于 Go runtime scheduler 對(duì) goroutines 的高效調(diào)度,這個(gè)原生網(wǎng)絡(luò)模型不論從適用性還是性能上都足以滿足絕大部分的應(yīng)用場(chǎng)景。

然而,在工程性上能做到如此高的普適性和兼容性,最終暴露給開發(fā)者提供接口/模式如此簡(jiǎn)潔,其底層必然是基于非常復(fù)雜的封裝,做了很多取舍,也有可能放棄了一些追求極致性能的設(shè)計(jì)和理念。事實(shí)上 Go netpoller 底層就是基于 epoll/kqueue/iocp 這些 I/O 多路復(fù)用技術(shù)來(lái)做封裝的,最終暴露出 goroutine-per-connection 這樣的極簡(jiǎn)的開發(fā)模式給使用者。

Go netpoller 在不同的操作系統(tǒng),其底層使用的 I/O 多路復(fù)用技術(shù)也不一樣,可以從 Go 源碼目錄結(jié)構(gòu)和對(duì)應(yīng)代碼文件了解 Go 在不同平臺(tái)下的網(wǎng)絡(luò) I/O 模式的實(shí)現(xiàn)。比如,在 Linux 系統(tǒng)下基于 epoll,freeBSD 系統(tǒng)下基于 kqueue,以及 Windows 系統(tǒng)下基于 iocp。

本文將基于 Linux 平臺(tái)來(lái)解析 Go netpoller 之 I/O 多路復(fù)用的底層是如何基于 epoll 封裝實(shí)現(xiàn)的,從源碼層層推進(jìn),全面而深度地解析 Go netpoller 的設(shè)計(jì)理念和實(shí)現(xiàn)原理,以及 Go 是如何利用 netpoller 來(lái)構(gòu)建它的原生網(wǎng)絡(luò)模型的。主要涉及到的一些概念:I/O 模型、用戶/內(nèi)核空間、epoll、Linux 源碼、goroutine scheduler 等等,我會(huì)盡量簡(jiǎn)單地講解,如果有對(duì)相關(guān)概念不熟悉的同學(xué),還是希望能提前熟悉一下。

用戶空間與內(nèi)核空間

現(xiàn)代操作系統(tǒng)都是采用虛擬存儲(chǔ)器,那么對(duì) 32 位操作系統(tǒng)而言,它的尋址空間(虛擬存儲(chǔ)空間)為 4G(2 的 32 次方)。操作系統(tǒng)的核心是內(nèi)核,獨(dú)立于普通的應(yīng)用程序,可以訪問受保護(hù)的內(nèi)存空間,也有訪問底層硬件設(shè)備的所有權(quán)限。為了保證用戶進(jìn)程不能直接操作內(nèi)核(kernel),保證內(nèi)核的安全,操心系統(tǒng)將虛擬空間劃分為兩部分,一部分為內(nèi)核空間,一部分為用戶空間。針對(duì) Linux 操作系統(tǒng)而言,將最高的 1G 字節(jié)(從虛擬地址 0xC0000000 到 0xFFFFFFFF),供內(nèi)核使用,稱為內(nèi)核空間,而將較低的 3G 字節(jié)(從虛擬地址 0x00000000 到 0xBFFFFFFF),供各個(gè)進(jìn)程使用,稱為用戶空間。

現(xiàn)代的網(wǎng)絡(luò)服務(wù)的主流已經(jīng)完成從 CPU 密集型到 IO 密集型的轉(zhuǎn)變,所以服務(wù)端程序?qū)?I/O 的處理必不可少,而一旦操作 I/O 則必定要在用戶態(tài)和內(nèi)核態(tài)之間來(lái)回切換。

I/O 模型

在神作《UNIX 網(wǎng)絡(luò)編程》里,總結(jié)歸納了 5 種 I/O 模型,包括同步和異步 I/O:

  • 阻塞 I/O (Blocking I/O)

  • 非阻塞 I/O (Nonblocking I/O)

  • I/O 多路復(fù)用 (I/O multiplexing)

  • 信號(hào)驅(qū)動(dòng) I/O (Signal driven I/O)

  • 異步 I/O (Asynchronous I/O)

操作系統(tǒng)上的 I/O 是用戶空間和內(nèi)核空間的數(shù)據(jù)交互,因此 I/O 操作通常包含以下兩個(gè)步驟:

  • 等待網(wǎng)絡(luò)數(shù)據(jù)到達(dá)網(wǎng)卡(讀就緒)/等待網(wǎng)卡可寫(寫就緒) –> 讀取/寫入到內(nèi)核緩沖區(qū)

  • 從內(nèi)核緩沖區(qū)復(fù)制數(shù)據(jù) –> 用戶空間(讀)/從用戶空間復(fù)制數(shù)據(jù) -> 內(nèi)核緩沖區(qū)(寫)

  • 而判定一個(gè) I/O 模型是同步還是異步,主要看第二步:數(shù)據(jù)在用戶和內(nèi)核空間之間復(fù)制的時(shí)候是不是會(huì)阻塞當(dāng)前進(jìn)程,如果會(huì),則是同步 I/O,否則,就是異步 I/O。基于這個(gè)原則,這 5 種 I/O 模型中只有一種異步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

    這 5 種 I/O 模型的對(duì)比如下:

    Non-blocking I/O

    什么叫非阻塞 I/O,顧名思義就是:所有 I/O 操作都是立刻返回而不會(huì)阻塞當(dāng)前用戶進(jìn)程。I/O 多路復(fù)用通常情況下需要和非阻塞 I/O 搭配使用,否則可能會(huì)產(chǎn)生意想不到的問題。比如,epoll 的 ET(邊緣觸發(fā)) 模式下,如果不使用非阻塞 I/O,有極大的概率會(huì)導(dǎo)致阻塞 event-loop 線程,從而降低吞吐量,甚至導(dǎo)致 bug。

    Linux 下,我們可以通過 fcntl 系統(tǒng)調(diào)用來(lái)設(shè)置 O_NONBLOCK 標(biāo)志位,從而把 socket 設(shè)置成 Non-blocking。當(dāng)對(duì)一個(gè) Non-blocking socket 執(zhí)行讀操作時(shí),流程是這個(gè)樣子:

    當(dāng)用戶進(jìn)程發(fā)出 read 操作時(shí),如果 kernel 中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會(huì) block 用戶進(jìn)程,而是立刻返回一個(gè) EAGAIN error。

    從用戶進(jìn)程角度講 ,它發(fā)起一個(gè) read 操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè) error 時(shí),它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是它可以再次發(fā)送 read 操作。一旦 kernel 中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的 system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存,然后返回。

    所以,Non-blocking I/O 的特點(diǎn)是用戶進(jìn)程需要不斷的主動(dòng)詢問 kernel 數(shù)據(jù)好了沒有。下一節(jié)我們要講的 I/O 多路復(fù)用需要和 Non-blocking I/O 配合才能發(fā)揮出最大的威力!

    I/O 多路復(fù)用

    所謂 I/O 多路復(fù)用指的就是 select/poll/epoll 這一系列的多路選擇器:支持單一線程同時(shí)監(jiān)聽多個(gè)文件描述符(I/O 事件),阻塞等待,并在其中某個(gè)文件描述符可讀寫時(shí)收到通知。I/O 復(fù)用其實(shí)復(fù)用的不是 I/O 連接,而是復(fù)用線程,讓一個(gè) thread of control 能夠處理多個(gè)連接(I/O 事件)。

    select & poll

    #include?<sys/select.h>/*?According?to?earlier?standards?*/ #include?<sys/time.h> #include?<sys/types.h> #include?<unistd.h>int?select(int?nfds,?fd_set?*readfds,?fd_set?*writefds,?fd_set?*exceptfds,?struct?timeval?*timeout);//?和 select 緊密結(jié)合的四個(gè)宏: void?FD_CLR(int?fd,?fd_set?*set); int?FD_ISSET(int?fd,?fd_set?*set); void?FD_SET(int?fd,?fd_set?*set); void?FD_ZERO(fd_set?*set);

    select 是 epoll 之前 Linux 使用的 I/O 事件驅(qū)動(dòng)技術(shù)。

    理解 select 的關(guān)鍵在于理解 fd_set,為說明方便,取 fd_set 長(zhǎng)度為 1 字節(jié),fd_set 中的每一 bit 可以對(duì)應(yīng)一個(gè)文件描述符 fd,則 1 字節(jié)長(zhǎng)的 fd_set 最大可以對(duì)應(yīng) 8 個(gè) fd。select 的調(diào)用過程如下:

  • 執(zhí)行 FD_ZERO(&set), 則 set 用位表示是 0000,0000

  • 若 fd=5, 執(zhí)行 FD_SET(fd, &set); 后 set 變?yōu)?0001,0000(第 5 位置為 1)

  • 再加入 fd=2, fd=1,則 set 變?yōu)?0001,0011

  • 執(zhí)行 select(6, &set, 0, 0, 0) 阻塞等待

  • 若 fd=1, fd=2 上都發(fā)生可讀事件,則 select 返回,此時(shí) set 變?yōu)?0000,0011 (注意:沒有事件發(fā)生的 fd=5 被清空)

  • 基于上面的調(diào)用過程,可以得出 select 的特點(diǎn):

    • 可監(jiān)控的文件描述符個(gè)數(shù)取決于 sizeof(fd_set) 的值。假設(shè)服務(wù)器上 sizeof(fd_set)=512,每 bit 表示一個(gè)文件描述符,則服務(wù)器上支持的最大文件描述符是 512*8=4096。fd_set 的大小調(diào)整可參考 【原創(chuàng)】技術(shù)系列之 網(wǎng)絡(luò)模型(二) 中的模型 2,可以有效突破 select 可監(jiān)控的文件描述符上限

    • 將 fd 加入 select 監(jiān)控集的同時(shí),還要再使用一個(gè)數(shù)據(jù)結(jié)構(gòu) array 保存放到 select 監(jiān)控集中的 fd,一是用于在 select 返回后,array 作為源數(shù)據(jù)和 fd_set 進(jìn)行 FD_ISSET 判斷。二是 select 返回后會(huì)把以前加入的但并無(wú)事件發(fā)生的 fd 清空,則每次開始 select 前都要重新從 array 取得 fd 逐一加入(FD_ZERO 最先),掃描 array 的同時(shí)取得 fd 最大值 maxfd,用于 select 的第一個(gè)參數(shù)

    • 可見 select 模型必須在 select 前循環(huán) array(加 fd,取 maxfd),select 返回后循環(huán) array(FD_ISSET 判斷是否有事件發(fā)生)

    所以,select 有如下的缺點(diǎn):

  • 最大并發(fā)數(shù)限制:使用 32 個(gè)整數(shù)的 32 位,即 32*32=1024 來(lái)標(biāo)識(shí) fd,雖然可修改,但是有以下第 2, 3 點(diǎn)的瓶頸

  • 每次調(diào)用 select,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài),這個(gè)開銷在 fd 很多時(shí)會(huì)很大

  • 性能衰減嚴(yán)重:每次 kernel 都需要線性掃描整個(gè) fd_set,所以隨著監(jiān)控的描述符 fd 數(shù)量增長(zhǎng),其 I/O 性能會(huì)線性下降

  • poll 的實(shí)現(xiàn)和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 結(jié)構(gòu)而不是 select 的 fd_set 結(jié)構(gòu),poll 解決了最大文件描述符數(shù)量限制的問題,但是同樣需要從用戶態(tài)拷貝所有的 fd 到內(nèi)核態(tài),也需要線性遍歷所有的 fd 集合,所以它和 select 只是實(shí)現(xiàn)細(xì)節(jié)上的區(qū)分,并沒有本質(zhì)上的區(qū)別。

    epoll

    epoll 是 Linux kernel 2.6 之后引入的新 I/O 事件驅(qū)動(dòng)技術(shù),I/O 多路復(fù)用的核心設(shè)計(jì)是 1 個(gè)線程處理所有連接的 等待消息準(zhǔn)備好 I/O 事件,這一點(diǎn)上 epoll 和 select&poll 是大同小異的。但 select&poll 錯(cuò)誤預(yù)估了一件事,當(dāng)數(shù)十萬(wàn)并發(fā)連接存在時(shí),可能每一毫秒只有數(shù)百個(gè)活躍的連接,同時(shí)其余數(shù)十萬(wàn)連接在這一毫秒是非活躍的。select&poll 的使用方法是這樣的:返回的活躍連接 == select(全部待監(jiān)控的連接) 。

    什么時(shí)候會(huì)調(diào)用 select&poll 呢?在你認(rèn)為需要找出有報(bào)文到達(dá)的活躍連接時(shí),就應(yīng)該調(diào)用。所以,select&poll 在高并發(fā)時(shí)是會(huì)被頻繁調(diào)用的。這樣,這個(gè)頻繁調(diào)用的方法就很有必要看看它是否有效率,因?yàn)?#xff0c;它的輕微效率損失都會(huì)被 高頻 二字所放大。它有效率損失嗎?顯而易見,全部待監(jiān)控連接是數(shù)以十萬(wàn)計(jì)的,返回的只是數(shù)百個(gè)活躍連接,這本身就是無(wú)效率的表現(xiàn)。被放大后就會(huì)發(fā)現(xiàn),處理并發(fā)上萬(wàn)個(gè)連接時(shí),select&poll 就完全力不從心了。這個(gè)時(shí)候就該 epoll 上場(chǎng)了,epoll 通過一些新的設(shè)計(jì)和優(yōu)化,基本上解決了 select&poll 的問題。

    epoll 的 API 非常簡(jiǎn)潔,涉及到的只有 3 個(gè)系統(tǒng)調(diào)用:

    #include?<sys/epoll.h>?? int?epoll_create(int?size);?//?int?epoll_create1(int?flags); int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event); int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);

    其中,epoll_create 創(chuàng)建一個(gè) epoll 實(shí)例并返回 epollfd;epoll_ctl 注冊(cè) file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 實(shí)例上;epoll_wait 則是阻塞監(jiān)聽 epoll 實(shí)例上所有的 file descriptor 的 I/O 事件,它接收一個(gè)用戶空間上的一塊內(nèi)存地址 (events 數(shù)組),kernel 會(huì)在有 I/O 事件發(fā)生的時(shí)候把文件描述符列表復(fù)制到這塊內(nèi)存地址上,然后 epoll_wait 解除阻塞并返回,最后用戶空間上的程序就可以對(duì)相應(yīng)的 fd 進(jìn)行讀寫了:

    #include?<unistd.h> ssize_t?read(int?fd,?void?*buf,?size_t?count); ssize_t?write(int?fd,?const?void?*buf,?size_t?count);

    epoll 的工作原理如下:

    與 select&poll 相比,epoll 分清了高頻調(diào)用和低頻調(diào)用。例如,epoll_ctl 相對(duì)來(lái)說就是非頻繁調(diào)用的,而 epoll_wait 則是會(huì)被高頻調(diào)用的。所以 epoll 利用 epoll_ctl 來(lái)插入或者刪除一個(gè) fd,實(shí)現(xiàn)用戶態(tài)到內(nèi)核態(tài)的數(shù)據(jù)拷貝,這確保了每一個(gè) fd 在其生命周期只需要被拷貝一次,而不是每次調(diào)用 epoll_wait 的時(shí)候都拷貝一次。epoll_wait 則被設(shè)計(jì)成幾乎沒有入?yún)⒌恼{(diào)用,相比 select&poll 需要把全部監(jiān)聽的 fd 集合從用戶態(tài)拷貝至內(nèi)核態(tài)的做法,epoll 的效率就高出了一大截。

    在實(shí)現(xiàn)上 epoll 采用紅黑樹來(lái)存儲(chǔ)所有監(jiān)聽的 fd,而紅黑樹本身插入和刪除性能比較穩(wěn)定,時(shí)間復(fù)雜度 O(logN)。通過 epoll_ctl 函數(shù)添加進(jìn)來(lái)的 fd 都會(huì)被放在紅黑樹的某個(gè)節(jié)點(diǎn)內(nèi),所以,重復(fù)添加是沒有用的。當(dāng)把 fd 添加進(jìn)來(lái)的時(shí)候時(shí)候會(huì)完成關(guān)鍵的一步:該 fd 會(huì)與相應(yīng)的設(shè)備(網(wǎng)卡)驅(qū)動(dòng)程序建立回調(diào)關(guān)系,也就是在內(nèi)核中斷處理程序?yàn)樗?cè)一個(gè)回調(diào)函數(shù),在 fd 相應(yīng)的事件觸發(fā)(中斷)之后(設(shè)備就緒了),內(nèi)核就會(huì)調(diào)用這個(gè)回調(diào)函數(shù),該回調(diào)函數(shù)在內(nèi)核中被稱為:ep_poll_callback ,這個(gè)回調(diào)函數(shù)其實(shí)就是把這個(gè) fd 添加到 rdllist 這個(gè)雙向鏈表(就緒鏈表)中。epoll_wait 實(shí)際上就是去檢查 rdllist 雙向鏈表中是否有就緒的 fd,當(dāng) rdllist 為空(無(wú)就緒 fd)時(shí)掛起當(dāng)前進(jìn)程,直到 rdllist 非空時(shí)進(jìn)程才被喚醒并返回。

    相比于 select&poll 調(diào)用時(shí)會(huì)將全部監(jiān)聽的 fd 從用戶態(tài)空間拷貝至內(nèi)核態(tài)空間并線性掃描一遍找出就緒的 fd 再返回到用戶態(tài),epoll_wait 則是直接返回已就緒 fd,因此 epoll 的 I/O 性能不會(huì)像 select&poll 那樣隨著監(jiān)聽的 fd 數(shù)量增加而出現(xiàn)線性衰減,是一個(gè)非常高效的 I/O 事件驅(qū)動(dòng)技術(shù)。

    由于使用 epoll 的 I/O 多路復(fù)用需要用戶進(jìn)程自己負(fù)責(zé) I/O 讀寫,從用戶進(jìn)程的角度看,讀寫過程是阻塞的,所以 select&poll&epoll 本質(zhì)上都是同步 I/O 模型,而像 Windows 的 IOCP 這一類的異步 I/O,只需要在調(diào)用 WSARecv 或 WSASend 方法讀寫數(shù)據(jù)的時(shí)候把用戶空間的內(nèi)存 buffer 提交給 kernel,kernel 負(fù)責(zé)數(shù)據(jù)在用戶空間和內(nèi)核空間拷貝,完成之后就會(huì)通知用戶進(jìn)程,整個(gè)過程不需要用戶進(jìn)程參與,所以是真正的異步 I/O。

    延伸

    另外,我看到有些文章說 epoll 之所以性能高是因?yàn)槔昧?Linux 的 mmap 內(nèi)存映射讓內(nèi)核和用戶進(jìn)程共享了一片物理內(nèi)存,用來(lái)存放就緒 fd 列表和它們的數(shù)據(jù) buffer,所以用戶進(jìn)程在 epoll_wait 返回之后用戶進(jìn)程就可以直接從共享內(nèi)存那里讀取/寫入數(shù)據(jù)了,這讓我很疑惑,因?yàn)槭紫瓤?epoll_wait 的函數(shù)聲明:

    int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);

    第二個(gè)參數(shù):就緒事件列表,是需要在用戶空間分配內(nèi)存然后再傳給 epoll_wait 的,如果內(nèi)核會(huì)用 mmap 設(shè)置共享內(nèi)存,直接傳遞一個(gè)指針進(jìn)去就行了,根本不需要在用戶態(tài)分配內(nèi)存,多此一舉。其次,內(nèi)核和用戶進(jìn)程通過 mmap 共享內(nèi)存是一件極度危險(xiǎn)的事情,內(nèi)核無(wú)法確定這塊共享內(nèi)存什么時(shí)候會(huì)被回收,而且這樣也會(huì)賦予用戶進(jìn)程直接操作內(nèi)核數(shù)據(jù)的權(quán)限和入口,非常容易出現(xiàn)大的系統(tǒng)漏洞,因此一般極少會(huì)這么做。所以我很懷疑 epoll 是不是真的在 Linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 Linux kernel 源碼:

    /**?Implement?the?event?wait?interface?for?the?eventpoll?file.?It?is?the?kernel*?part?of?the?user?space?epoll_wait(2).*/ static?int?do_epoll_wait(int?epfd,?struct?epoll_event?__user?*events,int?maxevents,?int?timeout) {.../*?Time?to?fish?for?events?...?*/error?=?ep_poll(ep,?events,?maxevents,?timeout); }//?如果?epoll_wait?入?yún)r(shí)設(shè)定?timeout?==?0,?那么直接通過?ep_events_available?判斷當(dāng)前是否有用戶感興趣的事件發(fā)生,如果有則通過?ep_send_events?進(jìn)行處理 //?如果設(shè)置 timeout >?0,并且當(dāng)前沒有用戶關(guān)注的事件發(fā)生,則進(jìn)行休眠,并添加到 ep->wq 等待隊(duì)列的頭部;對(duì)等待事件描述符設(shè)置 WQ_FLAG_EXCLUSIVE 標(biāo)志 //?ep_poll?被事件喚醒后會(huì)重新檢查是否有關(guān)注事件,如果對(duì)應(yīng)的事件已經(jīng)被搶走,那么?ep_poll?會(huì)繼續(xù)休眠等待 static?int?ep_poll(struct?eventpoll?*ep,?struct?epoll_event?__user?*events,?int?maxevents,?long?timeout) {...send_events:/**?Try?to?transfer?events?to?user?space.?In?case?we?get?0?events?and*?there's?still?timeout?left?over,?we?go?trying?again?in?search?of*?more?luck.*///?如果一切正常,?有?event?發(fā)生,?就開始準(zhǔn)備數(shù)據(jù)?copy?給用戶空間了//?如果有就緒的事件發(fā)生,那么就調(diào)用?ep_send_events?將就緒的事件?copy?到用戶態(tài)內(nèi)存中,//?然后返回到用戶態(tài),否則判斷是否超時(shí),如果沒有超時(shí)就繼續(xù)等待就緒事件發(fā)生,如果超時(shí)就返回用戶態(tài)。//?從?ep_poll?函數(shù)的實(shí)現(xiàn)可以看到,如果有就緒事件發(fā)生,則調(diào)用?ep_send_events?函數(shù)做進(jìn)一步處理if?(!res?&&?eavail?&&!(res?=?ep_send_events(ep,?events,?maxevents))?&&?!timed_out)goto?fetch_events;... }//?ep_send_events?函數(shù)是用來(lái)向用戶空間拷貝就緒?fd?列表的,它將用戶傳入的就緒?fd?列表內(nèi)存簡(jiǎn)單封裝到 // ep_send_events_data 結(jié)構(gòu)中,然后調(diào)用 ep_scan_ready_list 將就緒隊(duì)列中的事件寫入用戶空間的內(nèi)存; //?用戶進(jìn)程就可以訪問到這些數(shù)據(jù)進(jìn)行處理 static?int?ep_send_events(struct?eventpoll?*ep,struct?epoll_event?__user?*events,?int?maxevents) {struct?ep_send_events_data?esed;esed.maxevents?=?maxevents;esed.events?=?events;//?調(diào)用?ep_scan_ready_list?函數(shù)檢查?epoll?實(shí)例?eventpoll?中的?rdllist?就緒鏈表,//?并注冊(cè)一個(gè)回調(diào)函數(shù)?ep_send_events_proc,如果有就緒?fd,則調(diào)用?ep_send_events_proc?進(jìn)行處理ep_scan_ready_list(ep,?ep_send_events_proc,?&esed,?0,?false);return?esed.res; }//?調(diào)用?ep_scan_ready_list?的時(shí)候會(huì)傳遞指向?ep_send_events_proc?函數(shù)的函數(shù)指針作為回調(diào)函數(shù), //?一旦有就緒?fd,就會(huì)調(diào)用?ep_send_events_proc?函數(shù) static?__poll_t?ep_send_events_proc(struct?eventpoll?*ep,?struct?list_head?*head,?void?*priv) {.../**?If?the?event?mask?intersect?the?caller-requested?one,*?deliver?the?event?to?userspace.?Again,?ep_scan_ready_list()*?is?holding?ep->mtx,?so?no?operations?coming?from?userspace*?can?change?the?item.*/revents?=?ep_item_poll(epi,?&pt,?1);//?如果?revents?為?0,說明沒有就緒的事件,跳過,否則就將就緒事件拷貝到用戶態(tài)內(nèi)存中if?(!revents)continue;//?將當(dāng)前就緒的事件和用戶進(jìn)程傳入的數(shù)據(jù)都通過?__put_user?拷貝回用戶空間,//?也就是調(diào)用?epoll_wait?之時(shí)用戶進(jìn)程傳入的?fd?列表的內(nèi)存if?(__put_user(revents,?&uevent->events)?||?__put_user(epi->event.data,?&uevent->data))?{list_add(&epi->rdllink,?head);ep_pm_stay_awake(epi);if?(!esed->res)esed->res?=?-EFAULT;return?0;}... }

    從 do_epoll_wait 開始層層跳轉(zhuǎn),我們可以很清楚地看到最后內(nèi)核是通過 __put_user 函數(shù)把就緒 fd 列表和事件返回到用戶空間,而 __put_user 正是內(nèi)核用來(lái)拷貝數(shù)據(jù)到用戶空間的標(biāo)準(zhǔn)函數(shù)。此外,我并沒有在 Linux kernel 的源碼中和 epoll 相關(guān)的代碼里找到 mmap 系統(tǒng)調(diào)用做內(nèi)存映射的邏輯,所以基本可以得出結(jié)論:epoll 在 Linux kernel 里并沒有使用 mmap 來(lái)做用戶空間和內(nèi)核空間的內(nèi)存共享,所以那些說 epoll 使用了 mmap 的文章都是誤解。

    Go netpoller 核心

    Go netpoller 基本原理

    Go netpoller 通過在底層對(duì) epoll/kqueue/iocp 的封裝,從而實(shí)現(xiàn)了使用同步編程模式達(dá)到異步執(zhí)行的效果。總結(jié)來(lái)說,所有的網(wǎng)絡(luò)操作都以網(wǎng)絡(luò)描述符 netFD 為中心實(shí)現(xiàn)。netFD 與底層 PollDesc 結(jié)構(gòu)綁定,當(dāng)在一個(gè) netFD 上讀寫遇到 EAGAIN 錯(cuò)誤時(shí),就將當(dāng)前 goroutine 存儲(chǔ)到這個(gè) netFD 對(duì)應(yīng)的 PollDesc 中,同時(shí)調(diào)用 gopark 把當(dāng)前 goroutine 給 park 住,直到這個(gè) netFD 上再次發(fā)生讀寫事件,才將此 goroutine 給 ready 激活重新運(yùn)行。顯然,在底層通知 goroutine 再次發(fā)生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅(qū)動(dòng)機(jī)制。

    總所周知,Go 是一門跨平臺(tái)的編程語(yǔ)言,而不同平臺(tái)針對(duì)特定的功能有不用的實(shí)現(xiàn),這當(dāng)然也包括了 I/O 多路復(fù)用技術(shù),比如 Linux 里的 I/O 多路復(fù)用有 select、poll 和 epoll,而 freeBSD 或者 MacOS 里則是 kqueue,而 Windows 里則是基于異步 I/O 實(shí)現(xiàn)的 iocp,等等;因此,Go 為了實(shí)現(xiàn)底層 I/O 多路復(fù)用的跨平臺(tái),分別基于上述的這些不同平臺(tái)的系統(tǒng)調(diào)用實(shí)現(xiàn)了多版本的 netpollers,具體的源碼路徑如下:

    • src/runtime/netpoll_epoll.go

    • src/runtime/netpoll_kqueue.go

    • src/runtime/netpoll_solaris.go

    • src/runtime/netpoll_windows.go

    • src/runtime/netpoll_aix.go

    • src/runtime/netpoll_fake.go

    本文的解析基于 epoll 版本,如果讀者對(duì)其他平臺(tái)的 netpoller 底層實(shí)現(xiàn)感興趣,可以在閱讀完本文后自行翻閱其他 netpoller 源碼,所有實(shí)現(xiàn)版本的機(jī)制和原理基本類似,所以了解了 epoll 版本的實(shí)現(xiàn)后再去學(xué)習(xí)其他版本實(shí)現(xiàn)應(yīng)該沒什么障礙。

    接下來(lái)讓我們通過分析最新的 Go 源碼(v1.15.3),全面剖析一下整個(gè) Go netpoller 的運(yùn)行機(jī)制和流程。

    數(shù)據(jù)結(jié)構(gòu)

    netFD

    net.Listen("tcp", ":8888") 方法返回了一個(gè) *TCPListener,它是一個(gè)實(shí)現(xiàn)了 net.Listener 接口的 struct,而通過 listener.Accept() 接收的新連接 *TCPConn 則是一個(gè)實(shí)現(xiàn)了 net.Conn 接口的 struct,它內(nèi)嵌了 net.conn struct。仔細(xì)閱讀上面的源碼可以發(fā)現(xiàn),不管是 Listener 的 Accept 還是 Conn 的 Read/Write 方法,都是基于一個(gè) netFD 的數(shù)據(jù)結(jié)構(gòu)的操作, netFD 是一個(gè)網(wǎng)絡(luò)描述符,類似于 Linux 的文件描述符的概念,netFD 中包含一個(gè) poll.FD 數(shù)據(jù)結(jié)構(gòu),而 poll.FD 中包含兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu) Sysfd 和 pollDesc,前者是真正的系統(tǒng)文件描述符,后者對(duì)是底層事件驅(qū)動(dòng)的封裝,所有的讀寫超時(shí)等操作都是通過調(diào)用后者的對(duì)應(yīng)方法實(shí)現(xiàn)的。

    netFD 和 poll.FD 的源碼:

    //?Network?file?descriptor. type?netFD?struct?{pfd?poll.FD//?immutable?until?Closefamily??????intsotype??????intisConnected?bool?//?handshake?completed?or?use?of?association?with?peernet?????????stringladdr???????Addrraddr???????Addr }//?FD?is?a?file?descriptor.?The?net?and?os?packages?use?this?type?as?a //?field?of?a?larger?type?representing?a?network?connection?or?OS?file. type?FD?struct?{//?Lock?sysfd?and?serialize?access?to?Read?and?Write?methods.fdmu?fdMutex//?System?file?descriptor.?Immutable?until?Close.Sysfd?int//?I/O?poller.pd?pollDesc//?Writev?cache.iovecs?*[]syscall.Iovec//?Semaphore?signaled?when?file?is?closed.csema?uint32//?Non-zero?if?this?file?has?been?set?to?blocking?mode.isBlocking?uint32//?Whether?this?is?a?streaming?descriptor,?as?opposed?to?a//?packet-based?descriptor?like?a?UDP?socket.?Immutable.IsStream?bool//?Whether?a?zero?byte?read?indicates?EOF.?This?is?false?for?a//?message?based?socket?connection.ZeroReadIsEOF?bool//?Whether?this?is?a?file?rather?than?a?network?socket.isFile?bool }

    pollDesc

    前面提到了 pollDesc 是底層事件驅(qū)動(dòng)的封裝,netFD 通過它來(lái)完成各種 I/O 相關(guān)的操作,它的定義如下:

    type?pollDesc?struct?{runtimeCtx?uintptr }

    這里的 struct 只包含了一個(gè)指針,而通過 pollDesc 的 init 方法,我們可以找到它具體的定義是在 runtime.pollDesc 這里:

    func?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }//?Network?poller?descriptor. // //?No?heap?pointers. // //go:notinheap type?pollDesc?struct?{link?*pollDesc?//?in?pollcache,?protected?by?pollcache.lock//?The?lock?protects?pollOpen,?pollSetDeadline,?pollUnblock?and?deadlineimpl?operations.//?This?fully?covers?seq,?rt?and?wt?variables.?fd?is?constant?throughout?the?PollDesc?lifetime.//?pollReset,?pollWait,?pollWaitCanceled?and?runtime·netpollready?(IO?readiness?notification)//?proceed?w/o?taking?the?lock.?So?closing,?everr,?rg,?rd,?wg?and?wd?are?manipulated//?in?a?lock-free?way?by?all?operations.//?NOTE(dvyukov):?the?following?code?uses?uintptr?to?store?*g?(rg/wg),//?that?will?blow?up?when?GC?starts?moving?objects.lock????mutex?//?protects?the?following?fieldsfd??????uintptrclosing?booleverr???bool????//?marks?event?scanning?error?happeneduser????uint32??//?user?settable?cookierseq????uintptr?//?protects?from?stale?read?timersrg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?read?or?nilrt??????timer???//?read?deadline?timer?(set?if?rt.f?!=?nil)rd??????int64???//?read?deadlinewseq????uintptr?//?protects?from?stale?write?timerswg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?write?or?nilwt??????timer???//?write?deadline?timerwd??????int64???//?write?deadline }

    這里重點(diǎn)關(guān)注里面的 rg 和 wg,這里兩個(gè) uintptr "萬(wàn)能指針"類型,取值分別可能是 pdReady、pdWait、等待 file descriptor 就緒的 goroutine 也就是 g 數(shù)據(jù)結(jié)構(gòu)以及 nil,它們是實(shí)現(xiàn)喚醒 goroutine 的關(guān)鍵。

    runtime.pollDesc 包含自身類型的一個(gè)指針,用來(lái)保存下一個(gè) runtime.pollDesc 的地址,以此來(lái)實(shí)現(xiàn)鏈表,可以減少數(shù)據(jù)結(jié)構(gòu)的大小,所有的 runtime.pollDesc 保存在 runtime.pollCache 結(jié)構(gòu)中,定義如下:

    type?pollCache?struct?{lock??mutexfirst?*pollDesc//?PollDesc?objects?must?be?type-stable,//?because?we?can?get?ready?notification?from?epoll/kqueue//?after?the?descriptor?is?closed/reused.//?Stale?notifications?are?detected?using?seq?variable,//?seq?is?incremented?when?deadlines?are?changed?or?descriptor?is?reused. }

    因?yàn)?runtime.pollCache 是一個(gè)在 runtime 包里的全局變量,因此需要用一個(gè)互斥鎖來(lái)避免 data race 問題,從它的名字也能看出這是一個(gè)用于緩存的數(shù)據(jù)結(jié)構(gòu),也就是用來(lái)提高性能的,具體如何實(shí)現(xiàn)呢?

    const?pollBlockSize?=?4?*?1024func?(c?*pollCache)?alloc()?*pollDesc?{lock(&c.lock)if?c.first?==?nil?{const?pdSize?=?unsafe.Sizeof(pollDesc{})n?:=?pollBlockSize?/?pdSizeif?n?==?0?{n?=?1}//?Must?be?in?non-GC?memory?because?can?be?referenced//?only?from?epoll/kqueue?internals.mem?:=?persistentalloc(n*pdSize,?0,?&memstats.other_sys)for?i?:=?uintptr(0);?i?<?n;?i++?{pd?:=?(*pollDesc)(add(mem,?i*pdSize))pd.link?=?c.firstc.first?=?pd}}pd?:=?c.firstc.first?=?pd.linklockInit(&pd.lock,?lockRankPollDesc)unlock(&c.lock)return?pd }

    Go runtime 會(huì)在調(diào)用 poll_runtime_pollOpen 往 epoll 實(shí)例注冊(cè) fd 之時(shí)首次調(diào)用 runtime.pollCache.alloc方法時(shí)批量初始化大小 4KB 的 runtime.pollDesc 結(jié)構(gòu)體的鏈表,初始化過程中會(huì)調(diào)用 runtime.persistentalloc 來(lái)為這些數(shù)據(jù)結(jié)構(gòu)分配不會(huì)被 GC 回收的內(nèi)存,確保這些數(shù)據(jù)結(jié)構(gòu)只能被 epoll和kqueue 在內(nèi)核空間去引用。

    再往后每次調(diào)用這個(gè)方法則會(huì)先判斷鏈表頭是否已經(jīng)分配過值了,若是,則直接返回表頭這個(gè) pollDesc,這種批量初始化數(shù)據(jù)進(jìn)行緩存而后每次都直接從緩存取數(shù)據(jù)的方式是一種很常見的性能優(yōu)化手段,在這里這種方式可以有效地提升 netpoller 的吞吐量。

    Go runtime 會(huì)在關(guān)閉 pollDesc 之時(shí)調(diào)用 runtime.pollCache.free 釋放內(nèi)存:

    func?(c?*pollCache)?free(pd?*pollDesc)?{lock(&c.lock)pd.link?=?c.firstc.first?=?pdunlock(&c.lock) }

    實(shí)現(xiàn)原理

    使用 Go 編寫一個(gè)典型的 TCP echo server:

    package?mainimport?("log""net" )func?main()?{listen,?err?:=?net.Listen("tcp",?":8888")if?err?!=?nil?{log.Println("listen?error:?",?err)return}for?{conn,?err?:=?listen.Accept()if?err?!=?nil?{log.Println("accept?error:?",?err)break}//?start?a?new?goroutine?to?handle?the?new?connection.go?HandleConn(conn)} }func?HandleConn(conn?net.Conn)?{defer?conn.Close()packet?:=?make([]byte,?1024)for?{//?block?here?if?socket?is?not?available?for?reading?data.n,?err?:=?conn.Read(packet)if?err?!=?nil?{log.Println("read?socket?error:?",?err)return}//?same?as?above,?block?here?if?socket?is?not?available?for?writing._,?_?=?conn.Write(packet[:n])} }

    上面是一個(gè)基于 Go 原生網(wǎng)絡(luò)模型(基于 netpoller)編寫的一個(gè) TCP server,模式是 goroutine-per-connection ,在這種模式下,開發(fā)者使用的是同步的模式去編寫異步的邏輯而且對(duì)于開發(fā)者來(lái)說 I/O 是否阻塞是無(wú)感知的,也就是說開發(fā)者無(wú)需考慮 goroutines 甚至更底層的線程、進(jìn)程的調(diào)度和上下文切換。而 Go netpoller 最底層的事件驅(qū)動(dòng)技術(shù)肯定是基于 epoll/kqueue/iocp 這一類的 I/O 事件驅(qū)動(dòng)技術(shù),只不過是把這些調(diào)度和上下文切換的工作轉(zhuǎn)移到了 runtime 的 Go scheduler,讓它來(lái)負(fù)責(zé)調(diào)度 goroutines,從而極大地降低了程序員的心智負(fù)擔(dān)!

    Go 的這種同步模式的網(wǎng)絡(luò)服務(wù)器的基本架構(gòu)通常如下:

    上面的示例代碼中相關(guān)的在源碼里的幾個(gè)數(shù)據(jù)結(jié)構(gòu)和方法:

    //?TCPListener?is?a?TCP?network?listener.?Clients?should?typically //?use?variables?of?type?Listener?instead?of?assuming?TCP. type?TCPListener?struct?{fd?*netFDlc?ListenConfig }//?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }//?TCPConn?is?an?implementation?of?the?Conn?interface?for?TCP?network //?connections. type?TCPConn?struct?{conn }//?Conn type?conn?struct?{fd?*netFD }type?conn?struct?{fd?*netFD }func?(c?*conn)?ok()?bool?{?return?c?!=?nil?&&?c.fd?!=?nil?}//?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }//?Write?implements?the?Conn?Write?method. func?(c?*conn)?Write(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Write(b)if?err?!=?nil?{err?=?&OpError{Op:?"write",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }

    net.Listen

    調(diào)用 net.Listen 之后,底層會(huì)通過 Linux 的系統(tǒng)調(diào)用 socket 方法創(chuàng)建一個(gè) fd 分配給 listener,并用以來(lái)初始化 listener 的 netFD ,接著調(diào)用 netFD 的 listenStream 方法完成對(duì) socket 的 bind&listen 操作以及對(duì) netFD 的初始化(主要是對(duì) netFD 里的 pollDesc 的初始化),調(diào)用鏈?zhǔn)?runtime.runtime_pollServerInit --> runtime.poll_runtime_pollServerInit --> runtime.netpollGenericInit,主要做的事情是:

  • 調(diào)用 epollcreate1 創(chuàng)建一個(gè) epoll 實(shí)例 epfd,作為整個(gè) runtime 的唯一 event-loop 使用;

  • 調(diào)用 runtime.nonblockingPipe 創(chuàng)建一個(gè)用于和 epoll 實(shí)例通信的管道,這里為什么不用更新且更輕量的 eventfd 呢?我個(gè)人猜測(cè)是為了兼容更多以及更老的系統(tǒng)版本;

  • 將 netpollBreakRd 通知信號(hào)量封裝成 epollevent 事件結(jié)構(gòu)體注冊(cè)進(jìn) epoll 實(shí)例。

  • 相關(guān)源碼如下:

    //?調(diào)用?linux?系統(tǒng)調(diào)用?socket?創(chuàng)建?listener?fd?并設(shè)置為為阻塞?I/O s,?err?:=?socketFunc(family,?sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC,?proto) //?On?Linux?the?SOCK_NONBLOCK?and?SOCK_CLOEXEC?flags?were //?introduced?in?2.6.27?kernel?and?on?FreeBSD?both?flags?were //?introduced?in?10?kernel.?If?we?get?an?EINVAL?error?on?Linux //?or?EPROTONOSUPPORT?error?on?FreeBSD,?fall?back?to?using //?socket?without?them.socketFunc????????func(int,?int,?int)?(int,?error)??=?syscall.Socket//?用上面創(chuàng)建的?listener?fd?初始化?listener?netFD if?fd,?err?=?newFD(s,?family,?sotype,?net);?err?!=?nil?{poll.CloseFunc(s)return?nil,?err }//?對(duì)?listener?fd?進(jìn)行?bind&listen?操作,并且調(diào)用?init?方法完成初始化 func?(fd?*netFD)?listenStream(laddr?sockaddr,?backlog?int,?ctrlFn?func(string,?string,?syscall.RawConn)?error)?error?{...//?完成綁定操作if?err?=?syscall.Bind(fd.pfd.Sysfd,?lsa);?err?!=?nil?{return?os.NewSyscallError("bind",?err)}//?完成監(jiān)聽操作if?err?=?listenFunc(fd.pfd.Sysfd,?backlog);?err?!=?nil?{return?os.NewSyscallError("listen",?err)}//?調(diào)用?init,內(nèi)部會(huì)調(diào)用?poll.FD.Init,最后調(diào)用?pollDesc.initif?err?=?fd.init();?err?!=?nil?{return?err}lsa,?_?=?syscall.Getsockname(fd.pfd.Sysfd)fd.setAddr(fd.addrFunc()(lsa),?nil)return?nil }//?使用?sync.Once?來(lái)確保一個(gè)?listener?只持有一個(gè)?epoll?實(shí)例 var?serverInit?sync.Once//?netFD.init?會(huì)調(diào)用?poll.FD.Init?并最終調(diào)用到?pollDesc.init, //?它會(huì)創(chuàng)建?epoll?實(shí)例并把?listener?fd?加入監(jiān)聽隊(duì)列 func?(pd?*pollDesc)?init(fd?*FD)?error?{//?runtime_pollServerInit?通過?`go:linkname`?鏈接到具體的實(shí)現(xiàn)函數(shù)?poll_runtime_pollServerInit,//?接著再調(diào)用?netpollGenericInit,然后會(huì)根據(jù)不同的系統(tǒng)平臺(tái)去調(diào)用特定的?netpollinit?來(lái)創(chuàng)建?epoll?實(shí)例serverInit.Do(runtime_pollServerInit)//?runtime_pollOpen?內(nèi)部調(diào)用了?netpollopen?來(lái)將?listener?fd?注冊(cè)到?//?epoll?實(shí)例中,另外,它會(huì)初始化一個(gè)?pollDesc?并返回ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}//?把真正初始化完成的?pollDesc?實(shí)例賦值給當(dāng)前的?pollDesc?代表自身的指針,//?后續(xù)使用直接通過該指針操作pd.runtimeCtx?=?ctxreturn?nil }var?(//?全局唯一的?epoll?fd,只在?listener?fd?初始化之時(shí)被指定一次epfd?int32?=?-1?//?epoll?descriptor )//?netpollinit?會(huì)創(chuàng)建一個(gè)?epoll?實(shí)例,然后把?epoll?fd?賦值給?epfd, //?后續(xù)?listener?以及它?accept?的所有?sockets?有關(guān)?epoll?的操作都是基于這個(gè)全局的?epfd func?netpollinit()?{epfd?=?epollcreate1(_EPOLL_CLOEXEC)if?epfd?<?0?{epfd?=?epollcreate(1024)if?epfd?<?0?{println("runtime:?epollcreate?failed?with",?-epfd)throw("runtime:?netpollinit?failed")}closeonexec(epfd)}r,?w,?errno?:=?nonblockingPipe()if?errno?!=?0?{println("runtime:?pipe?failed?with",?-errno)throw("runtime:?pipe?failed")}ev?:=?epollevent{events:?_EPOLLIN,}*(**uintptr)(unsafe.Pointer(&ev.data))?=?&netpollBreakRderrno?=?epollctl(epfd,?_EPOLL_CTL_ADD,?r,?&ev)if?errno?!=?0?{println("runtime:?epollctl?failed?with",?-errno)throw("runtime:?epollctl?failed")}netpollBreakRd?=?uintptr(r)netpollBreakWr?=?uintptr(w) }//?netpollopen?會(huì)被?runtime_pollOpen?調(diào)用,注冊(cè)?fd?到?epoll?實(shí)例, //?注意這里使用的是?epoll?的?ET?模式,同時(shí)會(huì)利用萬(wàn)能指針把?pollDesc?保存到?epollevent?的一個(gè)?8?位的字節(jié)數(shù)組?data?里 func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32?{var?ev?epolleventev.events?=?_EPOLLIN?|?_EPOLLOUT?|?_EPOLLRDHUP?|?_EPOLLET*(**pollDesc)(unsafe.Pointer(&ev.data))?=?pdreturn?-epollctl(epfd,?_EPOLL_CTL_ADD,?int32(fd),?&ev) }

    我們前面提到的 epoll 的三個(gè)基本調(diào)用,Go 在源碼里實(shí)現(xiàn)了對(duì)那三個(gè)調(diào)用的封裝:

    #include?<sys/epoll.h>?? int?epoll_create(int?size);?? int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event);?? int?epoll_wait(int?epfd,?struct?epoll_event?*?events,?int?maxevents,?int?timeout);//?Go?對(duì)上面三個(gè)調(diào)用的封裝 func?netpollinit() func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32 func?netpoll(block?bool)?gList

    netFD 就是通過這三個(gè)封裝來(lái)對(duì) epoll 進(jìn)行創(chuàng)建實(shí)例、注冊(cè) fd 和等待事件操作的。

    Listener.Accept()

    netpoll accept socket 的工作流程如下:

  • 服務(wù)端的 netFD 在 listen 時(shí)會(huì)創(chuàng)建 epoll 的實(shí)例,并將 listenerFD 加入 epoll 的事件隊(duì)列

  • netFD 在 accept 時(shí)將返回的 connFD 也加入 epoll 的事件隊(duì)列

  • netFD 在讀寫時(shí)出現(xiàn) syscall.EAGAIN 錯(cuò)誤,通過 pollDesc 的 waitRead 方法將當(dāng)前的 goroutine park 住,直到 ready,從 pollDesc 的 waitRead 中返回

  • Listener.Accept() 接收來(lái)自客戶端的新連接,具體還是調(diào)用 netFD.accept 方法來(lái)完成這個(gè)功能:

    //?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }func?(fd?*netFD)?accept()?(netfd?*netFD,?err?error)?{//?調(diào)用?poll.FD?的?Accept?方法接受新的?socket?連接,返回?socket?的?fdd,?rsa,?errcall,?err?:=?fd.pfd.Accept()if?err?!=?nil?{if?errcall?!=?""?{err?=?wrapSyscallError(errcall,?err)}return?nil,?err}//?以?socket?fd?構(gòu)造一個(gè)新的?netFD,代表這個(gè)新的?socketif?netfd,?err?=?newFD(d,?fd.family,?fd.sotype,?fd.net);?err?!=?nil?{poll.CloseFunc(d)return?nil,?err}//?調(diào)用?netFD?的?init?方法完成初始化if?err?=?netfd.init();?err?!=?nil?{fd.Close()return?nil,?err}lsa,?_?:=?syscall.Getsockname(netfd.pfd.Sysfd)netfd.setAddr(netfd.addrFunc()(lsa),?netfd.addrFunc()(rsa))return?netfd,?nil }

    netFD.accept 方法里會(huì)再調(diào)用 poll.FD.Accept ,最后會(huì)使用 Linux 的系統(tǒng)調(diào)用 accept 來(lái)完成新連接的接收,并且會(huì)把 accept 的 socket 設(shè)置成非阻塞 I/O 模式:

    //?Accept?wraps?the?accept?network?call. func?(fd?*FD)?Accept()?(int,?syscall.Sockaddr,?string,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?-1,?nil,?"",?err}defer?fd.readUnlock()if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?-1,?nil,?"",?err}for?{//?使用?linux?系統(tǒng)調(diào)用?accept?接收新連接,創(chuàng)建對(duì)應(yīng)的?sockets,?rsa,?errcall,?err?:=?accept(fd.Sysfd)//?因?yàn)?listener?fd?在創(chuàng)建的時(shí)候已經(jīng)設(shè)置成非阻塞的了,//?所以 accept 方法會(huì)直接返回,不管有沒有新連接到來(lái);如果 err == nil 則表示正常建立新連接,直接返回if?err?==?nil?{return?s,?rsa,?"",?err}//?如果?err?!=?nil,則判斷?err?==?syscall.EAGAIN,符合條件則進(jìn)入?pollDesc.waitRead?方法switch?err?{case?syscall.EAGAIN:if?fd.pd.pollable()?{//?如果當(dāng)前沒有發(fā)生期待的?I/O?事件,那么?waitRead?會(huì)通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}case?syscall.ECONNABORTED://?This?means?that?a?socket?on?the?listen//?queue?was?closed?before?we?Accept()ed?it;//?it's?a?silly?error,?so?try?again.continue}return?-1,?nil,?errcall,?err} }//?使用?linux?的?accept?系統(tǒng)調(diào)用接收新連接并把這個(gè)?socket?fd?設(shè)置成非阻塞?I/O ns,?sa,?err?:=?Accept4Func(s,?syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) //?On?Linux?the?accept4?system?call?was?introduced?in?2.6.28 //?kernel?and?on?FreeBSD?it?was?introduced?in?10?kernel.?If?we //?get?an?ENOSYS?error?on?both?Linux?and?FreeBSD,?or?EINVAL //?error?on?Linux,?fall?back?to?using?accept.//?Accept4Func?is?used?to?hook?the?accept4?call. var?Accept4Func?func(int,?int)?(int,?syscall.Sockaddr,?error)?=?syscall.Accept4

    pollDesc.waitRead 方法主要負(fù)責(zé)檢測(cè)當(dāng)前這個(gè) pollDesc 的上層 netFD 對(duì)應(yīng)的 fd 是否有『期待的』I/O 事件發(fā)生,如果有就直接返回,否則就 park 住當(dāng)前的 goroutine 并持續(xù)等待直至對(duì)應(yīng)的 fd 上發(fā)生可讀/可寫或者其他『期待的』I/O 事件為止,然后它就會(huì)返回到外層的 for 循環(huán),讓 goroutine 繼續(xù)執(zhí)行邏輯。

    poll.FD.Accept() 返回之后,會(huì)構(gòu)造一個(gè)對(duì)應(yīng)這個(gè)新 socket 的 netFD,然后調(diào)用 init() 方法完成初始化,這個(gè) init 過程和前面 net.Listen() 是一樣的,調(diào)用鏈:netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),最終又會(huì)走到這里:

    var?serverInit?sync.Oncefunc?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }

    然后把這個(gè) socket fd 注冊(cè)到 listener 的 epoll 實(shí)例的事件隊(duì)列中去,等待 I/O 事件。

    Conn.Read/Conn.Write

    我們先來(lái)看看 Conn.Read 方法是如何實(shí)現(xiàn)的,原理其實(shí)和 Listener.Accept 是一樣的,具體調(diào)用鏈還是首先調(diào)用 conn 的 netFD.Read ,然后內(nèi)部再調(diào)用 poll.FD.Read ,最后使用 Linux 的系統(tǒng)調(diào)用 read: syscall.Read 完成數(shù)據(jù)讀取:

    //?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }func?(fd?*netFD)?Read(p?[]byte)?(n?int,?err?error)?{n,?err?=?fd.pfd.Read(p)runtime.KeepAlive(fd)return?n,?wrapSyscallError("read",?err) }//?Read?implements?io.Reader. func?(fd?*FD)?Read(p?[]byte)?(int,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?0,?err}defer?fd.readUnlock()if?len(p)?==?0?{//?If?the?caller?wanted?a?zero?byte?read,?return?immediately//?without?trying?(but?after?acquiring?the?readLock).//?Otherwise?syscall.Read?returns?0,?nil?which?looks?like//?io.EOF.//?TODO(bradfitz):?make?it?wait?for?readability??(Issue?15735)return?0,?nil}if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?0,?err}if?fd.IsStream?&&?len(p)?>?maxRW?{p?=?p[:maxRW]}for?{//?嘗試從該?socket?讀取數(shù)據(jù),因?yàn)?socket?在被?listener?accept?的時(shí)候設(shè)置成//?了非阻塞?I/O,所以這里同樣也是直接返回,不管有沒有可讀的數(shù)據(jù)n,?err?:=?syscall.Read(fd.Sysfd,?p)if?err?!=?nil?{n?=?0//?err?==?syscall.EAGAIN?表示當(dāng)前沒有期待的?I/O?事件發(fā)生,也就是?socket?不可讀if?err?==?syscall.EAGAIN?&&?fd.pd.pollable()?{//?如果當(dāng)前沒有發(fā)生期待的?I/O?事件,那么?waitRead?//?會(huì)通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}//?On?MacOS?we?can?see?EINTR?here?if?the?user//?pressed?^Z.??See?issue?#22838.if?runtime.GOOS?==?"darwin"?&&?err?==?syscall.EINTR?{continue}}err?=?fd.eofError(n,?err)return?n,?err} }

    conn.Write 和 conn.Read 的原理是一致的,它也是通過類似 pollDesc.waitRead 的 pollDesc.waitWrite 來(lái) park 住 goroutine 直至期待的 I/O 事件發(fā)生才返回恢復(fù)執(zhí)行。

    pollDesc.waitRead/pollDesc.waitWrite

    pollDesc.waitRead 內(nèi)部調(diào)用了 poll.runtime_pollWait --> runtime.poll_runtime_pollWait 來(lái)達(dá)成無(wú) I/O 事件時(shí) park 住 goroutine 的目的:

    //go:linkname?poll_runtime_pollWait?internal/poll.runtime_pollWait func?poll_runtime_pollWait(pd?*pollDesc,?mode?int)?int?{err?:=?netpollcheckerr(pd,?int32(mode))if?err?!=?pollNoError?{return?err}//?As?for?now?only?Solaris,?illumos,?and?AIX?use?level-triggered?IO.if?GOOS?==?"solaris"?||?GOOS?==?"illumos"?||?GOOS?==?"aix"?{netpollarm(pd,?mode)}//?進(jìn)入?netpollblock?并且判斷是否有期待的?I/O?事件發(fā)生,//?這里的?for?循環(huán)是為了一直等到?io?readyfor?!netpollblock(pd,?int32(mode),?false)?{err?=?netpollcheckerr(pd,?int32(mode))if?err?!=?0?{return?err}//?Can?happen?if?timeout?has?fired?and?unblocked?us,//?but?before?we?had?a?chance?to?run,?timeout?has?been?reset.//?Pretend?it?has?not?happened?and?retry.}return?0 }//?returns?true?if?IO?is?ready,?or?false?if?timedout?or?closed //?waitio?-?wait?only?for?completed?IO,?ignore?errors func?netpollblock(pd?*pollDesc,?mode?int32,?waitio?bool)?bool?{//?gpp?保存的是?goroutine?的數(shù)據(jù)結(jié)構(gòu)?g,這里會(huì)根據(jù)?mode?的值決定是?rg?還是?wg,//?前面提到過,rg?和?wg?是用來(lái)保存等待?I/O?就緒的?gorouine?的,后面調(diào)用?gopark?之后,//?會(huì)把當(dāng)前的?goroutine?的抽象數(shù)據(jù)結(jié)構(gòu)?g?存入?gpp?這個(gè)指針,也就是?rg?或者?wggpp?:=?&pd.rgif?mode?==?'w'?{gpp?=?&pd.wg}//?set?the?gpp?semaphore?to?WAIT//?這個(gè)?for?循環(huán)是為了等待?io?ready?或者?io?waitfor?{old?:=?*gpp//?gpp?==?pdReady?表示此時(shí)已有期待的?I/O?事件發(fā)生,//?可以直接返回?unblock?當(dāng)前?goroutine?并執(zhí)行響應(yīng)的?I/O?操作if?old?==?pdReady?{*gpp?=?0return?true}if?old?!=?0?{throw("runtime:?double?wait")}//?如果沒有期待的?I/O?事件發(fā)生,則通過原子操作把?gpp?的值置為?pdWait?并退出?for?循環(huán)if?atomic.Casuintptr(gpp,?0,?pdWait)?{break}}//?need?to?recheck?error?states?after?setting?gpp?to?WAIT//?this?is?necessary?because?runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl//?do?the?opposite:?store?to?closing/rd/wd,?membarrier,?load?of?rg/wg//?waitio?此時(shí)是?false,netpollcheckerr?方法會(huì)檢查當(dāng)前?pollDesc?對(duì)應(yīng)的?fd?是否是正常的,//?通常來(lái)說??netpollcheckerr(pd,?mode)?==?0?是成立的,所以這里會(huì)執(zhí)行?gopark?//?把當(dāng)前?goroutine?給?park?住,直至對(duì)應(yīng)的?fd?上發(fā)生可讀/可寫或者其他『期待的』I/O?事件為止,//?然后?unpark?返回,在?gopark?內(nèi)部會(huì)把當(dāng)前?goroutine?的抽象數(shù)據(jù)結(jié)構(gòu)?g?存入//?gpp(pollDesc.rg/pollDesc.wg)?指針里,以便在后面的?netpoll?函數(shù)取出?pollDesc?之后,//?把?g?添加到鏈表里返回,接著重新調(diào)度?goroutineif?waitio?||?netpollcheckerr(pd,?mode)?==?0?{//?注冊(cè)?netpollblockcommit?回調(diào)給?gopark,在?gopark?內(nèi)部會(huì)執(zhí)行它,保存當(dāng)前?goroutine?到?gppgopark(netpollblockcommit,?unsafe.Pointer(gpp),?waitReasonIOWait,?traceEvGoBlockNet,?5)}//?be?careful?to?not?lose?concurrent?READY?notificationold?:=?atomic.Xchguintptr(gpp,?0)if?old?>?pdWait?{throw("runtime:?corrupted?polldesc")}return?old?==?pdReady }//?gopark?會(huì)停住當(dāng)前的?goroutine?并且調(diào)用傳遞進(jìn)來(lái)的回調(diào)函數(shù)?unlockf,從上面的源碼我們可以知道這個(gè)函數(shù)是 //?netpollblockcommit func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.//?gopark?最終會(huì)調(diào)用?park_m,在這個(gè)函數(shù)內(nèi)部會(huì)調(diào)用?unlockf,也就是?netpollblockcommit,//?然后會(huì)把當(dāng)前的?goroutine,也就是?g?數(shù)據(jù)結(jié)構(gòu)保存到?pollDesc?的?rg?或者?wg?指針里mcall(park_m) }//?park?continuation?on?g0. func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{//?調(diào)用?netpollblockcommit,把當(dāng)前的?goroutine,//?也就是?g?數(shù)據(jù)結(jié)構(gòu)保存到?pollDesc?的?rg?或者?wg?指針里ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }//?netpollblockcommit?在?gopark?函數(shù)里被調(diào)用 func?netpollblockcommit(gp?*g,?gpp?unsafe.Pointer)?bool?{//?通過原子操作把當(dāng)前?goroutine?抽象的數(shù)據(jù)結(jié)構(gòu)?g,也就是這里的參數(shù)?gp?存入?gpp?指針,//?此時(shí)?gpp?的值是?pollDesc?的?rg?或者?wg?指針r?:=?atomic.Casuintptr((*uintptr)(gpp),?pdWait,?uintptr(unsafe.Pointer(gp)))if?r?{//?Bump?the?count?of?goroutines?waiting?for?the?poller.//?The?scheduler?uses?this?to?decide?whether?to?block//?waiting?for?the?poller?if?there?is?nothing?else?to?do.atomic.Xadd(&netpollWaiters,?1)}return?r }

    pollDesc.waitWrite 的內(nèi)部實(shí)現(xiàn)原理和 pollDesc.waitRead 是一樣的,都是基于 poll.runtime_pollWait --> runtime.poll_runtime_pollWait,這里就不再贅述。

    netpoll

    前面已經(jīng)從源碼的層面分析完了 netpoll 是如何通過 park goroutine 從而達(dá)到阻塞 Accept/Read/Write 的效果,而通過調(diào)用 gopark,goroutine 會(huì)被放置在某個(gè)等待隊(duì)列中,這里是放到了 epoll 的 "interest list" 里,底層數(shù)據(jù)結(jié)構(gòu)是由紅黑樹實(shí)現(xiàn)的 ?eventpoll.rbr,此時(shí) G 的狀態(tài)由 _Grunning為_Gwaitting ,因此 G 必須被手動(dòng)喚醒(通過 goready ),否則會(huì)丟失任務(wù),應(yīng)用層阻塞通常使用這種方式。

    所以我們現(xiàn)在可以來(lái)從整體的層面來(lái)概括 Go 的網(wǎng)絡(luò)業(yè)務(wù) goroutine 是如何被規(guī)劃調(diào)度的了:

    首先,client 連接 server 的時(shí)候,listener 通過 accept 調(diào)用接收新 connection,每一個(gè)新 connection 都啟動(dòng)一個(gè) goroutine 處理,accept 調(diào)用會(huì)把該 connection 的 fd 連帶所在的 goroutine 上下文信息封裝注冊(cè)到 epoll 的監(jiān)聽列表里去,當(dāng) goroutine 調(diào)用 conn.Read 或者 conn.Write 等需要阻塞等待的函數(shù)時(shí),會(huì)被 gopark 給封存起來(lái)并使之休眠,讓 P 去執(zhí)行本地調(diào)度隊(duì)列里的下一個(gè)可執(zhí)行的 goroutine,往后 Go scheduler 會(huì)在循環(huán)調(diào)度的 runtime.schedule() 函數(shù)以及 sysmon 監(jiān)控線程中調(diào)用 runtime.nepoll 以獲取可運(yùn)行的 goroutine 列表并通過調(diào)用 injectglist 把剩下的 g 放入全局調(diào)度隊(duì)列或者當(dāng)前 P 本地調(diào)度隊(duì)列去重新執(zhí)行。

    那么當(dāng) I/O 事件發(fā)生之后,netpoller 是通過什么方式喚醒那些在 I/O wait 的 goroutine 的?答案是通過 runtime.netpoll。

    runtime.netpoll 的核心邏輯是:

  • 根據(jù)調(diào)用方的入?yún)?delay,設(shè)置對(duì)應(yīng)的調(diào)用 epollwait 的 timeout 值;

  • 調(diào)用 epollwait 等待發(fā)生了可讀/可寫事件的 fd;

  • 循環(huán) epollwait 返回的事件列表,處理對(duì)應(yīng)的事件類型, 組裝可運(yùn)行的 goroutine 鏈表并返回。

  • //?netpoll?checks?for?ready?network?connections. //?Returns?list?of?goroutines?that?become?runnable. //?delay?<?0:?blocks?indefinitely //?delay?==?0:?does?not?block,?just?polls //?delay?>?0:?block?for?up?to?that?many?nanoseconds func?netpoll(delay?int64)?gList?{if?epfd?==?-1?{return?gList{}}//?根據(jù)特定的規(guī)則把?delay?值轉(zhuǎn)換為?epollwait?的?timeout?值var?waitms?int32if?delay?<?0?{waitms?=?-1}?else?if?delay?==?0?{waitms?=?0}?else?if?delay?<?1e6?{waitms?=?1}?else?if?delay?<?1e15?{waitms?=?int32(delay?/?1e6)}?else?{//?An?arbitrary?cap?on?how?long?to?wait?for?a?timer.//?1e9?ms?==?~11.5?days.waitms?=?1e9}var?events?[128]epollevent retry://?超時(shí)等待就緒的?fd?讀寫事件n?:=?epollwait(epfd,?&events[0],?int32(len(events)),?waitms)if?n?<?0?{if?n?!=?-_EINTR?{println("runtime:?epollwait?on?fd",?epfd,?"failed?with",?-n)throw("runtime:?netpoll?failed")}//?If?a?timed?sleep?was?interrupted,?just?return?to//?recalculate?how?long?we?should?sleep?now.if?waitms?>?0?{return?gList{}}goto?retry}//?toRun?是一個(gè)?g?的鏈表,存儲(chǔ)要恢復(fù)的?goroutines,最后返回給調(diào)用方var?toRun?gListfor?i?:=?int32(0);?i?<?n;?i++?{ev?:=?&events[i]if?ev.events?==?0?{continue}//?Go?scheduler?在調(diào)用?findrunnable()?尋找?goroutine?去執(zhí)行的時(shí)候,//?在調(diào)用?netpoll?之時(shí)會(huì)檢查當(dāng)前是否有其他線程同步阻塞在?netpoll,//?若是,則調(diào)用?netpollBreak?來(lái)喚醒那個(gè)線程,避免它長(zhǎng)時(shí)間阻塞if?*(**uintptr)(unsafe.Pointer(&ev.data))?==?&netpollBreakRd?{if?ev.events?!=?_EPOLLIN?{println("runtime:?netpoll:?break?fd?ready?for",?ev.events)throw("runtime:?netpoll:?break?fd?ready?for?something?unexpected")}if?delay?!=?0?{//?netpollBreak?could?be?picked?up?by?a//?nonblocking?poll.?Only?read?the?byte//?if?blocking.var?tmp?[16]byteread(int32(netpollBreakRd),?noescape(unsafe.Pointer(&tmp[0])),?int32(len(tmp)))atomic.Store(&netpollWakeSig,?0)}continue}//?判斷發(fā)生的事件類型,讀類型或者寫類型等,然后給?mode?復(fù)制相應(yīng)的值,//?mode?用來(lái)決定從?pollDesc?里的?rg?還是?wg?里取出?goroutinevar?mode?int32if?ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR)?!=?0?{mode?+=?'r'}if?ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR)?!=?0?{mode?+=?'w'}if?mode?!=?0?{//?取出保存在?epollevent?里的?pollDescpd?:=?*(**pollDesc)(unsafe.Pointer(&ev.data))pd.everr?=?falseif?ev.events?==?_EPOLLERR?{pd.everr?=?true}//?調(diào)用?netpollready,傳入就緒?fd?的?pollDesc,//?把?fd?對(duì)應(yīng)的?goroutine?添加到鏈表?toRun?中netpollready(&toRun,?pd,?mode)}}return?toRun }//?netpollready?調(diào)用?netpollunblock?返回就緒?fd?對(duì)應(yīng)的?goroutine?的抽象數(shù)據(jù)結(jié)構(gòu)?g func?netpollready(toRun?*gList,?pd?*pollDesc,?mode?int32)?{var?rg,?wg?*gif?mode?==?'r'?||?mode?==?'r'+'w'?{rg?=?netpollunblock(pd,?'r',?true)}if?mode?==?'w'?||?mode?==?'r'+'w'?{wg?=?netpollunblock(pd,?'w',?true)}if?rg?!=?nil?{toRun.push(rg)}if?wg?!=?nil?{toRun.push(wg)} }//?netpollunblock?會(huì)依據(jù)傳入的?mode?決定從?pollDesc?的?rg?或者?wg?取出當(dāng)時(shí)?gopark?之時(shí)存入的 //?goroutine?抽象數(shù)據(jù)結(jié)構(gòu)?g?并返回 func?netpollunblock(pd?*pollDesc,?mode?int32,?ioready?bool)?*g?{//?mode?==?'r'?代表當(dāng)時(shí)?gopark?是為了等待讀事件,而?mode?==?'w'?則代表是等待寫事件gpp?:=?&pd.rgif?mode?==?'w'?{gpp?=?&pd.wg}for?{//?取出?gpp?存儲(chǔ)的?gold?:=?*gppif?old?==?pdReady?{return?nil}if?old?==?0?&&?!ioready?{//?Only?set?READY?for?ioready.?runtime_pollWait//?will?check?for?timeout/cancel?before?waiting.return?nil}var?new?uintptrif?ioready?{new?=?pdReady}//?重置?pollDesc?的?rg?或者?wgif?atomic.Casuintptr(gpp,?old,?new)?{//?如果該?goroutine?還是必須等待,則返回?nilif?old?==?pdWait?{old?=?0}//?通過萬(wàn)能指針還原成?g?并返回return?(*g)(unsafe.Pointer(old))}} }//?netpollBreak?往通信管道里寫入信號(hào)去喚醒?epollwait func?netpollBreak()?{//?通過?CAS?避免重復(fù)的喚醒信號(hào)被寫入管道,//?從而減少系統(tǒng)調(diào)用并節(jié)省一些系統(tǒng)資源if?atomic.Cas(&netpollWakeSig,?0,?1)?{for?{var?b?byten?:=?write(netpollBreakWr,?unsafe.Pointer(&b),?1)if?n?==?1?{break}if?n?==?-_EINTR?{continue}if?n?==?-_EAGAIN?{return}println("runtime:?netpollBreak?write?failed?with",?-n)throw("runtime:?netpollBreak?write?failed")}} }

    Go 在多種場(chǎng)景下都可能會(huì)調(diào)用 netpoll 檢查文件描述符狀態(tài),netpoll 里會(huì)調(diào)用 epoll_wait 從 epoll 的 eventpoll.rdllist 就緒雙向鏈表返回,從而得到 I/O 就緒的 socket fd 列表,并根據(jù)取出最初調(diào)用 epoll_ctl 時(shí)保存的上下文信息,恢復(fù) g。所以執(zhí)行完netpoll 之后,會(huì)返回一個(gè)就緒 fd 列表對(duì)應(yīng)的 goroutine 鏈表,接下來(lái)將就緒的 goroutine 通過調(diào)用 injectglist 加入到全局調(diào)度隊(duì)列或者 P 的本地調(diào)度隊(duì)列中,啟動(dòng) M 綁定 P 去執(zhí)行。

    具體調(diào)用 netpoll 的地方,首先在 Go runtime scheduler 循環(huán)調(diào)度 goroutines 之時(shí)就有可能會(huì)調(diào)用 netpoll 獲取到已就緒的 fd 對(duì)應(yīng)的 goroutine 來(lái)調(diào)度執(zhí)行。

    首先 Go scheduler 的核心方法 runtime.schedule() 里會(huì)調(diào)用一個(gè)叫 runtime.findrunable() 的方法獲取可運(yùn)行的 goroutine 來(lái)執(zhí)行,而在 runtime.findrunable() 方法里就調(diào)用了 runtime.netpoll 獲取已就緒的 fd 列表對(duì)應(yīng)的 goroutine 列表:

    //?One?round?of?scheduler:?find?a?runnable?goroutine?and?execute?it. //?Never?returns. func?schedule()?{...if?gp?==?nil?{gp,?inheritTime?=?findrunnable()?//?blocks?until?work?is?available}... }//?Finds?a?runnable?goroutine?to?execute. //?Tries?to?steal?from?other?P's,?get?g?from?global?queue,?poll?network. func?findrunnable()?(gp?*g,?inheritTime?bool)?{...//?Poll?network.if?netpollinited()?&&?(atomic.Load(&netpollWaiters)?>?0?||?pollUntil?!=?0)?&&?atomic.Xchg64(&sched.lastpoll,?0)?!=?0?{atomic.Store64(&sched.pollUntil,?uint64(pollUntil))if?_g_.m.p?!=?0?{throw("findrunnable:?netpoll?with?p")}if?_g_.m.spinning?{throw("findrunnable:?netpoll?with?spinning")}if?faketime?!=?0?{//?When?using?fake?time,?just?poll.delta?=?0}list?:=?netpoll(delta)?//?同步阻塞調(diào)用?netpoll,直至有可用的?goroutineatomic.Store64(&sched.pollUntil,?0)atomic.Store64(&sched.lastpoll,?uint64(nanotime()))if?faketime?!=?0?&&?list.empty()?{//?Using?fake?time?and?nothing?is?ready;?stop?M.//?When?all?M's?stop,?checkdead?will?call?timejump.stopm()goto?top}lock(&sched.lock)_p_?=?pidleget()?//?查找是否有空閑的?P?可以來(lái)就緒的?goroutineunlock(&sched.lock)if?_p_?==?nil?{injectglist(&list)?//?如果當(dāng)前沒有空閑的?P,則把就緒的?goroutine?放入全局調(diào)度隊(duì)列等待被執(zhí)行}?else?{//?如果當(dāng)前有空閑的?P,則?pop?出一個(gè)?g,返回給調(diào)度器去執(zhí)行,//?并通過調(diào)用?injectglist?把剩下的?g?放入全局調(diào)度隊(duì)列或者當(dāng)前?P?本地調(diào)度隊(duì)列acquirep(_p_)if?!list.empty()?{gp?:=?list.pop()injectglist(&list)casgstatus(gp,?_Gwaiting,?_Grunnable)if?trace.enabled?{traceGoUnpark(gp,?0)}return?gp,?false}if?wasSpinning?{_g_.m.spinning?=?trueatomic.Xadd(&sched.nmspinning,?1)}goto?top}}?else?if?pollUntil?!=?0?&&?netpollinited()?{pollerPollUntil?:=?int64(atomic.Load64(&sched.pollUntil))if?pollerPollUntil?==?0?||?pollerPollUntil?>?pollUntil?{netpollBreak()}}stopm()goto?top }

    另外, sysmon 監(jiān)控線程會(huì)在循環(huán)過程中檢查距離上一次 runtime.netpoll 被調(diào)用是否超過了 10ms,若是則會(huì)去調(diào)用它拿到可運(yùn)行的 goroutine 列表并通過調(diào)用 injectglist 把 g 列表放入全局調(diào)度隊(duì)列或者當(dāng)前 P 本地調(diào)度隊(duì)列等待被執(zhí)行:

    //?Always?runs?without?a?P,?so?write?barriers?are?not?allowed. // //go:nowritebarrierrec func?sysmon()?{...//?poll?network?if?not?polled?for?more?than?10mslastpoll?:=?int64(atomic.Load64(&sched.lastpoll))if?netpollinited()?&&?lastpoll?!=?0?&&?lastpoll+10*1000*1000?<?now?{atomic.Cas64(&sched.lastpoll,?uint64(lastpoll),?uint64(now))list?:=?netpoll(0)?//?non-blocking?-?returns?list?of?goroutinesif?!list.empty()?{//?Need?to?decrement?number?of?idle?locked?M's//?(pretending?that?one?more?is?running)?before?injectglist.//?Otherwise?it?can?lead?to?the?following?situation://?injectglist?grabs?all?P's?but?before?it?starts?M's?to?run?the?P's,//?another?M?returns?from?syscall,?finishes?running?its?G,//?observes?that?there?is?no?work?to?do?and?no?other?running?M's//?and?reports?deadlock.incidlelocked(-1)injectglist(&list)incidlelocked(1)}}... }

    Go runtime 在程序啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)獨(dú)立的 M 作為監(jiān)控線程,叫 sysmon ,這個(gè)線程為系統(tǒng)級(jí)的 daemon 線程,無(wú)需 P 即可運(yùn)行, sysmon 每 20us~10ms 運(yùn)行一次。sysmon 中以輪詢的方式執(zhí)行以下操作(如上面的代碼所示):

  • 以非阻塞的方式調(diào)用 runtime.netpoll ,從中找出能從網(wǎng)絡(luò) I/O 中喚醒的 g 列表,并通過調(diào)用 injectglist 把 g 列表放入全局調(diào)度隊(duì)列或者當(dāng)前 P 本地調(diào)度隊(duì)列等待被執(zhí)行,調(diào)度觸發(fā)時(shí),有可能從這個(gè)全局 runnable 調(diào)度隊(duì)列獲取 g。然后再循環(huán)調(diào)用 startm ,直到所有 P 都不處于 _Pidle 狀態(tài)。

  • 調(diào)用 retake ,搶占長(zhǎng)時(shí)間處于 _Psyscall 狀態(tài)的 P。

  • 綜上,Go 借助于 epoll/kqueue/iocp 和 runtime scheduler 等的幫助,設(shè)計(jì)出了自己的 I/O 多路復(fù)用 netpoller,成功地讓 Listener.Accept / conn.Read / conn.Write 等方法從開發(fā)者的角度看來(lái)是同步模式。

    Go netpoller 的價(jià)值

    通過前面對(duì)源碼的分析,我們現(xiàn)在知道 Go netpoller 依托于 runtime scheduler,為開發(fā)者提供了一種強(qiáng)大的同步網(wǎng)絡(luò)編程模式;然而,Go netpoller 存在的意義卻遠(yuǎn)不止于此,Go netpoller I/O 多路復(fù)用搭配 Non-blocking I/O 而打造出來(lái)的這個(gè)原生網(wǎng)絡(luò)模型,它最大的價(jià)值是把網(wǎng)絡(luò) I/O 的控制權(quán)牢牢掌握在 Go 自己的 runtime 里,關(guān)于這一點(diǎn)我們需要從 Go 的 runtime scheduler 說起,Go 的 G-P-M 調(diào)度模型如下:

    G 在運(yùn)行過程中如果被阻塞在某個(gè) system call 操作上,那么不光 G 會(huì)阻塞,執(zhí)行該 G 的 M 也會(huì)解綁 P(實(shí)質(zhì)是被 sysmon 搶走了),與 G 一起進(jìn)入 sleep 狀態(tài)。如果此時(shí)有 idle 的 M,則 P 與其綁定繼續(xù)執(zhí)行其他 G;如果沒有 idle M,但仍然有其他 G 要去執(zhí)行,那么就會(huì)創(chuàng)建一個(gè)新的 M。當(dāng)阻塞在 system call 上的 G 完成 syscall 調(diào)用后,G 會(huì)去嘗試獲取一個(gè)可用的 P,如果沒有可用的 P,那么 G 會(huì)被標(biāo)記為 _Grunnable 并把它放入全局的 runqueue 中等待調(diào)度,之前的那個(gè) sleep 的 M 將再次進(jìn)入 sleep。

    現(xiàn)在清楚為什么 netpoll 為什么一定要使用非阻塞 I/O 了吧?就是為了避免讓操作網(wǎng)絡(luò) I/O 的 goroutine 陷入到系統(tǒng)調(diào)用從而進(jìn)入內(nèi)核態(tài),因?yàn)橐坏┻M(jìn)入內(nèi)核態(tài),整個(gè)程序的控制權(quán)就會(huì)發(fā)生轉(zhuǎn)移(到內(nèi)核),不再屬于用戶進(jìn)程了,那么也就無(wú)法借助于 Go 強(qiáng)大的 runtime scheduler 來(lái)調(diào)度業(yè)務(wù)程序的并發(fā)了;而有了 netpoll 之后,借助于非阻塞 I/O ,G 就再也不會(huì)因?yàn)橄到y(tǒng)調(diào)用的讀寫而 (長(zhǎng)時(shí)間) 陷入內(nèi)核態(tài),當(dāng) G 被阻塞在某個(gè) network I/O 操作上時(shí),實(shí)際上它不是因?yàn)橄萑雰?nèi)核態(tài)被阻塞住了,而是被 Go runtime 調(diào)用 gopark 給 park 住了,此時(shí) G 會(huì)被放置到某個(gè) wait queue 中,而 M 會(huì)嘗試運(yùn)行下一個(gè) _Grunnable 的 G,如果此時(shí)沒有 _Grunnable 的 G 供 M 運(yùn)行,那么 M 將解綁 P,并進(jìn)入 sleep 狀態(tài)。當(dāng) I/O available,在 epoll 的 eventpoll.rdr 中等待的 G 會(huì)被放到 eventpoll.rdllist 鏈表里并通過 netpoll 中的 epoll_wait 系統(tǒng)調(diào)用返回放置到全局調(diào)度隊(duì)列或者 P 的本地調(diào)度隊(duì)列,標(biāo)記為 _Grunnable ,等待 P 綁定 M 恢復(fù)執(zhí)行。

    Goroutine 的調(diào)度

    這一小節(jié)主要是講處理網(wǎng)絡(luò) I/O 的 goroutines 阻塞之后,Go scheduler 具體是如何像前面幾個(gè)章節(jié)所說的那樣,避免讓操作網(wǎng)絡(luò) I/O 的 goroutine 陷入到系統(tǒng)調(diào)用從而進(jìn)入內(nèi)核態(tài)的,而是封存 goroutine 然后讓出 CPU 的使用權(quán)從而令 P 可以去調(diào)度本地調(diào)度隊(duì)列里的下一個(gè) goroutine 的。

    溫馨提示:這一小節(jié)屬于延伸閱讀,涉及到的知識(shí)點(diǎn)更偏系統(tǒng)底層,需要有一定的匯編語(yǔ)言基礎(chǔ)才能通讀,另外,這一節(jié)對(duì) Go scheduler 的講解僅僅涉及核心的一部分,不會(huì)把整個(gè)調(diào)度器都講一遍(事實(shí)上如果真要解析 Go scheduler 的話恐怕重開一篇幾萬(wàn)字的文章才能基本講清楚。。。),所以也要求讀者對(duì) Go 的并發(fā)調(diào)度器有足夠的了解,因此這一節(jié)可能會(huì)稍顯深?yuàn)W。當(dāng)然這一節(jié)也可選擇不讀,因?yàn)橥ㄟ^前面的整個(gè)解析,我相信讀者應(yīng)該已經(jīng)能夠基本掌握 Go netpoller 處理網(wǎng)絡(luò) I/O 的核心細(xì)節(jié)了,以及能從宏觀層面了解 netpoller 對(duì)業(yè)務(wù) goroutines 的基本調(diào)度了。而這一節(jié)主要是通過對(duì) goroutines 調(diào)度細(xì)節(jié)的剖析,能夠加深讀者對(duì)整個(gè) Go netpoller 的徹底理解,接上前面幾個(gè)章節(jié),形成一個(gè)完整的閉環(huán)。如果對(duì)調(diào)度的底層細(xì)節(jié)沒興趣的話這也可以直接跳過這一節(jié),對(duì)理解 Go netpoller 的基本原理影響不大,不過還是建議有條件的讀者可以看看。

    從源碼可知,Go scheduler 的調(diào)度 goroutine 過程中所調(diào)用的核心函數(shù)鏈如下:

    runtime.schedule?-->?runtime.execute?-->?runtime.gogo?-->?goroutine?code?-->?runtime.goexit?-->?runtime.goexit1?-->?runtime.mcall?-->?runtime.goexit0?-->?runtime.schedule

    Go scheduler 會(huì)不斷循環(huán)調(diào)用 runtime.schedule() 去調(diào)度 goroutines,而每個(gè) goroutine 執(zhí)行完成并退出之后,會(huì)再次調(diào)用 runtime.schedule(),使得調(diào)度器回到調(diào)度循環(huán)去執(zhí)行其他的 goroutine,不斷循環(huán),永不停歇。

    當(dāng)我們使用 go 關(guān)鍵字啟動(dòng)一個(gè)新 goroutine 時(shí),最終會(huì)調(diào)用 runtime.newproc --> runtime.newproc1,來(lái)得到 g,runtime.newproc1 會(huì)先從 P 的 gfree 緩存鏈表中查找可用的 g,若緩存未生效,則會(huì)新創(chuàng)建 g 給當(dāng)前的業(yè)務(wù)函數(shù),最后這個(gè) g 會(huì)被傳給 runtime.gogo 去真正執(zhí)行。

    這里首先需要了解一個(gè) gobuf 的結(jié)構(gòu)體,它用來(lái)保存 goroutine 的調(diào)度信息,是 runtime.gogo 的入?yún)?#xff1a;

    //?gobuf?存儲(chǔ)?goroutine?調(diào)度上下文信息的結(jié)構(gòu)體 type?gobuf?struct?{//?The?offsets?of?sp,?pc,?and?g?are?known?to?(hard-coded?in)?libmach.////?ctxt?is?unusual?with?respect?to?GC:?it?may?be?a//?heap-allocated?funcval,?so?GC?needs?to?track?it,?but?it//?needs?to?be?set?and?cleared?from?assembly,?where?it's//?difficult?to?have?write?barriers.?However,?ctxt?is?really?a//?saved,?live?register,?and?we?only?ever?exchange?it?between//?the?real?register?and?the?gobuf.?Hence,?we?treat?it?as?a//?root?during?stack?scanning,?which?means?assembly?that?saves//?and?restores?it?doesn't?need?write?barriers.?It's?still//?typed?as?a?pointer?so?that?any?other?writes?from?Go?get//?write?barriers.sp???uintptr?//?Stack?Pointer?棧指針pc???uintptr?//?Program?Counter?程序計(jì)數(shù)器g????guintptr?//?持有當(dāng)前?gobuf?的?goroutinectxt?unsafe.Pointerret??sys.Uintreglr???uintptrbp???uintptr?//?for?GOEXPERIMENT=framepointer }

    執(zhí)行 runtime.execute(),進(jìn)而調(diào)用 runtime.gogo:

    func?execute(gp?*g,?inheritTime?bool)?{_g_?:=?getg()//?Assign?gp.m?before?entering?_Grunning?so?running?Gs?have?an//?M._g_.m.curg?=?gpgp.m?=?_g_.mcasgstatus(gp,?_Grunnable,?_Grunning)gp.waitsince?=?0gp.preempt?=?falsegp.stackguard0?=?gp.stack.lo?+?_StackGuardif?!inheritTime?{_g_.m.p.ptr().schedtick++}//?Check?whether?the?profiler?needs?to?be?turned?on?or?off.hz?:=?sched.profilehzif?_g_.m.profilehz?!=?hz?{setThreadCPUProfiler(hz)}if?trace.enabled?{//?GoSysExit?has?to?happen?when?we?have?a?P,?but?before?GoStart.//?So?we?emit?it?here.if?gp.syscallsp?!=?0?&&?gp.sysblocktraced?{traceGoSysExit(gp.sysexitticks)}traceGoStart()}//?gp.sched?就是?gobufgogo(&gp.sched) }

    這里還需要了解一個(gè)概念:g0,Go G-P-M 調(diào)度模型中,g 代表 goroutine,而實(shí)際上一共有三種 g:

  • 執(zhí)行用戶代碼的 g;

  • 執(zhí)行調(diào)度器代碼的 g,也即是 g0;

  • 執(zhí)行 runtime.main 初始化工作的 main goroutine;

  • 第一種 g 就是使用 go 關(guān)鍵字啟動(dòng)的 goroutine,也是我們接觸最多的一類 g;第三種 g 是調(diào)度器啟動(dòng)之后用來(lái)執(zhí)行的一系列初始化工作的,包括但不限于啟動(dòng) sysmon 監(jiān)控線程、內(nèi)存初始化和啟動(dòng) GC 等等工作;第二種 g 叫 g0,用來(lái)執(zhí)行調(diào)度器代碼,g0 在底層和其他 g 是一樣的數(shù)據(jù)結(jié)構(gòu),但是性質(zhì)上有很大的區(qū)別,首先 g0 的棧大小是固定的,比如在 Linux 或者其他 Unix-like 的系統(tǒng)上一般是固定 8MB,不能動(dòng)態(tài)伸縮,而普通的 g 初始棧大小是 2KB,可按需擴(kuò)展,g0 其實(shí)就是線程棧,我們知道每個(gè)線程被創(chuàng)建出來(lái)之時(shí)都需要操作系統(tǒng)為之分配一個(gè)初始固定的線程棧,就是前面說的 8MB 大小的棧,g0 棧就代表了這個(gè)線程棧,因此每一個(gè) m 都需要綁定一個(gè) g0 來(lái)執(zhí)行調(diào)度器代碼,然后跳轉(zhuǎn)到執(zhí)行用戶代碼的地方。

    runtime.gogo 是真正去執(zhí)行 goroutine 代碼的函數(shù),這個(gè)函數(shù)由匯編實(shí)現(xiàn),為什么需要用匯編?因?yàn)?gogo 的工作是完成線程 M 上的堆棧切換:從系統(tǒng)堆棧 g0 切換成 goroutine gp,也就是 CPU 使用權(quán)和堆棧的切換,這種切換本質(zhì)上是對(duì) CPU 的 PC、SP 等寄存器和堆棧指針的更新,而這一類精度的底層操作別說是 Go,就算是最貼近底層的 C 也無(wú)法做到,這種程度的操作已超出所有高級(jí)語(yǔ)言的范疇,因此只能借助于匯編來(lái)實(shí)現(xiàn)。

    runtime.gogo 在不同的 CPU 架構(gòu)平臺(tái)上的實(shí)現(xiàn)各不相同,但是核心原理殊途同歸,我們這里選用 amd64 架構(gòu)的匯編實(shí)現(xiàn)來(lái)分析,我會(huì)在關(guān)鍵的地方加上解釋:

    // func gogo(buf *gobuf) // restore state from Gobuf; longjmp TEXT runtime·gogo(SB), NOSPLIT, $16-8// 將第一個(gè) FP 偽寄存器所指向的 gobuf 的第一個(gè)參數(shù)存入 BX 寄存器, // gobuf 的一個(gè)參數(shù)即是 SP 指針MOVQ buf+0(FP), BXMOVQ gobuf_g(BX), DX // 將 gp.sched.g 保存到 DX 寄存器MOVQ 0(DX), CX // make sure g != nil// 將 tls (thread local storage) 保存到 CX 寄存器,然后把 gp.sched.g 放到 tls[0],// 這樣以后調(diào)用 getg() 之時(shí)就可以通過 TLS 直接獲取到當(dāng)前 goroutine 的 g 結(jié)構(gòu)體實(shí)例,// 進(jìn)而可以得到 g 所在的 m 和 p,TLS 里一開始存儲(chǔ)的是系統(tǒng)堆棧 g0 的地址get_tls(CX)MOVQ DX, g(CX)// 下面的指令則是對(duì)函數(shù)棧的 BP/SP 寄存器(指針)的存取,// 最后進(jìn)入到指定的代碼區(qū)域,執(zhí)行函數(shù)棧幀MOVQ gobuf_sp(BX), SP // restore SPMOVQ gobuf_ret(BX), AXMOVQ gobuf_ctxt(BX), DXMOVQ gobuf_bp(BX), BP// 這里是在清空 gp.sched,因?yàn)榍懊嬉呀?jīng)把 gobuf 里的字段值都存入了寄存器,// 所以 gp.sched 就可以提前清空了,不需要等到后面 GC 來(lái)回收,減輕 GC 的負(fù)擔(dān)MOVQ $0, gobuf_sp(BX) // clear to help garbage collectorMOVQ $0, gobuf_ret(BX)MOVQ $0, gobuf_ctxt(BX)MOVQ $0, gobuf_bp(BX)// 把 gp.sched.pc 值放入 BX 寄存器// PC 指針指向 gogo 退出時(shí)需要執(zhí)行的函數(shù)地址MOVQ gobuf_pc(BX), BX// 用 BX 寄存器里的值去修改 CPU 的 IP 寄存器,// 這樣就可以根據(jù) CS:IP 寄存器的段地址+偏移量跳轉(zhuǎn)到 BX 寄存器里的地址,也就是 gp.sched.pcJMP BX

    runtime.gogo 函數(shù)接收 gp.sched 這個(gè) gobuf 結(jié)構(gòu)體實(shí)例,其中保存了函數(shù)棧寄存器 SP/PC/BP,如果熟悉操作系統(tǒng)原理的話可以知道這些寄存器是 CPU 進(jìn)行函數(shù)調(diào)用和返回時(shí)切換對(duì)應(yīng)的函數(shù)棧幀所需的寄存器,而 goroutine 的執(zhí)行和函數(shù)調(diào)用的原理是一致的,也是 CPU 寄存器的切換過程,所以這里的幾個(gè)寄存器當(dāng)前存的就是 G 的函數(shù)執(zhí)行棧,當(dāng) goroutine 在處理網(wǎng)絡(luò) I/O 之時(shí),如果恰好處于 I/O 就緒的狀態(tài)的話,則正常完成 runtime.gogo,并在最后跳轉(zhuǎn)到特定的地址,那么這個(gè)地址是哪里呢?

    我們知道 CPU 執(zhí)行函數(shù)的時(shí)候需要知道函數(shù)在內(nèi)存里的代碼段地址和偏移量,然后才能去取來(lái)函數(shù)棧執(zhí)行,而典型的提供代碼段地址和偏移量的寄存器就是 CS 和 IP 寄存器,而 JMP BX 指令則是用 BX 寄存器去更新 IP 寄存器,而 BX 寄存器里的值是 gp.sched.pc,那么這個(gè) PC 指針究竟是指向哪里呢?讓我們來(lái)看另一處源碼。

    眾所周知,啟動(dòng)一個(gè)新的 goroutine 是通過 go 關(guān)鍵字來(lái)完成的,而 go compiler 會(huì)在編譯期間利用 cmd/compile/internal/gc.state.stmt 和 cmd/compile/internal/gc.state.call 這兩個(gè)函數(shù)將 go 關(guān)鍵字翻譯成 runtime.newproc 函數(shù)調(diào)用,而 runtime.newproc 接收了函數(shù)指針和其大小之后,會(huì)獲取 goroutine 和調(diào)用處的程序計(jì)數(shù)器,接著再調(diào)用 runtime.newproc1:

    //?Create?a?new?g?in?state?_Grunnable,?starting?at?fn,?with?narg?bytes //?of?arguments?starting?at?argp.?callerpc?is?the?address?of?the?go //?statement?that?created?this.?The?caller?is?responsible?for?adding //?the?new?g?to?the?scheduler. // //?This?must?run?on?the?system?stack?because?it's?the?continuation?of //?newproc,?which?cannot?split?the?stack. // //go:systemstack func?newproc1(fn?*funcval,?argp?unsafe.Pointer,?narg?int32,?callergp?*g,?callerpc?uintptr)?*g?{...memclrNoHeapPointers(unsafe.Pointer(&newg.sched),?unsafe.Sizeof(newg.sched))newg.sched.sp?=?spnewg.stktopsp?=?sp//?把?goexit?函數(shù)地址存入?gobuf?的?PC?指針里newg.sched.pc?=?funcPC(goexit)?+?sys.PCQuantum?//?+PCQuantum?so?that?previous?instruction?is?in?same?functionnewg.sched.g?=?guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched,?fn)newg.gopc?=?callerpcnewg.ancestors?=?saveAncestors(callergp)newg.startpc?=?fn.fnif?_g_.m.curg?!=?nil?{newg.labels?=?_g_.m.curg.labels}if?isSystemGoroutine(newg,?false)?{atomic.Xadd(&sched.ngsys,?+1)}casgstatus(newg,?_Gdead,?_Grunnable)... }

    這里可以看到,newg.sched.pc 被設(shè)置了 runtime.goexit 的函數(shù)地址,newg 就是后面 runtime.gogo 執(zhí)行的 goroutine,因此 runtime.gogo 最后的匯編指令 JMP BX是跳轉(zhuǎn)到了 runtime.goexit,讓我們來(lái)繼續(xù)看看這個(gè)函數(shù)做了什么:

    // The top-most function running on a goroutine // returns to goexit+PCQuantum. Defined as ABIInternal // so as to make it identifiable to traceback (this // function it used as a sentinel; traceback wants to // see the func PC, not a wrapper PC). TEXT runtime·goexit<ABIInternal>(SB),NOSPLIT,$0-0BYTE $0x90 // NOPCALL runtime·goexit1(SB) // does not return// traceback from goexit1 must hit code range of goexitBYTE $0x90 // NOP

    這個(gè)函數(shù)也是匯編實(shí)現(xiàn)的,但是非常簡(jiǎn)單,就是直接調(diào)用 runtime·goexit1:

    //?Finishes?execution?of?the?current?goroutine. func?goexit1()?{if?raceenabled?{racegoend()}if?trace.enabled?{traceGoEnd()}mcall(goexit0) }

    調(diào)用 runtime.mcall函數(shù):

    // func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g.// 切換回 g0 的系統(tǒng)堆棧,執(zhí)行 fn(g) TEXT runtime·mcall(SB), NOSPLIT, $0-8// 取入?yún)?funcval 對(duì)象的指針存入 DI 寄存器,此時(shí) fn.fn 是 goexit0 的地址MOVQ fn+0(FP), DIget_tls(CX)MOVQ g(CX), AX // save state in g->schedMOVQ 0(SP), BX // caller's PCMOVQ BX, (g_sched+gobuf_pc)(AX)LEAQ fn+0(FP), BX // caller's SPMOVQ BX, (g_sched+gobuf_sp)(AX)MOVQ AX, (g_sched+gobuf_g)(AX)MOVQ BP, (g_sched+gobuf_bp)(AX)// switch to m->g0 & its stack, call fnMOVQ g(CX), BXMOVQ g_m(BX), BX// 把 g0 的棧指針存入 SI 寄存器,后面需要用到MOVQ m_g0(BX), SICMPQ SI, AX // if g == m->g0 call badmcallJNE 3(PC)MOVQ $runtime·badmcall(SB), AXJMP AX// 這兩個(gè)指令是把 g0 地址存入到 TLS 里,// 然后從 SI 寄存器取出 g0 的棧指針,// 替換掉 SP 寄存器里存的當(dāng)前 g 的棧指針MOVQ SI, g(CX) // g = m->g0MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.spPUSHQ AXMOVQ DI, DX// 入口處的第一個(gè)指令已經(jīng)把 funcval 實(shí)例對(duì)象的指針存入了 DI 寄存器,// 0(DI) 表示取出 DI 的第一個(gè)成員,即 goexit0 函數(shù)地址,再存入 DIMOVQ 0(DI), DICALL DI // 調(diào)用 DI 寄存器里的地址,即 goexit0POPQ AXMOVQ $runtime·badmcall2(SB), AXJMP AXRET

    可以看到 runtime.mcall 函數(shù)的主要邏輯是從當(dāng)前 goroutine 切換回 g0 的系統(tǒng)堆棧,然后調(diào)用 fn(g),此處的 g 即是當(dāng)前運(yùn)行的 goroutine,這個(gè)方法會(huì)保存當(dāng)前運(yùn)行的 G 的 PC/SP 到 g->sched 里,以便該 G 可以在以后被重新恢復(fù)執(zhí)行,因?yàn)橐采婕暗郊拇嫫骱投褩V羔樀牟僮?#xff0c;所以也需要使用匯編實(shí)現(xiàn),該函數(shù)最后會(huì)在 g0 系統(tǒng)堆棧下執(zhí)行 runtime.goexit0:

    func?goexit0(gp?*g)?{_g_?:=?getg()casgstatus(gp,?_Grunning,?_Gdead)if?isSystemGoroutine(gp,?false)?{atomic.Xadd(&sched.ngsys,?-1)}gp.m?=?nillocked?:=?gp.lockedm?!=?0gp.lockedm?=?0_g_.m.lockedg?=?0gp.preemptStop?=?falsegp.paniconfault?=?falsegp._defer?=?nil?//?should?be?true?already?but?just?in?case.gp._panic?=?nil?//?non-nil?for?Goexit?during?panic.?points?at?stack-allocated?data.gp.writebuf?=?nilgp.waitreason?=?0gp.param?=?nilgp.labels?=?nilgp.timer?=?nilif?gcBlackenEnabled?!=?0?&&?gp.gcAssistBytes?>?0?{//?Flush?assist?credit?to?the?global?pool.?This?gives//?better?information?to?pacing?if?the?application?is//?rapidly?creating?an?exiting?goroutines.scanCredit?:=?int64(gcController.assistWorkPerByte?*?float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit,?scanCredit)gp.gcAssistBytes?=?0}dropg()if?GOARCH?==?"wasm"?{?//?no?threads?yet?on?wasmgfput(_g_.m.p.ptr(),?gp)schedule()?//?never?returns}if?_g_.m.lockedInt?!=?0?{print("invalid?m->lockedInt?=?",?_g_.m.lockedInt,?"\n")throw("internal?lockOSThread?error")}gfput(_g_.m.p.ptr(),?gp)if?locked?{//?The?goroutine?may?have?locked?this?thread?because//?it?put?it?in?an?unusual?kernel?state.?Kill?it//?rather?than?returning?it?to?the?thread?pool.//?Return?to?mstart,?which?will?release?the?P?and?exit//?the?thread.if?GOOS?!=?"plan9"?{?//?See?golang.org/issue/22227.gogo(&_g_.m.g0.sched)}?else?{//?Clear?lockedExt?on?plan9?since?we?may?end?up?re-using//?this?thread._g_.m.lockedExt?=?0}}schedule() }

    runtime.goexit0 的主要工作是就是

  • 利用 CAS 操作把 g 的狀態(tài)從 _Grunning 更新為 _Gdead;

  • 對(duì) g 做一些清理操作,把一些字段值置空;

  • 調(diào)用 runtime.dropg 解綁 g 和 m;

  • 把 g 放入 p 存儲(chǔ) g 的 gfree 鏈表作為緩存,后續(xù)如果需要啟動(dòng)新的 goroutine 則可以直接從鏈表里取而不用重新初始化分配內(nèi)存。

  • 最后,調(diào)用 runtime.schedule() 再次進(jìn)入調(diào)度循環(huán)去調(diào)度新的 goroutines,永不停歇。

  • 另一方面,如果 goroutine 處于 I/O 不可用狀態(tài),我們前面已經(jīng)分析過 netpoller 利用非阻塞 I/O + I/O 多路復(fù)用避免了陷入系統(tǒng)調(diào)用,所以此時(shí)會(huì)調(diào)用 runtime.gopark 并把 goroutine 暫時(shí)封存在用戶態(tài)空間,并休眠當(dāng)前的 goroutine,因此不會(huì)阻塞 runtime.gogo 的匯編執(zhí)行,而是通過 runtime.mcall 調(diào)用 runtime.park_m:

    func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.mcall(park_m) }func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }

    runtime.mcall 方法我們?cè)谇懊嬉呀?jīng)介紹過,它主要的工作就是是從當(dāng)前 goroutine 切換回 g0 的系統(tǒng)堆棧,然后調(diào)用 fn(g),而此時(shí) runtime.mcall 調(diào)用執(zhí)行的是 runtime.park_m,這個(gè)方法里會(huì)利用 CAS 把當(dāng)前運(yùn)行的 goroutine -- gp 的狀態(tài) 從 _Grunning 切換到 _Gwaiting,表明該 goroutine 已進(jìn)入到等待喚醒狀態(tài),此時(shí)封存和休眠 G 的操作就完成了,只需等待就緒之后被重新喚醒執(zhí)行即可。最后調(diào)用 runtime.schedule() 再次進(jìn)入調(diào)度循環(huán),去執(zhí)行下一個(gè) goroutine,充分利用 CPU。

    至此,我們完成了對(duì) Go netpoller 原理剖析的整個(gè)閉環(huán)。

    Go netpoller 的問題

    Go netpoller 的設(shè)計(jì)不可謂不精巧、性能也不可謂不高,配合 goroutine 開發(fā)網(wǎng)絡(luò)應(yīng)用的時(shí)候就一個(gè)字:爽。因此 Go 的網(wǎng)絡(luò)編程模式是及其簡(jiǎn)潔高效的,然而,沒有任何一種設(shè)計(jì)和架構(gòu)是完美的, goroutine-per-connection 這種模式雖然簡(jiǎn)單高效,但是在某些極端的場(chǎng)景下也會(huì)暴露出問題:goroutine 雖然非常輕量,它的自定義棧內(nèi)存初始值僅為 2KB,后面按需擴(kuò)容;海量連接的業(yè)務(wù)場(chǎng)景下, goroutine-per-connection ,此時(shí) goroutine 數(shù)量以及消耗的資源就會(huì)呈線性趨勢(shì)暴漲,雖然 Go scheduler 內(nèi)部做了 g 的緩存鏈表,可以一定程度上緩解高頻創(chuàng)建銷毀 goroutine 的壓力,但是對(duì)于瞬時(shí)性暴漲的長(zhǎng)連接場(chǎng)景就無(wú)能為力了,大量的 goroutines 會(huì)被不斷創(chuàng)建出來(lái),從而對(duì) Go runtime scheduler 造成極大的調(diào)度壓力和侵占系統(tǒng)資源,然后資源被侵占又反過來(lái)影響 Go scheduler 的調(diào)度,進(jìn)而導(dǎo)致性能下降。

    Reactor 網(wǎng)絡(luò)模型

    目前 Linux 平臺(tái)上主流的高性能網(wǎng)絡(luò)庫(kù)/框架中,大都采用 Reactor 模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。

    Reactor 模式本質(zhì)上指的是使用 I/O 多路復(fù)用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O) 的模式。

    通常設(shè)置一個(gè)主線程負(fù)責(zé)做 event-loop 事件循環(huán)和 I/O 讀寫,通過 select/poll/epoll_wait 等系統(tǒng)調(diào)用監(jiān)聽 I/O 事件,業(yè)務(wù)邏輯提交給其他工作線程去做。而所謂『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系統(tǒng)調(diào)用上,這樣可以最大限度的復(fù)用 event-loop 線程,讓一個(gè)線程能服務(wù)于多個(gè) sockets。在 Reactor 模式中,I/O 線程只能阻塞在 I/O multiplexing 函數(shù)上(select/poll/epoll_wait)。

    Reactor 模式的基本工作流程如下:

    • Server 端完成在 bind&listen 之后,將 listenfd 注冊(cè)到 epollfd 中,最后進(jìn)入 event-loop 事件循環(huán)。循環(huán)過程中會(huì)調(diào)用 select/poll/epoll_wait 阻塞等待,若有在 listenfd 上的新連接事件則解除阻塞返回,并調(diào)用 socket.accept 接收新連接 connfd,并將 connfd 加入到 epollfd 的 I/O 復(fù)用(監(jiān)聽)隊(duì)列。

    • 當(dāng) connfd 上發(fā)生可讀/可寫事件也會(huì)解除 select/poll/epoll_wait 的阻塞等待,然后進(jìn)行 I/O 讀寫操作,這里讀寫 I/O 都是非阻塞 I/O,這樣才不會(huì)阻塞 event-loop 的下一個(gè)循環(huán)。然而,這樣容易割裂業(yè)務(wù)邏輯,不易理解和維護(hù)。

    • 調(diào)用 read 讀取數(shù)據(jù)之后進(jìn)行解碼并放入隊(duì)列中,等待工作線程處理。

    • 工作線程處理完數(shù)據(jù)之后,返回到 event-loop 線程,由這個(gè)線程負(fù)責(zé)調(diào)用 write 把數(shù)據(jù)寫回 client。

    accept 連接以及 conn 上的讀寫操作若是在主線程完成,則要求是非阻塞 I/O,因?yàn)?Reactor 模式一條最重要的原則就是:I/O 操作不能阻塞 event-loop 事件循環(huán)。實(shí)際上 event loop 可能也可以是多線程的,只是一個(gè)線程里只有一個(gè) select/poll/epoll_wait

    上面提到了 Go netpoller 在某些場(chǎng)景下可能因?yàn)閯?chuàng)建太多的 goroutine 而過多地消耗系統(tǒng)資源,而在現(xiàn)實(shí)世界的網(wǎng)絡(luò)業(yè)務(wù)中,服務(wù)器持有的海量連接中在極短的時(shí)間窗口內(nèi)只有極少數(shù)是 active 而大多數(shù)則是 idle,就像這樣(非真實(shí)數(shù)據(jù),僅僅是為了比喻):

    那么為每一個(gè)連接指派一個(gè) goroutine 就顯得太過奢侈了,而 Reactor 模式這種利用 I/O 多路復(fù)用進(jìn)而只需要使用少量線程即可管理海量連接的設(shè)計(jì)就可以在這樣網(wǎng)絡(luò)業(yè)務(wù)中大顯身手了:

    MultiReactors.png

    在絕大部分應(yīng)用場(chǎng)景下,我推薦大家還是遵循 Go 的 best practices,使用原生的 Go 網(wǎng)絡(luò)庫(kù)來(lái)構(gòu)建自己的網(wǎng)絡(luò)應(yīng)用。然而,在某些極度追求性能、壓榨系統(tǒng)資源以及技術(shù)棧必須是原生 Go (不考慮 C/C++ 寫中間層而 Go 寫業(yè)務(wù)層)的業(yè)務(wù)場(chǎng)景下,我們可以考慮自己構(gòu)建 Reactor 網(wǎng)絡(luò)模型。

    gnet

    gnet 是一個(gè)基于事件驅(qū)動(dòng)的高性能和輕量級(jí)網(wǎng)絡(luò)框架。它直接使用 epoll 和 kqueue 系統(tǒng)調(diào)用而非標(biāo)準(zhǔn) Go 網(wǎng)絡(luò)包:net 來(lái)構(gòu)建網(wǎng)絡(luò)應(yīng)用,它的工作原理類似兩個(gè)開源的網(wǎng)絡(luò)庫(kù):netty 和 libuv,這也使得gnet 達(dá)到了一個(gè)遠(yuǎn)超 Go net 的性能表現(xiàn)。

    gnet 設(shè)計(jì)開發(fā)的初衷不是為了取代 Go 的標(biāo)準(zhǔn)網(wǎng)絡(luò)庫(kù):net,而是為了創(chuàng)造出一個(gè)類似于 Redis、Haproxy 能高效處理網(wǎng)絡(luò)包的 Go 語(yǔ)言網(wǎng)絡(luò)服務(wù)器框架。

    gnet 的賣點(diǎn)在于它是一個(gè)高性能、輕量級(jí)、非阻塞的純 Go 實(shí)現(xiàn)的傳輸層(TCP/UDP/Unix Domain Socket)網(wǎng)絡(luò)框架,開發(fā)者可以使用 gnet 來(lái)實(shí)現(xiàn)自己的應(yīng)用層網(wǎng)絡(luò)協(xié)議(HTTP、RPC、Redis、WebSocket 等等),從而構(gòu)建出自己的應(yīng)用層網(wǎng)絡(luò)應(yīng)用:比如在 gnet 上實(shí)現(xiàn) HTTP 協(xié)議就可以創(chuàng)建出一個(gè) HTTP 服務(wù)器 或者 Web 開發(fā)框架,實(shí)現(xiàn) Redis 協(xié)議就可以創(chuàng)建出自己的 Redis 服務(wù)器等等。

    gnet,在某些極端的網(wǎng)絡(luò)業(yè)務(wù)場(chǎng)景,比如海量連接、高頻短連接、網(wǎng)絡(luò)小包等等場(chǎng)景,gnet 在性能和資源占用上都遠(yuǎn)超 Go 原生的 net 包(基于 netpoller)。

    gnet 已經(jīng)實(shí)現(xiàn)了 Multi-Reactors 和 Multi-Reactors + Goroutine Pool 兩種網(wǎng)絡(luò)模型,也得益于這些網(wǎng)絡(luò)模型,使得 gnet 成為一個(gè)高性能和低損耗的 Go 網(wǎng)絡(luò)框架:

    MultiReactors.png

    multireactorsthreadpool.png

    ???? 功能

    • [x] 高性能 的基于多線程/Go程網(wǎng)絡(luò)模型的 event-loop 事件驅(qū)動(dòng)

    • [x] 內(nèi)置 goroutine 池,由開源庫(kù) ants 提供支持

    • [x] 內(nèi)置 bytes 內(nèi)存池,由開源庫(kù) bytebufferpool 提供支持

    • [x] 整個(gè)生命周期是無(wú)鎖的

    • [x] 簡(jiǎn)單易用的 APIs

    • [x] 基于 Ring-Buffer 的高效且可重用的內(nèi)存 buffer

    • [x] 支持多種網(wǎng)絡(luò)協(xié)議/IPC 機(jī)制:TCP、UDP 和 Unix Domain Socket

    • [x] 支持多種負(fù)載均衡算法:Round-Robin(輪詢)、Source-Addr-Hash(源地址哈希) 和 Least-Connections(最少連接數(shù))

    • [x] 支持兩種事件驅(qū)動(dòng)機(jī)制:Linux 里的 epoll 以及 FreeBSD/DragonFly/Darwin 里的 kqueue

    • [x] 支持異步寫操作

    • [x] 靈活的事件定時(shí)器

    • [x] SO_REUSEPORT 端口重用

    • [x] 內(nèi)置多種編解碼器,支持對(duì) TCP 數(shù)據(jù)流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,參考自 netty codec,而且支持自定制編解碼器

    • [x] 支持 Windows 平臺(tái),基于 IOCP 事件驅(qū)動(dòng)機(jī)制 Go 標(biāo)準(zhǔn)網(wǎng)絡(luò)庫(kù)

    • [ ] 實(shí)現(xiàn) gnet 客戶端

    參考&延伸閱讀

    • The Go netpoller

    • Nonblocking I/O

    • epoll(7) — Linux manual page

    • I/O Multiplexing: The select and poll Functions

    • The method to epoll’s madness

    • Scalable Go Scheduler Design Doc

    • Scheduling In Go : Part I - OS Scheduler

    • Scheduling In Go : Part II - Go Scheduler

    • Scheduling In Go : Part III - Concurrency

    • Goroutines, Nonblocking I/O, And Memory Usage

    • IO多路復(fù)用與Go網(wǎng)絡(luò)庫(kù)的實(shí)現(xiàn)

    • 關(guān)于select函數(shù)中timeval和fd_set重新設(shè)置的問題

    • A Million WebSockets and Go

    • Going Infinite, handling 1M websockets connections in Go

    • 字節(jié)跳動(dòng)在 Go 網(wǎng)絡(luò)庫(kù)上的實(shí)踐

    總結(jié)

    以上是生活随笔為你收集整理的Go netpoller 网络模型之源码全面解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

    亚洲 成人 欧美 | 人人狠狠综合久久亚洲婷 | 免费福利在线 | 手机成人在线 | 免费看的国产视频网站 | 天天草天天色 | 国产在线色视频 | 欧美日韩99 | 在线观看av片 | 欧美一区二区伦理片 | 综合久久婷婷 | 日韩大片免费在线观看 | 国产精品白丝av | 国产亲近乱来精品 | 亚洲少妇激情 | 精品a视频 | 久久精品一区二区三区中文字幕 | 国产录像在线观看 | www成人精品 | 欧美色图一区 | 蜜臀av麻豆 | 亚洲经典精品 | 色播五月激情综合网 | 色老板在线视频 | 精品国产欧美一区二区三区不卡 | 色婷久久 | 99产精品成人啪免费网站 | 欧美另类巨大 | 欧美美女一级片 | 伊人黄色网 | 国产流白浆高潮在线观看 | 91精品国产一区 | 黄色亚洲精品 | 91成人网在线 | 久久久久久综合网天天 | 黄色av成人在线 | 欧美一区二区三区免费看 | 日韩一区二区三区高清在线观看 | 九九99靖品 | 天天操天天操天天 | 日本中文字幕在线看 | 久精品在线观看 | www国产亚洲精品久久麻豆 | 久久久精品福利视频 | 成人网444ppp| 亚洲黄色app | 成人网色| 五月天婷亚洲天综合网鲁鲁鲁 | 伊人日日干 | 国产黄网站在线观看 | 免费av网址大全 | 亚洲高清资源 | 久久高清国产 | 伊人宗合| 国产免费a| 伊人久操 | 激情网综合 | 亚洲成人资源在线 | 91av视频在线观看免费 | 国产精品男女视频 | 国产精品午夜av | 天天做天天看 | 五月天亚洲婷婷 | 偷拍久久久| 日韩视频免费播放 | 国产中文欧美日韩在线 | 一区二区三区在线观看免费视频 | 日韩精品三区四区 | 国产精品一区二区三区在线播放 | 日韩理论在线 | 成人羞羞免费 | 亚洲精品国偷拍自产在线观看 | 天天干天天天天 | 精品婷婷 | 91精品推荐 | 午夜精品一区二区三区视频免费看 | 久久久久99精品成人片三人毛片 | 九九亚洲精品 | 欧美日韩国产精品爽爽 | 人人玩人人添人人澡97 | 中文字幕欧美激情 | 在线视频你懂得 | 97人人超 | 久久人人添人人爽添人人88v | 免费观看成年人视频 | av日韩中文 | 2021国产视频 | 精品亚洲va在线va天堂资源站 | 日韩一二区在线观看 | 黄色中文字幕在线 | 欧美极品少妇xxxx | 亚洲干视频在线观看 | 亚洲精选久久 | 久久久精品亚洲 | 四虎www | 中文字幕av网站 | 成人在线视频在线观看 | 久久艹免费 | www99精品 | 免费在线一区二区 | 国产精品国产三级在线专区 | 精品在线免费视频 | 国产在线观看 | 亚洲成人午夜av | 日韩视频免费观看高清完整版在线 | 久久久资源网 | 婷婷久久一区 | 日本少妇高清做爰视频 | 福利电影一区二区 | 97激情影院 | 亚洲热久久 | 99久久精品免费看 | 久久久久亚洲精品男人的天堂 | 天天干天天操天天入 | 亚洲一区二区三区miaa149 | 热久在线 | 国产成人免费高清 | 国产99在线播放 | 成人久久免费视频 | 日韩欧美在线视频一区二区 | 一区二区三区在线免费观看 | 久久av在线| 欧美日韩精品在线播放 | 中文字幕在线观看视频一区二区三区 | 国产欧美精品在线观看 | aaa毛片视频 | 久热久草 | 日韩精品免费一区二区三区 | 青青河边草观看完整版高清 | 日日干 天天干 | 天天草天天色 | 国产欧美精品xxxx另类 | 91av视频网| 99久久er热在这里只有精品15 | 国产一区视频在线播放 | 精品麻豆入口免费 | 97色se| 最新日韩在线 | 国产小视频网站 | 99色免费 | 午夜电影久久久 | 亚洲 在线 | 91在线网站| 看片一区二区三区 | www久久 | 久视频在线播放 | 麻豆视频免费看 | 欧美日韩一区二区久久 | 韩国av一区二区三区在线观看 | 中文字幕在线一区观看 | 中文字幕二区在线观看 | 日韩在线视频不卡 | 久久成人国产精品一区二区 | 久久综合激情 | 麻豆91精品91久久久 | 日韩激情视频在线观看 | 国产精品国产亚洲精品看不卡15 | 色姑娘综合 | 久久无码av一区二区三区电影网 | 日韩精品一区在线观看 | 久久激情视频 久久 | 国内精品久久久久 | 91完整视频| 婷婷狠狠操| 久久国产品 | 精品自拍sae8—视频 | 欧美成人精品在线 | 91热这里只有精品 | 亚洲精品伦理在线 | 日日夜夜网 | 99在线高清视频在线播放 | 成人黄色小说视频 | 亚洲精品综合欧美二区变态 | 亚洲国产中文字幕 | 又黄又爽又刺激 | 日韩成人黄色 | 91综合视频在线观看 | 亚洲a资源 | 亚州欧美视频 | 日韩在线视频观看 | 国产字幕在线播放 | 国产中文字幕视频在线观看 | 黄色大片日本 | 日韩电影在线视频 | 免费国产黄线在线观看视频 | 亚洲精品在线播放视频 | 久久久精品综合 | av夜夜操 | 色com网 | 精品国产福利在线 | 国产xxxx性hd极品 | 国产在线国偷精品产拍免费yy | 亚洲黄a| 欧美网站黄色 | 国产精品91一区 | 一区二区三区中文字幕在线观看 | 久插视频 | 国产精品入口麻豆www | 亚洲成av人片在线观看www | 最新av免费在线观看 | 久久天天躁狠狠躁夜夜不卡公司 | 麻豆一二三精选视频 | 夜夜躁日日躁狠狠久久av | 亚洲涩涩网站 | 国产精品福利一区 | 激情影院在线 | 久久99久国产精品黄毛片入口 | 久久人人爽人人爽人人片 | av黄色大片| 狠狠干综合 | 77国产精品 | 成年人视频在线免费观看 | 国产视频中文字幕在线观看 | 一本一道波多野毛片中文在线 | 久久精品www人人爽人人 | 欧美激情va永久在线播放 | 99久热在线精品视频成人一区 | 亚洲一级片在线观看 | 91高清免费观看 | 国产高清在线观看 | 国产馆在线播放 | 免费a一级| 精品国偷自产国产一区 | 日本中文字幕观看 | 中文免费在线观看 | 91亚洲精品久久久蜜桃借种 | a视频免费| 国产污视频在线观看 | 成人av在线直播 | 免费激情在线电影 | 久久国产成人午夜av影院宅 | 久久精品123 | 久久久资源| 美女久久99| 国产黄视频在线观看 | 香蕉精品视频在线观看 | 婷婷伊人综合亚洲综合网 | 日韩av二区| 99视频99| 日日干天天插 | 人人涩| 国产福利免费看 | 国产精品手机在线 | 国产一级片视频 | 午夜影院日本 | 日本中文字幕在线看 | 色综合人人 | 99精品欧美一区二区蜜桃免费 | 亚洲 欧美 另类人妖 | 97在线资源| 超碰国产在线 | 日韩av图片 | 色吊丝在线永久观看最新版本 | 在线免费色 | 欧美色操 | 亚洲午夜av久久乱码 | 国产不卡毛片 | a在线观看免费视频 | 久久国色夜色精品国产 | 久久婷婷一区二区三区 | 国产精品久免费的黄网站 | 天天看天天干 | 日本性生活免费看 | 一级一片免费观看 | 亚洲激情av | 99久久99久久综合 | 激情视频一区 | 久久久久女教师免费一区 | 国产中文字幕一区二区三区 | 色偷偷888欧美精品久久久 | 麻豆一精品传二传媒短视频 | 久久久久久国产精品久久 | 国产成人免费高清 | 在线免费av网站 | 色狠狠干| 天天摸日日摸人人看 | 日本三级香港三级人妇99 | 最近在线中文字幕 | 亚洲视频在线观看免费 | 久久久久久久看片 | 精产嫩模国品一二三区 | 国内丰满少妇猛烈精品播放 | 国产精品人成电影在线观看 | av网址在线播放 | 欧美亚洲三级 | 久久久久亚洲国产 | 午夜12点 | 久久久国产高清 | 男女激情片在线观看 | 日日夜夜综合网 | 免费福利在线 | 亚洲人在线7777777精品 | 亚洲免费永久精品国产 | 九九热免费精品视频 | 亚洲国产一区二区精品专区 | 91资源在线观看 | 91视频黄色| www色片| 成人免费在线视频观看 | 日韩成人精品 | 日韩一区二区三区免费视频 | 深爱激情五月婷婷 | 伊人手机在线 | 国产高清不卡av | 国产拍揄自揄精品视频麻豆 | 亚洲视频在线看 | 久久久免费精品国产一区二区 | 久久有精品 | 99视| 国产精品久久久久久久久久三级 | 玖玖在线看 | 国产色一区 | 免费视频资源 | 国产精品h在线观看 | 成人久久18免费网站图片 | 一区久久久 | 伊人狠狠色丁香婷婷综合 | 亚洲 欧美 变态 国产 另类 | 在线观看视频黄 | 91社区国产高清 | 国产特级毛片 | 天堂va欧美va亚洲va老司机 | 亚洲欧洲日韩在线观看 | www国产在线| 欧美日韩在线观看一区 | 美女精品在线 | 欧美成人按摩 | 亚洲国产精品视频在线观看 | 免费观看黄 | 色丁香久久 | 波多野结衣在线播放视频 | 久久国产视频网站 | 国产精品午夜久久 | 91免费黄视频 | 久久久久日本精品一区二区三区 | 日韩精品黄 | 黄色小网站在线观看 | 成 人 黄 色 免费播放 | 人人干人人做 | 最近中文字幕在线中文高清版 | 美女黄色网在线播放 | 99精品国产一区二区三区麻豆 | 国产成人在线综合 | 国产精品一二三 | 97碰在线 | 国产在线精品一区二区三区 | 毛片无卡免费无播放器 | 国产成人精品一区二区三区福利 | 亚洲精品乱码久久久久久写真 | 日韩免费在线观看网站 | 中文字幕高清有码 | 99爱在线| 成年人免费在线看 | 丁香婷婷色综合亚洲电影 | 国产97在线看| 四虎永久视频 | 亚洲久草网 | 日日爱夜夜爱 | 色91av| 亚洲乱码精品久久久 | 国内精品久久久久久久久 | 91豆花在线观看 | 国产精品青草综合久久久久99 | 国产亚洲人成网站在线观看 | 在线99热 | 久久精品这里都是精品 | av片一区 | 麻豆国产在线视频 | 国产精品网站一区二区三区 | 91亚色在线观看 | 91人人澡人人爽人人精品 | 久久不见久久见免费影院 | 午夜色大片在线观看 | 国产欧美日韩视频 | 91精品久久久久久久久久入口 | 亚洲毛片视频 | 午夜av激情 | 高清国产在线一区 | 日韩精品免费在线视频 | 五月天婷亚洲天综合网鲁鲁鲁 | 91精品1区 | 日韩性片 | 美女免费视频观看网站 | 波多野结衣在线视频免费观看 | 9在线观看免费高清完整版 玖玖爱免费视频 | 99综合影院在线 | 国产成人精品亚洲日本在线观看 | www.日日日.com | 亚洲天堂激情 | 在线观看成人一级片 | 91网在线看| 日本在线视频网址 | 日本一区二区不卡高清 | 国产视频一区二区在线观看 | 免费在线激情视频 | 伊人狠狠操| 亚洲欧美成aⅴ人在线观看 四虎在线观看 | 欧美极品少妇xxxxⅹ欧美极品少妇xxxx亚洲精品 | 一级c片| 久久精选| 久久视频99 | 精品亚洲欧美无人区乱码 | 成人午夜电影在线观看 | 久久成人国产精品免费软件 | 久久艹在线 | 亚洲综合小说电影qvod | 天天色天天综合网 | 国产又粗又猛又黄又爽 | 国产黄色在线 | 亚洲五月六月 | 一区二区三区久久精品 | 伊色综合久久之综合久久 | 99久久99热这里只有精品 | 日韩v在线 | 色视频在线观看 | 激情久久小说 | 国产 字幕 制服 中文 在线 | 免费国产视频 | 在线观看日韩专区 | 亚洲精品乱码久久久久久9色 | 欧美 国产 视频 | 六月激情婷婷 | 国产破处在线播放 | 精品专区一区二区 | 久久久久久国产精品亚洲78 | 国产精品日韩在线播放 | 狠狠色噜噜狠狠狠 | bbbb操bbbb| 久久久免费 | 国产精品视频专区 | 亚洲免费精品一区二区 | 四虎在线免费观看 | 激情欧美一区二区三区免费看 | a天堂最新版中文在线地址 久久99久久精品国产 | 日韩欧美网址 | 亚洲一级片免费观看 | 国产精品3 | 91麻豆看国产在线紧急地址 | 久草在线视频免费资源观看 | 精品国产精品久久一区免费式 | 亚洲免费在线视频 | 久久亚洲美女 | 国产成人精品一区一区一区 | 亚洲精品乱码久久久久久久久久 | 成人黄色av免费在线观看 | 国产无区一区二区三麻豆 | 青青草国产精品视频 | 日韩免费成人av | 午夜久久美女 | 奇米影视在线99精品 | 欧美日韩精品免费观看视频 | 911亚洲精品第一 | 麻豆超碰 | 久久国产免费视频 | 国产亚洲成人网 | 亚洲国产精品一区二区久久,亚洲午夜 | 国产精品久久久久999 | 国产在线播放一区二区三区 | 国语黄色片 | 四虎亚洲精品 | 亚洲麻豆精品 | 狠狠成人| 福利av影院 | 天天躁天天躁天天躁婷 | 午夜精品久久久久久久久久久 | 中文字幕国产精品一区二区 | 国产 日韩 在线 亚洲 字幕 中文 | 91超碰免费在线 | 婷婷综合伊人 | 九七人人干 | 8090yy亚洲精品久久 | 狠狠狠狠狠狠操 | 在线免费视| 狠狠躁夜夜躁人人爽超碰97香蕉 | 免费一级片视频 | 国产精品美女www爽爽爽视频 | 中文字幕在线看视频国产 | 日本中文字幕高清 | 亚洲国产午夜视频 | 日韩高清精品一区二区 | 日本在线中文 | 超碰999 | 99精品视频在线观看视频 | 久久精品一区二区 | 99精品毛片 | av在线播放快速免费阴 | 911国产| 国产日产欧美在线观看 | 久草网在线 | 麻豆一区二区 | 毛片网在线观看 | 色综合久久久网 | 黄色的片子 | 黄色亚洲在线 | 青草视频在线 | 色在线视频网 | 日韩成年视频 | 韩日电影在线观看 | 亚洲一区视频免费观看 | 国产视频精品在线 | 手机在线黄色网址 | 欧美日韩国产一区 | 久久久久国产精品免费 | 中文字幕乱视频 | 亚洲另类久久 | 中文字幕久久精品亚洲乱码 | 射久久 | 中文字幕久久精品亚洲乱码 | 天天草天天色 | 国产成人av网站 | 2022国产精品视频 | 日韩欧美一区二区在线观看 | 91 中文字幕 | 国产精品美女在线观看 | www.69xx| 日本久久成人中文字幕电影 | 午夜精品久久久久久久99婷婷 | 激情欧美日韩一区二区 | 国产精品久久久久久五月尺 | 久久久99精品免费观看 | 超碰在线官网 | 在线精品观看 | 免费精品视频在线 | 伊人久久国产精品 | 韩国一区视频 | 人人干人人超 | 色婷婷视频在线观看 | 欧美亚洲国产一卡 | 日韩三级视频 | 久久不卡国产精品一区二区 | 91九色蝌蚪| 久久精品中文字幕 | 亚一亚二国产专区 | 亚洲在线视频网站 | 五月天激情在线 | 四虎影视成人精品 | 亚洲婷婷在线视频 | 五月天激情在线 | 久久精选| 午夜精品一区二区三区在线 | 国产高清专区 | 亚洲一区久久 | 久久婷婷国产色一区二区三区 | 免费高清在线视频一区· | 国产精品成人国产乱一区 | 欧美综合色在线图区 | 国产精品永久久久久久久久久 | 黄色一级网 | 国产无遮挡又黄又爽馒头漫画 | 国产区欧美 | 中文字幕色播 | 天天操天天摸天天射 | 国产三级午夜理伦三级 | 深爱激情五月综合 | 婷婷中文字幕综合 | 91精品网站在线观看 | 亚洲精品国产精品乱码不99热 | a级国产乱理论片在线观看 特级毛片在线观看 | 成人免费电影 | 天堂av在线中文在线 | 黄色影院在线播放 | 久草香蕉在线视频 | 色婷五月天 | 久久99精品久久久久久三级 | 国产网红在线观看 | 1024手机基地在线观看 | 精品二区久久 | 日韩欧美在线观看一区二区 | 久久久久欧美精品 | 日日夜色| 精品国产一二区 | 在线视频 91 | 成人一区二区三区在线观看 | 日本黄色大片免费看 | 国产成人精品一二三区 | 色中色亚洲 | 国产精品久久久久久久久久ktv | 在线视频亚洲 | 欧美一二三区在线播放 | 欧美激情综合色 | 天天色天天操天天爽 | 欧美成a人片在线观看久 | 国产精品系列在线 | 99视频精品| 亚洲va欧美va国产va黑人 | 亚洲人成免费网站 | 国产高清av| 日韩视频免费 | 日本mv大片欧洲mv大片 | 日韩视频中文 | 狠狠色伊人亚洲综合网站野外 | 久久综合九色综合欧美就去吻 | 五月开心色 | 久草在线手机观看 | 日韩高清无线码2023 | 欧美精品久久久久久久久久久 | 天天天天色射综合 | 8090yy亚洲精品久久 | 国产精品入口麻豆www | 久久精品中文 | 亚洲国产精品99久久久久久久久 | 在线观看亚洲国产 | 成年人在线播放视频 | 丁香花在线观看免费完整版视频 | 精品综合久久 | 日韩乱色精品一区二区 | 亚洲精品在线电影 | 国产自偷自拍 | 亚洲电影院 | 精品综合久久久 | 国产精品久久久电影 | 黄色国产成人 | 探花视频在线观看免费 | 天天操天天爽天天干 | 午夜三级理论 | 国色天香在线观看 | 特黄特色特刺激视频免费播放 | 狠狠综合久久 | 久久伊99综合婷婷久久伊 | 国产精品久久精品 | 高清av在线免费观看 | 国产日韩亚洲 | 亚洲最大av网 | 成人久久18免费网站 | 狠狠插狠狠操 | 国产精品99在线观看 | 特黄特色特刺激视频免费播放 | 伊人小视频 | 久久不射电影网 | 久久视频在线观看 | 免费在线电影网址大全 | 日韩在线字幕 | a级国产乱理论片在线观看 伊人宗合网 | 国产亚洲一级高清 | 国产无限资源在线观看 | 成人99免费视频 | 性色在线视频 | www视频在线播放 | 日本在线视频一区二区三区 | 日韩丝袜 | 国产精品久久久久一区二区三区共 | 午夜视频播放 | 欧洲精品久久久久毛片完整版 | 三级黄色理论片 | 久久影视精品 | 深夜视频久久 | 麻豆影视网| 波多野结衣综合网 | 99久久夜色精品国产亚洲 | 久久久久久久久久久久电影 | 91精品办公室少妇高潮对白 | 日韩欧美v | 国产成人一区二区三区在线观看 | 国产偷v国产偷∨精品视频 在线草 | 久久久精品影视 | 伊人www22综合色 | 中文字幕精品在线 | 成人四虎影院 | 特级毛片网站 | 91成人精品一区在线播放69 | 日韩国产精品久久久久久亚洲 | 中文字幕无吗 | 亚洲一级片 | 九九99靖品 | 精品一区二区三区在线播放 | 亚洲精品一区二区三区高潮 | 亚洲精品成人在线 | 国产精品永久在线观看 | 亚洲激情视频在线 | 亚州黄色一级 | 久久成人免费 | 在线观看黄网 | 色综合天天色综合 | 人成在线免费视频 | 久久手机视频 | 91热爆在线观看 | 激情久久综合网 | 久久免费美女视频 | 中文字幕在线观看视频一区二区三区 | 超碰在线色| 日韩欧美在线中文字幕 | 日日夜夜综合网 | 丝袜美腿在线 | 六月丁香在线视频 | 国产精品久久99综合免费观看尤物 | 97在线视频免费 | 丁香五婷| 午夜精品久久久久久久久久久 | 亚洲精品9 | 亚洲专区路线二 | 国产精品毛片网 | 97精品视频在线播放 | 久久精品视频国产 | 精品国产三级a∨在线欧美 免费一级片在线观看 | 三级在线视频观看 | 欧美在线久久 | 国语对白少妇爽91 | 亚洲第一区在线观看 | 69国产成人综合久久精品欧美 | 日韩精品不卡在线 | 久久久久久蜜av免费网站 | 国产一区二区影院 | 成年人视频在线免费观看 | 色老板在线 | 日日夜夜天天综合 | 免费在线一区二区 | 国产成人精品av在线 | 成人av在线网 | www.xxx.性狂虐 | 九九免费在线看完整版 | 久久久片 | 国产麻豆剧果冻传媒视频播放量 | 色黄www小说 | 久久精品99视频 | 成人免费观看电影 | 丁香六月婷婷综合 | 99福利片 | 国产精品美女久久 | 热久久国产精品 | 久久精品国产亚洲精品 | 婷婷午夜 | 欧美一级电影在线观看 | 亚洲一区天堂 | 免费欧美高清视频 | 国产免费资源 | 菠萝菠萝在线精品视频 | 中文乱幕日产无线码1区 | 九九热免费观看 | 国产精品99久久久 | 亚洲综合一区二区精品导航 | 一 级 黄 色 片免费看的 | 在线观看日本高清mv视频 | 久草97| 天天操天天射天天舔 | 精品久久久久久久 | 91av电影网 | 色综合久久88色综合天天6 | 91av电影网| 久久久精品在线观看 | 我爱av激情网 | 亚洲精品国产高清 | 一区二区三区国 | 国产中文字幕免费 | 九九在线视频免费观看 | 国产无限资源在线观看 | 在线国产专区 | 色香蕉在线视频 | 免费看黄的 | 成人免费av电影 | 日日日天天天 | 午夜影院一级片 | 久久精品国产免费看久久精品 | 六月婷婷网| .国产精品成人自产拍在线观看6 | 国产一区国产二区在线观看 | 日日爽| 91香蕉视频好色先生 | 在线播放 日韩专区 | 久久99九九99精品 | 国产精品久久久久久久久费观看 | a级成人毛片 | 黄色三级在线 | 国产精品一区二区三区在线免费观看 | 日韩成人精品在线观看 | 狠狠撸电影 | 日韩一级片观看 | 日日精品 | 97人人艹| 欧美日韩在线看 | 天天拍天天草 | 亚洲乱码在线 | 国产xx视频 | 日韩在线视频免费观看 | 国产精品私人影院 | 国产精品福利午夜在线观看 | 91麻豆高清视频 | 久久久免费国产 | 丁香激情五月婷婷 | 精品一区在线 | 欧美在线1区 | 久久麻豆视频 | 人人射人人爱 | 免费精品国产 | 国产精品一区二区在线看 | 久久久一本精品99久久精品 | 久久综合免费 | 日本aaaa级毛片在线看 | www.激情五月.com | av在线精品 | 国产免费人成xvideos视频 | 五月婷婷在线观看视频 | 成人av视屏 | 五月婷激情 | 久久精品国产一区二区 | 欧美色噜噜 | 成人小视频在线免费观看 | 日韩精品你懂的 | 丁香 久久 综合 | 欧美男同视频网站 | 日韩久久精品一区二区 | 91资源在线 | 国产亚洲欧美在线视频 | 婷婷丁香激情 | 99色资源 | 啪嗒啪嗒免费观看完整版 | 欧美性护士 | 在线观看成人小视频 | 久久a级片 | 亚洲天堂香蕉 | 亚洲精品视频在线 | 婷婷新五月 | 免费一级日韩欧美性大片 | 欧美日韩精品在线观看 | 最近2019中文免费高清视频观看www99 | 又黄又爽的视频在线观看网站 | 国产精品免费小视频 | 97热在线观看 | 在线激情网| 久av在线| 国产精品久久久免费 | av免费片| 日韩视频欧美视频 | 亚洲第一中文字幕 | 九草在线视频 | 国产成人av一区二区三区在线观看 | 在线观看的av | 丰满少妇一级 | 日韩a免费 | 欧美 日韩 国产 成人 在线 | 在线小视频国产 | 狠狠狠色丁香婷婷综合久久五月 | 黄色aa久久 | 国产亚洲免费的视频看 | 亚洲精品国产区 | www.激情五月.com | 福利视频一区二区 | 亚洲三级性片 | 天天操天天干天天摸 | 中文字幕在线观看视频一区 | 久久综合九色综合久久久精品综合 | 深夜免费福利网站 | 97中文字幕 | 亚洲视频www | 91传媒在线播放 | 久久久久国产精品午夜一区 | 久久毛片高清国产 | 国产精品日韩在线观看 | 日本69hd | 激情欧美网 | 狠狠的日| 色婷婷久久久综合中文字幕 | 日本电影黄色 | 成人av网址大全 | 人人爽人人爽人人爽学生一级 | 欧美伦理电影一区二区 | 69av久久| 黄a网 | 日本福利视频在线 | 808电影 | 欧美精品一区二区性色 | 成年人免费观看国产 | 在线视频99 | 欧美亚洲成人xxx | 激情综合五月网 | 最新真实国产在线视频 | 欧美日韩国产精品一区二区 | 三三级黄色片之日韩 | 成人免费看视频 | adc在线观看 | 黄色小网站免费看 | 精品一区久久 | 午夜影视av | 91看成人 | 国产一区高清在线 | 91污污视频在线观看 | 日韩一区二区免费播放 | 国产一区二区三区免费在线 | 少妇bbb好爽 | 国产精品女人网站 | 六月天综合网 | 一区二区精品国产 | 成人一级黄色片 | 国产高清在线 | 日韩欧美一区二区三区在线观看 | 欧美日本日韩aⅴ在线视频 插插插色综合 | 美女黄频| 在线小视频 | 中文字幕av免费观看 | 国产成人无码AⅤ片在线观 日韩av不卡在线 | 中文字幕电影高清在线观看 | 99日韩精品| 国产主播99 | 麻豆成人网 | 午夜性盈盈 | 四虎影视成人 | 四虎影视欧美 | 日本在线中文 | 天天操天天摸天天干 | 国产中文a | 国产91国语对白在线 | 国产99一区视频免费 | 91在线看视频 | 91视频在线免费观看 | 天天爽夜夜爽人人爽一区二区 | 国产女教师精品久久av | 一区中文字幕在线观看 | 欧美日韩三区二区 | 在线观看黄色免费视频 | 久久国产美女视频 | 精品国偷自产国产一区 | 99婷婷狠狠成为人免费视频 | 91av中文字幕 | 日韩在线观看视频网站 | 亚洲成人av电影在线 | 在线日韩视频 | 国产精品福利午夜在线观看 | 久久综合九色九九 | 国产视频在线观看一区二区 | 久草在线免费看视频 | 国产精品视频区 | 2019av在线视频 | 亚洲欧美一区二区三区孕妇写真 | 91成人免费电影 | 亚洲精品观看 | 在线观看中文av | 99久久99久久精品免费 | 成人免费视频免费观看 | 精品久久1 | 日韩成人看片 | 国产精品福利久久久 | 久久久精品国产免费观看同学 | 欧美久久成人 | 视频成人永久免费视频 | 看片黄网站 | 国产手机在线观看 | 久久久久国产精品免费免费搜索 | 99久久精品无码一区二区毛片 | 国产视频在线免费 | 国产高清在线免费视频 | 在线一级片 | 亚洲伦理一区二区 | 午夜三级福利 | 久久久久久久久久网 | 久久国产亚洲 | 手机看片国产日韩 | 日韩在线视频免费看 | 亚洲爱视频| 99久久精品国产一区 | 中文字幕亚洲综合久久五月天色无吗'' | 久久不射影院 | 最近中文字幕免费av | 狠狠色狠狠色综合系列 | 精品国产一区二区三区久久久蜜月 | 在线导航福利 | 国产精品成人一区二区三区 | 免费看一级特黄a大片 | 黄色av免费看 | 久久久国产毛片 | 日本中文字幕在线播放 | 中文字幕一区二区三区在线观看 | 欧美不卡视频在线 | 99视频偷窥在线精品国自产拍 | 不卡的av电影在线观看 | 视频 国产区 | 亚洲午夜精品一区 | 欧美国产亚洲精品久久久8v | 天天操天天射天天爱 | 国产精品中文字幕在线播放 | 99国产成+人+综合+亚洲 欧美 | 色wwwww | 国产精品亚洲成人 | 欧美亚洲一区二区在线 | 国产精品入口久久 | 一区二区三区精品久久久 | 日日夜夜狠狠操 | 欧美91精品久久久久国产性生爱 | 最新av观看 | 国产精品免费视频网站 | 国产精品久久久99 | 91久久久久久久 | 久久久久在线 | 91在线中文| 九九亚洲视频 | 日韩欧美视频免费观看 | 国产精品一区二区美女视频免费看 | 在线看成人av | 久草在线最新免费 | 在线观看av网站 | 久久69精品久久久久久久电影好 | 午夜av片| 国产一区二区不卡视频 | 麻豆影视在线播放 | 日本精品一二区 | 狠狠色丁香婷婷综合基地 | 天天射天天做 | 91传媒在线观看 | 中文字幕亚洲精品日韩 | 亚洲高清免费在线 |