日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载

發(fā)布時間:2025/4/5 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

 原文地址:http://yanbohappy.sinaapp.com/?p=110 

  最新版本的Hadoop代碼中已經(jīng)默認(rèn)了Protocol buffer(以下簡稱PB,http://code.google.com/p/protobuf/)作為RPC的默認(rèn)實(shí)現(xiàn),原來的WritableRpcEngine已經(jīng)被淘汰了。來自cloudera的Aaron T. Myers在郵件中這樣說的“since PB can provide support for evolving protocols in a compatible fashion.”

首先要明白PB是什么,PB是Google開源的一種輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲格式,可以用于結(jié)構(gòu)化數(shù)據(jù)序列化/反序列化,很適合做數(shù)據(jù)存儲或 RPC 數(shù)據(jù)交換格式。它可用于通訊協(xié)議、數(shù)據(jù)存儲等領(lǐng)域的語言無關(guān)、平臺無關(guān)、可擴(kuò)展的序列化結(jié)構(gòu)數(shù)據(jù)格式。目前提供了 C++、Java、Python 三種語言的 API。簡單理解就是某個進(jìn)程把一些結(jié)構(gòu)化數(shù)據(jù)通過網(wǎng)絡(luò)通信的形式傳遞給另外一個進(jìn)程(典型應(yīng)用就是RPC);或者某個進(jìn)程要把某些結(jié)構(gòu)化數(shù)據(jù)持久化存儲到磁盤上(這個有點(diǎn)類似于在Mongodb中的BSON格式)。對于存儲的這個例子來說,使用PB和XML,JSON相比的缺點(diǎn)就是存儲在磁盤上的數(shù)據(jù)用戶是無法理解的,除非用PB反序列化之后才行,這個有點(diǎn)類似于IDL。優(yōu)點(diǎn)就是序列化/反序列化速度快,網(wǎng)絡(luò)或者磁盤IO傳輸?shù)臄?shù)據(jù)少,這個在Data-Intensive Scalable Computing中是非常重要的。

Hadoop使用PB作為RPC實(shí)現(xiàn)的另外一個原因是PB的語言、平臺無關(guān)性。在mailing list里聽說過社區(qū)的人有這樣的考慮:就是現(xiàn)在每個MapReduce task都是在一個JVM虛擬機(jī)上運(yùn)行的(即使是Streaming的模式,MR任務(wù)的數(shù)據(jù)流也是通過JVM與NN或者DN進(jìn)行RPC交換的),JVM最嚴(yán)重的問題就是內(nèi)存,例如OOM。我看社區(qū)里有人討論說如果用PB這樣的RPC實(shí)現(xiàn),那么每個MR task都可以直接與NN或者DN進(jìn)行RPC交換了,這樣就可以用C/C++來實(shí)現(xiàn)每一個MR task了。百度做的HCE(https://issues.apache.org/jira/browse/MAPREDUCE-1270)和這種思路有點(diǎn)類似,但是由于當(dāng)時的Hadoop RPC通信還是通過WritableRpcEngine來實(shí)現(xiàn)的,所以MR task還是沒有擺脫通過本地的JVM代理與NN或者DN通信的束縛,因?yàn)镃hild JVM Process還是存在的,還是由它來設(shè)置運(yùn)行時環(huán)境和RPC交互。

關(guān)于PB的原理和實(shí)現(xiàn),請大家參考http://code.google.com/p/protobuf/或者h(yuǎn)ttp://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608,本文不再贅述。

下面來看看Hadoop代碼中的RPC是如何實(shí)現(xiàn)的。RPC就是一臺機(jī)器上的某個進(jìn)程要調(diào)用另外一臺機(jī)器上的某個進(jìn)程的方法,中間通信傳輸?shù)木褪穷愃朴凇胺椒?shù)1、參數(shù)2……”這樣的信息,是結(jié)構(gòu)化的。同時通信除了這些RPC實(shí)體以外,還要有header等。

我們要定義一種PB實(shí)現(xiàn)的RPC傳輸格式,首先要定義相應(yīng)的.proto文件,在Hadoop common工程里,這些文件放在D:\Hadoop-trunk\hadoop-common-project\hadoop-common\src\main\proto目錄下;在Hadoop HDFS工程里這些文件放在D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下,以此類推。Hadoop編譯腳本會調(diào)用相應(yīng)的protoc二進(jìn)制程序來編譯這些以.proto結(jié)尾的文件,生成相應(yīng)的.java文件。

以D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目錄下的ClientNamenodeProtocol.proto為例說明。文件最開始定義了一些參數(shù):

option java_package = "org.apache.hadoop.hdfs.protocol.proto";option java_outer_classname = "ClientNamenodeProtocolProtos";option java_generic_services = true;option java_generate_equals_and_hash = true;

這個表示這個.proto文件經(jīng)過protoc編譯之后會生成org.apache.hadoop.hdfs.protocol.proto這個包下面的ClientNamenodeProtocolProtos.java類文件,那么在Hadoop源碼里就可以調(diào)用這個類里的方法了。

這個文件的主體主要是兩種數(shù)據(jù)類型message和rpc,仔細(xì)看下這個文件就知道了,message就是這個ClientNamenodeProtocol協(xié)議中傳輸?shù)慕Y(jié)構(gòu)體,rpc就是調(diào)用的方法。那么這兩種類型在經(jīng)過編譯之后會生成什么呢?

編譯之后,在Hadoop-trunk/hadoop-hdfs-project/hadoop-hdfs/target/generated-sources/java/org/apache/hadoop/hdfs/protocol/proto目錄里生成了ClientNamenodeProtocolProtos.java文件,里面把message都包裝成了類,而把rpc都包裝成了方法。這個文件是由PB編譯器自動生成的,所以不能修改。

有了這些java類之后,我們就可以看看在Server端是怎么實(shí)現(xiàn)RPC的了。首先還是NameNode初始化的流程,會調(diào)用到rpcServer = createRpcServer(conf)來創(chuàng)建RPC server。下面看看NameNodeRpcServer的構(gòu)造函數(shù)里都做了哪些工作:

public NameNodeRpcServer(Configuration conf, NameNode nn)throws IOException {this.nn = nn;this.namesystem = nn.getNamesystem();this.metrics = NameNode.getNameNodeMetrics();int handlerCount =conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,DFS_NAMENODE_HANDLER_COUNT_DEFAULT);InetSocketAddress socAddr = nn.getRpcServerAddress(conf);//設(shè)置ProtolEngine,目前只支持PB協(xié)議。表示接收到的RPC協(xié)議如果是ClientNamenodeProtocolPB,//那么處理這個RPC協(xié)議的引擎是ProtobufRpcEngineRPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);//聲明一個ClientNamenodeProtocolServerSideTranslatorPB,//這個類負(fù)責(zé)把Server接收到的PB格式對象的數(shù)據(jù),拼裝成NameNode內(nèi)村中的數(shù)據(jù)類型,//調(diào)用NameNodeRpcServer類中相應(yīng)的邏輯,然后再把執(zhí)行結(jié)果拼裝成PB格式。 ClientNamenodeProtocolServerSideTranslatorPBclientProtocolServerTranslator =new ClientNamenodeProtocolServerSideTranslatorPB(this);BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =new DatanodeProtocolServerSideTranslatorPB(this);BlockingService dnProtoPbService = DatanodeProtocolService.newReflectiveBlockingService(dnProtoPbTranslator);NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =new NamenodeProtocolServerSideTranslatorPB(this);BlockingService NNPbService = NamenodeProtocolService.newReflectiveBlockingService(namenodeProtocolXlator);RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService.newReflectiveBlockingService(refreshAuthPolicyXlator);RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =new RefreshUserMappingsProtocolServerSideTranslatorPB(this);BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService.newReflectiveBlockingService(refreshUserMappingXlator);GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =new GetUserMappingsProtocolServerSideTranslatorPB(this);BlockingService getUserMappingService = GetUserMappingsProtocolService.newReflectiveBlockingService(getUserMappingXlator);HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =new HAServiceProtocolServerSideTranslatorPB(this);BlockingService haPbService = HAServiceProtocolService.newReflectiveBlockingService(haServiceProtocolXlator);WritableRpcEngine.ensureInitialized();InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);if (dnSocketAddr != null) {int serviceHandlerCount =conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);// Add all the RPC protocols that the namenode implementsthis.serviceRpcServer =RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class, clientNNPbService,dnSocketAddr.getHostName(), dnSocketAddr.getPort(),serviceHandlerCount,false, conf, namesystem.getDelegationTokenSecretManager());DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,serviceRpcServer);DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,refreshAuthService, serviceRpcServer);DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,refreshUserMappingService, serviceRpcServer);DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,getUserMappingService, serviceRpcServer);this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();nn.setRpcServiceServerAddress(conf, serviceRPCAddress);} else {serviceRpcServer = null;serviceRPCAddress = null;}// Add all the RPC protocols that the namenode implementsthis.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf,namesystem.getDelegationTokenSecretManager());DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,clientRpcServer);DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,refreshAuthService, clientRpcServer);DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,refreshUserMappingService, clientRpcServer);DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,getUserMappingService, clientRpcServer);// set service-level authorization security policyif (serviceAuthEnabled =conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());if (this.serviceRpcServer != null) {this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());}}// The rpc-server port can be ephemeral... ensure we have the correct infothis.clientRpcAddress = this.clientRpcServer.getListenerAddress();nn.setRpcServerAddress(conf, clientRpcAddress);this.minimumDataNodeVersion = conf.get(DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);}

ClientNamenodeProtocol是protoc編譯生成的ClientNamenodeProtocolProtos類中的inner class。

public static com.google.protobuf.BlockingServicenewReflectiveBlockingService(final BlockingInterface impl) {……}

這個方法也是由protoc編譯器自動生成的。這個方法會返回一個com.google.protobuf.BlockingService類型的對象,這種類型的對象定義了RPC的各種服務(wù),后面會講。

this.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf,namesystem.getDelegationTokenSecretManager());

這個RPC.getServer()函數(shù)生成一個Server對象,負(fù)責(zé)接收網(wǎng)絡(luò)連接,讀取數(shù)據(jù),調(diào)用處理數(shù)據(jù)函數(shù),返回結(jié)果。這個Server對象里有Listener, Handler, Responder內(nèi)部類,分別開啟多個線程負(fù)責(zé)監(jiān)聽、讀取、處理和返回結(jié)果。前兩個參數(shù)表示如果RPC發(fā)送過來的是ClientNamenodeProtocolPB協(xié)議,那么負(fù)責(zé)處理這個協(xié)議的服務(wù)(com.google.protobuf.BlockingService類型的對象)就是clientNNPbService。

這個RPC.getServer()會經(jīng)過層層調(diào)用,因?yàn)楝F(xiàn)在默認(rèn)的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就會調(diào)用到下面這個函數(shù),在這生成了一個Server對象,就是用于接收client端RPC請求,處理,回復(fù)的Server。這個Server對象是一個純粹的網(wǎng)絡(luò)服務(wù)的Server,在RPC中起到基礎(chǔ)網(wǎng)絡(luò)IO服務(wù)的作用。

public RPC.Server getServer(Class<?> protocol, Object protocolImpl,String bindAddress, int port, int numHandlers, int numReaders,int queueSizePerHandler, boolean verbose, Configuration conf,SecretManager<? extends TokenIdentifier> secretManager,String portRangeConfig)throws IOException {return new Server(protocol, protocolImpl, conf, bindAddress, port,numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,portRangeConfig);}

現(xiàn)在該用到的東西都生成好了,就要看看client端來了一個RPC請求之后,Server端是怎么處理的呢?

Server里的Reader線程也是基于Selector的異步IO模式,每次Select選出一個SelectionKey之后,會調(diào)用SelectionKey.attachment()把這個SelectionKey所attach的Connection對象獲取,然后執(zhí)行對應(yīng)的readAndProcess()方法,把這個SelectionKey所對應(yīng)的管道上的網(wǎng)絡(luò)IO數(shù)據(jù)讀入緩沖區(qū)。readAndProcess()方法會層層調(diào)用到Server.processData()方法,在這個方法內(nèi)部,會把剛才從網(wǎng)絡(luò)IO中讀取的數(shù)據(jù)反序列化成對象rpcRequest對象。rpcRequest對象的類型是繼承自Writable類型的子類的對象,也就是說可以序列化/反序列化的類。這里rpcRequest對象里包含的RPC請求的內(nèi)容對象是由.proto文件中Message生成的類,也就是說PB框架自動編譯出來的類,后面可以通過調(diào)用這個類的get方法獲取RPC中真正傳輸?shù)臄?shù)據(jù)。之后把生成的rpcRequest對象放到一個Call對象里面,再把Call對象放到隊(duì)列Server.callQueue里面。至此網(wǎng)絡(luò)服務(wù)器的Reader線程做的工作就OK了。

下面看看Handler線程是怎么處理的。Handler線程默認(rèn)有10個,所以處理邏輯是多線程的。每個Handler線程會從剛才提到的callQueue中取一個Call對象,然后調(diào)用Server.call()方法執(zhí)行這個Call對象中蘊(yùn)含的RPC請求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最后這個call()函數(shù)里面真正執(zhí)行嘍。。。。重點(diǎn)看這個函數(shù),首先校驗(yàn)這個請求發(fā)過來的數(shù)據(jù)是不是合理的。然后就是獲取實(shí)現(xiàn)這個協(xié)議的服務(wù)。實(shí)現(xiàn)協(xié)議的服務(wù)在初始化的時候已經(jīng)注冊過了,就是前面說的那個com.google.protobuf.BlockingService類型的對象,例如:

BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);

這個就是實(shí)現(xiàn)Client和NameNode之間的ClientNamenodeProtocol協(xié)議的服務(wù)。當(dāng)然還有dnProtoPbService, NNPbService, refreshAuthService, refreshUserMappingService, haPbService等等這些不同的服務(wù)。

這個Service獲取了之后,通過調(diào)用這句代碼

result = service.callBlockingMethod(methodDescriptor, null, param);

就會執(zhí)行這個RPC請求的邏輯。

再往深入執(zhí)行就要涉及到google protocol buffer內(nèi)部的東西了,這個service對象會把相應(yīng)的方法調(diào)用轉(zhuǎn)移到一個繼承自BlockingInterface接口的實(shí)現(xiàn)類上。Service的真正實(shí)現(xiàn)類就是clientProtocolServerTranslator,是newReflectiveBlockingService()這個函數(shù)的參數(shù)。

BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);

這個初始化過程中的參數(shù),也就是service.callBlockingMethod()真正調(diào)用的是clientProtocolServerTranslator中對應(yīng)的方法。這一點(diǎn)可以通過由protoc自動編譯生成的代碼中看出:

public static com.google.protobuf.BlockingServicenewReflectiveBlockingService(final BlockingInterface impl) {return new com.google.protobuf.BlockingService() {public final com.google.protobuf.Descriptors.ServiceDescriptorgetDescriptorForType() {return getDescriptor();}public final com.google.protobuf.Message callBlockingMethod(com.google.protobuf.Descriptors.MethodDescriptor method,com.google.protobuf.RpcController controller,com.google.protobuf.Message request)throws com.google.protobuf.ServiceException {if (method.getService() != getDescriptor()) {throw new java.lang.IllegalArgumentException("Service.callBlockingMethod() given method descriptor for " +"wrong service type.");}switch(method.getIndex()) {case 0:return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);case 1:return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request);case 2:return impl.create(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto)request);case 3:return impl.append(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto)request);…… } …… }

上面就是proto編譯生成的ClientNamenodeProtocolProtos.java文件,從中可以看出對callBlockingMethod()方法的調(diào)用都是轉(zhuǎn)移到BlockingInterface impl上面了。

然后我們看看clientProtocolServerTranslator是怎么進(jìn)一步執(zhí)行的。下面以getBlockLocations()函數(shù)為例說明:

public GetBlockLocationsResponseProto getBlockLocations(RpcController controller, GetBlockLocationsRequestProto req)throws ServiceException {try {//下面這個server是由NameNodeRpcServer類生成的對象,定義了HDFS元數(shù)據(jù)操作邏輯。LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),req.getLength());//由于server返回的是NameNode內(nèi)存中的數(shù)據(jù)結(jié)構(gòu),要把這個結(jié)果通過RPC傳回client端,//那么我們需要利用PB框架提供的對應(yīng)Message的Builder類,把內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)通過這個接口序列化。Builder builder = GetBlockLocationsResponseProto.newBuilder();if (b != null) {builder.setLocations(PBHelper.convert(b)).build();}return builder.build();} catch (IOException e) {throw new ServiceException(e);}}

至此,Hadoop的RPC流程Server端已經(jīng)分析結(jié)束,不過這個是正確執(zhí)行的流程。如果中間拋出了異常呢?還是以上面這個getBlockLocations()函數(shù)為例,如果元數(shù)據(jù)操作邏輯NameNodeRpcServer里面拋出IOException,那么它都會把它封裝成ServiceException,然后一路傳遞給client端。在client端,會通過ProtobufHelper.getRemoteException()把封裝在ServiceException中的IOException獲取出來。

?

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4773914.html

總結(jié)

以上是生活随笔為你收集整理的Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。