日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

【手撸RPC框架】SpringBoot+Netty4实现RPC框架

發(fā)布時(shí)間:2025/3/19 javascript 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【手撸RPC框架】SpringBoot+Netty4实现RPC框架 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

【手?jǐn)]RPC框架】SpringBoot+Netty4實(shí)現(xiàn)RPC框架

線程模型1:傳統(tǒng)阻塞 I/O 服務(wù)模型

模型特點(diǎn):

采用阻塞IO模式獲取輸入的數(shù)據(jù)
每個(gè)鏈接都需要獨(dú)立的線程完成數(shù)據(jù)的輸入,業(yè)務(wù)處理、數(shù)據(jù)返回。
問題分析:

當(dāng)并發(fā)數(shù)很大,就會創(chuàng)建大量的線程,占用很大系統(tǒng)資源
連接創(chuàng)建后,如果當(dāng)前線程暫時(shí)沒有數(shù)據(jù)可讀,該線程會阻塞在read操作,造成線程資源浪費(fèi)。
線程模型2:Reactor 模式
針對傳統(tǒng)阻塞I/O服務(wù)模型的2個(gè)缺點(diǎn),解決方案如下:

基于 I/O 復(fù)用模型:多個(gè)連接共用一個(gè)阻塞對象,應(yīng)用程序只需要在一個(gè)阻塞對象等待,無需阻塞等待所有連接。當(dāng)某個(gè)連接有新的數(shù)據(jù)可以處理時(shí),操作系統(tǒng)通知應(yīng)用程序,線程從阻塞狀態(tài)返回,開始進(jìn)行業(yè)務(wù)處理。Reactor對應(yīng)的叫法: 1. 反應(yīng)器模式 2. 分發(fā)者模式(Dispatcher) 3. 通知者模式(notifier)
基于線程池復(fù)用線程資源:不必再為每個(gè)連接創(chuàng)建線程,將連接完成后的業(yè)務(wù)處理任務(wù)分配給線程進(jìn)行處理,一個(gè)線程可以處理多個(gè)連接的業(yè)務(wù)。

單 Reactor 單線程

模型分析

優(yōu)點(diǎn):模型簡單,沒有多線程、進(jìn)程通信、競爭的問題,全部都在一個(gè)線程中完成
缺點(diǎn):性能問題,只有一個(gè)線程,無法完全發(fā)揮多核 CPU 的性能。Handler 在處理某個(gè)連接上的業(yè)務(wù)時(shí),整個(gè)進(jìn)程無法處理其他連接事件,很容易導(dǎo)致性能瓶頸

缺點(diǎn):可靠性問題,線程意外終止,或者進(jìn)入死循環(huán),會導(dǎo)致整個(gè)系統(tǒng)通信模塊不可用,不能接收和處理外部消息,造成節(jié)點(diǎn)故障
使用場景:客戶端的數(shù)量有限,業(yè)務(wù)處理非常快速,比如 Redis在業(yè)務(wù)處理的時(shí)間復(fù)雜度 O(1) 的情況
單 Reactor 多線程

模型分析

優(yōu)點(diǎn):可以充分的利用多核cpu 的處理能力
缺點(diǎn):多線程數(shù)據(jù)共享和訪問比較復(fù)雜, reactor 處理所有的事件的監(jiān)聽和響應(yīng),在單線程運(yùn)行, 在高并發(fā)場景容易出現(xiàn)性能瓶頸.
主從 Reactor 多線程

模型分析

優(yōu)點(diǎn):父線程與子線程的數(shù)據(jù)交互簡單職責(zé)明確,父線程只需要接收新連接,子線程完成后續(xù)的業(yè)務(wù)處理。
優(yōu)點(diǎn):父線程與子線程的數(shù)據(jù)交互簡單,Reactor 主線程只需要把新連接傳給子線程,子線程無需返回?cái)?shù)據(jù)
缺點(diǎn):編程復(fù)雜度較高
結(jié)合實(shí)例:這種模型在許多項(xiàng)目中廣泛使用,包括 Nginx 主從 Reactor 多進(jìn)程模型,Memcached 主從多線程,Netty 主從多線程模型的支持

先實(shí)現(xiàn)簡單的Netty通信

服務(wù)端示例

public static void main(String[] args) {//創(chuàng)建連接線程組,線程數(shù)為1。只負(fù)責(zé)處理連接請求NioEventLoopGroup boss = new NioEventLoopGroup(1);//創(chuàng)建工作線程組,線程數(shù)默認(rèn)為cpu核數(shù)*2。處理與客戶端的業(yè)務(wù)處理NioEventLoopGroup worker = new NioEventLoopGroup();//創(chuàng)建Server端的啟動對象ServerBootstrap serverBootstrap = new ServerBootstrap();//配置線程組serverBootstrap.group(boss, worker)//使用 NioServerSocketChannel 作為服務(wù)器的通道實(shí)現(xiàn).channel(NioServerSocketChannel.class)//給worker線程組初始化處理器.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()//添加字符串的編解碼器.addLast(new StringDecoder()).addLast(new StringEncoder())//添加對象的編解碼器,ClassResolvers.weakCachingConcurrentResolver設(shè)置弱引用WeakReferenceMap緩存類加載器,防止內(nèi)存溢出.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))).addLast(new ObjectEncoder())//添加自定義的業(yè)務(wù)處理器.addLast(new SimpleChannelInboundHandler<Object>() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客戶端連接啦。。。客戶端地址:{}", ctx.channel().remoteAddress());}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {log.info("服務(wù)端接收到的數(shù)據(jù):{}", o.toString());//價(jià)值1個(gè)億的AI代碼String str = o.toString();str = str.replace("嗎", "");str = str.replace("?", "!");str = str.replace("? ", "! ");channelHandlerContext.writeAndFlush(str);}});}});//啟動并且監(jiān)聽ChannelFuture channelFuture = serverBootstrap.bind(8888).syncUninterruptibly();//監(jiān)聽關(guān)閉通道channelFuture.channel().closeFuture(); }

客戶端示例

public static void main(String[] args) {//設(shè)置客戶端工作線程NioEventLoopGroup worker = new NioEventLoopGroup();//創(chuàng)建客戶端啟動對象Bootstrap bootstrap = new Bootstrap();bootstrap.group(worker)//通道連接者.channel(NioSocketChannel.class)//給worker線程組初始化處理器.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()//添加字符串的編解碼器.addLast(new StringDecoder()).addLast(new StringEncoder())//添加對象的編解碼器,ClassResolvers.weakCachingConcurrentResolver設(shè)置弱引用WeakReferenceMap緩存類加載器,防止內(nèi)存溢出.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))).addLast(new ObjectEncoder())//添加自定義的業(yè)務(wù)處理器.addLast(new SimpleChannelInboundHandler<Object>() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush("哈哈哈");}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {log.info("客戶端接收到的數(shù)據(jù):{}", o.toString());}});}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).syncUninterruptibly();//客戶端需要輸入信息,創(chuàng)建一個(gè)掃描器Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String msg = scanner.nextLine();//通過channel發(fā)送到服務(wù)器端channel.writeAndFlush(msg + "\r\n");}channelFuture.channel().closeFuture(); }

快啟動試試看把,不過需要注意的是,得先啟動服務(wù)端哦~

SpringBoot + Netty4實(shí)現(xiàn)rpc框架
好了,接下來就讓我們進(jìn)入正題,讓我們利用我們所學(xué)的知識去實(shí)現(xiàn)自己一個(gè)簡單的rpc框架吧

簡單說下RPC(Remote Procedure Call)遠(yuǎn)程過程調(diào)用,簡單的理解是一個(gè)節(jié)點(diǎn)請求另一個(gè)節(jié)點(diǎn)提供的服務(wù)。讓兩個(gè)服務(wù)之間調(diào)用就像調(diào)用本地方法一樣。

RPC時(shí)序圖:

RPC流程:

【客戶端】發(fā)起調(diào)用 【客戶端】數(shù)據(jù)編碼 【客戶端】發(fā)送編碼后的數(shù)據(jù)到服務(wù)端 【服務(wù)端】接收客戶端發(fā)送的數(shù)據(jù) 【服務(wù)端】對數(shù)據(jù)進(jìn)行解碼 【服務(wù)端】處理消息業(yè)務(wù)并返回結(jié)果值 【服務(wù)端】對結(jié)果值編碼 【服務(wù)端】將編碼后的結(jié)果值回傳給客戶端 【客戶端】接收結(jié)果值 【客戶端】解碼結(jié)果值 【客戶端】處理返回?cái)?shù)據(jù)業(yè)務(wù)

引入依賴

<dependencies><!-- SpringBoot依賴 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring容器上下文 --><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId></dependency><!-- Spring配置 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- Netty4 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.58.Final</version></dependency><!-- 工具 --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.5.8</version></dependency> </dependencies>

編寫服務(wù)端
自定義消息協(xié)議:

/*** @author zc* @date 2021/3/1 17:43*/ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class RpcMessage implements Serializable {private static final long serialVersionUID = 430507739718447406L;/*** interface接口名*/private String name;/*** 方法名*/private String methodName;/*** 參數(shù)類型*/private Class<?>[] parTypes;/*** 參數(shù)*/private Object[] pars;/*** 結(jié)果值*/private Object result; }

自定義Rpc注解:

/*** @author zc* @date 2021/3/2 15:36*/ @Target(value = {ElementType.TYPE, ElementType.FIELD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RpcServer { }

定義ServerHandle業(yè)務(wù)處理器:

/*** Netty Server端Handle處理類,消息體RpcMessage* 實(shí)現(xiàn)ApplicationContextAware接口:該接口可以加載獲取到所有的 spring bean。* 實(shí)現(xiàn)了這個(gè)接口的bean,當(dāng)spring容器初始化的時(shí)候,會自動的將ApplicationContext注入進(jìn)來** @author ZC* @date 2021/3/1 22:15*/ @Slf4j @ChannelHandler.Sharable public class ServerHandle extends SimpleChannelInboundHandler<RpcMessage> implements ApplicationContextAware {private Map<String, Object> serviceMap;/*** 在類被Spring容器加載時(shí)會自動執(zhí)行setApplicationAware** @param applicationContext Spring上下文* @throws BeansException 異常信息*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//從Spring容器中獲取到所有擁有@RpcServer注解的Beans集合,Map<Name(對象類型,對象全路徑名),實(shí)例對象>Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcServer.class);log.info("被@RpcServer注解加載的Bean: {}", beansWithAnnotation);if (beansWithAnnotation.size() > 0) {Map<String, Object> map = new ConcurrentHashMap<>(16);for (Object o : beansWithAnnotation.values()) {//獲取該實(shí)例對象實(shí)現(xiàn)的接口ClassClass<?> anInterface = o.getClass().getInterfaces()[0];//獲取該接口類名,作為Key,實(shí)例對象作為Valuemap.put(anInterface.getName(), o);}//使用變量接住mapserviceMap = map;}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客戶端連接了: {}", ctx.channel().remoteAddress());super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("異常信息");cause.printStackTrace();super.exceptionCaught(ctx, cause);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {log.info("客戶端發(fā)送的消息:{}", rpcMessage);//從Map中獲取實(shí)例對象Object service = serviceMap.get(rpcMessage.getName());//獲取調(diào)用方法Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParTypes());method.setAccessible(true);//反射調(diào)用實(shí)例對象方法,獲取返回值Object result = method.invoke(service, rpcMessage.getPars());rpcMessage.setResult(JSONUtil.toJsonStr(result));log.info("回給客戶端的消息:{}", rpcMessage);//Netty服務(wù)端將數(shù)據(jù)寫會Channel并發(fā)送給客戶端,同時(shí)添加一個(gè)監(jiān)聽器,當(dāng)所有數(shù)據(jù)包發(fā)送完成后,關(guān)閉通道channelHandlerContext.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE);} }

定義NettyServer端:

/*** Netty服務(wù)端** @author zc* @date 2021/2/24 13:23**/ @Slf4j public class NettyServer {/*** server端處理器*/private final ServerHandle serverHandle;/*** 服務(wù)端通道*/private Channel channel;/*** 構(gòu)造器** @param serverHandle server處理器*/public NettyServer(ServerHandle serverHandle) {this.serverHandle = serverHandle;}/*** 啟動** @param port 啟動端口*/public void start(int port) {EventLoopGroup boss = new NioEventLoopGroup(1);EventLoopGroup worker = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))).addLast(new ObjectEncoder()).addLast(serverHandle);}});final ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();log.info("服務(wù)端啟動-端口: {}", port);channel = channelFuture.channel();channel.closeFuture().syncUninterruptibly();} catch (Exception e) {boss.shutdownGracefully();worker.shutdownGracefully();}}/*** 關(guān)閉當(dāng)前通道*/public void stop() {channel.close();} }

自定義rpc配置屬性類:

/*** @author zc* @date 2021/3/4 23:38*/ @Component @ConfigurationProperties(prefix = "netty") @Data public class NettyRpcProperties {private int serverPort; }`

創(chuàng)建Server端啟動配置類:

/*** NettyServer服務(wù)端配置類** @author zc* @date 2021/3/1 18:24*/ @Slf4j @Configuration @EnableConfigurationProperties(NettyRpcProperties.class) public class ServerBeanConfig {private final NettyRpcProperties nettyRpcProperties;@Autowiredpublic ServerBeanConfig(NettyRpcProperties nettyRpcProperties) {this.nettyRpcProperties = nettyRpcProperties;}/*** 配置ServerHandle** @return ServerHandle處理類*/@Beanpublic ServerHandle serverHandle() {return new ServerHandle();}/*** 配置NettyServer** @param handle ServerHandle處理類* @return NettyServer*/@Beanpublic NettyServer nettyServer(ServerHandle handle) {NettyServer nettyServer = new NettyServer(handle); // nettyServer.start(nettyRpcProperties.getServerPort());return nettyServer;}/*** 解決SpringBoot端口無法監(jiān)聽問題*/@Componentstatic class NettyServerStart implements ApplicationRunner {private final NettyServer nettyServer;private final NettyRpcProperties properties;@AutowiredNettyServerStart(NettyServer nettyServer, NettyRpcProperties properties) {this.nettyServer = nettyServer;this.properties = properties;}@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("===============ApplicationRunner");if (nettyServer != null) {nettyServer.start(properties.getServerPort());}}} }

注入Spring容器

此時(shí)有兩種方式讓該配置自動注入Spring容器生效:

自動注入

在resource目錄下創(chuàng)建META-INF目錄,創(chuàng)建spring.factories文件在該文件里寫上org.springframework.boot.autoconfigure.EnableAutoConfiguration=${包路徑:xxx.xxx.xxx}.${配置類:ServerBeanConfig}配置好之后,在SpringBoot啟動時(shí)會自動加載該配置類。

通過注解注入

/*** 自定義SpringBoot啟動注解* 注入ServerBeanConfig配置類** @author ZC* @date 2021/3/1 23:48*/ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ImportAutoConfiguration({ServerBeanConfig.class}) public @interface EnableNettyServer { }

編寫客戶端

創(chuàng)建客戶端處理器`ClientHandle

/*** @author zc* @date 2021/3/2 15:19*/ @Slf4j @ChannelHandler.Sharable public class ClientHandle extends SimpleChannelInboundHandler<RpcMessage> {/*** 定義消息Map,將連接通道Channel作為key,消息返回值作為value*/private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap;public ClientHandle(ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap) {this.rpcMessageConcurrentMap = rpcMessageConcurrentMap;}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {log.info("客戶端收到服務(wù)端消息:{}", rpcMessage);rpcMessageConcurrentMap.put(channelHandlerContext.channel(), rpcMessage);} }

創(chuàng)建客戶端啟動類NettyClient

/*** @author ZC* @date 2021/3/1 23:30*/ @Slf4j public class NettyClient {private Channel channel;/*** 存放請求編號與響應(yīng)對象的映射關(guān)系*/private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap = new ConcurrentHashMap<>();public RpcMessage send(int port, final RpcMessage rpcMessage) {//客戶端需要一個(gè)事件循環(huán)組EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))).addLast(new ObjectEncoder()).addLast(new ClientHandle(rpcMessageConcurrentMap));}});final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();log.info("連接服務(wù)端成功: " + channelFuture.channel().remoteAddress());channel = channelFuture.channel();channel.writeAndFlush(rpcMessage);log.info("發(fā)送數(shù)據(jù)成功:{}", rpcMessage);channel.closeFuture().syncUninterruptibly();return rpcMessageConcurrentMap.get(channel);} catch (Exception e) {log.error("client exception", e);return null;} finally {group.shutdownGracefully();//移除請求編號和響應(yīng)對象直接的映射關(guān)系rpcMessageConcurrentMap.remove(channel);}}public void stop() {channel.close();} }

定義Netty客戶端Bean后置處理器

/*** Netty客戶端Bean后置處理器* 實(shí)現(xiàn)Spring后置處理器接口:BeanPostProcessor* 在Bean對象在實(shí)例化和依賴注入完畢后,在顯示調(diào)用初始化方法的前后添加我們自己的邏輯。注意是Bean實(shí)例化完畢后及依賴注入完成后觸發(fā)的** @author ZC* @date 2021/3/2 23:00*/ @Slf4j public class NettyClientBeanPostProcessor implements BeanPostProcessor {private final NettyClient nettyClient;public NettyClientBeanPostProcessor(NettyClient nettyClient) {this.nettyClient = nettyClient;}/*** 實(shí)例化、依賴注入完畢,在調(diào)用顯示的初始化之前完成一些定制的初始化任務(wù)* 注意:方法返回值不能為null* 如果返回null那么在后續(xù)初始化方法將報(bào)空指針異常或者通過getBean()方法獲取不到Bean實(shí)例對象* 因?yàn)楹笾锰幚砥鲝腟pring IoC容器中取出bean實(shí)例對象沒有再次放回IoC容器中*/@Overridepublic Object postProcessBeforeInitialization(Object bean, @Nullable String beanName) throws BeansException {//獲取實(shí)例ClassClass<?> beanClass = bean.getClass();do {//獲取該類所有字段Field[] fields = beanClass.getDeclaredFields();for (Field field : fields) {//判斷該字段是否擁有@RpcServerif (field.getAnnotation(RpcServer.class) != null) {field.setAccessible(true);try {//通過JDK動態(tài)代理獲取該類的代理對象Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, new ClientInvocationHandle(nettyClient));//將代理類注入該字段field.set(bean, o);log.info("創(chuàng)建代理類 ===>>> {}", beanName);} catch (IllegalAccessException e) {log.error(e.getMessage());}}}} while ((beanClass = beanClass.getSuperclass()) != null);return bean;}/*** 實(shí)例化、依賴注入、初始化完畢時(shí)執(zhí)行* 注意:方法返回值不能為null* 如果返回null那么在后續(xù)初始化方法將報(bào)空指針異常或者通過getBean()方法獲取不到Bean實(shí)例對象* 因?yàn)楹笾锰幚砥鲝腟pring IoC容器中取出bean實(shí)例對象沒有再次放回IoC容器中*/@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 可以根據(jù)beanName不同執(zhí)行不同的處理操作return bean;}/*** JDK動態(tài)代理處理器*/static class ClientInvocationHandle implements InvocationHandler {private final NettyClient nettyClient;public ClientInvocationHandle(NettyClient nettyClient) {this.nettyClient = nettyClient;}/*** 代理方法調(diào)用** @param proxy 代理類* @param method 方法* @param args 參數(shù)* @return 返回值*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) {//組裝Netty參數(shù)RpcMessage rpcMessage = RpcMessage.builder().name(method.getDeclaringClass().getName()).methodName(method.getName()).parTypes(method.getParameterTypes()).pars(args).build();//調(diào)用Netty,發(fā)送數(shù)據(jù)RpcMessage send = nettyClient.send(1111, rpcMessage);log.info("接收到服務(wù)端數(shù)據(jù):{}, 返回結(jié)果值 ====》》》》{}", send, send.getResult());return send.getResult();}} }

定義客戶端配置類

/*** @author zc* @date 2021/3/1 18:24*/ @Configuration public class ClientBeanConfig {@Beanpublic NettyClient nettyClient() {return new NettyClient();}@Beanpublic NettyClientBeanPostProcessor nettyClientBeanPostProcessor(NettyClient nettyClient) {return new NettyClientBeanPostProcessor(nettyClient);} }

最后和服務(wù)端一樣,注入Spring容器

/*** @author ZC* @date 2021/3/1 23:48*/ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ImportAutoConfiguration({ClientBeanConfig.class}) public @interface EnableNettyClient { }

至此我們的SpringBoot + Netty4的就已經(jīng)實(shí)現(xiàn)了最最簡單的rpc框架模式了;然后我們就可以引用我們自己的rpc依賴了。

最后再執(zhí)行一下maven命令

mvn install

netty-rpc-examples例子
接口服務(wù)
pom里啥也沒有。。。

定義一個(gè)接口

/*** @author zc* @date 2021/3/1 17:55*/ public interface Test1Api {void test();void test(int id, String name);String testStr(int id);Object testObj(); }

rpc-server服務(wù)端

正常的SpringBoot工程

引入pom

<!-- 自定義rpc依賴 --> <dependency><groupId>cn.happyloves.rpc</groupId><artifactId>netty-rpc</artifactId><version>0.0.1</version> </dependency> <!-- 接口依賴 --> <dependency><groupId>cn.happyloves.netty.rpc.examples.api</groupId><artifactId>rpc-api</artifactId><version>0.0.1-SNAPSHOT</version> </dependency>

配置屬性

# 應(yīng)用名稱 spring.application.name=rpc-server # 應(yīng)用服務(wù) WEB 訪問端口 server.port=8080 netty.server-port=1111

創(chuàng)建一個(gè)實(shí)體類

/*** @author ZC* @date 2021/3/2 23:59*/ @Data public class Account implements Serializable {private static final long serialVersionUID = 667178018106218163L;private Integer id;private String name;private String username;private String password; }

創(chuàng)建Server實(shí)現(xiàn)Test1Api接口

/*** @author ZC* @date 2021/3/2 23:59*/ @Slf4j @Service @RpcServer public class TestServiceImpl implements Test1Api {@Overridepublic void test() {log.info("111111111");}@Overridepublic void test(int id, String name) {log.info("222222222,{},{}", id, name);}@Overridepublic String testStr(int id) {log.info("33333333333333333,{}", id);return "33333333333333333 " + id;}@Overridepublic Object testObj() {log.info("444444444444444444");Account account = new Account();account.setName("張三");return account;} }

最后在SpringBoot啟動類上加上@EnableNettyServer

/*** @author ZC* @date 2021/3/2 23:55*/ @EnableNettyServer @SpringBootApplication public class RpcServerApplication {public static void main(String[] args) {SpringApplication.run(RpcServerApplication.class, args);} }

rpc-server客戶端
引入pom依賴

<dependency><groupId>cn.happyloves.rpc</groupId><artifactId>netty-rpc</artifactId><version>0.0.1</version> </dependency> <dependency><groupId>cn.happyloves.netty.rpc.examples.api</groupId><artifactId>rpc-api</artifactId><version>0.0.1-SNAPSHOT</version> </dependency>

創(chuàng)建Controller

/*** @author ZC* @date 2021/3/3 0:04*/ @RestController public class ClientController {@RpcServerprivate Test1Api testServiceImpl;@GetMapping("/test1")public void test() {testServiceImpl.test();}@GetMapping("/test2")public void test(int id, String name) {testServiceImpl.test(id, name);}@GetMapping("/test3")public String testStr(int id) {return testServiceImpl.testStr(id);}@GetMapping("/test4")public Object testObj() {return testServiceImpl.testObj();} }

最后在啟動類上加上注解@EnableNettyClient

@EnableNettyClient @SpringBootApplication public class RpcClientApplication {public static void main(String[] args) {SpringApplication.run(RpcClientApplication.class, args);} }

先運(yùn)行服務(wù)端,在運(yùn)行客戶端,然后在調(diào)用客戶端接口就可以看到服務(wù)端能夠接收到客戶端發(fā)來的消息,然后服務(wù)端處理并返回,客戶端接收并返回。。。

至此,一個(gè)小demo就完成了。

當(dāng)然啦,后續(xù)還有很多需求需要處理的,比方說當(dāng)前demo中客戶端每次通信都需要創(chuàng)建一個(gè)實(shí)例去連接、服務(wù)的注冊、客戶端和服務(wù)端是同一個(gè)應(yīng)用等等,這個(gè)后面再慢慢完善吧

繼續(xù)支持Remi醬呀~~

文章來源:https://www.jianshu.com/p/f94a0d971b7b

總結(jié)

以上是生活随笔為你收集整理的【手撸RPC框架】SpringBoot+Netty4实现RPC框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。