日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Netty之自定义RPC

發(fā)布時(shí)間:2024/7/5 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Netty之自定义RPC 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

需求分析

使用netty實(shí)現(xiàn)方法遠(yuǎn)程調(diào)用, 在client調(diào)用本地接口中方法時(shí), 使用反射進(jìn)行遠(yuǎn)程調(diào)用, server執(zhí)行完結(jié)果后, 將執(zhí)行結(jié)果進(jìn)行封裝, 發(fā)送到client

RPC調(diào)用模型:

1. 服務(wù)消費(fèi)方(client)以本地調(diào)用方式調(diào)用服務(wù) 2. client stub 接收到調(diào)用后負(fù)責(zé)將方法、參數(shù)等封裝成能夠進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw 3. client stub 將消息進(jìn)行編碼并發(fā)送到服務(wù)端 4. server stub 收到消息后進(jìn)行解碼 5. server stub 根據(jù)解碼結(jié)果調(diào)用本地的服務(wù) 6. 本地服務(wù)執(zhí)行并將結(jié)果返回給 server stub 7. server stub 將返回導(dǎo)入結(jié)果進(jìn)行編碼并發(fā)送至消費(fèi)方 8. client stub 接收到消息并進(jìn)行解碼 9. 服務(wù)消費(fèi)方(client)得到結(jié)果

自定義RPC結(jié)構(gòu)


根據(jù)上面執(zhí)行流程圖, 編寫(xiě)代碼
定義request消息結(jié)構(gòu):

/*** 封裝class信息, 用于反射過(guò)程, 封裝client的request*/ public class ClassInfo implements Serializable {private String className;private String methodName;//參數(shù)類(lèi)型private Class<?>[] type;//參數(shù)列表private Object[] objects;public ClassInfo(String className, String methodName, Class<?>[] type, Object[] objects) {this.className = className;this.methodName = methodName;this.type = type;this.objects = objects;}public String getClassName() {return className;}public String getMethodName() {return methodName;}public Class<?>[] getType() {return type;}public Object[] getObjects() {return objects;} }

Client

client接口:

public interface HelloRpc {String hello(String name); }

client的代理類(lèi):

public class NettyRpcProxy {/*** 創(chuàng)建代理對(duì)象* @param target* @return*/public static Object create(Class target) {//動(dòng)態(tài)代理, 在代理過(guò)程中執(zhí)行遠(yuǎn)程數(shù)據(jù)發(fā)送return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target},(proxy, method, args) -> {//定義要調(diào)用哪一個(gè)方法的信息ClassInfo classInfo = new ClassInfo(target.getName(), method.getName(), method.getParameterTypes(), args);NioEventLoopGroup workGroup = new NioEventLoopGroup();MyClientResultHandler myClientResultHandler = new MyClientResultHandler();Bootstrap bootstrap = new Bootstrap().group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast(myClientResultHandler);}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();//client進(jìn)行發(fā)送channelFuture.channel().writeAndFlush(classInfo).sync();channelFuture.channel().closeFuture().sync();return myClientResultHandler.getResponse();});} }

client的handler:

public class MyClientResultHandler extends ChannelInboundHandlerAdapter {private Object response;public Object getResponse() {return response;}//讀取從Server發(fā)送過(guò)來(lái)的執(zhí)行結(jié)果@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response = msg;ctx.close();} }

Server

server接口(該接口與client的接口是一樣的):

public interface HelloRpc {String hello(String name); }

server接口實(shí)現(xiàn)類(lèi):

public class HelloRpcImpl implements HelloRpc {@Overridepublic String hello(String name) {return "hello" + name;} }

server端:

/*** 網(wǎng)絡(luò)處理服務(wù)器*/ public class NettyRpcServer {private final int port;public NettyRpcServer(int port) {this.port = port;}public void start() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//對(duì)象編碼器, 底層使用Java序列化, 效率低下, 通常使用protobufpipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast(new MyInvokeHandler());}});try {ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("server is ready");channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}public static void main(String[] args) {new NettyRpcServer(9999).start();}}

server的MyInvokeHandler(解析從client發(fā)送的內(nèi)容):

/*** 封裝client調(diào)用方法后執(zhí)行結(jié)果*/ public class MyInvokeHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//使用反射機(jī)制, 調(diào)用本地方法實(shí)現(xiàn)類(lèi), 將方法執(zhí)行結(jié)果封裝, 發(fā)送至clientClassInfo classInfo = (ClassInfo) msg;Object clazz = Class.forName(this.getImplClassName(classInfo)).newInstance();Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getType());Object result = method.invoke(clazz, classInfo.getObjects());ctx.writeAndFlush(result);}//根據(jù)ClassInfo反射獲取對(duì)應(yīng)method執(zhí)行結(jié)果private String getImplClassName(ClassInfo classInfo) throws ClassNotFoundException {String interfacePath = "com.regotto.test.netty_test_rpc.server.fun";int lastIndexOf = classInfo.getClassName().lastIndexOf(".");String interfaceName = classInfo.getClassName().substring(lastIndexOf);Class superClass = Class.forName(interfacePath + interfaceName);Reflections reflections = new Reflections(interfacePath);//獲得該接口下的所有實(shí)現(xiàn)類(lèi)Set<Class<?>> implClassSet = reflections.getSubTypesOf(superClass);if (implClassSet.size() == 0) {System.out.println("未找到實(shí)現(xiàn)類(lèi), erro");return null;} else if (implClassSet.size() > 1) {System.out.println("實(shí)現(xiàn)類(lèi)存在多個(gè), 未指明使用哪一個(gè)");return null;}return (implClassSet.toArray(new Class[0]))[0].getName();} }

Client測(cè)試類(lèi)

/*** 測(cè)試*/ public class TestNettyRpc {public static void main(String[] args) {//反射調(diào)用HelloNetty helloNetty = (HelloNetty) NettyRpcProxy.create(HelloNetty.class);System.out.println(helloNetty.hello());} }

執(zhí)行結(jié)果:

總結(jié)

RPC機(jī)制:
1.Client使用動(dòng)態(tài)代理機(jī)制, 將本地接口信息編碼封裝發(fā)送至Server
2.Server將接收到的信息解碼, 根據(jù)反射機(jī)制獲得方法執(zhí)行結(jié)果, 然后將執(zhí)行結(jié)果編碼封裝發(fā)送至Client
3.Client解析Server發(fā)送的執(zhí)行結(jié)果

總結(jié)

以上是生活随笔為你收集整理的Netty之自定义RPC的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。