golang socket读写同时_epoll在Golang的应用
使用Golang可以輕松地為每一個(gè)TCP連接創(chuàng)建一個(gè)協(xié)程去服務(wù)而不用擔(dān)心性能問(wèn)題,這是因?yàn)镚o內(nèi)部使用goroutine結(jié)合IO多路復(fù)用實(shí)現(xiàn)了一個(gè)“異步”的IO模型,這使得開(kāi)發(fā)者不用過(guò)多的關(guān)注底層,而只需要按照需求編寫(xiě)上層業(yè)務(wù)邏輯。這種異步的IO是如何實(shí)現(xiàn)的呢?下面我會(huì)針對(duì)Linux系統(tǒng)進(jìn)行分析。
在Unix/Linux系統(tǒng)下,一切皆文件,每條TCP連接對(duì)應(yīng)了一個(gè)socket句柄,這個(gè)句柄也可以看做是一個(gè)文件,在socket上收發(fā)數(shù)據(jù),相當(dāng)于對(duì)一個(gè)文件進(jìn)行讀寫(xiě),所以一個(gè)socket句柄,通常也用表示文件描述符fd來(lái)表示。可以進(jìn)入/proc/PID/fd/查看進(jìn)程占用的fd。
系統(tǒng)內(nèi)核會(huì)為每個(gè)socket句柄分配一個(gè)讀(接收)緩沖區(qū)和一個(gè)寫(xiě)(發(fā)送)緩沖區(qū),發(fā)送數(shù)據(jù)就是在這個(gè)fd對(duì)應(yīng)的寫(xiě)緩沖區(qū)上寫(xiě)數(shù)據(jù),而接收數(shù)據(jù)就是在讀緩沖區(qū)上讀數(shù)據(jù),當(dāng)程序調(diào)用write或者send時(shí),并不代表數(shù)據(jù)發(fā)送出去,僅僅是把數(shù)據(jù)拷貝到了寫(xiě)緩沖區(qū),在時(shí)機(jī)恰當(dāng)時(shí)候(積累到一定數(shù)量),會(huì)將數(shù)據(jù)發(fā)送到目的端。
Golang runtime還是需要頻繁去檢查是否有fd就緒的,嚴(yán)格說(shuō)并不算真正的異步,算是一種非阻塞IO復(fù)用。
IO模型
借用教科書(shū)中幾張圖
阻塞式IO
程序想在緩沖區(qū)讀數(shù)據(jù)時(shí),緩沖區(qū)并不一定會(huì)有數(shù)據(jù),這會(huì)造成陷入系統(tǒng)調(diào)用,只能等待數(shù)據(jù)可以讀取,沒(méi)有數(shù)據(jù)讀取時(shí)則會(huì)阻塞住進(jìn)程,這就是阻塞式IO。當(dāng)需要為多個(gè)客戶端提供服務(wù)時(shí),可以使用線程方式,每個(gè)socket句柄使用一個(gè)線程來(lái)服務(wù),這樣阻塞住的則是某個(gè)線程。雖然如此可以解決進(jìn)程阻塞,但是還是會(huì)有相當(dāng)一部分CPU資源浪費(fèi)在了等待數(shù)據(jù)上,同時(shí),使用線程來(lái)服務(wù)fd有些浪費(fèi)資源,因?yàn)槿绻幚淼膄d較多,則又是一筆資源開(kāi)銷。
非阻塞式IO
與之對(duì)應(yīng)的是非阻塞IO,當(dāng)程序想要讀取數(shù)據(jù)時(shí),如果緩沖區(qū)不存在,則直接返回給用戶程序,但是需要用戶程序去頻繁檢查,直到有數(shù)據(jù)準(zhǔn)備好。這同樣也會(huì)造成空耗CPU。
IO多路復(fù)用
而IO多路復(fù)用則不同,他會(huì)使用一個(gè)線程去管理多個(gè)fd,可以將多個(gè)fd加入IO多路復(fù)用函數(shù)中,每次調(diào)用該函數(shù),傳入要檢查的fd,如果有就緒的fd,直接返回就緒的fd,再啟動(dòng)線程處理或者順序處理就緒的fd。這達(dá)到了一個(gè)線程管理多個(gè)fd任務(wù),相對(duì)來(lái)說(shuō)較為高效。常見(jiàn)的IO多路復(fù)用函數(shù)有select,poll,epoll。select與poll的最大缺點(diǎn)是每次調(diào)用時(shí)都需要傳入所有要監(jiān)聽(tīng)的fd集合,內(nèi)核再遍歷這個(gè)傳入的fd集合,當(dāng)并發(fā)量大時(shí)候,用戶態(tài)與內(nèi)核態(tài)之間的數(shù)據(jù)拷貝以及內(nèi)核輪詢fd又要浪費(fèi)一波系統(tǒng)資源(關(guān)于select與poll這里不展開(kāi))。
epoll介紹
接下來(lái)介紹一下epoll系統(tǒng)調(diào)用
epoll相比于select與poll相比要靈活且高效,他提供給用戶三個(gè)系統(tǒng)調(diào)用函數(shù)。Golang底層就是通過(guò)這三個(gè)系統(tǒng)調(diào)用結(jié)合goroutine完成的“異步”IO。
//用于創(chuàng)建并返回一個(gè)epfd句柄,后續(xù)關(guān)于fd的添加刪除等操作都依據(jù)這個(gè)句柄。
int epoll_create(int size);
//用于向epfd添加,刪除,修改要監(jiān)聽(tīng)的fd。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
//傳入創(chuàng)建返回的epfd句柄,以及超時(shí)時(shí)間,返回就緒的fd句柄。
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);- 調(diào)用epoll_create會(huì)在內(nèi)核創(chuàng)建一個(gè)eventpoll對(duì)象,這個(gè)對(duì)象會(huì)維護(hù)一個(gè)epitem集合,可簡(jiǎn)單理解為fd集合。
- 調(diào)用epoll_ctl函數(shù)用于將fd封裝成epitem加入這個(gè)eventpoll對(duì)象,并給這個(gè)epitem加了一個(gè)回調(diào)函數(shù)注冊(cè)到內(nèi)核,會(huì)在這個(gè)fd狀態(tài)改變時(shí)候觸發(fā),使得該epitem加入eventpoll的就緒列表rdlist。
- 當(dāng)相應(yīng)數(shù)據(jù)到來(lái),觸發(fā)中斷響應(yīng)程序,將數(shù)據(jù)拷貝到fd的socket緩沖區(qū),fd緩沖區(qū)狀態(tài)發(fā)生變化,回調(diào)函數(shù)將fd對(duì)應(yīng)的epitem加入rdlist就緒隊(duì)列中。
- 調(diào)用epoll_wait時(shí)無(wú)需遍歷,只是返回了這個(gè)就緒的rdlist隊(duì)列,如果rdlist隊(duì)列為空,則阻塞等待或等待超時(shí)時(shí)間的到來(lái)。
大致工作原理如圖
異步IO
當(dāng)用戶程序想要讀取fd數(shù)據(jù)時(shí),系統(tǒng)調(diào)用直接通知到內(nèi)核并返回處理其他的事情,內(nèi)核將數(shù)據(jù)準(zhǔn)備好之后,通知用戶程序,用戶程序再處理這個(gè)fd上的事件。
Golang異步IO實(shí)現(xiàn)思路
我們都知道,協(xié)程的資源占有量很小,而且協(xié)程也擁有多種狀態(tài)如阻塞,就緒,運(yùn)行等,可以使用一個(gè)協(xié)程服務(wù)一個(gè)fd不用擔(dān)心資源問(wèn)題。將監(jiān)聽(tīng)fd的事件交由runtime來(lái)管理,實(shí)現(xiàn)協(xié)程調(diào)度與依賴fd的事件。當(dāng)要協(xié)程讀取fd數(shù)據(jù)但是沒(méi)有數(shù)據(jù)時(shí),park住該協(xié)程(改為Gwaiting),調(diào)度其他協(xié)程執(zhí)行。
在執(zhí)行協(xié)程調(diào)度時(shí)候,去檢查fd是否就緒,如果就緒時(shí),調(diào)度器再通知該park住的協(xié)程fd可以處理了(改為Grunnable并加入執(zhí)行隊(duì)列),該協(xié)程處理fd數(shù)據(jù),這樣既減少了CPU的空耗,也實(shí)現(xiàn)了消息的通知,用戶層面上看實(shí)現(xiàn)了一個(gè)異步的IO模型。
Golang netpoll的大致思想就是這樣,接下來(lái)看一下具體代碼實(shí)現(xiàn),本文基于go1.14。
具體實(shí)現(xiàn)
接下來(lái)看下Golang netpoll對(duì)其的使用。
實(shí)驗(yàn)案例
跟隨一個(gè)很簡(jiǎn)單的demo探索一下。
func main() {fmt.Println("服務(wù)端進(jìn)程id:",os.Getpid())lister, err := net.Listen("tcp", "0.0.0.0:9009")if err != nil {fmt.Println("連接失敗", err)return}for {conn, err := lister.Accept() //等待建立連接if err != nil {fmt.Println("建立連接失敗", err)continue}//開(kāi)啟協(xié)程處理go func() {defer conn.Close()for {buf := make([]byte, 128)n, err := conn.Read(buf)if err != nil{fmt.Println("讀出錯(cuò)",err)return}fmt.Println("讀取到的數(shù)據(jù):",string(buf[:n]))}}()}
}net.Listen的內(nèi)部調(diào)用
net.Listen依次調(diào)用lc.Listen->sl.listenTCP->internetSocket->socket到fd.listenStream函數(shù)創(chuàng)建了一個(gè)監(jiān)聽(tīng)9009的tcp連接的socket接口,也就是創(chuàng)建了socket fd,
接下來(lái)為了監(jiān)聽(tīng)該socket對(duì)象就需要把這個(gè)socket fd加入到eventpoll中了。
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {......//綁定該socket接口if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {return os.NewSyscallError("bind", err)}//監(jiān)聽(tīng)該socketif err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {return os.NewSyscallError("listen", err)}//初始化fd,也就是把socket放入epoll中,進(jìn)入if err = fd.init(); err != nil {return err}lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)fd.setAddr(fd.addrFunc()(lsa), nil)return nil
}
func (fd *FD) Init(net string, pollable bool) error {......//將socket fd加到poll,進(jìn)入err := fd.pd.init(fd)......return err
}
//最終跳轉(zhuǎn)到該處,主要關(guān)注兩個(gè)函數(shù)runtime_pollServerInit,runtime_pollOpen,
//這兩個(gè)函數(shù)都是runtime實(shí)現(xiàn)的,將epoll交由runtime來(lái)管理
func (pd *pollDesc) init(fd *FD) error {//sync.once方法,調(diào)用epoll_create創(chuàng)建eventpoll對(duì)象serverInit.Do(runtime_pollServerInit)//將當(dāng)前的fd加到epoll中,底層調(diào)用epollctl函數(shù)ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))//如果出錯(cuò),處理相應(yīng)的fd,刪除epoll中fd以及解除狀態(tài)等操作if errno != 0 {if ctx != 0 {runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return errnoErr(syscall.Errno(errno))}pd.runtimeCtx = ctxreturn nil
}查看runtime_pollServerInit,是對(duì)epoll_create的封裝。
func poll_runtime_pollServerInit() {//初始化全局epoll對(duì)象netpollinit()/全局標(biāo)志位設(shè)置為1atomic.Store(&netpollInited, 1)
}
func netpollinit() {//系統(tǒng)調(diào)用,創(chuàng)建一個(gè)eventpoll對(duì)象epfd = epollcreate1(_EPOLL_CLOEXEC)if epfd >= 0 {return}......
}查看一下runtime_pollOpen方法,將當(dāng)前監(jiān)聽(tīng)的socket fd加入eventpoll對(duì)象中。實(shí)際上是對(duì)epoll_ctl的封裝。
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {//返回一個(gè)存儲(chǔ)在Go程序中的一個(gè)fd對(duì)應(yīng)的結(jié)構(gòu)體,算是用于記錄//goroutine與fd之間的關(guān)系,后面會(huì)分析到pd := pollcache.alloc()//加鎖,防止并發(fā)問(wèn)題lock(&pd.lock)if pd.wg != 0 && pd.wg != pdReady {throw("runtime: blocked write on free polldesc")}if pd.rg != 0 && pd.rg != pdReady {throw("runtime: blocked read on free polldesc")}pd.fd = fdpd.closing = falsepd.everr = falsepd.rseq++pd.rg = 0pd.rd = 0pd.wseq++pd.wg = 0pd.wd = 0unlock(&pd.lock)var errno int32//epoll_ctl系統(tǒng)調(diào)用errno = netpollopen(fd, pd)return pd, int(errno)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {var ev epollevent//注冊(cè)event事件,這里使用了epoll的ET模式,相對(duì)于ET,ET需要每次產(chǎn)生事件時(shí)候就要處理事件,//否則容易丟失事件。ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET//events記錄上pd的指針*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd//系統(tǒng)調(diào)用將該fd加到eventpoll對(duì)象中,交由內(nèi)核監(jiān)聽(tīng)return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}Accept的內(nèi)部調(diào)用
接下來(lái)返回到主函數(shù)。
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {......//檢查fd狀態(tài)是否變化if err := fd.pd.prepareRead(fd.isFile); err != nil {return -1, nil, "", err}for {//accept系統(tǒng)調(diào)用,如果有對(duì)監(jiān)聽(tīng)的socket的連接請(qǐng)求,則直接返回發(fā)起連接的socket文件描述符//,否則返回EAGAIN錯(cuò)誤,被下面捕獲到s, rsa, errcall, err := accept(fd.Sysfd)if err == nil {return s, rsa, "", err}switch err {case syscall.EAGAIN:if fd.pd.pollable() {//進(jìn)入waitRead方法,內(nèi)部if err = fd.pd.waitRead(fd.isFile); err == nil {continue}}case syscall.ECONNABORTED:continue}return -1, nil, errcall, err}
}
func (pd *pollDesc) wait(mode int, isFile bool) error {if pd.runtimeCtx == 0 {return errors.New("waiting for unsupported file type")}//進(jìn)入runtime_pollWait方法內(nèi)部,該方法會(huì)跳轉(zhuǎn)到runtime包下,條件滿足會(huì)park住goroutineres := runtime_pollWait(pd.runtimeCtx, mode)return convertErr(res, isFile)
}
func poll_runtime_pollWait(pd *pollDesc, mode int) int {......//進(jìn)入netpollblock函數(shù),該函數(shù)內(nèi)部會(huì)阻塞住該goroutinefor !netpollblock(pd, int32(mode), false) {err = netpollcheckerr(pd, int32(mode))if err != 0 {return err}}return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {gpp := &pd.rgif mode == 'w' {gpp = &pd.wg}......if waitio || netpollcheckerr(pd, mode) == 0 {//gark住該g,此時(shí)傳參主要關(guān)注前兩個(gè),一個(gè)netpollblockcommit函數(shù),一個(gè)gpp為當(dāng)前pd的rg或者wg,//用于后面記錄fd對(duì)應(yīng)的阻塞的goroutinegopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)}old := atomic.Xchguintptr(gpp, 0)if old > pdWait {throw("runtime: corrupted polldesc")}return old == pdReady
}
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {......//主要關(guān)注兩個(gè)傳參,lock是gpp指針mp.waitlock = lock//unlockf為netpollblockcommit函數(shù)mp.waitunlockf = unlockf......//切換到g0棧去執(zhí)行park_mmcall(park_m)
}
func park_m(gp *g) {//獲取當(dāng)前goroutine_g_ := getg()//修改狀態(tài)為Gwaiting,代表當(dāng)前的goroutine被park住了casgstatus(gp, _Grunning, _Gwaiting)//解除m和g關(guān)聯(lián)dropg()if fn := _g_.m.waitunlockf; fn != nil {//調(diào)用剛傳入的函數(shù)參數(shù),也就是netpollblockcommitok := fn(gp, _g_.m.waitlock)//調(diào)用完清除_g_.m.waitunlockf = nil_g_.m.waitlock = nilif !ok {if trace.enabled {traceGoUnpark(gp, 2)}casgstatus(gp, _Gwaiting, _Grunnable)execute(gp, true) // Schedule it back, never returns.}}//調(diào)度新的g到m上來(lái)schedule()
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {//把當(dāng)前g的指針存為gpp指針,gpp為pd的rg或wgr := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))if r {//將全局變量改為1,代表系統(tǒng)有netpoll的等待者atomic.Xadd(&netpollWaiters, 1)}return r
}到此時(shí),accept函數(shù)就被阻塞住了,系統(tǒng)會(huì)在這個(gè)監(jiān)聽(tīng)的socket fd事件(0.0.0.0:9009的這個(gè)fd)的狀態(tài)發(fā)生變化時(shí)候(也就是有新的客戶端請(qǐng)求連接的時(shí)候),將該park住的goroutine給ready。
//上面提到過(guò)的accept函數(shù),根據(jù)序號(hào)順序分析
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {......for {//2.使用accept系統(tǒng)調(diào)用能獲取到新的連接,linux會(huì)為新的連接分配一個(gè)新的fd,//這個(gè)函數(shù)會(huì)返回新的連接的socket fd對(duì)應(yīng)的進(jìn)程描述符s, rsa, errcall, err := accept(fd.Sysfd)if err == nil {//3.返回新的進(jìn)程描述符return s, rsa, "", err}switch err {case syscall.EAGAIN:if fd.pd.pollable() {//1.剛才阻塞到了這個(gè)goroutine,后來(lái)新的連接請(qǐng)求,該goroutine被喚醒if err = fd.pd.waitRead(fd.isFile); err == nil {continue}}......}......}
}
//返回上一層的函數(shù)
func (fd *netFD) accept() (netfd *netFD, err error) {//此時(shí)獲取到了新的fdd, rsa, errcall, err := fd.pfd.Accept()......//創(chuàng)建新的fd結(jié)構(gòu)體if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {poll.CloseFunc(d)return nil, err}//init函數(shù)又會(huì)進(jìn)入func (pd *pollDesc) init(fd *FD) error函數(shù),并將新的socket連接通過(guò)epoll_ctl傳入//epoll的監(jiān)聽(tīng)事件if err = netfd.init(); err != nil {fd.Close()return nil, err}//系統(tǒng)調(diào)用,可以獲得客戶端的socket的ip信息等lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))return netfd, nil
}喚醒park住的協(xié)程
go會(huì)在調(diào)度goroutine時(shí)候執(zhí)行epoll_wait系統(tǒng)調(diào)用,檢查是否有狀態(tài)發(fā)生改變的fd,有的話就把他取出,喚醒對(duì)應(yīng)的goroutine去處理。該部分對(duì)應(yīng)了runtime中的netpoll方法。
源碼調(diào)用runtime中的schedule() -> findrunnable() -> netpoll()
func findrunnable() (gp *g, inheritTime bool) {_g_ := getg()//分別從本地隊(duì)列和全局隊(duì)列尋找可執(zhí)行的g......//判斷是否滿足條件,初始化netpoll對(duì)象,是否等待者,以及上次調(diào)用時(shí)間if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {//netpoll底層調(diào)用epoll_wait,傳參代表epoll_wait時(shí)候是阻塞等待或者非阻塞直接返回//這里是非阻塞模式,會(huì)立即返回內(nèi)核eventpoll對(duì)象的rdlist列表if list := netpoll(false); !list.empty() {gp := list.pop()//將可運(yùn)行G的列表注入調(diào)度程序并清除glistinjectglist(&list)//修改gp狀態(tài)casgstatus(gp, _Gwaiting, _Grunnable)if trace.enabled {traceGoUnpark(gp, 0)}//返回可運(yùn)行的greturn gp, false}}.......stopm()goto top
}
//對(duì)epoll_wait的進(jìn)一步封裝
func netpoll(block bool) gList {if epfd == -1 {return gList{}}waitms := int32(-1)if !block {waitms = 0}//聲明一個(gè)epollevent事件,在epoll_wait系統(tǒng)調(diào)用時(shí)候,會(huì)給該數(shù)組賦值并返回一個(gè)索引位,/之后可以遍歷數(shù)組取出就緒的fd事件。var events [128]epollevent
retry://陷入系統(tǒng)調(diào)用,取出內(nèi)核eventpoll中的rdlist,返回就緒的事件n := epollwait(epfd, &events[0], int32(len(events)), waitms)if n < 0 {if n != -_EINTR {println("runtime: epollwait on fd", epfd, "failed with", -n)throw("runtime: netpoll failed")}goto retry}var toRun gList//遍歷event事件數(shù)組for i := int32(0); i < n; i++ {ev := &events[i]if ev.events == 0 {continue}var mode int32//是否有就緒的讀寫(xiě)事件,放入mode標(biāo)志位if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {mode += 'r'}if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {mode += 'w'}if mode != 0 {//取出存入的pollDesc的指針pd := *(**pollDesc)(unsafe.Pointer(&ev.data))pd.everr = falseif ev.events == _EPOLLERR {pd.everr = true}//取出pd中的rg或wg,后面放到運(yùn)行隊(duì)列netpollready(&toRun, pd, mode)}}if block && toRun.empty() {goto retry}return toRun
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) {var rg, wg *gif mode == 'r' || mode == 'r'+'w' {rg = netpollunblock(pd, 'r', true)}if mode == 'w' || mode == 'r'+'w' {wg = netpollunblock(pd, 'w', true)}//將阻塞的goroutine加入gList返回if rg != nil {toRun.push(rg)}if wg != nil {toRun.push(wg)}
}conn.Read的內(nèi)部調(diào)用
回到主函數(shù),我們使用go func形式使用一個(gè)協(xié)程去處理一個(gè)tcp連接,每個(gè)協(xié)程里面會(huì)有conn.Read,該函數(shù)在讀取時(shí)候如果緩沖區(qū)不可讀,該goroutine也會(huì)陪park住,等待socket fd可讀,調(diào)度器通過(guò)netpoll函數(shù)調(diào)度它。
func main() {......//開(kāi)啟處理go func() {defer conn.Close()for {buf := make([]byte, 128)//將緩沖區(qū)的數(shù)據(jù)讀出來(lái)放到buf中n, err := conn.Read(buf)......}}()}
}
func (fd *FD) Read(p []byte) (int, error) {......for {//系統(tǒng)調(diào)用讀取緩沖區(qū)數(shù)據(jù),這里沒(méi)有可讀會(huì)直接返回,不會(huì)阻塞n, err := syscall.Read(fd.Sysfd, p)if err != nil {n = 0if err == syscall.EAGAIN && fd.pd.pollable() {//不可讀,進(jìn)入waitRead方法,park住該goroutine,//并記錄goroutine到pd的rg中,等待喚醒if err = fd.pd.waitRead(fd.isFile); err == nil {continue}}}......}
}后面會(huì)等待緩沖區(qū)可讀寫(xiě),shchedule函數(shù)調(diào)用netpoll并進(jìn)一步調(diào)用epoll_wait檢測(cè)到并喚醒該goroutine。可以查看上面netpoll,這里不做重復(fù)工作了。
Golang也提供了對(duì)于epoll item節(jié)點(diǎn)的刪除操作,具體封裝函數(shù)poll_runtime_pollClose
//當(dāng)發(fā)生某些情況,如連接斷開(kāi),fd銷毀等,會(huì)調(diào)用到此處
func poll_runtime_pollClose(pd *pollDesc) {.......netpollclose(pd.fd)//釋放對(duì)應(yīng)的pdpollcache.free(pd)
}
//調(diào)用epoll_ctl系統(tǒng)調(diào)用,刪除該fd在eventpoll上對(duì)應(yīng)的epitem
func netpollclose(fd uintptr) int32 {var ev epolleventreturn -epollctl(epfd, _EPOLL_CTL_DEL, int32(fd), &ev)
}部分系統(tǒng)調(diào)用
抓了一部分系統(tǒng)調(diào)用分析一下上述程序與內(nèi)核交互的大致過(guò)程。
$ strace -f ./server部分系統(tǒng)調(diào)用函數(shù)如下。
#....省略內(nèi)存管理部分以及線程管理部分
#執(zhí)行到fmt.Println("服務(wù)端進(jìn)程id:",os.Getpid())
[pid 30307] getpid() = 30307
[pid 30307] write(1, "346234215345212241347253257350277233347250213id357274232 30307n", 27服務(wù)端進(jìn)程id:30307
) = 27
......由于過(guò)多,省略關(guān)于socket的系統(tǒng)調(diào)用
[pid 30308] <... nanosleep resumed> NULL) = 0
#打開(kāi)系統(tǒng)文件,該文件定義tcp最大連接數(shù),會(huì)被設(shè)置成pollable,并加入epoll節(jié)點(diǎn)中
[pid 30307] openat(AT_FDCWD, "/proc/sys/net/core/somaxconn", O_RDONLY|O_CLOEXEC <unfinished ...>
[pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...>
[pid 30307] <... openat resumed> ) = 4
#調(diào)用epoll_ctl,創(chuàng)建一個(gè)eventpoll
[pid 30307] epoll_create1(EPOLL_CLOEXEC) = 5
#將fd加到epoll事件
[pid 30307] epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189320, u64=139635855949576}}) = 0
[pid 30307] fcntl(4, F_GETFL) = 0x8000 (flags O_RDONLY|O_LARGEFILE)
[pid 30307] fcntl(4, F_SETFL, O_RDONLY|O_NONBLOCK|O_LARGEFILE) = 0
[pid 30308] <... nanosleep resumed> NULL) = 0
[pid 30307] read(4, <unfinished ...>
#執(zhí)行epoll_wait查看就緒事件
[pid 30308] epoll_pwait(5, <unfinished ...>
[pid 30307] <... read resumed> "512n", 65536) = 4
[pid 30308] <... epoll_pwait resumed> [{EPOLLIN|EPOLLOUT, {u32=2174189320, u64=139635855949576}}], 128, 0, NULL, 139635812673280) = 1
[pid 30307] read(4, <unfinished ...>
[pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...>
[pid 30307] <... read resumed> "", 65532) = 0
#將/proc/sys/net/core/somaxconn文件的fd從epoll中刪除
[pid 30307] epoll_ctl(5, EPOLL_CTL_DEL, 4, 0xc00005e8d4) = 0
#關(guān)掉打開(kāi)的somaxconn描述符
[pid 30307] close(4) = 0
#設(shè)置監(jiān)聽(tīng)的socket描述符
[pid 30307] setsockopt(3, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
[pid 30307] bind(3, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
[pid 30307] listen(3, 512 <unfinished ...>
[pid 30308] <... nanosleep resumed> NULL) = 0
[pid 30307] <... listen resumed> ) = 0
[pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...>
#將用于監(jiān)聽(tīng)的socket fd加入到epoll中
[pid 30307] epoll_ctl(5, EPOLL_CTL_ADD, 3, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189320, u64=139635855949576}}) = 0
[pid 30307] getsockname(3, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0
#執(zhí)行accept4發(fā)現(xiàn)沒(méi)有連接,返回EAGAIN錯(cuò)誤
[pid 30307] accept4(3, 0xc00005eb98, [112], SOCK_CLOEXEC|SOCK_NONBLOCK) = -1 EAGAIN (Resource temporarily unavailable)
#查看是否有就緒的fd,此次調(diào)用是非阻塞,立即返回
[pid 30307] epoll_pwait(5, [], 128, 0, NULL, 0) = 0
[pid 30308] <... nanosleep resumed> NULL) = 0
#查看是否有就緒的fd,此次會(huì)阻塞等待,直到有連接進(jìn)來(lái)
[pid 30307] epoll_pwait(5, <unfinished ...>
[pid 30308] futex(0x60dc70, FUTEX_WAIT_PRIVATE, 0, {tv_sec=60, tv_nsec=0} <unfinished ...>
[pid 30307] <... epoll_pwait resumed> [{EPOLLIN, {u32=2174189320, u64=139635855949576}}], 128, -1, NULL, 0) = 1
[pid 30307] futex(0x60dc70, FUTEX_WAKE_PRIVATE, 1) = 1
[pid 30308] <... futex resumed> ) = 0
#新的連接,代表收到了一個(gè)客戶端連接,分配了一個(gè)fd是4
[pid 30307] accept4(3, <unfinished ...>, <... accept4 resumed> {sa_family=AF_INET6, sin6_port=htons(52082), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28], SOCK_CLOEXEC|SOCK_NONBLOCK) = 4
#把4加入到epoll中管理
[pid 30307] epoll_ctl(5, EPOLL_CTL_ADD, 4, {EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLET, {u32=2174189112, u64=139635855949368}}) = 0
[pid 30307] getsockname(4, {sa_family=AF_INET6, sin6_port=htons(9009), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [112->28]) = 0
......
#后來(lái)將client端關(guān)掉,此時(shí)tcp連接斷掉了,將epoll中的fd移除
[pid 30309] epoll_ctl(5, EPOLL_CTL_DEL, 4, 0xc00005fdd4 <unfinished ...>
[pid 30308] nanosleep({tv_sec=0, tv_nsec=20000}, <unfinished ...>
[pid 30309] <... epoll_ctl resumed> ) = 0
[pid 30309] close(4) = 0
[pid 30309] epoll_pwait(5, [], 128, 0, NULL, 824634114048) = 0
#阻塞等待
[pid 30309] epoll_pwait(5, <unfinished ...>
........參考資料
- 《后臺(tái)開(kāi)發(fā)核心技術(shù)與應(yīng)用實(shí)踐》第七章:網(wǎng)絡(luò)IO模型
- 《Unix環(huán)境高級(jí)編程》第十四章:高級(jí)IO
- 《Go語(yǔ)言設(shè)計(jì)與實(shí)現(xiàn)》https://draveness.me/golang/d...
- 《Go netpoller 原生網(wǎng)絡(luò)模型之源碼全面揭秘》https://mp.weixin.qq.com/s/3kqVry3uV6BeMei8WjGN4g
總結(jié)
以上是生活随笔為你收集整理的golang socket读写同时_epoll在Golang的应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: mysql 判断日期是否在某范围内_判断
- 下一篇: tensorflow with求导_3.