Thrift异步IO服务器源码分析
http://yanyiwu.com/work/2014/12/06/thrift-tnonblockingserver-analysis.html
最近在使用?libevent?開發(fā)項目,想起之前寫?Thrift源碼剖析?的時候說到關(guān)于 TNonblockingServer 以后會單獨寫一篇解析, 現(xiàn)在是時候了,就這篇了。
以下內(nèi)容依然是基于?thrift-0.9.0?。
概述
現(xiàn)在隨著 Node.js 的興起,很多人著迷 eventloop , 經(jīng)常是不明真相就會各種追捧,其實 eventloop 只是 一種高并發(fā)的解決方案。Thrift 的 TNonblockingServer 就是該解決方案的典型實現(xiàn)之一。
而且,Thrift 的 TNonblockingServer 實現(xiàn)代碼干凈,注釋豐富, 并沒有用到什么奇淫巧計,核心就是使用libevent?進行異步 驅(qū)動和狀態(tài)的轉(zhuǎn)換,只要有些?libevent?經(jīng)驗的人就很容易 能看懂。
想進一步了解?libevent?可以看看?C1000K之Libevent源碼分析?。
事件注冊
Thrift 使用?libevent?作為服務(wù)的事件驅(qū)動器, libevent 其實就是 epoll 更高級的封裝而已(在linux下是epoll),而struct event?事件是?libevent?編程的最小單元,只要是使用?libevent?就會使用到它,或者是包裝它。
整個 TNonblockingServer 有三個關(guān)鍵的地方和 libevent 有關(guān)。
1. listener_event
第一種是服務(wù)的監(jiān)聽事件也就是服務(wù)負責 listen 和 accept 的 主 socket ,如下。
// Register the server event event_set(&serverEvent_,listenSocket_,EV_READ | EV_PERSIST,TNonblockingIOThread::listenHandler,server_);當新的連接請求進來的時候,TNonblockingIOThread::listenHandler 函數(shù)被 觸發(fā),在 TNonblockingIOThread::listenHandler 里主要負責 accept 新連接。
2. pipe_event
第二種比較有意思,這個事件對應(yīng)的文件描述符是 socket pair ,使用 evutil_socketpair 創(chuàng)建,其實也是調(diào)用linux接口 socketpair 搞出來的。這個東西不是之前理解的 網(wǎng)絡(luò)通信套接字,在這里可以把它理解成一個管道來使用,如下:
// Create an event to be notified when a task finishes event_set(¬ificationEvent_,getNotificationRecvFD(),EV_READ | EV_PERSIST,TNonblockingIOThread::notifyHandler,this);代碼里面的 getNotificationRecvFD 就是拿這個 socket pair 管道的讀文件描述符, 也就是當這個 socketpair 管道有數(shù)據(jù)可讀時,該事件就會被觸發(fā),也就是回調(diào)函數(shù) TNonblockingIOThread::notifyHandler 會被調(diào)用。
其實第二種事件非常好理解,可以類比多線程編程里面的任務(wù)隊列, 不同線程之間共享著同一個任務(wù)隊列來進行消息的傳遞。 而在 TNonblockingServer 里面,則通過該管道進行消息的傳遞。
3. connection_event
第三種是每個連接的狀態(tài)變化事件,每一個 TConnection 代表一個連接, 每一個 TConnection 含有一個 socket 文件描述符,并且當 TConnection 生成之后,會為它注冊一個事件,負責對該 socket 的異步讀寫。 如下:
event_set(&event_, tSocket_->getSocketFD(), eventFlags_,TConnection::eventHandler, this);注意到,每個連接都會注冊一個 第三種事件, 也就是說,程序的整個運行過程中,假設(shè)并發(fā)連接數(shù)為 n , 則第三種事件的數(shù)量也為 n,而第一種和第二種始終 只有一個事件。 所以真?zhèn)€程序運行過程中事件的數(shù)量是【2 + n】。
socket狀態(tài)轉(zhuǎn)移
因為是異步編程,每個socket都必須設(shè)置為非阻塞。 當可讀的事件發(fā)生時,則讀,可寫的事件發(fā)生時,則寫。 讀和寫兩種操作會互相交替進行,所以我們需要用 狀態(tài)值來進行不同的邏輯處理。
TNonblockingServer 里的狀態(tài)值有以下三種:
/// Three states for sockets: recv frame size, recv data, and send mode enum TSocketState {SOCKET_RECV_FRAMING,SOCKET_RECV,SOCKET_SEND };需要補充說明的是,要和 Thrift 的 TNonblockingServer 通信,則客戶端 需要使用
shared_ptr<TTransport> transport(new TFramedTransport(socket));來作為傳輸工具,就是因為 TNonblockingServer 的 socket recv 數(shù)據(jù)是 按 frame 來一幀幀的接受。所以第一個狀態(tài)值 SOCKET_RECV_FRAMING 代表進入該狀態(tài)就是有幀頭(數(shù)據(jù)包的大小)可以讀取, 而第二個狀態(tài)值 SOCKET_RECV 代表有數(shù)據(jù)可以讀取,先讀完幀頭才讀該數(shù)據(jù)。 第三個狀態(tài) SOCKET_SEND 代表有數(shù)據(jù)可以發(fā)送。
每次 rpc 調(diào)用的過程的狀態(tài)轉(zhuǎn)移先后過程如下:
SOCKET_RECV_FRAMING -> SOCKET_RECV -> SOCKET_SEND這三個狀態(tài)都有可能被重復(fù)調(diào)用,取決于數(shù)據(jù)包的大小。
每次 socket 狀態(tài)轉(zhuǎn)移靠 workSocket 函數(shù)完成:
/** * Libevent handler called (via our static wrapper) when the connection * socket had something happen. Rather than use the flags [libevent] passed, * we use the connection state to determine whether we need to read or * write the socket. */ void workSocket();app狀態(tài)轉(zhuǎn)移
上面的 socket 狀態(tài)轉(zhuǎn)移,是針對每個連接的數(shù)據(jù)收發(fā)狀態(tài)轉(zhuǎn)移, 和 socket 緊密相關(guān),而這里的 app狀態(tài)轉(zhuǎn)移則是針對整個 rpc 遠程函數(shù)調(diào)用(不過每次rpc調(diào)用其實也是建立在某個連接的基礎(chǔ)之上)。
app狀態(tài)的代碼如下:
enum TAppState {APP_INIT,APP_READ_FRAME_SIZE,APP_READ_REQUEST,APP_WAIT_TASK,APP_SEND_RESULT,APP_CLOSE_CONNECTION };狀態(tài)的轉(zhuǎn)移順序如下:
每次app狀態(tài)轉(zhuǎn)移由 TConnetion::transition 函數(shù)完成:
/** * This is called when the application transitions from one state into * another. This means that it has finished writing the data that it needed * to, or finished receiving the data that it needed to. */ void transition();狀態(tài)3 -> 狀態(tài)4 -> 狀態(tài)5 轉(zhuǎn)移很關(guān)鍵,涉及到線程池和主線程的交互。 請看下文。
任務(wù)的線程池
總所周知的是,異步服務(wù)器最適合的場景是高并發(fā),IO 密集型程序。 對于 CPU 密集型的應(yīng)用場景一般使用多線程服務(wù)來解決。 而對于 RPC 服務(wù),TNonblockingServer 想使用異步 IO 來應(yīng)對高并發(fā)。 但是對于 rpc 遠程函數(shù)調(diào)用,如果被方法的函數(shù)是 CPU 密集型的函數(shù), 則運行該函數(shù)的過程整個主線程就會被阻塞,也就是傳說中的 【block the whole world】, 對于此,TNonblockingServer 的解決方法是將該函數(shù)包裝成一個任務(wù), 然后扔進線程池,以此來避免主線程的阻塞。
線程池本身沒什么好說的,但是在 TNonblockingServer 里 面需要了解的就是 線程池和主線程的交互:
當 TConnetion 的 app狀態(tài) 進入 APP_READ_REQUEST 之后 讀取完請求數(shù)據(jù)之后,則將任務(wù)包裝好扔進線程池。 并且將狀態(tài)改變(APP_READ_REQUEST -> APP_WAIT_TASK):
// The application is now waiting on the task to finish appState_ = APP_WAIT_TASK;并且將該連接標識為 Idle ,如下函數(shù):
// Set this connection idle so that [libevent] doesn't process more // data on it while we're still waiting for the threadmanager to // finish this task setIdle();setIdle 的目的在于將該連接對應(yīng)的 socket事件標志位清空, 也就是在 Idle階段不再關(guān)心該 socket是否有數(shù)據(jù)可讀或者可寫。
而當線程池里的某個 Task 運行完畢后,則會觸發(fā)主線程的 pipe_event (上文中的已注冊事件種的第二種事件),告知主線程任務(wù)已完成。 如下:
// Signal completion back to the libevent thread via a pipe if (!connection_->notifyIOThread()) {throw TException("TNonblockingServer::Task::run: failed write on notify pipe"); }主線程收到通知之后,則會從 狀態(tài)4(APP_WAIT_TASK) 轉(zhuǎn) 移向 狀態(tài)5(APP_SEND_RESULT) ,進入向 客戶端發(fā)送函數(shù)調(diào)用結(jié)果的過程。
總結(jié)
Thrift 的 TNonblockingServer 注釋很豐富,原理清晰。 個人認為基本上是事件驅(qū)動服務(wù)器的入門教科書級代碼了, 事件驅(qū)動服務(wù)器核心在于狀態(tài)轉(zhuǎn)移, 因為事件驅(qū)動的原因,每次轉(zhuǎn)換 事件我們都需要保存當前的狀態(tài)。 沒啥,都是狀態(tài)而已。
哦對了,在下讀源碼的時候習(xí)慣加?cout?,然后跑起來看結(jié)果, 文末有一份運行示例可以幫助理解,有興趣的可以看看, 修改后的源碼在?MyTNonblockingServer?。
總結(jié)
以上是生活随笔為你收集整理的Thrift异步IO服务器源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 由浅入深了解Thrift(三)——Thr
- 下一篇: sar