Hadoop中RPC机制
Hadoop中RPC機(jī)制
RPC(Remote Procedure Call Protocol)遠(yuǎn)程過程調(diào)用協(xié)議,它是一種通過網(wǎng)絡(luò)從遠(yuǎn)程計算機(jī)程序上請求服務(wù),而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議。Hadoop底層的交互都是通過rpc進(jìn)行的。例如:datanode和namenode 、tasktracker和jobtracker、secondary namenode和namenode之間的通信都是通過rpc實現(xiàn)的。下面是rpc交互過程圖:?
1.客服端調(diào)用的總過程:
Hadoop的RPC客戶端代碼其實就一個類:org.apache.hadoop.ipc.Client。這個類使用Java的動態(tài)代理技術(shù),生成服務(wù)器的業(yè)務(wù)接口的代理,通過socket將調(diào)用的業(yè)務(wù)方法和參數(shù)傳送到服務(wù)器端,并且等待服務(wù)器端的響應(yīng)。
客戶端調(diào)用的序列圖如下:
?
?例如:TaskTracker請求與JobTracker的通信:
TaskTracker通過:
????? this.jobClient = (InterTrackerProtocol)
????????? RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID,
?????????????????????? jobTrackAddr, this.fConf);
中的InterTrackerProtocol 去和JobTracker通信。
在RPC中通過:
?????? VersionedProtocol proxy =
????????????(VersionedProtocol) Proxy.newProxyInstance(
??????????? protocol.getClassLoader(), new Class[] { protocol },
??????????? new Invoker(addr, ticket, conf, factory));
產(chǎn)生一個動態(tài)代理完成JobTracker和TaskTracker之間的心跳交流。
2.客服端向服務(wù)器發(fā)送連接
??? 客戶端(C)要發(fā)起對服務(wù)端(S)方法的調(diào)用主要通過:
????public Writable call(Writable param, InetSocketAddress addr,
?????????????????????? Class<?> protocol, UserGroupInformation ticket)?
?????????????????????? throws InterruptedException, IOException 實現(xiàn)
2.1首先在該方法調(diào)用中:
??? Call call = new Call(param); //將param轉(zhuǎn)換成Call對象? 其實就是將Invocation(用來序列化和反序列化RPC客戶端的調(diào)用信息,包括方法名和參數(shù)信息)轉(zhuǎn)化為Call 。
2.2 客服端創(chuàng)建一個通向服務(wù)端的連接connection,Connection connection = getConnection(addr, protocol, ticket, call);然后將此次調(diào)用放入CallList里,這樣客戶端就可以同時發(fā)生很多調(diào)用,每個調(diào)用用ID來識別。
(1) 根據(jù)RPC服務(wù)端的地址和接口從連接池中獲取一個,如果取到Connection則直接返回。
(2) 否則新建一個Connection,并將它放入到連接池中
(3) 然后通過SocketFactory創(chuàng)建一個Socket,并建立到RPC服務(wù)端的連接,如果連接不成功,則重試。
(4) 創(chuàng)建和關(guān)聯(lián)輸入和輸出流對象。
2.3 發(fā)送調(diào)用參數(shù)connection.sendParam(call)。調(diào)用參數(shù)是Client的調(diào)用方(比如NameNode,DataNode等)指定的,一般就是一個Invocation對象,里面包含要調(diào)用的方法和參數(shù)。
2.4等待調(diào)用結(jié)果.Client.Connection是個線程類,啟動了之后唯一做的事情就是等待調(diào)用結(jié)果。
??? synchronized (call) {???
??????????while (!call.done){?? //done就是服務(wù)器端返回該call的結(jié)果,判斷該call是否處理
????????? ……………………???????????
????????? ………………
?????? }
??? }
3.服務(wù)器端對客服端的call請求處理
對于服務(wù)器端,其有一個方法start指定了啟動服務(wù)器開始監(jiān)聽,這個start被四個類調(diào)用,分別是TaskTracker.initialize,Namenode.initialize,Jobtracker.offerService,Datanode.startDatanode,顯然,任何兩者之間的通信都是考這個client-server模型實現(xiàn)的。
3.1 server start后,干了三件事,就是啟動三個線程
1.啟動listener,監(jiān)聽客戶端Call
2.啟動response,隨時準(zhǔn)備將處理結(jié)果發(fā)回client
3.啟動10個handler,處理具體的請求。
3.2上面三個線程的具體工作過程
3.2.1. Listener線程
該線程負(fù)責(zé)監(jiān)聽客戶端請求以及數(shù)據(jù)的接收,然后將接收到的數(shù)據(jù)組成一個Call實例,放到請求隊列里面。具體做法如下:
1) Listener線程循環(huán)等待RPC客戶端的發(fā)送數(shù)據(jù)過來
2) 當(dāng)有數(shù)據(jù)可以接收時,調(diào)用Connection的readAndProcess()方法。readAndProces()又調(diào)用processData()方法
3) Connection邊接收邊對數(shù)據(jù)進(jìn)行處理,如果接收到一個完整的Call包,則構(gòu)建一個Call對象。readAndProcess()調(diào)用processData()方法把Call對象加入到Call隊列的,并將這個Call對象PUSH到Call隊列中,由Handler線程來處理Call隊列中的所有Call。
3.2.2.Handler線程
該線程負(fù)責(zé)從請求隊列中取出調(diào)用請求,通過調(diào)用抽象方法
1) Handler線程循環(huán)監(jiān)聽Call隊列,如果Call隊列為空,則進(jìn)入wait狀態(tài),否則按FIFO規(guī)則從Call隊列取出Call
2) 將Call交給RPC.Server處理(調(diào)用RPC.Server的public Writable call(Class<?> protocol, Writable param, long receivedTime)
??? throws IOException ),因為RPC對Server的一些功能進(jìn)行了實現(xiàn)
3) 借助java里的反射機(jī)制,完成對目標(biāo)方法的調(diào)用
4) 返回響應(yīng)。由于響應(yīng)需要通過SOCKET返回給RPC客戶端,所以響應(yīng)的類型必須是Writable。
3.2.2. Response線程???
?Response也監(jiān)視responselist,如果responselist中某個call需要將結(jié)果寫入客戶端就寫出,當(dāng)某個call的結(jié)果被發(fā)送完畢,從responselist中刪除該call對象。
注意:handler完成call之后就開始向客戶端寫call結(jié)果,但是結(jié)果可能太多,無法通過一次性發(fā)送完畢,而發(fā)送之后還要等待client接受完畢才能再發(fā),如果現(xiàn)在handler在那里等待客戶端接受完畢,然后再發(fā),效率不高。解決辦法是handler處理完畢之后,只向client發(fā)送一次處理結(jié)果。如果這一次將處理結(jié)果發(fā)送完畢,接下來就沒有response的事情了,如果沒有發(fā)送完畢,接下來response負(fù)責(zé)將剩下的處理結(jié)果發(fā)送給客戶端。這樣handler的并發(fā)量會大一些。
服務(wù)器實現(xiàn)中大量利用監(jiān)視隊列,比如handler就直觀堅持calllist,一旦發(fā)現(xiàn)數(shù)據(jù)就開始處理,而response就監(jiān)視responselist,發(fā)現(xiàn)數(shù)據(jù)需要發(fā)送就開始發(fā)送。
參考:
1.http://bbs.hadoopor.com/thread-329-1-2.html
2.http://jackosn-liao.iteye.com/blog/851914?
3. http://blog.csdn.net/wuixiaobao/article/details/6549781
4.http://bbs.hadoopor.com/thread-329-1-2.html
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/MGGOON/archive/2012/02/24/2367231.html
總結(jié)
以上是生活随笔為你收集整理的Hadoop中RPC机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手游服务器源码 https,python
- 下一篇: win7 ie11版本安装报此更新不适