日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop中RPC机制详解之Server端

發(fā)布時間:2024/1/17 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop中RPC机制详解之Server端 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

2019獨角獸企業(yè)重金招聘Python工程師標準>>>

Hadoop 中 RPC 機制詳解之 Client 端


1. Server.Listener

RPC Client 端的 RPC 請求發(fā)送到 Server 端后, 首先由 Server.Listener 接收

Server.Listener 類繼承自 Thread 類, 監(jiān)聽了 OP_READ 和 OP_ACCEPT 事件

Server.Listener 接收 RPC 請求, 在 Server.Listener.doRead() 方法中讀取數(shù)據(jù), 在 doRead() 方法中又調(diào)用了Server.Connection.readAndProcess() 方法,?

最后會調(diào)用 Server.Connection.processRpcRequest() 方法, 源碼如下:

private?void?processRpcRequest(RpcRequestHeaderProto?header,DataInputStream?dis)?throws?WrappedRpcServerException,InterruptedException?{...Writable?rpcRequest;//?從成員變量dis中反序列化出Client端發(fā)送來的RPC請求(?WritableRpcEngine.Invocation對象?)try?{?//Read?the?rpc?requestrpcRequest?=?ReflectionUtils.newInstance(rpcRequestClass,?conf);rpcRequest.readFields(dis);}?catch?(Throwable?t)?{?//?includes?runtime?exception?from?newInstance...}//?構(gòu)造Server端Server.Call實例對象Call?call?=?new?Call(header.getCallId(),?header.getRetryCount(),rpcRequest,?this,?ProtoUtil.convert(header.getRpcKind()),?header.getClientId().toByteArray());//?將Server.Call實例對象放入調(diào)用隊列中callQueue.put(call);??????????????//?queue?the?call;?maybe?blocked?hereincRpcCount();??//?Increment?the?rpc?count}

調(diào)用隊列 callQueue 是 Server 的成員變量, Server.Listener?和?Server.Handler 是典型的生產(chǎn)者, 消費者模型,?

Server.Listener( 生產(chǎn)者 )的doRead()方法最終調(diào)用Server.Connection.processRpcRequest() 方法,?

而Server.Handler( 消費者 )處理RPC請求


2. Server.Handler 繼承 Thread 類, 其主要工作是處理 callQueue 中的調(diào)用, 都在 run() 方法中完成. 在 run() 的主循環(huán)中, 每次處理一個從 callQueue 中出隊的請求, Server.call() 是一個抽象方法, 實際是調(diào)用了 RPC.Server.call()方法, 最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用

/**?Handles?queued?calls?.?*/private?class?Handler?extends?Thread?{...@Overridepublic?void?run()?{...ByteArrayOutputStream?buf?=?new?ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);while?(running)?{...final?Call?call?=?callQueue.take();????//?獲取一個RPC調(diào)用請求...Writable?value?=?null;value?=?call.connection.user.doAs(new?PrivilegedExceptionAction<Writable>()?{@Overridepublic?Writable?run()?throws?Exception?{//?調(diào)用RPC.Server.call()方法//?call.rpcKind?:?RPC調(diào)用請求的類型,?一般為Writable//?call.connection.protocolName?:?RPC協(xié)議接口的類名//?call.rpcRequest?:?Invocation實例對象,?包括方法名,?參數(shù)列表,?參數(shù)列表的Class對象數(shù)組//?call.timestamp?:?調(diào)用時間戳return?call(call.rpcKind,?call.connection.protocolName,?call.rpcRequest,?call.timestamp);}});}...} }

RPC.Server.call() 方法如下:

@Override public?Writable?call(RPC.RpcKind?rpcKind,?String?protocol,Writable?rpcRequest,?long?receiveTime)?throws?Exception?{return?getRpcInvoker(rpcKind).call(this,?protocol,?rpcRequest,receiveTime); }

最后通過 WritableRPCEngine.call() 方法完成 Server 端方法調(diào)用, 代碼如下:

@Override public?Writable?call(org.apache.hadoop.ipc.RPC.Server?server,String?protocolName,?Writable?rpcRequest,?long?receivedTime)throws?IOException,?RPC.VersionMismatch?{Invocation?call?=?(Invocation)rpcRequest; //?將RPC請求強制轉(zhuǎn)成WritableRpcEngine.Invocation對象...long?clientVersion?=?call.getProtocolVersion();final?String?protoName;ProtoClassProtoImpl?protocolImpl; //?Server端RPC協(xié)議接口的實現(xiàn)類的實例對象...//?Invoke?the?protocol?methodtry?{...//?獲取RPC請求中調(diào)用的方法對象MethodMethod?method?=?protocolImpl.protocolClass.getMethod(call.getMethodName(),call.getParameterClasses());method.setAccessible(true);...//?在Server端RPC協(xié)議接口的實現(xiàn)類的實例對象?protocolImpl?上調(diào)用具體的方法Object?value?=?method.invoke(protocolImpl.protocolImpl,?call.getParameters());...//?調(diào)用正常結(jié)束,?返回調(diào)用結(jié)果return?new?ObjectWritable(method.getReturnType(),?value);}?catch?(InvocationTargetException?e)?{ //?調(diào)用出現(xiàn)異常,?用IOException包裝異常,?最后拋出該異常Throwable?target?=?e.getTargetException();if?(target?instanceof?IOException)?{throw?(IOException)target;}?else?{IOException?ioe?=?new?IOException(target.toString());ioe.setStackTrace(target.getStackTrace());throw?ioe;}}?catch?(Throwable?e)?{...}} }

在 WritableRpcEngine.call() 方法中, 傳入的 rpcRequest 會被強制轉(zhuǎn)換成 WritableRpcEngine.Invocation 類型的對象 call , 并通過 call 這個對象包含的方法名(getMethodName()方法)和參數(shù)列表的 Class對象數(shù)組(getParameterClasses())獲取 Method 對象, 最終通過 Method 對象的invoke() 方法, 調(diào)用實現(xiàn)類的實例對象 protocolImpl 上的方法, 完成 Hadoop 的遠程過程調(diào)用


好了, 現(xiàn)在 Server 端的具體方法已經(jīng)被調(diào)用了, 調(diào)用結(jié)果分兩種情況:

????1) 調(diào)用正常結(jié)束, 則將方法的返回值和調(diào)用結(jié)果封裝成一個 ObjectWritable 類型的對象, 并返回

????2) 調(diào)用出現(xiàn)異常, 拋出 IOException 類型的異常


3. Server.Responder

這個類的功能: 發(fā)送 Hadoop 遠程過程調(diào)用的應答給 Client 端,?Server.Responder 類繼承自 Thread 類, 監(jiān)聽了 OP_WRITE 事件, 即通道可寫.? 具體細節(jié)寫不下去了


總結(jié):?

????Server.Responder 和 Server.Listener, Server.Handler 一起配合, 完成 Hadoop 中 RPC 的 Server 端處理:

Server.Listener 接收 Client 端的連接請求和請求數(shù)據(jù); Server.Handler 完成實際的過程調(diào)用; Server.Responder 則進行應答發(fā)送


轉(zhuǎn)載于:https://my.oschina.net/u/2503731/blog/670456

總結(jié)

以上是生活随笔為你收集整理的Hadoop中RPC机制详解之Server端的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。