你真的明白RPC 吗?一起来探究 RPC 的实质
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
你真的明白R(shí)PC 嗎?一起來探究 RPC 的實(shí)質(zhì)
? 不論你是科班出身還是半路轉(zhuǎn)行,這么優(yōu)秀的你一定上過小學(xué)語文,那么對(duì)擴(kuò)句和縮句你一定不陌生。縮句就是去除各種修飾提煉出一句話的核心,而不失基本的語義。下面來實(shí)現(xiàn)一個(gè)簡(jiǎn)易的 rpc 程序探究其實(shí)質(zhì),進(jìn)而去理解復(fù)雜的 rpc 框架。所謂復(fù)雜的框架就是在簡(jiǎn)單的過程中加入了一些設(shè)計(jì)裝飾將rpc的功能豐富起來,如 dubbo 的 filter、router、loadblance、集群容錯(cuò)、多種 Invoker 、通訊協(xié)議等等,這就是一個(gè)擴(kuò)句的過程。
? RPC是指遠(yuǎn)程過程調(diào)用,也就是說兩臺(tái)服務(wù)器A、B,一個(gè)應(yīng)用部署在A服務(wù)器上,想要調(diào)用B服務(wù)器上應(yīng)用提供的函數(shù)/方法,由于不在一個(gè)內(nèi)存空間,不能直接調(diào)用,需要通過網(wǎng)絡(luò)去發(fā)起一次調(diào)用請(qǐng)求獲取結(jié)果。
? 無論是市面上主流的 rpc 框架還是小眾的 rpc 框架都實(shí)現(xiàn)了上述 rpc的語義。【服務(wù)治理型:dubbo、dubbox、motan;多語言型:grpc、thrift、avro、protocol buffers】
打一波廣告:【博主最近在寫一個(gè) java 實(shí)現(xiàn)的 rpc 框架 bridge 歡迎關(guān)注,考慮Mesh 化】
一、原理
首先用一幅圖來簡(jiǎn)單描述一下 rpc 的調(diào)用過程,從 dubbo 官網(wǎng)拿來的,不算是最簡(jiǎn)單的圖,但是也非常簡(jiǎn)單了,去掉上面的 Registry 和下面的 Monitor 剩下的就是最簡(jiǎn)單的 rpc 調(diào)用,說白了就是一個(gè)網(wǎng)絡(luò)請(qǐng)求。
過程描述:
OK,原理就是這么簡(jiǎn)單,接下來根據(jù)上面的描述逐步實(shí)現(xiàn)。
二、動(dòng)手實(shí)踐
下面基于 springboot 來實(shí)現(xiàn)上述的過程。
2.1 構(gòu)建模塊
搭建工程和子模塊,工程結(jié)構(gòu)如下:
2.2 實(shí)現(xiàn)服務(wù)端
看下服務(wù)端的內(nèi)容,貼圖
把接口定義在 api 模塊,consumer 和 provider 模塊都要引用到,接口HelloService代碼如下
package com.glmapper.simple.api;/*** service interface** @author: Jerry*/ public interface HelloService {/*** service function** @param name* @return*/String hello(String name); }然后在 provider 模塊實(shí)現(xiàn)接口,用自定注解 @SimpleProvider 標(biāo)識(shí),先看下注解內(nèi)容
package com.glmapper.simple.provider.annotation;/*** 自定義服務(wù)注解** @author Jerry*/ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) // 標(biāo)明可被 Spring 掃描 @Component public @interface SimpleProvider {Class<?> value(); }注解使用了@Component標(biāo)識(shí),所以可被 spring 掃描到,接下來看實(shí)現(xiàn)類HelloServiceImpl:
package com.glmapper.simple.provider.service;/*** service implement class** @author: Jerry*/ @SimpleProvider(HelloService.class) public class HelloServiceImpl implements HelloService {/*** service function** @param name* @return*/@Overridepublic String hello(String name) {return "Hello! " + name;} }在定義一個(gè)服務(wù)配置的類SimpleProviderProperties,方便通過 application.yml 文件配置,
package com.glmapper.simple.provider.property;/*** provider properties** @author: Jerry*/ public class SimpleProviderProperties {/*** 暴露服務(wù)的端口*/private Integer port;public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;} }到這里基礎(chǔ)的類文件就已經(jīng)結(jié)束了,下面開始服務(wù)初始化,入口 ProviderInitializer
package com.glmapper.simple.provider;/*** 啟動(dòng)并注冊(cè)服務(wù)** @author Jerry*/ public class ProviderInitializer implements ApplicationContextAware, InitializingBean {private static final Logger LOGGER = LoggerFactory.getLogger(ProviderInitializer.class);private SimpleProviderProperties providerProperties;/*** service registry*/private ServiceRegistry serviceRegistry;/*** store interface and service implement mapping*/private Map<String, Object> handlerMap = new HashMap<>();public ProviderInitializer(SimpleProviderProperties providerProperties, ServiceRegistry serviceRegistry) {this.providerProperties = providerProperties;this.serviceRegistry = serviceRegistry;}@Overridepublic void setApplicationContext(ApplicationContext ctx) throws BeansException {// 獲取被 SimpleProvider 注解的 BeanMap<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(SimpleProvider.class);if (MapUtils.isNotEmpty(serviceBeanMap)) {for (Object serviceBean : serviceBeanMap.values()) {String interfaceName = serviceBean.getClass().getAnnotation(SimpleProvider.class).value().getName();handlerMap.put(interfaceName, serviceBean);}}}@Overridepublic void afterPropertiesSet() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();ChannelHandler channelHandler = new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new SimpleDecoder(SimpleRequest.class)).addLast(new SimpleEncoder(SimpleResponse.class)).addLast(new SimpleHandler(handlerMap));}};bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(channelHandler).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);String host = getLocalHost();if (null == host) {LOGGER.error("can't get service address,because address is null");throw new SimpleException("can't get service address,because address is null");}int port = providerProperties.getPort();ChannelFuture future = bootstrap.bind(host, port).sync();LOGGER.debug("server started on port {}", port);if (serviceRegistry != null) {String serverAddress = host + ":" + port;serviceRegistry.register(serverAddress);}future.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}/*** get service host** @return*/private String getLocalHost() {Enumeration<NetworkInterface> allNetInterfaces;try {allNetInterfaces = NetworkInterface.getNetworkInterfaces();} catch (SocketException e) {LOGGER.error("get local address error,cause:", e);return null;}while (allNetInterfaces.hasMoreElements()) {NetworkInterface netInterface = allNetInterfaces.nextElement();Enumeration<InetAddress> addresses = netInterface.getInetAddresses();while (addresses.hasMoreElements()) {InetAddress ip = addresses.nextElement();if (ip instanceof Inet4Address && !ip.isLoopbackAddress() && !ip.getHostAddress().contains(":")) {return ip.getHostAddress();}}}return null;} }描述一下這個(gè)類做了什么工作:
- 首先他實(shí)現(xiàn)了ApplicationContextAware, InitializingBean這兩個(gè) spring 中接口,根據(jù)IOC容器初始化的順序,會(huì)依次回調(diào)用接口中的setApplicationContext 和 afterPropertiesSet 方法。
- setApplicationContext方法中獲取了容器中被@SimpleProvider標(biāo)注的類,并將服務(wù)接口名和服務(wù)實(shí)現(xiàn)類綁定,存放到handlerMap中,在@SimpleProvider中有一個(gè) value 屬性,是考慮到一個(gè)類可以實(shí)現(xiàn)多個(gè)接口,通過 value 可以指定哪個(gè)服務(wù)接口,當(dāng)然也可以定義為數(shù)組,處理多個(gè)接口
- afterPropertiesSet 方法中做了兩件事:
- 在服務(wù)端開啟了一個(gè)處理socket請(qǐng)求的線程池,監(jiān)聽和處理服務(wù)暴露端口上接受到的請(qǐng)求,指定了一個(gè)處理器SimpleHandler
- 調(diào)用ServiceRegistry類的registry方法向 zookeeper 注冊(cè)服務(wù)的地址和端口,這里沒有用到協(xié)議,只注冊(cè)了 ip:port
SimpleHandler是一個(gè)實(shí)現(xiàn)了 netty的SimpleChannelInboundHandler的請(qǐng)求處理器類
package com.glmapper.simple.provider.handler;/*** request handler** @author Jerry*/ public class SimpleHandler extends SimpleChannelInboundHandler<SimpleRequest> {private static final Logger LOGGER = LoggerFactory.getLogger(SimpleHandler.class);private final Map<String, Object> handlerMap;public SimpleHandler(Map<String, Object> handlerMap) {this.handlerMap = handlerMap;}@Overridepublic void channelRead0(final ChannelHandlerContext ctx, SimpleRequest request) throws Exception {SimpleResponse response = new SimpleResponse();response.setRequestId(request.getRequestId());try {Object result = handle(request);response.setResult(result);} catch (Throwable t) {response.setError(t);}ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}private Object handle(SimpleRequest request) throws Throwable {String className = request.getClassName();Object serviceBean = handlerMap.get(className);Class<?> serviceClass = serviceBean.getClass();String methodName = request.getMethodName();Class<?>[] parameterTypes = request.getParameterTypes();Object[] parameters = request.getParameters();FastClass serviceFastClass = FastClass.create(serviceClass);FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);return serviceFastMethod.invoke(serviceBean, parameters);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {LOGGER.error("server caught exception", cause);ctx.close();} }SimpleHandler基于 netty 的事件驅(qū)動(dòng)模型觸發(fā)對(duì)應(yīng)的方法,當(dāng)收到請(qǐng)求事件會(huì)調(diào)用channelRead0方法,這個(gè)方法的作用就是,根據(jù)請(qǐng)求參數(shù)中的接口名找到對(duì)應(yīng)的實(shí)現(xiàn)類調(diào)用指定的方法,然后把結(jié)果返回。
再瞅瞅ServiceRegistry,入口是ProviderInitializer調(diào)用了ServiceRegistry的 registry方法
package com.glmapper.simple.provider.registry;/*** connect zookeeper to registry service** @author Jerry*/ public class ServiceRegistry {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);private ZookeeperProperties zookeeperProperties;public ServiceRegistry(ZookeeperProperties zookeeperProperties) {this.zookeeperProperties = zookeeperProperties;}public void register(String data) {if (data != null) {ZooKeeper zk = ZookeeperUtils.connectServer(zookeeperProperties.getAddress(), zookeeperProperties.getTimeout());if (zk != null) {addRootNode(zk);createNode(zk, data);}}}/*** add one zookeeper root node** @param zk*/private void addRootNode(ZooKeeper zk) {try {String registryPath = zookeeperProperties.getRootPath();Stat s = zk.exists(registryPath, false);if (s == null) {zk.create(registryPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (KeeperException | InterruptedException e) {LOGGER.error("zookeeper add root node error,cause:", e);}}private void createNode(ZooKeeper zk, String data) {try {byte[] bytes = data.getBytes(Charset.forName("UTF-8"));String dataPath = zookeeperProperties.getRootPath() + zookeeperProperties.getDataPath();String path = zk.create(dataPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOGGER.debug("create zookeeper node ({} => {})", path, data);} catch (KeeperException | InterruptedException e) {LOGGER.error("create zookeeper node error,cause:", e);}} }ServiceRegistry類做的工作比較簡(jiǎn)單,就是把 服務(wù)ip:port注冊(cè)到 zk 的指定目錄下
- 創(chuàng)建根節(jié)點(diǎn),根節(jié)點(diǎn)是個(gè)永久節(jié)點(diǎn)
- 在根節(jié)點(diǎn)下創(chuàng)建臨時(shí)的子節(jié)點(diǎn),子節(jié)點(diǎn)存儲(chǔ)了服務(wù)的 ip:port,服務(wù)被掛掉對(duì)應(yīng)的子節(jié)點(diǎn)就會(huì)被干掉
2.3 消費(fèi)端
消費(fèi)端內(nèi)容:
消費(fèi)端的內(nèi)容比較少,核心就三個(gè)類:ServiceDiscovery、ConsumerHandler、ConsumerProxy
先看下ServiceDiscovery內(nèi)容:
package com.glmapper.simple.consumer.discovery;/*** 服務(wù)發(fā)現(xiàn):連接ZK,添加watch事件** @author Jerry*/ public class ServiceDiscovery {private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);private volatile List<String> nodes = new ArrayList<>();private ZookeeperProperties zookeeperProperties;public ServiceDiscovery(ZookeeperProperties zookeeperProperties) {this.zookeeperProperties = zookeeperProperties;String address = zookeeperProperties.getAddress();int timeout = zookeeperProperties.getTimeout();ZooKeeper zk = ZookeeperUtils.connectServer(address, timeout);if (zk != null) {watchNode(zk);}}public String discover() {String data = null;int size = nodes.size();if (size > 0) {if (size == 1) {data = nodes.get(0);LOGGER.debug("using only node: {}", data);} else {data = nodes.get(ThreadLocalRandom.current().nextInt(size));LOGGER.debug("using random node: {}", data);}}return data;}private void watchNode(final ZooKeeper zk) {try {Watcher childrenNodeChangeWatcher = event -> {if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {watchNode(zk);}};String rootPath = zookeeperProperties.getRootPath();List<String> nodeList = zk.getChildren(rootPath, childrenNodeChangeWatcher);List<String> nodes = new ArrayList<>();for (String node : nodeList) {byte[] bytes = zk.getData(rootPath + "/" + node, false, null);nodes.add(new String(bytes, Charset.forName("UTF-8")));}LOGGER.info("node data: {}", nodes);this.nodes = nodes;} catch (KeeperException | InterruptedException e) {LOGGER.error("節(jié)點(diǎn)監(jiān)控出錯(cuò),原因:", e);}} }這個(gè)類的入口是構(gòu)造器,作用是獲取 zk 的地址,然后獲取 zk 上的節(jié)點(diǎn)信息,這里沒有實(shí)現(xiàn)服務(wù)訂閱,也就是說如果 zk 上原本有兩個(gè)服務(wù),掛掉一個(gè),客戶端不會(huì)剔除掛掉的服務(wù)信息,導(dǎo)致調(diào)用失敗。
然后是ConsumerProxy,它是一個(gè)代理工廠:
package com.glmapper.simple.consumer.proxy;/*** ConsumerProxy** @author Jerry*/ public class ConsumerProxy {private ServiceDiscovery serviceDiscovery;public ConsumerProxy(ServiceDiscovery serviceDiscovery) {this.serviceDiscovery = serviceDiscovery;}@SuppressWarnings("unchecked")public <T> T create(Class<?> interfaceClass) {return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new SimpleInvocationHandler());}private class SimpleInvocationHandler implements InvocationHandler {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {SimpleRequest request = buildRequest(method, args);String serverAddress = getServerAddress();String[] array = serverAddress.split(":");String host = array[0];int port = Integer.parseInt(array[1]);ConsumerHandler consumerHandler = new ConsumerHandler(host, port);SimpleResponse response = consumerHandler.send(request);if (response.getError() != null) {throw new SimpleException("service invoker error,cause:", response.getError());} else {return response.getResult();}}private SimpleRequest buildRequest(Method method, Object[] args) {SimpleRequest request = new SimpleRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);return request;}private String getServerAddress() {String serverAddress = null;if (serviceDiscovery != null) {serverAddress = serviceDiscovery.discover();}if (null == serverAddress) {throw new SimpleException("no server address available");}return serverAddress;}} }這里有個(gè)內(nèi)部類SimpleInvocationHandler是生產(chǎn)代理的核心,方法的核心是在 SimpleInvocationHandler.invoke()中是調(diào)用這兩行代碼
ConsumerHandler consumerHandler = new ConsumerHandler(host, port); SimpleResponse response = consumerHandler.send(request);發(fā)起網(wǎng)絡(luò)請(qǐng)求,下面看下ConsumerHandler類
package com.glmapper.simple.consumer.handler;/*** RPC真正調(diào)用客戶端** @author Jerry*/ public class ConsumerHandler extends SimpleChannelInboundHandler<SimpleResponse> {private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHandler.class);private int port;private String host;private SimpleResponse response;private CountDownLatch latch = new CountDownLatch(1);public ConsumerHandler(String host, int port) {this.host = host;this.port = port;}@Overridepublic void channelRead0(ChannelHandlerContext ctx, SimpleResponse response) throws Exception {this.response = response;latch.countDown();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {LOGGER.error("client caught exception", cause);ctx.close();}public SimpleResponse send(SimpleRequest request) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();ChannelInitializer<SocketChannel> channelHandler = new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline()// 將 RPC 請(qǐng)求進(jìn)行編碼(為了發(fā)送請(qǐng)求).addLast(new SimpleEncoder(SimpleRequest.class))// 將 RPC 響應(yīng)進(jìn)行解碼(為了處理響應(yīng)).addLast(new SimpleDecoder(SimpleResponse.class))// 使用 RpcClient 發(fā)送 RPC 請(qǐng)求.addLast(ConsumerHandler.this);}};bootstrap.group(group).channel(NioSocketChannel.class).handler(channelHandler).option(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.connect(host, port).sync();future.channel().writeAndFlush(request).sync();latch.await();if (response != null) {future.channel().closeFuture().sync();}return response;} finally {group.shutdownGracefully();}} }這個(gè)類和服務(wù)端的 ProviderHandler 的代碼差不多,也是netty通訊類
附一下 GitHub 地址 simple-rpc
轉(zhuǎn)載于:https://my.oschina.net/GinkGo/blog/1834620
總結(jié)
以上是生活随笔為你收集整理的你真的明白RPC 吗?一起来探究 RPC 的实质的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 刷题or源码链接
- 下一篇: npm script 的实践