手写带注册中心的rpc框架(Netty版和Socket版)
之前使用socket實現了一個簡單的RPC框架調用,不了解RPC的實現原理的可以看下那篇文章
手寫實現RPC框架基礎功能?
之前的客戶端里是寫死了服務端的ip和端口號,這里代碼做了個優化,使用zookeeper實現注冊中心,服務端把自己的ip和端口注冊到zk上,客戶端從服務端獲取到對應的地址信息之后,再通過通訊協議連接到服務端實現遠程調用
Zookeeper介紹
ZooKeeper(后面稱為zk)是一種用于分布式應用程序的分布式開源協調服務
zookeeper官網下載地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
解壓后將conf下的zoo_sample.cfg重命名為zoo.cfg,在cfg配置文件上添加對應的log位置
dataDir=D:\opt\zookeeper\data ?
dataLogDir=D:\software\zookeeper\log?
進入bin目錄,分別運行zkServer.cmd和zkCli.cmd即可啟動zk服務端和客戶端
zk的結構和linux系統的文件目錄比較相似,整體是一個樹狀結構,zk允許各分布式應用通過一個共享的命名空間相互聯系,該命名空間類似于一個標準的文件系統:由若干注冊了的數據節點構成(每一個節點稱為ZNode,每個節點不單需要有目錄名稱,還必須要有值,類似于鍵值對) ?圖上沒有畫出value
每個節點擁有唯一的路徑path,客戶端基于唯一的path新增和修改節點數據,zookeeper 收到后會實時通知對該路徑進行監聽的客戶端。
技術選型
序列化協議 :使用了Netty自帶的編解碼器
IOC框架:這里使用注解自動注冊服務到zookeeper,所以引入了Spring來實現自動注入
底層通信框架:使用netty,為了方便不懂Netty的同學,也實現了Socket的版本
注冊中心:使用ZooKeeper 提供服務注冊與發現功能(zk還具有實現負載均衡的功能,不過這里沒有實現)
實現思路
服務端啟動和服務發布
- ? ?定義接口服務(register-rpc-server-api)
- ? ?根據 配置的 zookeeper 信息, 初始化 zookeeper 的連接
- ???獲取 容器中 帶有 RpcService 注解的 bean,將 服務名 和 對應的服務實例?存放在beanMappings
- ???遍歷beanMappings, 向 zookeeper 中注冊 服務節點信息
- ? ?初始化 netty服務端監聽指定端口,處理接收到的RpcRequest信息,然后通過反射調用
客戶端調用
- ?初始化 zookeeper 的連接,用于獲取zk上的服務器信息,并實現監聽
- ?創建代理類,當執行代理對象的方法的時候實際會執行我們實現的InvocationHandler接口的實現方法,通過服務名從zk上 ?獲取服務的ip和端口
- ?使用 netty 發送請求到指定服務器,并獲取返回值
更具體的步驟都寫在代碼注釋里
服務端代碼
public interface IServiceRegister {/*** 注冊服務* @param serviceName 服務名稱* @param serviceIp 服務IP* @param port 端口號*/void register(String serviceName,String serviceIp,int port); }定義注冊接口,并實現根據serviceName注冊指定ip,端口號到zk(這里直接使用基于Curator實現的ZK注冊中心,它幫我們封裝了很多zk客戶端底層的操作)
/*** @Description* @Author: chenpp* @Date: 2020/3/10 18:52*/ @Component public class ServiceRegisterCenter implements IServiceRegister {private CuratorFramework curatorFramework;{ // 通過curator連接zkcuratorFramework = CuratorFrameworkFactory.builder().//定義連接串connectString(ZKConfig.ZK_CONNECTION).// 定義session超時時間sessionTimeoutMs(ZKConfig.SESSION_TIMEOUT).//定義重試策略retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();//啟動curatorFramework.start();}//實現注冊方法,將對應的服務名稱和服務地址注冊到zk上 serviceAddress--- ip : portpublic void register(String serviceName, String serviceIp, int port) {//注冊相應的服務 注意 zk注冊的節點名稱需要以/開頭String servicePath = ZKConfig.REGISTER_NAMESPACE + "/" + serviceName;try {//判斷 /${registerPath}/${serviceName}節點是否存在,不存在則創建對應的持久節點if (curatorFramework.checkExists().forPath(servicePath) == null) {curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(servicePath, "0".getBytes());}//設置節點的value為對應的服務地址信息(臨時節點)String serviceAddress = servicePath + "/" + serviceIp + ":" + port;String zkNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(serviceAddress, "0".getBytes());System.out.println(serviceName + "服務,地址:" + serviceAddress + " 注冊成功:" + zkNode);} catch (Exception e) {e.printStackTrace();}} }核心類,在spring啟動時自動將添加了RpcService注解的服務注冊到zk上,并在完成初始化之后啟動netty服務端監聽
/*** 實現了InitializingBean接口,那么會在對應的AutoRpcServer實例化之后調用afterPropertiesSet方法* 而實現了ApplicationContextAware接口,當spring容器初始化的時候,會自動的將ApplicationContext注入進來,* 使得當前bean可以獲得ApplicationContext上下文*/ @Component public class ZkRpcServer implements ApplicationContextAware, InitializingBean {private static final ExecutorService executor = Executors.newCachedThreadPool();@Autowiredprivate IServiceRegister registerCenter;//key 為對應的接口類名,valeu 為具體的實例private Map<String, Object> beanMappings = new HashMap<String, Object>();//當rpc server端初始化完成后,就會開啟監聽 這里也可以改成Socket調用public void afterPropertiesSet() throws Exception {nettyRpc();//socketRpc();}private void nettyRpc() throws InterruptedException {//定義主線程池EventLoopGroup bossGroup = new NioEventLoopGroup();//定義工作線程池EventLoopGroup workerGroup = new NioEventLoopGroup();//類似于ServerSocketServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup).channel(NioServerSocketChannel.class)//定義工作線程的處理函數.childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel socketChannel) throws Exception {//添加編碼/解碼器 用于轉化對應的傳輸數據 從字節流到目標對象稱之為解碼 反之則為編碼ChannelPipeline pipeline = socketChannel.pipeline();//自定義協議解碼器/*** 入參有5個,分別解釋如下* maxFrameLength:框架的最大長度。如果幀的長度大于此值,則將拋出TooLongFrameException。* lengthFieldOffset:長度字段的偏移量:即對應的長度字段在整個消息數據中得位置* lengthFieldLength:長度字段的長度。如:長度字段是int型表示,那么這個值就是4(long型就是8)* lengthAdjustment:要添加到長度字段值的補償值* initialBytesToStrip:從解碼幀中去除的第一個字節數*/pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定義協議編碼器.addLast(new LengthFieldPrepender(4))//對象參數類型編碼器.addLast(new ObjectEncoder())//對象參數類型解碼器.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(new RpcServerHandler(beanMappings));}})//boss線程池的最大線程數.option(ChannelOption.SO_BACKLOG, 128)//工作線程保持長連接.childOption(ChannelOption.SO_KEEPALIVE, true);//綁定端口啟動netty服務端ChannelFuture future = serverBootstrap.bind(ZKConfig.SERVER_PORT).sync();System.out.println("netty服務端啟動,端口為:" + ZKConfig.SERVER_PORT + "....");future.channel().closeFuture().sync();}private void socketRpc(){ServerSocket serverSocket = null;try {//創建socketserverSocket = new ServerSocket(ZKConfig.SERVER_PORT);while(true){//監聽端口,是個阻塞的方法Socket socket = serverSocket.accept();//處理rpc請求,這里使用線程池來處理executor.submit(new SpringHandleThread(beanMappings,socket));}} catch (Exception e) {e.printStackTrace();}finally {if(serverSocket != null){try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}}public void setApplicationContext(ApplicationContext context) throws BeansException {//從spring上下文中獲取添加了RegisterService的注解的beanString[] beanNames = context.getBeanNamesForAnnotation(RpcService.class);for (String beanName : beanNames) {Object bean = context.getBean(beanName);RpcService annotation = bean.getClass().getAnnotation(RpcService.class);Class interfaceClass = annotation.interfaceClass();String serviceName = annotation.serviceName();//將接口的類名和對應的實例bean的映射關系保存起來beanMappings.put(interfaceClass.getName(), bean);//注冊實例到zkregisterCenter.register(serviceName, IpUtils.getLocalHost(), ZKConfig.SERVER_PORT);}} }netty處理服務端接收到的消息handler,通過反射調用后將返回值返回給客戶端
/*** @Description* @Author: chenpp* @Date: 2020/3/10 20:21*/ public class RpcServerHandler extends ChannelInboundHandlerAdapter {private Map<String,Object> serviceMap;public RpcServerHandler(Map<String,Object> serviceMap){this.serviceMap = serviceMap;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;String className = rpcRequest.getClassName();Object result = null;ChannelFuture future = null;try {Class<?> clazz = Class.forName(className);//這里無法實例化,因為傳入的是接口類型,接口無法實例化,所以需要從注冊的serviceMap獲取到Object[] parameters = rpcRequest.getArgs();Object serviceInstance = serviceMap.get(clazz.getName());if (parameters == null) {Method method = clazz.getMethod(rpcRequest.getMethodName());result = method.invoke(serviceInstance);} else {Class[] types = new Class[parameters.length];for (int i = 0; i < parameters.length; i++) {types[i] = parameters[i].getClass();}Method method = clazz.getMethod(rpcRequest.getMethodName(), types);result = method.invoke(serviceInstance, parameters);}if (result == null) {// 如果方法結果為空,將一個默認的OK結果給客戶端future = ctx.writeAndFlush(ZKConfig.DEFAULT_MSG);} else {// 將返回值寫給客戶端寫給客戶端結果future = ctx.writeAndFlush(result);}// 釋放通道,不是關閉連接future.addListener(ChannelFutureListener.CLOSE);} catch (Exception e) {e.printStackTrace();}finally {if( future != null) {future.addListener(ChannelFutureListener.CLOSE);}}} }啟動服務端,可以看到zk注冊成功的日志和對應的netty服務端啟動日志
客戶端代碼
實現服務發送的接口,通過serviceName獲取對應的服務地址,因為這里只做了單機,所以只返回一個服務地址
@Component public class ServiceDiscoveryImpl implements IServiceDiscovery {private Map<String,String> serviceMap = new HashMap<String,String>();private List<String> serviceAddresses;private CuratorFramework curatorFramework;{ // 通過curator連接zkcuratorFramework = CuratorFrameworkFactory.builder().connectString(ZKConfig.ZK_CONNECTION).sessionTimeoutMs(ZKConfig.SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();//啟動curatorFramework.start();}public String discover(String serviceName) {//根據serviceName獲取對應的pathString nodePath = ZKConfig.REGISTER_NAMESPACE + "/" + serviceName;try {//這里不考慮集群,一個服務只發布一個實例serviceAddresses = curatorFramework.getChildren().forPath(nodePath);addServiceAddress(serviceAddresses,serviceName);//動態發現服務節點變化,需要注冊監聽registerWatcher(nodePath,serviceName);System.out.println("獲取服務:"+serviceName +"的服務地址:"+serviceMap.get(serviceName));} catch (Exception e) {throw new RuntimeException("服務發現獲取子節點異常!", e);}return serviceMap.get(serviceName);}/*** 監聽節點變化,更新serviceAddresses** @param path*/private void registerWatcher(final String path,final String serviceName) {PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {serviceAddresses = curatorFramework.getChildren().forPath(path);addServiceAddress(serviceAddresses,serviceName);System.out.println("監聽到節點:" + path + "變化為:" + serviceAddresses + "....");}});try {pathChildrenCache.start();} catch (Exception e) {throw new RuntimeException("監聽節點變化異常!", e);}}private void addServiceAddress(List<String> serviceAddresses,String serviceName){if (!CollectionUtils.isEmpty(serviceAddresses)) {String serviceAddress = serviceAddresses.get(0);serviceMap.put(serviceName,serviceAddress);}}}客戶端核心類,實現了InvocationHandler接口,用于把創建代理類,invoke方法里就是創建netty客戶端并發送請求給從注冊中心獲取到的服務端
public class RpcInvocationHandler implements InvocationHandler {private String serviceName;private IServiceDiscovery serviceDiscovery;public RpcInvocationHandler(String serviceName, IServiceDiscovery serviceDiscovery) {this.serviceName = serviceName;this.serviceDiscovery = serviceDiscovery;}/*** 增強的InvocationHandler,接口調用方法的時候實際是調用socket進行傳輸*/public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//將遠程調用需要的接口類、方法名、參數信息封裝成RPCRequestRpcRequest rpcRequest = new RpcRequest();rpcRequest.setArgs(args);rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());return handleNetty(rpcRequest);//return handleSocket(rpcRequest);}private Object handleNetty(RpcRequest rpcRequest){//創建客戶端線程池EventLoopGroup group = null;final RpcClientHandler handler = new RpcClientHandler();try{group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class);//添加客戶端的處理器bootstrap.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()/** 入參有5個,如下maxFrameLength:框架的最大長度。如果幀的長度大于此值,則將拋出TooLongFrameException。lengthFieldOffset:長度字段的偏移量:即對應的長度字段在整個消息數據中的位置lengthFieldLength:長度字段的長度:如:長度字段是int型表示,那么這個值就是4(long型就是8)lengthAdjustment:要添加到長度字段值的補償值initialBytesToStrip:從解碼幀中去除的第一個字節數*///自定義協議解碼器.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))//自定義協議編碼器.addLast("frameEncoder", new LengthFieldPrepender(4))//對象參數類型編碼器.addLast("encoder", new ObjectEncoder())// 對象參數類型解碼器.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))).addLast(handler);}});//通過service從zk獲取服務端地址String address = serviceDiscovery.discover(serviceName);//綁定端口啟動netty客戶端String[] add = address.split(":");ChannelFuture future = bootstrap.connect(add[0], Integer.parseInt(add[1])).sync();//通過Netty發送 RPCRequest給服務端future.channel().writeAndFlush(rpcRequest).sync();future.channel().closeFuture().sync();}catch (Exception e){e.printStackTrace();}finally {group.shutdownGracefully();}//返回客戶端獲取的服務端輸出return handler.getResponse();}private Object handleSocket(RpcRequest rpcRequest) throws IOException, ClassNotFoundException {String address = serviceDiscovery.discover(serviceName);//綁定端口啟動netty客戶端String[] add = address.split(":");//通過socket發送RPCRequest給服務端并獲取結果返回Socket socket= new Socket(add[0],Integer.parseInt(add[1]));ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(rpcRequest);ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());Object result = ois.readObject();return result;} }客戶端接收到服務端返回結果的處理類
public class RpcClientHandler extends ChannelInboundHandlerAdapter {private Object response;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response = msg;}public Object getResponse(){return response;} }客戶端執行結果:
至此,簡易版的帶注冊中心的基本rpc框架就完成了
源碼地址:
https://github.com/dearfulan/cp-rpc/tree/master/register-rpc-client
https://github.com/dearfulan/cp-rpc/tree/master/register-rpc-server
參考:https://blog.csdn.net/dongguabai/article/details/83625362
?
總結
以上是生活随笔為你收集整理的手写带注册中心的rpc框架(Netty版和Socket版)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IntelliJ IDEA不好用?那是因
- 下一篇: Netty学习笔记(二)Netty服务端