RPC实现Consumer 远程调用
生活随笔
收集整理的這篇文章主要介紹了
RPC实现Consumer 远程调用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
梳理一下基本的實現思路,主要完成一個這樣的功能:API 模塊中的接口功能在服務端實現(并沒有在客戶端實現)。因此,客戶端調用API 中定義的某一個接口方法時,實際上是要發起一次網絡請求去調用服務端的某一個服務。而這個網絡請求首先被注冊中心接收,由注冊中心先確定需要調用的服務的位置,再將請求轉發至真實的服務實現,最終調用服務端代碼,將返回值通過網絡傳輸給客戶端。整個過程對于客戶端而言是完全無感知的,就像調用本地方法一樣。具體調用過程如下圖所示:
下面來看代碼實現,創建RpcProxy 類:
import java.lang.reflect.Proxy; public class RpcProxy {public static <T> T create(Class<?> clazz){//clazz 傳進來本身就是interfaceMethodProxy proxy = new MethodProxy(clazz);Class<?> [] interfaces = clazz.isInterface() ?new Class[]{clazz} :clazz.getInterfaces();T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);return result;} }在RpcProxy 類的內部實現遠程方法調用的代理類,即由Netty 發送網絡請求,具體代碼如下:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class RpcProxy {public static <T> T create(Class<?> clazz){}private static class MethodProxy implements InvocationHandler {private Class<?> clazz;public MethodProxy(Class<?> clazz){this.clazz = clazz;}public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//如果傳進來是一個已實現的具體類(本次演示略過此邏輯)if (Object.class.equals(method.getDeclaringClass())) {try {return method.invoke(this, args);} catch (Throwable t) {t.printStackTrace();}//如果傳進來的是一個接口(核心)} else {return rpcInvoke(proxy,method, args);}return null;}/*** 實現接口的核心方法* @param method* @param args* @return*/public Object rpcInvoke(Object proxy,Method method,Object[] args){//傳輸協議封裝InvokerProtocol msg = new InvokerProtocol();msg.setClassName(this.clazz.getName());msg.setMethodName(method.getName());msg.setValues(args);msg.setParames(method.getParameterTypes());final RpcProxyHandler consumerHandler = new RpcProxyHandler();EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//自定義協議解碼器/** 入參有5 個,分別解釋如下maxFrameLength:框架的最大長度。如果幀的長度大于此值,則將拋出TooLongFrameException。lengthFieldOffset:長度字段的偏移量:即對應的長度字段在整個消息數據中得位置lengthFieldLength:長度字段的長度:如:長度字段是int 型表示,那么這個值就是4(long 型就是8)lengthAdjustment:要添加到長度字段值的補償值initialBytesToStrip:從解碼幀中去除的第一個字節數*/pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//自定義協議編碼器pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));//對象參數類型編碼器pipeline.addLast("encoder", new ObjectEncoder());//對象參數類型解碼器pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));pipeline.addLast("handler",consumerHandler);}});ChannelFuture future = b.connect("localhost", 8080).sync();future.channel().writeAndFlush(msg).sync();future.channel().closeFuture().sync();} catch(Exception e){e.printStackTrace();}finally {group.shutdownGracefully();}return consumerHandler.getResponse();}} }接收網絡調用的返回值
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class RpcProxyHandler extends ChannelInboundHandlerAdapter {private Object response;public Object getResponse() {return response;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response = msg;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("client exception is general");} }完成客戶端調用代碼:
public class RpcConsumer {public static void main(String [] args){IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);System.out.println(rpcHello.hello("Tom 老師"));IRpcService service = RpcProxy.create(IRpcService.class);System.out.println("8 + 2 = " + service.add(8, 2));System.out.println("8 - 2 = " + service.sub(8, 2));System.out.println("8 * 2 = " + service.mult(8, 2));System.out.println("8 / 2 = " + service.div(8, 2));} }?
總結
以上是生活随笔為你收集整理的RPC实现Consumer 远程调用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RPC实现Provider服务端业务逻辑
- 下一篇: Channel 与ChannelPipe