Hadoop中RPC机制详解之Server端
2019獨角獸企業(yè)重金招聘Python工程師標準>>>
Hadoop 中 RPC 機制詳解之 Client 端
1. Server.Listener
RPC Client 端的 RPC 請求發(fā)送到 Server 端后, 首先由 Server.Listener 接收
Server.Listener 類繼承自 Thread 類, 監(jiān)聽了 OP_READ 和 OP_ACCEPT 事件
Server.Listener 接收 RPC 請求, 在 Server.Listener.doRead() 方法中讀取數(shù)據(jù), 在 doRead() 方法中又調(diào)用了Server.Connection.readAndProcess() 方法,?
最后會調(diào)用 Server.Connection.processRpcRequest() 方法, 源碼如下:
private?void?processRpcRequest(RpcRequestHeaderProto?header,DataInputStream?dis)?throws?WrappedRpcServerException,InterruptedException?{...Writable?rpcRequest;//?從成員變量dis中反序列化出Client端發(fā)送來的RPC請求(?WritableRpcEngine.Invocation對象?)try?{?//Read?the?rpc?requestrpcRequest?=?ReflectionUtils.newInstance(rpcRequestClass,?conf);rpcRequest.readFields(dis);}?catch?(Throwable?t)?{?//?includes?runtime?exception?from?newInstance...}//?構(gòu)造Server端Server.Call實例對象Call?call?=?new?Call(header.getCallId(),?header.getRetryCount(),rpcRequest,?this,?ProtoUtil.convert(header.getRpcKind()),?header.getClientId().toByteArray());//?將Server.Call實例對象放入調(diào)用隊列中callQueue.put(call);??????????????//?queue?the?call;?maybe?blocked?hereincRpcCount();??//?Increment?the?rpc?count}調(diào)用隊列 callQueue 是 Server 的成員變量, Server.Listener?和?Server.Handler 是典型的生產(chǎn)者, 消費者模型,?
Server.Listener( 生產(chǎn)者 )的doRead()方法最終調(diào)用Server.Connection.processRpcRequest() 方法,?
而Server.Handler( 消費者 )處理RPC請求
2. Server.Handler 繼承 Thread 類, 其主要工作是處理 callQueue 中的調(diào)用, 都在 run() 方法中完成. 在 run() 的主循環(huán)中, 每次處理一個從 callQueue 中出隊的請求, Server.call() 是一個抽象方法, 實際是調(diào)用了 RPC.Server.call()方法, 最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用
/**?Handles?queued?calls?.?*/private?class?Handler?extends?Thread?{...@Overridepublic?void?run()?{...ByteArrayOutputStream?buf?=?new?ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);while?(running)?{...final?Call?call?=?callQueue.take();????//?獲取一個RPC調(diào)用請求...Writable?value?=?null;value?=?call.connection.user.doAs(new?PrivilegedExceptionAction<Writable>()?{@Overridepublic?Writable?run()?throws?Exception?{//?調(diào)用RPC.Server.call()方法//?call.rpcKind?:?RPC調(diào)用請求的類型,?一般為Writable//?call.connection.protocolName?:?RPC協(xié)議接口的類名//?call.rpcRequest?:?Invocation實例對象,?包括方法名,?參數(shù)列表,?參數(shù)列表的Class對象數(shù)組//?call.timestamp?:?調(diào)用時間戳return?call(call.rpcKind,?call.connection.protocolName,?call.rpcRequest,?call.timestamp);}});}...} }
RPC.Server.call() 方法如下:
@Override public?Writable?call(RPC.RpcKind?rpcKind,?String?protocol,Writable?rpcRequest,?long?receiveTime)?throws?Exception?{return?getRpcInvoker(rpcKind).call(this,?protocol,?rpcRequest,receiveTime); }最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用, 代碼如下:
@Override public?Writable?call(org.apache.hadoop.ipc.RPC.Server?server,String?protocolName,?Writable?rpcRequest,?long?receivedTime)throws?IOException,?RPC.VersionMismatch?{Invocation?call?=?(Invocation)rpcRequest; //?將RPC請求強制轉(zhuǎn)成WritableRpcEngine.Invocation對象...long?clientVersion?=?call.getProtocolVersion();final?String?protoName;ProtoClassProtoImpl?protocolImpl; //?Server端RPC協(xié)議接口的實現(xiàn)類的實例對象...//?Invoke?the?protocol?methodtry?{...//?獲取RPC請求中調(diào)用的方法對象MethodMethod?method?=?protocolImpl.protocolClass.getMethod(call.getMethodName(),call.getParameterClasses());method.setAccessible(true);...//?在Server端RPC協(xié)議接口的實現(xiàn)類的實例對象?protocolImpl?上調(diào)用具體的方法Object?value?=?method.invoke(protocolImpl.protocolImpl,?call.getParameters());...//?調(diào)用正常結(jié)束,?返回調(diào)用結(jié)果return?new?ObjectWritable(method.getReturnType(),?value);}?catch?(InvocationTargetException?e)?{ //?調(diào)用出現(xiàn)異常,?用IOException包裝異常,?最后拋出該異常Throwable?target?=?e.getTargetException();if?(target?instanceof?IOException)?{throw?(IOException)target;}?else?{IOException?ioe?=?new?IOException(target.toString());ioe.setStackTrace(target.getStackTrace());throw?ioe;}}?catch?(Throwable?e)?{...}} }在 WritableRpcEngine.call() 方法中, 傳入的 rpcRequest 會被強制轉(zhuǎn)換成 WritableRpcEngine.Invocation 類型的對象 call , 并通過 call 這個對象包含的方法名(getMethodName()方法)和參數(shù)列表的 Class對象數(shù)組(getParameterClasses())獲取 Method 對象, 最終通過 Method 對象的invoke() 方法, 調(diào)用實現(xiàn)類的實例對象 protocolImpl 上的方法, 完成 Hadoop 的遠程過程調(diào)用
好了, 現(xiàn)在 Server 端的具體方法已經(jīng)被調(diào)用了, 調(diào)用結(jié)果分兩種情況:
????1) 調(diào)用正常結(jié)束, 則將方法的返回值和調(diào)用結(jié)果封裝成一個 ObjectWritable 類型的對象, 并返回
????2) 調(diào)用出現(xiàn)異常, 拋出 IOException 類型的異常
3. Server.Responder
這個類的功能: 發(fā)送 Hadoop 遠程過程調(diào)用的應答給 Client 端,?Server.Responder 類繼承自 Thread 類, 監(jiān)聽了 OP_WRITE 事件, 即通道可寫.? 具體細節(jié)寫不下去了
總結(jié):?
????Server.Responder 和 Server.Listener, Server.Handler 一起配合, 完成 Hadoop 中 RPC 的 Server 端處理:
Server.Listener 接收 Client 端的連接請求和請求數(shù)據(jù); Server.Handler 完成實際的過程調(diào)用; Server.Responder 則進行應答發(fā)送
轉(zhuǎn)載于:https://my.oschina.net/u/2503731/blog/670456
總結(jié)
以上是生活随笔為你收集整理的Hadoop中RPC机制详解之Server端的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 跟我一起数据挖掘(10)——HP Ver
- 下一篇: WCF学习之旅—WCF寄宿前的准备(八)