日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop2源码分析-RPC机制初识

發(fā)布時(shí)間:2025/3/15 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop2源码分析-RPC机制初识 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.概述

  上一篇博客,講述Hadoop V2的序列化機(jī)制,這為我們學(xué)習(xí)Hadoop V2的RPC機(jī)制奠定了基礎(chǔ)。RPC的內(nèi)容涵蓋的信息有點(diǎn)多,包含Hadoop的序列化機(jī)制,RPC,代理,NIO等。若對(duì)Hadoop序列化不了解的同學(xué),可以參考《Hadoop2源碼分析-序列化篇》。今天這篇博客為大家介紹的內(nèi)容目錄如下:

  • RPC概述
  • 第三方RPC
  • Hadoop V2的RPC簡(jiǎn)述

  那么,下面開(kāi)始今天的學(xué)習(xí)之路。

2.RPC概述

  首先,我們要弄明白,什么是RPC?RPC能用來(lái)做什么?

2.1什么是RPC

  RPC的全程是Remote Procedure Call,中文釋為遠(yuǎn)程過(guò)程調(diào)用。也就是說(shuō),調(diào)用的過(guò)程代碼(業(yè)務(wù)服務(wù)代碼)并不在調(diào)用者本地運(yùn)行,而是要實(shí)現(xiàn)調(diào)用著和被調(diào)用著之間的連接通信,有同學(xué)可能已經(jīng)發(fā)現(xiàn),這個(gè)和C/S模式很像。沒(méi)錯(cuò),RPC的基礎(chǔ)通信模式是基于C/S進(jìn)程間相互通信的模式來(lái)實(shí)現(xiàn)的,它對(duì)Client端提供遠(yuǎn)程接口服務(wù),其RPC原理圖如下所示:

2.2RPC的功能

  我們都知道,在過(guò)去的編程概念中,過(guò)程是由開(kāi)發(fā)人員在本地編譯完成的,并且只能局限在本地運(yùn)行的某一段代碼,即主程序和過(guò)程程序是一種本地調(diào)用關(guān)系。因此,這種結(jié)構(gòu)在如今網(wǎng)絡(luò)飛速發(fā)展的情況下已無(wú)法適應(yīng)實(shí)際的業(yè)務(wù)需求。而且,傳統(tǒng)過(guò)程調(diào)用模式無(wú)法充分利用網(wǎng)絡(luò)上其他主機(jī)的資源,如CPU,內(nèi)存等,也無(wú)法提高代碼在Bean之間的共享,使得資源浪費(fèi)較大。

  而RPC的出現(xiàn),正好有效的解決了傳統(tǒng)過(guò)程中存在的這些不足。通過(guò)RPC,我們可以充分利用非共享內(nèi)存的機(jī)器,可以簡(jiǎn)便的將應(yīng)用分布在多臺(tái)機(jī)器上,類似集群分布。這樣方便的實(shí)現(xiàn)過(guò)程代碼共享,提高系統(tǒng)資源的利用率。減少單個(gè)集群的壓力,實(shí)現(xiàn)負(fù)載均衡。

3.第三方RPC

  在學(xué)習(xí)Hadoop V2的RPC機(jī)制之前,我們先來(lái)熟悉第三方的RPC機(jī)制是如何工作的,下面我以Thrift框架為例子。

  Thrift是一個(gè)軟件框架,用來(lái)進(jìn)行可擴(kuò)展且跨語(yǔ)言的服務(wù)開(kāi)發(fā)協(xié)議。它擁有強(qiáng)大的代碼生成引擎,支持C++,Java,Python,PHP,Ruby等編程語(yǔ)言。Thrift允許定義一個(gè)簡(jiǎn)單的定義文件(以.thirft結(jié)尾),文件中包含數(shù)據(jù)類型和服務(wù)接口。用以作為輸入文件,編譯器生成代碼用來(lái)方便的生成RPC客戶端和服務(wù)端通信的編程語(yǔ)言。具體Thrift安裝過(guò)程請(qǐng)參考《Mac OS X 下搭建thrift環(huán)境》。

3.1Thrift原理圖

  下面給出Thrift的原理圖,如下所示:

  下面為大家解釋一下上面的原理圖,首先,我們編譯完thrift定義文件后(這里我使用的是Java語(yǔ)言),會(huì)生成對(duì)應(yīng)的Java類文件,該類的Iface接口定義了我們所規(guī)范的接口函數(shù)。在服務(wù)端,實(shí)現(xiàn)Iface接口,編寫對(duì)應(yīng)函數(shù)下的業(yè)務(wù)邏輯,啟動(dòng)服務(wù)。客戶端同樣需要生成的Java類文件,以供Client端調(diào)用相應(yīng)的接口函數(shù),監(jiān)聽(tīng)服務(wù)端的IP和PORT來(lái)獲取連接對(duì)象。

3.2代碼示例

  • Server端代碼:
package cn.rpc.main;import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import cn.rpc.service.StatQueryService; import cn.rpc.service.impl.StatQueryServiceImpl;/*** @Date Mar 23, 2015** @Author dengjie*/ public class StatsServer {private static Logger logger = LoggerFactory.getLogger(StatsServer.class);private final int PORT = 9090;@SuppressWarnings({ "rawtypes", "unchecked" })private void start() {try {TNonblockingServerSocket socket = new TNonblockingServerSocket(PORT);final StatQueryService.Processor processor = new StatQueryService.Processor(new StatQueryServiceImpl());THsHaServer.Args arg = new THsHaServer.Args(socket);/** Binary coded format efficient, intensive data transmission, The* use of non blocking mode of transmission, according to the size* of the block, similar to the Java of NIO*/arg.protocolFactory(new TCompactProtocol.Factory());arg.transportFactory(new TFramedTransport.Factory());arg.processorFactory(new TProcessorFactory(processor));TServer server = new THsHaServer(arg);server.serve();} catch (Exception ex) {ex.printStackTrace();}}public static void main(String[] args) {try {logger.info("start thrift server...");StatsServer stats = new StatsServer();stats.start();} catch (Exception ex) {ex.printStackTrace();logger.error(String.format("run thrift server has error,msg is %s", ex.getMessage()));}}}
  • Client端代碼:
package cn.rpc.test;import java.util.Map;import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport;import cn.rpc.service.StatQueryService;/*** @Date Mar 23, 2015** @Author dengjie* * @Note Test thrift client*/ public class StatsClient {public static final String ADDRESS = "127.0.0.1";public static final int PORT = 9090;public static final int TIMEOUT = 30000;public static void main(String[] args) {if (args.length < 4) {System.out.println("args length must >= 4,current length is " + args.length);System.out.println("<info>****************</info>");System.out.println("ADDRESS,beginDate,endDate,kpiCode,...");System.out.println("<info>****************</info>");return;}TTransport transport = new TFramedTransport(new TSocket(args[0], PORT, TIMEOUT));TProtocol protocol = new TCompactProtocol(transport);StatQueryService.Client client = new StatQueryService.Client(protocol);String beginDate = args[1]; // "20150308"String endDate = args[2]; // "20150312"String kpiCode = args[3]; // "login_times"String userName = "";int areaId = 0;String type = "";String fashion = "";try {transport.open();Map<String, String> map = client.queryConditionDayKPI(beginDate, endDate, kpiCode, userName, areaId, type,fashion);System.out.println(map.toString());} catch (Exception e) {e.printStackTrace();} finally {transport.close();}}}
  • StatQueryService類:

  這個(gè)類的代碼量太大,暫不貼出。需要的同學(xué)請(qǐng)到以下地址下載。

  下載地址:git@gitlab.com:dengjie/Resource.git

  • StatQueryServiceImpl類:

  下面實(shí)現(xiàn)其中一個(gè)函數(shù)的內(nèi)容,代碼如下所示:

package cn.rpc.service.impl;import java.util.HashMap; import java.util.List; import java.util.Map;import org.apache.thrift.TException;import cn.rpc.conf.ConfigureAPI; import cn.rpc.dao.KpiDao; import cn.rpc.domain.ReportParam; import cn.rpc.domain.ReportResult; import cn.rpc.service.StatQueryService; import cn.rpc.util.MapperFactory;/*** @Date Mar 23, 2015** @Author dengjie*/ public class StatQueryServiceImpl implements StatQueryService.Iface {public Map<String, String> queryDayKPI(String beginDate, String endDate, String kpiCode) throws TException {return null;}public Map<String, String> queryConditionDayKPI(String beginDate, String endDate, String kpiCode, String userName,int areaId, String type, String fashion) throws TException {Map<String, String> res = new HashMap<String, String>();ReportParam param = new ReportParam();param.setBeginDate(beginDate + "");param.setEndDate(endDate + "");param.setKpiCode(kpiCode);param.setUserName(userName == "" ? null : userName);param.setDistrictId(areaId < 0 ? 0 : areaId);param.setProductStyle(fashion == "" ? null : fashion);param.setCustomerProperty(type == "" ? null : type);List<ReportResult> chart = ((KpiDao) MapperFactory.createMapper(KpiDao.class)).getChartAmount(param);Map<String, Integer> title = ((KpiDao) MapperFactory.createMapper(KpiDao.class)).getTitleAmount(param);List<Map<String, Integer>> tableAmount = ((KpiDao) MapperFactory.createMapper(KpiDao.class)).getTableAmount(param);String avgTime = kpiCode.split("_")[0];param.setKpiCode(avgTime + "_avg_time");List<Map<String, Integer>> tableAvgTime = ((KpiDao) MapperFactory.createMapper(KpiDao.class)).getTableAmount(param);res.put(ConfigureAPI.RESMAPKEY.CHART, chart.toString());res.put(ConfigureAPI.RESMAPKEY.TITLE, title.toString());res.put(ConfigureAPI.RESMAPKEY.TABLEAMOUNT, tableAmount.toString());res.put(ConfigureAPI.RESMAPKEY.TABLEAVG, tableAvgTime.toString());return res;}public Map<String, String> queryDetail(String beginDate, String endDate, String userName) throws TException {// TODO Auto-generated method stubreturn null;}}

4.Hadoop V2的RPC簡(jiǎn)述

  Hadoop V2中的RPC采用的是自己獨(dú)立開(kāi)發(fā)的協(xié)議,其核心內(nèi)容包含服務(wù)端,客戶端,交互協(xié)議。源碼內(nèi)容都在hadoop-common-project項(xiàng)目的org.apache.hadoop.ipc包下面。

  • VersionedProtocol類:
package org.apache.hadoop.ipc;import java.io.IOException;/*** Superclass of all protocols that use Hadoop RPC.* Subclasses of this interface are also supposed to have* a static final long versionID field.*/ public interface VersionedProtocol {/*** Return protocol version corresponding to protocol interface.* @param protocol The classname of the protocol interface* @param clientVersion The version of the protocol that the client speaks* @return the version that the server will speak* @throws IOException if any IO error occurs*/public long getProtocolVersion(String protocol,long clientVersion) throws IOException;/*** Return protocol version corresponding to protocol interface.* @param protocol The classname of the protocol interface* @param clientVersion The version of the protocol that the client speaks* @param clientMethodsHash the hashcode of client protocol methods* @return the server protocol signature containing its version and* a list of its supported methods* @see ProtocolSignature#getProtocolSignature(VersionedProtocol, String, * long, int) for a default implementation*/public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,int clientMethodsHash) throws IOException; }

  該類中的兩個(gè)方法一個(gè)是作為版本,另一個(gè)作為簽名用。

  • RPC下的Server類:
/** An RPC Server. */public abstract static class Server extends org.apache.hadoop.ipc.Server {boolean verbose;static String classNameBase(String className) {String[] names = className.split("\\.", -1);if (names == null || names.length == 0) {return className;}return names[names.length-1];}

  對(duì)外提供服務(wù),處理Client端的請(qǐng)求,并返回處理結(jié)果。

  至于Client端,監(jiān)聽(tīng)Server端的IP和PORT,封裝請(qǐng)求數(shù)據(jù),并接受Response。

5.總結(jié)

  這篇博客贅述了RPC的相關(guān)內(nèi)容,讓大家先熟悉一下RPC的相關(guān)機(jī)制和流程,并簡(jiǎn)述了Hadoop V2的RPC機(jī)制,關(guān)于Hadoop V2的RPC詳細(xì)內(nèi)容會(huì)在下一篇博客中給大家分享。這里只是讓大家先對(duì)Hadoop V2的RPC機(jī)制有個(gè)初步的認(rèn)識(shí)。

6.結(jié)束語(yǔ)

  這篇博客就和大家分享到這里,如果大家在研究學(xué)習(xí)的過(guò)程當(dāng)中有什么問(wèn)題,可以加群進(jìn)行討論或發(fā)送郵件給我,我會(huì)盡我所能為您解答,與君共勉!

轉(zhuǎn)載于:https://www.cnblogs.com/smartloli/p/4459763.html

總結(jié)

以上是生活随笔為你收集整理的Hadoop2源码分析-RPC机制初识的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。