教你用 Netty 实现一个简单的 RPC!
眾所周知,dubbo 底層使用了 Netty 作為網(wǎng)絡通訊框架,而 Netty 的高性能我們之前也分析過源碼,對他也算還是比較了解了。
今天我們就自己用 Netty 實現(xiàn)一個簡單的 RPC 框架。
1?需求?
模仿 dubbo,消費者和提供者約定接口和協(xié)議,消費者遠程調(diào)用提供者,提供者返回一個字符串,消費者打印提供者返回的數(shù)據(jù)。底層網(wǎng)絡通信使用 Netty 4.1.16。
2?設計?
創(chuàng)建一個接口,定義抽象方法。用于消費者和提供者之間的約定。
創(chuàng)建一個提供者,該類需要監(jiān)聽消費者的請求,并按照約定返回數(shù)據(jù)。
創(chuàng)建一個消費者,該類需要透明的調(diào)用自己不存在的方法,內(nèi)部需要使用 Netty 請求提供者返回數(shù)據(jù)。
3?實現(xiàn)
1. 創(chuàng)建 maven 項目,導入 Netty 4.1.16。
<groupId>cn.thinkinjava</groupId><artifactId>rpc-demo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.16.Final</version></dependency>2. 項目目錄結(jié)構(gòu)如下:
3. 設計接口
一個簡單的 hello world:
public?interface?HelloService { ?String?hello(String?msg); }4. 提供者相關實現(xiàn)
4.1. 首先實現(xiàn)約定接口,用于返回客戶端數(shù)據(jù):
/** * 實現(xiàn)類 */public?class?HelloServiceImpl?implements?HelloService?{ ?public?String hello(String msg) { ? ?return?msg !=?null?? msg +?" -----> I am fine."?:?"I am fine.";} }4.2. 實現(xiàn) Netty 服務端和自定義 handler
啟動 Netty Server 代碼:
private?static?void?startServer0(String hostName,?int?port)?{ ? ?try?{ServerBootstrap bootstrap =?new?ServerBootstrap();NioEventLoopGroup eventLoopGroup =?new?NioEventLoopGroup();bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new?ChannelInitializer<SocketChannel>() { ? ? ? ? ? ?@Overrideprotected?void?initChannel(SocketChannel ch)?throws?Exception?{ChannelPipeline p = ch.pipeline();p.addLast(new?StringDecoder());p.addLast(new?StringEncoder());p.addLast(new?HelloServerHandler());}});bootstrap.bind(hostName, port).sync();}?catch?(InterruptedException e) {e.printStackTrace();}}上面的代碼中添加了 String類型的編解碼 handler,添加了一個自定義 handler。
自定義 handler 邏輯如下:
/** * 用于處理請求數(shù)據(jù) */public?class?HelloServerHandler?extends?ChannelInboundHandlerAdapter { ?@Overridepublic?void?channelRead(ChannelHandlerContext ctx,?Object?msg) { ? ?// 如何符合約定,則調(diào)用本地方法,返回數(shù)據(jù)if?(msg.toString().startsWith(ClientBootstrap.providerName)) {String?result =?new?HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") +?1));ctx.writeAndFlush(result);}} }這里顯示判斷了是否符合約定(并沒有使用復雜的協(xié)議,只是一個字符串判斷),然后創(chuàng)建一個具體實現(xiàn)類,并調(diào)用方法寫回客戶端。
還需要一個啟動類:
public?class?ServerBootstrap?{??public?static?void?main(String[] args)?{NettyServer.startServer("localhost",?8088);} }好,關于提供者的代碼就寫完了,主要就是創(chuàng)建一個 netty 服務端,實現(xiàn)一個自定義的 handler,自定義 handler 判斷是否符合之間的約定(算是協(xié)議吧),如果符合,就創(chuàng)建一個接口的實現(xiàn)類,并調(diào)用他的方法返回字符串。
5. 消費者相關實現(xiàn)
消費者有一個需要注意的地方,就是調(diào)用需要透明,也就是說,框架使用者不用關心底層的網(wǎng)絡實現(xiàn)。這里我們可以使用 JDK 的動態(tài)代理來實現(xiàn)這個目的。
思路:客戶端調(diào)用代理方法,返回一個實現(xiàn)了 HelloService 接口的代理對象,調(diào)用代理對象的方法,返回結(jié)果。
我們需要在代理中做手腳,當調(diào)用代理方法的時候,我們需要初始化 Netty 客戶端,還需要向服務端請求數(shù)據(jù),并返回數(shù)據(jù)。
5.1. 首先創(chuàng)建代理相關的類
public?class?RpcConsumer?{ ?private?static?ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ?private?static?HelloClientHandler client; ?/*** 創(chuàng)建一個代理對象*/public?Object?createProxy(final Class<?> serviceClass, ? ? ?final String providerName)?{ ? ?return?Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), ? ? ? ?new?Class<?>[]{serviceClass}, (proxy, method, args) -> { ? ? ? ? ?if?(client ==?null) {initClient();} ? ? ? ? ?// 設置參數(shù)client.setPara(providerName + args[0]); ? ? ? ? ?return?executor.submit(client).get();});} ?/*** 初始化客戶端*/private?static?void?initClient()?{client =?new?HelloClientHandler();EventLoopGroup?group?=?new?NioEventLoopGroup();Bootstrap b =?new?Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,?true).handler(new?ChannelInitializer<SocketChannel>() { ? ? ? ? ?@Overridepublic?void?initChannel(SocketChannel ch) throws Exception?{ChannelPipeline p = ch.pipeline();p.addLast(new?StringDecoder());p.addLast(new?StringEncoder());p.addLast(client);}}); ? ?try?{b.connect("localhost",?8088).sync();}?catch?(InterruptedException e) {e.printStackTrace();}} }該類有 2 個方法,創(chuàng)建代理和初始化客戶端。
初始化客戶端邏輯:創(chuàng)建一個 Netty 的客戶端,并連接提供者,并設置一個自定義 handler,和一些 String 類型的編解碼器。
創(chuàng)建代理邏輯:使用 JDK 的動態(tài)代理技術,代理對象中的 invoke 方法實現(xiàn)如下:如果 client 沒有初始化,則初始化 client,這個 client 既是 handler ,也是一個 Callback。將參數(shù)設置進 client ,使用線程池調(diào)用 client 的 call 方法并阻塞等待數(shù)據(jù)返回。
看看 HelloClientHandler 的實現(xiàn):
public?class?HelloClientHandler?extends?ChannelInboundHandlerAdapter?implements?Callable?{ ?private?ChannelHandlerContext context; ?private?String result; ?private?String para; ?@Overridepublic?void?channelActive(ChannelHandlerContext ctx)?{context = ctx;} ?/*** 收到服務端數(shù)據(jù),喚醒等待線程*/@Overridepublic?synchronized?void?channelRead(ChannelHandlerContext ctx, Object msg)?{result = msg.toString();notify();} ?/*** 寫出數(shù)據(jù),開始等待喚醒*/@Overridepublic?synchronized?Object?call()?throws?InterruptedException?{context.writeAndFlush(para);wait(); ? ?return?result;} ?void?setPara(String para)?{ ? ?this.para = para;} }該類緩存了 ChannelHandlerContext,用于下次使用,有兩個屬性:返回結(jié)果和請求參數(shù)。
當成功連接后,緩存 ChannelHandlerContext,當調(diào)用 call 方法的時候,將請求參數(shù)發(fā)送到服務端,等待。當服務端收到并返回數(shù)據(jù)后,調(diào)用 channelRead 方法,將返回值賦值個 result,并喚醒等待在 call 方法上的線程。此時,代理對象返回數(shù)據(jù)。
再看看設計的測試類:
public?class?ClientBootstrap?{ ?public?static?final?String providerName =?"HelloService#hello#"; ?public?static?void?main(String[] args)?throws?InterruptedException?{RpcConsumer consumer =?new?RpcConsumer(); ? ?// 創(chuàng)建一個代理對象HelloService service = (HelloService) consumer.createProxy(HelloService.class, providerName); ? ?for?(; ; ) {Thread.sleep(1000);System.out.println(service.hello("are you ok ?"));}} }測試類首先創(chuàng)建了一個代理對象,然后每隔一秒鐘調(diào)用代理的 hello 方法,并打印服務端返回的結(jié)果。
測試結(jié)果
成功打印。
4?總結(jié)
看了這么久的 Netty 源碼,我們終于實現(xiàn)了一個自己的 Netty 應用,雖然這個應用很簡單,甚至代碼寫的有些粗糙,但功能還是實現(xiàn)了,RPC 的目的就是允許像調(diào)用本地服務一樣調(diào)用遠程服務,需要對使用者透明,于是我們使用了動態(tài)代理。并使用 Netty 的 handler 發(fā)送數(shù)據(jù)和響應數(shù)據(jù),完成了一次簡單的 RPC 調(diào)用。
當然,還是那句話,代碼比較簡單,主要是思路,以及了解 RPC 底層的實現(xiàn)。
?
作者:莫那魯?shù)?/p>
https://www.cnblogs.com/stateis0/p/8960791.html
總結(jié)
以上是生活随笔為你收集整理的教你用 Netty 实现一个简单的 RPC!的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring的核心思想,总结得非常好!
- 下一篇: CPU 到底是怎么认识代码的?涨姿势了!