日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

RPC框架原理及从零实现系列博客(二):11个类实现简单RPC框架

發(fā)布時(shí)間:2025/6/15 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RPC框架原理及从零实现系列博客(二):11个类实现简单RPC框架 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

項(xiàng)目1.0版本源碼

https://github.com/wephone/Me...


在上一博文中 跟大家講了RPC的實(shí)現(xiàn)思路 思路畢竟只是思路 那么這篇就帶著源碼給大家講解下實(shí)現(xiàn)過(guò)程中的各個(gè)具體問(wèn)題

讀懂本篇需要的基本知識(shí) 若尚未清晰請(qǐng)自行了解后再閱讀本文

  • java動(dòng)態(tài)代理
  • netty框架的基本使用
  • spring的基本配置

最終項(xiàng)目的使用如下

/***調(diào)用端代碼及spring配置*/ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ClientContext.xml"}) public class Client {@Testpublic void start(){Service service= (Service) RPC.call(Service.class);System.out.println("測(cè)試Integer,Double類型傳參與返回String對(duì)象:"+service.stringMethodIntegerArgsTest(233,666.66));//輸出string233666.66}}/***Service抽象及其實(shí)現(xiàn)*調(diào)用與實(shí)現(xiàn)端共同依賴Service*/ public interface Service {String stringMethodIntegerArgsTest(Integer a,Double b); } /*** ServiceImpl實(shí)現(xiàn)端對(duì)接口的具體實(shí)現(xiàn) */ public class ServiceImpl implements Service {@Overridepublic String stringMethodIntegerArgsTest(Integer a, Double b) {return "String"+a+b;} }

1.0版本分3個(gè)包

  • Client 調(diào)用端
  • Server 實(shí)現(xiàn)端
  • Core 核心方法

首先看這句代碼

調(diào)用端只需如此調(diào)用
定義接口 傳入接口類類型 后面調(diào)用的接口內(nèi)的方法 全部是由實(shí)現(xiàn)端實(shí)現(xiàn)

Service service= (Service) RPC.call(Service.class);

這句的作用其實(shí)就是生成調(diào)用端的動(dòng)態(tài)代理

/*** 暴露調(diào)用端使用的靜態(tài)方法 為抽象接口生成動(dòng)態(tài)代理對(duì)象* TODO 考慮后面優(yōu)化不在使用時(shí)仍需強(qiáng)轉(zhuǎn)* @param cls 抽象接口的類類型* @return 接口生成的動(dòng)態(tài)代理對(duì)象*/public static Object call(Class cls){RPCProxyHandler handler=new RPCProxyHandler();Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class<?>[]{cls},handler);return proxyObj;}

RPCProxyHandler為動(dòng)態(tài)代理的方法被調(diào)用后的回調(diào)方法 每個(gè)方法被調(diào)用時(shí)都會(huì)執(zhí)行這個(gè)invoke

/*** 代理抽象接口調(diào)用的方法* 發(fā)送方法信息給服務(wù)端 加鎖等待服務(wù)端返回* @param proxy* @param method* @param args* @return* @throws Throwable*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {RPCRequest request=new RPCRequest();request.setRequestID(buildRequestID(method.getName()));request.setClassName(method.getDeclaringClass().getName());//返回表示聲明由此 Method 對(duì)象表示的方法的類或接口的Class對(duì)象request.setMethodName(method.getName()); // request.setParameterTypes(method.getParameterTypes());//返回形參類型request.setParameters(args);//輸入的實(shí)參RPCRequestNet.requestLockMap.put(request.getRequestID(),request);RPCRequestNet.connect().send(request);//調(diào)用用結(jié)束后移除對(duì)應(yīng)的condition映射關(guān)系RPCRequestNet.requestLockMap.remove(request.getRequestID());return request.getResult();//目標(biāo)方法的返回結(jié)果}

也就是收集對(duì)應(yīng)調(diào)用的接口的信息 然后send給實(shí)現(xiàn)端
那么這個(gè)requestLockMap又是作何作用的呢

  • 由于我們的網(wǎng)絡(luò)調(diào)用都是異步
  • 但是RPC調(diào)用都要做到同步 等待這個(gè)遠(yuǎn)程調(diào)用方法完全返回后再繼續(xù)執(zhí)行
  • 所以將每個(gè)請(qǐng)求的request對(duì)象作為對(duì)象鎖 每個(gè)請(qǐng)求發(fā)送后加鎖 等到網(wǎng)絡(luò)異步調(diào)用返回后再釋放所
  • 生成每個(gè)請(qǐng)求的ID 這里我用隨機(jī)數(shù)加時(shí)間戳
  • 將請(qǐng)求ID和請(qǐng)求對(duì)象維護(hù)在靜態(tài)全局的一個(gè)map中 實(shí)現(xiàn)端通過(guò)ID來(lái)對(duì)應(yīng)是哪個(gè)請(qǐng)求
  • 異步調(diào)用返回后 通過(guò)ID notify喚醒對(duì)應(yīng)請(qǐng)求對(duì)象的線程

netty異步返回的調(diào)用 釋放對(duì)象鎖

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String responseJson= (String) msg;RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson);synchronized (RPCRequestNet.requestLockMap.get(response.getRequestID())) {//喚醒在該對(duì)象鎖上wait的線程RPCRequest request= (RPCRequest) RPCRequestNet.requestLockMap.get(response.getRequestID());request.setResult(response.getResult());request.notifyAll();}}

接下來(lái)是RPCRequestNet.connect().send(request);方法
connect方法其實(shí)是單例模式返回RPCRequestNet實(shí)例
RPCRequestNet構(gòu)造方法是使用netty對(duì)實(shí)現(xiàn)端進(jìn)行TCP鏈接
send方法如下

try {//判斷連接是否已完成 只在連接啟動(dòng)時(shí)會(huì)產(chǎn)生阻塞if (RPCRequestHandler.channelCtx==null){connectlock.lock();//掛起等待連接成功System.out.println("正在等待連接實(shí)現(xiàn)端");connectCondition.await();connectlock.unlock();}//編解碼對(duì)象為json 發(fā)送請(qǐng)求String requestJson= null;try {requestJson = RPC.requestEncode(request);} catch (JsonProcessingException e) {e.printStackTrace();}ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes());RPCRequestHandler.channelCtx.writeAndFlush(requestBuf);System.out.println("調(diào)用"+request.getRequestID()+"已發(fā)送");//掛起等待實(shí)現(xiàn)端處理完畢返回 TODO 后續(xù)配置超時(shí)時(shí)間synchronized (request) {//放棄對(duì)象鎖 并阻塞等待notifyrequest.wait();}System.out.println("調(diào)用"+request.getRequestID()+"接收完畢");} catch (InterruptedException e) {e.printStackTrace();}

condition和lock同樣是為了同步等待異步IO返回用的
send方法基本是編解碼json后發(fā)送給實(shí)現(xiàn)端

調(diào)用端基本實(shí)現(xiàn)綜上所述 代理 發(fā)送 同步鎖


下面是服務(wù)端的使用和實(shí)現(xiàn)

/***實(shí)現(xiàn)端代碼及spring配置*/@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations={"file:src/test/java/rpcTest/ServerContext.xml"})public class Server {@Testpublic void start(){//啟動(dòng)spring后才可啟動(dòng) 防止容器尚未加載完畢RPC.start();}}

出了配置spring之外 實(shí)現(xiàn)端就一句 RPC.start()
其實(shí)就是啟動(dòng)netty服務(wù)器
服務(wù)端的處理客戶端信息回調(diào)如下

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {String requestJson= (String) msg;System.out.println("receive request:"+requestJson);RPCRequest request= RPC.requestDeocde(requestJson);Object result=InvokeServiceUtil.invoke(request);//netty的write方法并沒(méi)有直接寫入通道(為避免多次喚醒多路復(fù)用選擇器)//而是把待發(fā)送的消息放到緩沖數(shù)組中,flush方法再全部寫到通道中 // ctx.write(resp);//記得加分隔符 不然客戶端一直不會(huì)處理RPCResponse response=new RPCResponse();response.setRequestID(request.getRequestID());response.setResult(result);String respStr=RPC.responseEncode(response);ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes());ctx.writeAndFlush(responseBuf);}

主要是編解碼json 反射對(duì)應(yīng)的方法 我們看看反射的工具類

/*** 反射調(diào)用相應(yīng)實(shí)現(xiàn)類并結(jié)果* @param request* @return*/public static Object invoke(RPCRequest request){Object result=null;//內(nèi)部變量必須賦值 全局變量才不用//實(shí)現(xiàn)類名String implClassName= RPC.getServerConfig().getServerImplMap().get(request.getClassName());try {Class implClass=Class.forName(implClassName);Object[] parameters=request.getParameters();int parameterNums=request.getParameters().length;Class[] parameterTypes=new Class[parameterNums];for (int i = 0; i <parameterNums ; i++) {parameterTypes[i]=parameters[i].getClass();}Method method=implClass.getDeclaredMethod(request.getMethodName(),parameterTypes);Object implObj=implClass.newInstance();result=method.invoke(implObj,parameters);} catch (ClassNotFoundException e) {e.printStackTrace();} catch (NoSuchMethodException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}return result;}

解析Parameters getClass獲取他們的類類型 反射調(diào)用對(duì)應(yīng)的方法

這里需要注意一個(gè)點(diǎn)

  • 本文最初采用Gson處理json 但gson默認(rèn)會(huì)把int類型轉(zhuǎn)為double類型 例如2變?yōu)?.0 不適用本場(chǎng)景 我也不想去專門適配
  • 所以換用了jackson
  • 常見json處理框架 反序列化為對(duì)象時(shí) int,long等基本類型都會(huì)變成他們的包裝類Integer Long
  • 所以本例程中 遠(yuǎn)程調(diào)度接口方法的形參不可以使用int等基本類型
  • 否則method.invoke(implObj,parameters);會(huì)找不到對(duì)應(yīng)的方法報(bào)錯(cuò)
  • 因?yàn)閜arameters已經(jīng)是包裝類了 而method還是int這些基本類 所以找不到對(duì)應(yīng)方法

最后是借助spring配置基礎(chǔ)配置
我寫了兩個(gè)類 ServerConfig ClientConfig 作為調(diào)用端和服務(wù)端的配置
只需在spring中配置這兩個(gè)bean 并啟動(dòng)IOC容器即可

調(diào)用端

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><bean class="org.meizhuo.rpc.client.ClientConfig"><property name="host" value="127.0.0.1"></property><property name="port" value="9999"></property></bean> </beans>

實(shí)現(xiàn)端

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"><bean class="org.meizhuo.rpc.server.ServerConfig"><property name="port" value="9999"></property><property name="serverImplMap"><map><!--配置對(duì)應(yīng)的抽象接口及其實(shí)現(xiàn)--><entry key="rpcTest.Service" value="rpcTest.ServiceImpl"></entry></map></property></bean></beans>

最后有個(gè)小問(wèn)題

我們的框架是作為一個(gè)依賴包引入的 我們不可能在我們的框架中讀取對(duì)應(yīng)的spring xml
這樣完全是去了框架的靈活性
那我們?cè)趺丛?strong>運(yùn)行過(guò)程中獲得我們所處于的IOC容器 已獲得我們的正確配置信息呢
答案是spring提供的ApplicationContextAware接口

/*** Created by wephone on 17-12-26.*/ public class ClientConfig implements ApplicationContextAware {private String host;private int port;//調(diào)用超時(shí)時(shí)間private long overtime;public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public long getOvertime() {return overtime;}public void setOvertime(long overtime) {this.overtime = overtime;}/*** 加載Spring配置文件時(shí),如果Spring配置文件中所定義的Bean類* 如果該類實(shí)現(xiàn)了ApplicationContextAware接口* 那么在加載Spring配置文件時(shí),會(huì)自動(dòng)調(diào)用ApplicationContextAware接口中的* @param applicationContext* @throws BeansException*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RPC.clientContext=applicationContext;} }

這樣我們?cè)赗PC類內(nèi)部就維護(hù)了一個(gè)靜態(tài)IOC容器的context
只需如此獲取配置
RPC.getServerConfig().getPort()

public static ServerConfig getServerConfig(){return serverContext.getBean(ServerConfig.class);}

就這樣 這個(gè)RPC框架的核心部分 已經(jīng)講述完畢了

本例程僅為1.0版本
后續(xù)博客中 會(huì)加入異常處理 zookeeper支持 負(fù)載均衡策略等
博客:zookeeper支持
歡迎持續(xù)關(guān)注 歡迎star 提issue

總結(jié)

以上是生活随笔為你收集整理的RPC框架原理及从零实现系列博客(二):11个类实现简单RPC框架的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。