HDFS2.x之RPC流程分析
HDFS2.x之RPC流程分析
1 概述
??? Hadoop提供了一個統(tǒng)一的RPC機制來處理client-namenode, namenode-dataname,client-dataname之間的通信。RPC是整個Hadoop中通信框架的核心,目前采用ProtocolBuf作為RPC的默認(rèn)實現(xiàn)。RPC的整體調(diào)用流程如下:
?
2 Protobuf
??? Protocol buffer(以下簡稱PB),PB是Google開源的一種輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲格式,可以用于結(jié)構(gòu)化數(shù)據(jù)的序列化和反序列化,很適合做數(shù)據(jù)存儲或 RPC 數(shù)據(jù)交換格式,目前提供了 C++、Java、Python 三種語言的 API。序列化/反序列化速度快,網(wǎng)絡(luò)或者磁盤IO傳輸?shù)臄?shù)據(jù)少。
RPC就是一臺機器上的某個進程要調(diào)用另外一臺機器上的某個進程的方法,中間通信傳輸?shù)木褪穷愃朴凇胺椒?shù)1、參數(shù)2……”這樣的信息,是結(jié)構(gòu)化的。
我們要定義一種PB實現(xiàn)的RPC傳輸格式,首先要定義相應(yīng)的.proto文件,在Hadoop common工程里,這些文件放在hadoop-common\src\main\proto目錄下;在Hadoop HDFS工程里這些文件放在hadoop-hdfs\src\main\proto目錄下,以此類推。Hadoop編譯腳本會調(diào)用相應(yīng)的protoc二進制程序來編譯這些以.proto結(jié)尾的文件,生成相應(yīng)的.java文件。
?
由proto文件生成的類,均提供了讀寫二進制數(shù)據(jù)的方法:
(1)byte[] toByteArray():序列化message并且返回一個原始字節(jié)類型的字節(jié)數(shù)組;
(2)static Person parseFrom(byte[] data): 將給定的字節(jié)數(shù)組解析為message;
(3)void writeTo(OutputStream output): 將序列化后的message寫入到輸出流;
(4)static Person parseFrom(InputStream input): 讀入并且將輸入流解析為一個message;
??? 另外,PB類中都有一些Builder子類,利用其中的build方法,可以完成對象的創(chuàng)建。PB的具體應(yīng)用會在下面的RPC的Client和Server的分析中說明。
3 RPC Client端
以create方法為例,來說明RPC的具體執(zhí)行流程。首先看下在Client端的執(zhí)行過程。
?
??? 由HDFS客戶端發(fā)起的create操作,在經(jīng)過一系列的前置步驟之后,會通過DFSClient類中的namenode代理來完成,其定義如下:
final ClientProtocol namenode; ……NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);this.dtService = proxyInfo.getDelegationTokenService();this.namenode = proxyInfo.getProxy();?
這說明此處的namenode實現(xiàn)的接口是ClientProtocol,也就是Client與NameNode之間RPC通信的協(xié)議。
HDFS2.x引入了NameNode的HA,這就使得Client端的底層代理是有多個的,分別連接Active NN和Standby NN。但是在實際運行過程中需要對Client調(diào)用呈現(xiàn)統(tǒng)一的接口,那么就出現(xiàn)了一個上層代理來統(tǒng)一上述這兩個底層代理。所有由Clientfa來的方法調(diào)用都是先到達上層代理,通過上層代理轉(zhuǎn)發(fā)到下層代理。并且,上層代理還會根據(jù)底層代理返回的Exception來決定是否進行Failover或者Retry等操作。
在使用HA模式時,客戶端創(chuàng)建代理的總體流程是:
?
其中,
(1)RetryProxy.create方法會創(chuàng)建上層代理,用于接收客戶端的請求,并根據(jù)情況調(diào)用連接到當(dāng)前兩個NameNode的底層代理。
Proxy.newProxyInstance(proxyProvider.getInterface().getClassLoader(),new Class<?>[] { iface },new RetryInvocationHandler(proxyProvider, retryPolicy) );?
生成的這個代理對象實現(xiàn)了ClientProtocol接口,Client可以通過這個代理對象調(diào)用ClientProtocol接口中相應(yīng)的方法。根據(jù)Java的動態(tài)代理機制,用戶對這個代理對象的方法調(diào)用都轉(zhuǎn)換為對RetryInvocationHandler(proxyProvider, retryPolicy)對象中invoke()方法的調(diào)用了。RetryInvocationHandler是與FailoverProxyProvider密切相關(guān)的,因為它需要FailoverProxyProvider提供底層代理的支持。
(2)當(dāng)代理對象接收到請求的時候,會調(diào)用invoke方法來進行處理,這里的invoke方法是上層代理中的RetryInvocationHanlder.invoke方法。
首先要獲取一個RetryPolicy,默認(rèn)的策略是在構(gòu)造RetryInvocationHandler時的參數(shù)。在Client與NameNode之間的ClientProtocol的RetryPolicy是:
RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.failoverSleepBaseMillis,
????????????? config.failoverSleepMaxMillis)
??? 接著,會調(diào)用invokeMethod方法調(diào)用底層的代理進行實際的處理:
Object ret = invokeMethod(method, args);
? ? ?????????? -> method.invoke(currentProxy, args);
currentProxy是現(xiàn)在正在使用的底層代理。當(dāng)NN發(fā)生主從切換的時候,這個currentProxy也會發(fā)生相應(yīng)的變化。
??? 如果在調(diào)用過程中出現(xiàn)了異常,則針對不同的異常會做出不同的處理,這里的判斷是根據(jù)生成動態(tài)代理(上層代理)的時候給定的RetryPolicy策略,默認(rèn)的RetryPolicy是FailoverOnNetworkExceptionRetry,所以調(diào)用對應(yīng)的shouldRetry()函數(shù)。
(2.1)如果Retry的次數(shù)已經(jīng)超過最大嘗試的次數(shù)了,那么就返回一個
RetryAction.RetryDecision.FAIL的RetryAction。
(2.2) 如果拋出的異常是ConnectionException、NoRouteToHostException、UnKnownHostException、StandbyException、RemoteException中的一個,說明底層代理在RPC過程中Active NN連不上或者宕機或者已經(jīng)發(fā)生主從切換了,那么就需要返回一個RetryAction.RetryDecision.FAILOVER_AND_RETRY的RetryAction,需要執(zhí)行performFailover()操作,然后用另外一個NN的底層代理重試。
(2.3)如果拋出的異常是SocketException、 IOException或者其他非RemoteException的異常,那么就無法判斷這個RPC命令到底是不是執(zhí)行成功了。可能是本地的Socket或者IO出問題,也可能是NN端的Socket或者IO問題。那就進行進一步的判斷:如果被調(diào)用的方法是idempotent的,也就是多次執(zhí)行是沒有副作用的,那么就連接另外的一個底層代理重試;否則直接返回RetryAction.RetryDecision.FAIL。
(3)FailoverProxyProvider類的當(dāng)前實現(xiàn)類為ConfiguredFailoverProxyProvider。它負責(zé)管理那兩個activeNN和standbyNN的代理,當(dāng)上層代理接收到來自用戶的一個RPC命令之后,轉(zhuǎn)發(fā)給當(dāng)前正在使用的底層代理(由ConfiguredFailoverProxyProvider.currentProxyIndex決定,表示當(dāng)前的代理對象的序號)執(zhí)行,然后看是否拋出異常。如果拋出了異常,根據(jù)異常的種類來判斷是執(zhí)行failover,還是retry,或者兩者都不做。如果需要切換NameNode代理的話,則會執(zhí)行:
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
??? 底層代理的實現(xiàn)是用的非HA模式:
current.namenode = NameNodeProxies.createNonHAProxy(conf,
??????????? current.address, xface, ugi, false).getProxy();
進一步調(diào)用->NameNodeProxies.createNNProxyWithClientProtocol
??????????? ->RPC.getProtocolProxy
方法,并把生成的ClientNamenodeProtocolPB類型的代理對象proxy封裝成ClientNamenodeProtocolTranslatorPB類型。
??? 這里又會涉及到Java的動態(tài)代理,是在RPC.getProtocolProxy方法生成proxy對象的時候,RPC.getProtocolProxy的實現(xiàn)代碼為:
return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
這里的引擎就是protocolbuf,所以,所有的RPC請求最終都會調(diào)用ProtobufRpcEngine類中的invoke方法進行和RPC的Server端通信以及數(shù)據(jù)的序列化和反序列化操作。
??? 把Client的請求封裝成call的操作返回也是在invoke中進行的:
val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
??????????? new RpcRequestWritable(rpcRequest), remoteId);
??? 封裝的具體實現(xiàn)是調(diào)用的Client類中的call方法:
//封裝成call
Call call = new Call(rpcKind, rpcRequest);
//建立和NameNode的連接
Connection connection = getConnection(remoteId, call);
//向NameNode發(fā)送數(shù)據(jù)
connection.sendParam(call);
RPC客戶端的執(zhí)行流程(HA模式)為:
?
4 RPC Server端
RPC的Server端的初始化方法是NameNode中被調(diào)用的:
rpcServer = createRpcServer(conf);
實際上初始化NameNodeRpcServer對象,調(diào)用其構(gòu)造函數(shù):
return new NameNodeRpcServer(conf, this);
??? 在構(gòu)造方法中,會初始化兩個RPCServer,一個是serviceRpcServer,用來處理數(shù)據(jù)節(jié)點發(fā)來的RPC請求,另一個是clientRpcServer,用于處理客戶端發(fā)來的RPC請求。
??? NameNodeRpcServer的構(gòu)造方法會初始化RPC的Server端所需要的handler的數(shù)目(默認(rèn)為10個),設(shè)置好處理引擎為Protocolbuf,初始化ClientNamenodeProtocolServerSideTranslatorPB類型的對象clientProtocolServerTranslator用來對傳來的數(shù)據(jù)進行反序列化,對發(fā)送的數(shù)據(jù)進行序列化。
??? 另外,會初始化提供不同RPC服務(wù)的對象BlockingService,針對客戶端、數(shù)據(jù)節(jié)點端的有:
BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);
BlockingService dnProtoPbService = DatanodeProtocolService.newReflectiveBlockingService(dnProtoPbTranslator);
??? 緊接著,會獲取RPC的Server對象:
this.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount,false,
conf,
namesystem.getDelegationTokenSecretManager());
?
此對象主要負責(zé)接收網(wǎng)絡(luò)連接,讀取數(shù)據(jù),調(diào)用處理數(shù)據(jù)函數(shù),返回結(jié)果。前兩個參數(shù)表示如果RPC發(fā)送過來的是ClientNamenodeProtocolPB協(xié)議,那么負責(zé)處理這個協(xié)議的服務(wù)(com.google.protobuf.BlockingService類型的對象)就是clientNNPbService。
這個Server對象里有Listener, Handler, Responder內(nèi)部類:
(1) Listener Thread:Server端會啟一個Listener線程主要用于監(jiān)聽Client發(fā)送過來的Request,Listene會啟動一個Reader的線程組,并把客戶端發(fā)來的Connection對象通過NIO的SelectionKey傳遞給Reader, Listener相當(dāng)于只作了一層轉(zhuǎn)發(fā);
(2) Reader Thread Pool:主要用于讀取Listener傳過來的Connection,并調(diào)用Connection的readAndProcess方法來讀取Request,并封裝成一個Call放到Call Queue中;
(3) Hanlder Thread Pool:Server會啟動一組線程組來處理Call Queue中Call,并把處理的Respone中放到response queue中;
(4) Responder Thread:主要處理response queue中的response,并把response發(fā)送給client,如果當(dāng)前response queue為空,則第一個新增的response會馬上發(fā)送給client端,不會通過responer thread來發(fā)送。
這個RPC.getServer()會經(jīng)過層層調(diào)用,因為現(xiàn)在默認(rèn)的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就會調(diào)用到ProtobufRpcEngine.getServer這個函數(shù),在這生成了一個Server對象,就是用于接收client端RPC請求,處理,回復(fù)的Server。這個Server對象是一個純粹的網(wǎng)絡(luò)服務(wù)的Server,在RPC中起到基礎(chǔ)網(wǎng)絡(luò)IO服務(wù)的作用。
RPC的Server端創(chuàng)建的總體流程是:
?
4.1 Reader處理
Server里的Reader線程也是基于Selector的異步IO模式,每次Select選出一個SelectionKey之后,會調(diào)用SelectionKey.attachment()把這個SelectionKey所attach的Connection對象獲取(在Listener的run方法中進行的attatch),然后執(zhí)行對應(yīng)的readAndProcess()方法,把這個SelectionKey所對應(yīng)的管道上的網(wǎng)絡(luò)IO數(shù)據(jù)讀入緩沖區(qū)。readAndProcess()方法會層層調(diào)用到Server.processData()方法,在這個方法內(nèi)部,會把剛才從網(wǎng)絡(luò)IO中讀取的數(shù)據(jù)反序列化成對象rpcRequest對象。
rpcRequest對象的類型是繼承自Writable類型的子類的對象,也就是說可以序列化/反序列化的類。這里rpcRequest對象里包含的RPC請求的內(nèi)容對象是由.proto文件中Message生成的類,也就是說PB框架自動編譯出來的類,后面可以通過調(diào)用這個類的get方法獲取RPC中真正傳輸?shù)臄?shù)據(jù)。之后把生成的rpcRequest對象放到一個Call對象里面,再把Call對象放到隊列Server.callQueue里面。
Reader的處理流程圖如下:
?
4.2 Handler處理
Handler線程默認(rèn)有10個,所以處理邏輯是多線程的。每個Handler線程會從剛才提到的callQueue中取一個Call對象,然后調(diào)用Server.call()方法執(zhí)行這個Call對象中蘊含的RPC請求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最后這個call()函數(shù)里面真正執(zhí)行。
call方法會首先校驗這個請求發(fā)過來的數(shù)據(jù)是不是合理的。然后就是獲取實現(xiàn)這個協(xié)議的服務(wù)。實現(xiàn)協(xié)議的服務(wù)在初始化的時候已經(jīng)注冊過了,就是前面說的那個com.google.protobuf.BlockingService類型的對象clientNNPbService。
這個就是實現(xiàn)Client和NameNode之間的ClientNamenodeProtocol協(xié)議的服務(wù),通過調(diào)用這句代碼:
result = service.callBlockingMethod(methodDescriptor, null, param);
就會執(zhí)行這個RPC請求的邏輯。service對象會把相應(yīng)的方法調(diào)用轉(zhuǎn)移到一個繼承自BlockingInterface接口的實現(xiàn)類上。Service的真正實現(xiàn)類就是clientProtocolServerTranslator,是newReflectiveBlockingService()這個函數(shù)的參數(shù)。并且此類是ClientNamenodeProtocolProtos中的子類,是在HDFS編譯的時候根據(jù)proto文件創(chuàng)建的。由于clientProtocolServerTranslator的構(gòu)造方法中傳遞的參數(shù)是NameNodeRpcServer,因此進一步的方法調(diào)用都在NameNodeRpcServer中實現(xiàn)的。
??? Handler處理流程如下:
?
如果元數(shù)據(jù)操作邏輯NameNodeRpcServer里面拋出IOException,那么它都會把它封裝成ServiceException,然后一路傳遞給client端。在client端,會通過ProtobufHelper.getRemoteException()把封裝在ServiceException中的IOException獲取出來。
??? RPC的Server端總體處理流程如下:
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/Scott007/p/3273352.html
總結(jié)
以上是生活随笔為你收集整理的HDFS2.x之RPC流程分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 通过 ANE(Adobe Native
- 下一篇: NSWindow上添加NSView