TNonblockingServer 连接管理
很久就想總結(jié)一下Thrift的Cpp lib庫的學(xué)習(xí)情況,但是一直沒有狠下心來去寫。不是沒有時間,就是恐怕理解不透徹。一晃幾個月過去了,很多東西已經(jīng)慢慢遺忘。今天,下定決心重新拾起來,邊看邊總結(jié)。這個故事,就從最有意思的Server端lib開始。這里的Server選用TNonBlockingServer,和他搭配的Transport選用TFrameTransport。這一節(jié)主要講述該類包含的幾個輔助類。
TNonblockingServer::TConnection::Task
要注意,Thrift有很多類都包括Task內(nèi)部類,這里提到的Task類是TConnection的內(nèi)部類。Task是Server端處理業(yè)務(wù)邏輯的地方,他繼承自Runnable,遵守類似Java的線程模型。因此這個類必須實(shí)現(xiàn)run()方法。
run()本質(zhì)是無限循環(huán),每次循環(huán)都會首先調(diào)用綁在一起的connection對象的serverEventHandler的processContext方法。這個方法的實(shí)現(xiàn)由Thrift的使用者編寫。如果實(shí)現(xiàn)沒有實(shí)現(xiàn)這個方法,則跳過這一步驟。
然后,執(zhí)行processor->process方法。processor是業(yè)務(wù)執(zhí)行邏輯的封裝,相當(dāng)于Server的進(jìn)程。process是調(diào)用實(shí)際的執(zhí)行函數(shù),即進(jìn)程的執(zhí)行體。執(zhí)行結(jié)果將被序列化,存入輸出序列(oprot類似的東西),注意,上文提到的Context也會傳入這個執(zhí)行體中。
最后,調(diào)用傳輸層的peek()。如果傳輸層使用TFrameTransport,那么其行為是查看讀緩沖區(qū)里面有沒有未處理的幀,如果沒有,嘗試讀取下一個請求幀并且存入讀緩沖區(qū)。如果依然沒有可以讀取的東西,返回False。
不論process執(zhí)行發(fā)生了問題(可能不一定是故障,輸出序列滿也是有可能終止的,真正發(fā)生的問題會以異常的方式拋出并捕獲),還是peek()得不到新的幀請求,都會跳出循環(huán)。循環(huán)一旦跳出,則通知Server,實(shí)現(xiàn)是調(diào)用捆綁的connection對象的notifyIOThread()。該方法的作用在于講connection對象的指針寫入Server的通知管道中。因?yàn)檎麄€過程都發(fā)生在Server的一個進(jìn)程中,因此傳送指針是有意義的。
TNonblockingServer::TConnection
在TNonBlockingServer中負(fù)責(zé)處理連接管理,即數(shù)據(jù)的輸入和輸出。每個TConnection和一個TNonBlockingIOThread關(guān)聯(lián),通過其可以獲得所在Server的句柄。(但是一個TNonBlockingIOThread可能被多個TConnection關(guān)聯(lián)。)同時記錄一個套接字連接的基本信息(sockaddr)。套接字信息存放在內(nèi)部的TSocket類型的屬性中。其他重要屬性如下:
a. 輸入輸出分別為inputTransport和outputTransport屬性,他們兩個都是TMemoryBuffer的對象指針,實(shí)質(zhì)上就是輸入緩沖和輸出緩沖。factoryInputTransport_和factoryOutputTransport_是對輸入緩沖和輸出緩沖的封裝。這里,應(yīng)該是封裝為TFrameTransport。
b. 在序列化協(xié)議方面,inputProtocol_和outputProtocol_兩個屬性分別是輸入和輸出的序列化協(xié)議。一般使用TBinaryProtocol。不過,也支持自定義的特殊Transport(沒有繼承TTransport)的上面架設(shè)二進(jìn)制序列化協(xié)議,只是需要傳入特定的類型給模板。在這里沒有傳入類型,可見TNonblockingServer只支持繼承TTransport的傳輸協(xié)議,實(shí)際上,只支持TFrameTransport。
c. connectionContext。如果創(chuàng)建Server的時候指定了serverEventHandler,那么這里新建connectionContext。實(shí)現(xiàn)是調(diào)用handler的createContext方法。
d. processor。這個調(diào)用比較詭異,他傳入了三個參數(shù):inputProtocol_、outputProtocol_還有tSocket_,但是下層調(diào)用的時候根本就沒有使用這三個參數(shù),而是直接調(diào)用的TProcessorFactory的getProcessor。這個方法是個純虛方法,考察得到該類只有一個派生類,TSingletonProcessorFactory,這里的方法調(diào)用時候,根本就沒有使用到之前傳遞的三個參數(shù),而是直接返回的processor_。這是一個單例,可以推測,一個TSingletonProcessorFactory只會擁有一個processor,而且注釋要求,所有調(diào)用這個方法的線程不得同時出現(xiàn)兩個。
構(gòu)造方法會調(diào)用init方法,最終完成所有屬性的初始化。除了上面這些屬性,比較重要的初始化還有兩個,appState<--APP_INIT,socketState<--SOCKET_RECV_FRAMING。這兩個State主要是用于TConnection以狀態(tài)機(jī)的方式進(jìn)行工作預(yù)設(shè)。
TConnection工作的主要方法,是workSocket和transition。前者是收發(fā)數(shù)據(jù),后者狀態(tài)遷移??傊?#xff0c;狀態(tài)機(jī)。
workSocket使用socketState_字段記錄3個狀態(tài):SOCKET_RECV_FRAMING、SOCKET_RECV、SOCKET_SEND。前兩個是接收數(shù)據(jù),最后一個是發(fā)送數(shù)據(jù)。前兩個的狀態(tài)遷移順序是SOCKET_RECV_FRAMING--->SOCKET_RECV。
SOCKET_RECV_FRAMING:接收幀頭部。這里也可看出TNonBlockingServer和TFrameTransport是嚴(yán)格綁定在一起的。每個幀分為兩個部分:幀頭部和幀數(shù)據(jù)。幀頭部以字節(jié)為單位標(biāo)明幀數(shù)據(jù)的長度。由于整個過程是非阻塞的,每次盡力而為的從網(wǎng)絡(luò)套接字讀取字節(jié),如果頭部(目前在0.9.0是4Byte)沒有讀取結(jié)束,存起來,下次調(diào)用時候繼續(xù)。直到獲取到完整的幀頭部,調(diào)用transition。(這里不會多讀數(shù)據(jù)。如果對方斷開連接,讀取的長度會是0,此時關(guān)閉套接字。server規(guī)定幀數(shù)據(jù)部分的長度不得超過256MB,這已經(jīng)是相當(dāng)大的一個范圍了。如果幀頭部指明的長度超過了server預(yù)設(shè)的最大值,則認(rèn)為是讀出了非法的幀格式,也會關(guān)閉套接字。)
SOCKET_RECV:接收幀數(shù)據(jù)。依然是非阻塞,盡力而為的讀取。如果獲得了完整的幀數(shù)據(jù),調(diào)用transition。
SOCKET_SEND:發(fā)送幀頭部和幀數(shù)據(jù)。因?yàn)槭欠亲枞?#xff0c;所以要盡力而為的發(fā)送,并且不會用for循環(huán)。如果期間捕獲到EPIPE ECONNRESET ENOTCONN信號,那么拋出異常。如果沒有捕獲到EWOULDBLOCK或者EAGAIN,但是發(fā)送的響應(yīng)是0字節(jié),那么說明發(fā)生了錯誤,依然拋出異常。
transition使用appState_字段記錄狀態(tài),基本的狀態(tài)遷移:
| 1 2 3 4 5 6 7 8 | +--> APP_INIT -----> APP_READ_FRAME_SIZE ---> APP_READ_REQUEST ---+ |???????????????????????????????????????????????????????????????? | |???????????????????????????????????????????????????????????????? | |???????????????????????????????????????????????????????????????? | +------------------- APP_SEND_RESULT??? <---? APP_WAIT_TASK <-----+ |?????????????????????????????????????????????????? | |?????????????????????????????????????????????????? | +--------------------oneway-------------------------+ |
APP_INIT:server最開始的狀態(tài)。設(shè)置讀寫緩沖區(qū)等等基本工作。需要設(shè)置等待讀的標(biāo)記位,通過調(diào)用setFlags(EV_READ|EV_PERSIST)實(shí)現(xiàn),事件的注冊是采用libevent庫,回調(diào)方法是TConnection::eventHandler,由它調(diào)用workSocket方法。
APP_READ_FRAME_SIZE:server已經(jīng)讀到了幀長度。調(diào)整讀緩沖區(qū)大小,以適應(yīng)幀數(shù)據(jù)接收。因?yàn)闅埲钡膸瑪?shù)據(jù)是沒有任何意義的。成倍的增加。最后改變appState_。
APP_READ_REQUEST:server已經(jīng)獲得了完整的幀。將輸入緩沖區(qū)封裝為inputTransport結(jié)構(gòu),并且重置輸入緩沖以待將來使用。此時,滿足處理請求的條件了。如果后臺不是線程池模式,那么立即執(zhí)行。否則,構(gòu)造一個Task結(jié)構(gòu),丟給server調(diào)度。調(diào)用setIdle,禁止新的讀取消息到來。
APP_WAIT_TASK:server已經(jīng)處理好這個請求,準(zhǔn)備回送返回結(jié)果。一般的,計(jì)算回送幀大小填充到幀首部,然后調(diào)用setFlags(EV_WRITE | EV_PERSIST)聲明有數(shù)據(jù)需要寫入套接字。但是,特殊的,如果請求是oneway的,無需回送數(shù)據(jù)。這時,僅僅需要進(jìn)入APP_INIT的狀態(tài)即可。
APP_SEND_RESULT:如果設(shè)置了每隔若干請求就重新調(diào)整輸入輸出緩沖區(qū)大小,則執(zhí)行。其實(shí)對接口比較少的應(yīng)用意義并不大,只是之前一直是緩沖不夠的時候變大,在這里把緩沖區(qū)收回來。然后就是執(zhí)行APP_INIT邏輯了。
APP_CLOSE_CONNECTION:關(guān)閉連接,減少活躍Processor計(jì)數(shù)。不在基本狀態(tài)機(jī)里面。
其他的方法都比較簡單,比如close就是關(guān)閉套接字,關(guān)閉inputTransport和outputTransport,最后調(diào)用server_->returnConnection。倒是有個方法,notifyIOThread,調(diào)用ioThread_->notify,先標(biāo)記在這里。
總結(jié)
以上是生活随笔為你收集整理的TNonblockingServer 连接管理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [Python]网络爬虫(十):一个爬虫
- 下一篇: IOS本地通知实现