手写实现RPC框架基础功能
隨著微服務、分布式的流行,基本復雜點的項目都會涉及到遠程調(diào)用,最基礎的可以使用http來實現(xiàn)調(diào)用,也可以通過一些RPC框架來實現(xiàn),比如Ailiaba 的dubbo,Thrift等。那么作為一個rpc框架,需要具備哪些基本的元素呢?
- 網(wǎng)絡通訊 在動態(tài)代理的實現(xiàn)過程中肯定是需要進行遠程調(diào)用來獲取相應的執(zhí)行結(jié)果的,可以使用Java中的socket或者使用Netty。
- 序列化和反序列化 ?數(shù)據(jù)需要通過網(wǎng)絡傳輸,所以對象都要求可以被序列化和反序列化,常用的有java的Serializable,json(Gson,fastjson等),Protobuf等 這里就直接使用java自帶的序列化接口
不了解序列化的可以參考這篇博客?Java中的序列化和反序列化
當然,如果功能更加完善,一個優(yōu)秀的rpc框架往往還會具備注冊中心,負載均衡等功能,這里就先不做要求了
調(diào)用方式 : 在客戶端引入服務端接口的jar包,直接調(diào)用接口就可以實現(xiàn)遠程調(diào)用
分析:
因為客戶端調(diào)用服務端api后,最后需要調(diào)用到服務端具體接口的具體方法,那么
1)需要通過動態(tài)代理來創(chuàng)建接口的代理類(接口無法實例化直接調(diào)用);
2)請求參數(shù)里就要求有類信息,方法名,參數(shù)集合,這樣服務端才能通過反射來調(diào)用
3)InvocationHandler的實現(xiàn)類里的invoke具體實現(xiàn)應該是socket調(diào)用,這樣才能在執(zhí)行接口方法的時候進行socket通訊
我把這個rpc實現(xiàn)分為兩部分,一部分是rpc-server服務端實現(xiàn),另一部分是rpc-client實現(xiàn)
rpc-server又分為兩部分 rpc-server-api(提供jar包給rpc-client調(diào)用)和rpc-server-provider
rpc-server-api
/*** 2020/3/7* created by chenpp* 定義接口,將rpc-server-api打包提供給client端使用,* client只要調(diào)用對應的接口方法就可以遠程調(diào)用服務端的具體實現(xiàn)* */
public interface IUserService {public void saveUser(User user);public User getUserById(Integer id);
}
/*** 2020/3/7* created by chenpp* 請求rpc時需要的參數(shù)*/
public class RpcRequest implements Serializable {private String className;private String methodName;private Object[] args;public String getClassName() {return className;}public void setClassName(String className) {this.className = className;}...
}
rpc-server-provider
/*** 2020/3/7* created by chenpp* 遠程調(diào)用的服務端入口,使用socket監(jiān)聽*/ public class RpcServer {private static final ExecutorService executor = Executors.newCachedThreadPool();/*** 注冊服務實例,服務注冊后,其他客戶端通過接口調(diào)用就可以調(diào)用服務端的實現(xiàn)* */public void register(Object service ,int port) {ServerSocket serverSocket = null;try {//創(chuàng)建socketserverSocket = new ServerSocket(port);while(true){//監(jiān)聽端口,是個阻塞的方法Socket socket = serverSocket.accept();//處理rpc請求,這里使用線程池來處理executor.submit(new HandleThread(service,socket));}} catch (Exception e) {e.printStackTrace();}finally {if(serverSocket != null){try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}} } package com.chenpp.server;import com.chenpp.request.RpcRequest;import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.Socket;/*** 2020/3/7* created by chenpp* 處理RPC請求的線程*/ public class HandleThread implements Runnable {private Socket socket;private Object serviceInstance;public HandleThread(Object serviceInstance, Socket socket) {this.socket = socket;this.serviceInstance = serviceInstance;}public void run() {ObjectOutputStream oos = null;ObjectInputStream ois = null;try {//從socket中獲取RPC請求ois = new ObjectInputStream(socket.getInputStream());RpcRequest rpcRequest = (RpcRequest) ois.readObject();Object result = invoke(rpcRequest);oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(result);oos.flush();} catch (Exception e) {e.printStackTrace();} finally {if (oos != null) {try {ois.close();oos.close();} catch (IOException e) {e.printStackTrace();}}}}private Object invoke(RpcRequest rpcRequest) {String className = rpcRequest.getClassName();Object result = null;try {Class<?> clazz = Class.forName(className);//這里無法實例化,因為傳入的是接口類型,接口無法實力哈Object[] parameters = rpcRequest.getArgs();if (parameters == null) {Method method = clazz.getMethod(rpcRequest.getMethodName());result = method.invoke(serviceInstance);} else {Class[] types = new Class[parameters.length];for (int i = 0; i < parameters.length; i++) {types[i] = parameters[i].getClass();}Method method = clazz.getMethod(rpcRequest.getMethodName(), types);result = method.invoke(serviceInstance, parameters);}} catch (Exception e) {e.printStackTrace();}return result;} } /*** 2020/3/7* created by chenpp* 啟動類,啟動rpc服務端*/ public class StartServer {public static void main(String[] args) {UserServiceImpl userService = new UserServiceImpl();RpcServer rpcServer = new RpcServer();rpcServer.register(userService,8080);} }rpc-client
public class StartClient {public static void main(String[] args) {//由于rpc-server-api里只有實體類和接口類,想要實例化只能通過代理來實現(xiàn)IUserService userService = RpcProxy.getInstance(IUserService.class,"localhost",8080);User user = new User();user.setAge(12);user.setName("chenpp");userService.saveUser(user);User user1 = userService.getUserById(1);System.out.println(user1.getName()+","+user1.getAge());} } /*** 2020/3/7* created by chenpp* 創(chuàng)建代理對象*/ public class RpcProxy<T> {public static <T> T getInstance(Class<T> classInterface, String host, int port) {return (T) Proxy.newProxyInstance(classInterface.getClassLoader(), new Class[]{classInterface}, new RpcInvocationHandler(host, port));} } /*** 2020/3/7* created by chenpp*/ public class RpcInvocationHandler implements InvocationHandler {private String host;private int port;public RpcInvocationHandler(String host,int port){this.host = host;this.port = port;}/*** 增強的InvocationHandler,接口調(diào)用方法的時候?qū)嶋H是調(diào)用socket進行傳輸*/public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //將遠程調(diào)用需要的接口類、方法名、參數(shù)信息封裝成RPCRequestRpcRequest rpcRequest = new RpcRequest();rpcRequest.setArgs(args);rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());//通過socket發(fā)送RPCRequest給服務端并獲取結(jié)果返回Socket socket= new Socket(host,port);ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());oos.writeObject(rpcRequest);ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());Object result = ois.readObject();return result;} }因為接口無法實例化,所以對于每個實現(xiàn)類我都自己手動創(chuàng)建了一個實例對象并發(fā)布,可以考慮引入spring來優(yōu)化對bean的管理,并通過注解來實現(xiàn)服務的發(fā)布
/*** 2020/3/7* created by chenpp* 引入Component注解,加了RegisterService注解的類都會被Spring容器管理*/ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RegisterService {String interfaceClass() ; } /*** 實現(xiàn)了InitializingBean接口,那么會在對應的AutoRpcServer實例化之后調(diào)用afterPropertiesSet方法* 而實現(xiàn)了ApplicationContextAware接口,當spring容器初始化的時候,會自動的將ApplicationContext注入進來,* 使得當前bean可以獲得ApplicationContext上下文* */ @Component public class AutoRpcServer implements ApplicationContextAware, InitializingBean {private int port;public AutoRpcServer(int port){this.port = port;}private static final ExecutorService executor = Executors.newCachedThreadPool();//key 為對應的接口類名,valeu 為具體的實例private Map<String,Object> map = new HashMap<String, Object>();public void afterPropertiesSet() throws Exception {ServerSocket serverSocket = null;try {//創(chuàng)建socketserverSocket = new ServerSocket(port);while(true){//監(jiān)聽端口,是個阻塞的方法Socket socket = serverSocket.accept();//處理rpc請求,這里使用線程池來處理executor.submit(new HandleThread(map,socket));}} catch (Exception e) {e.printStackTrace();}finally {if(serverSocket != null){try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}}public void setApplicationContext(ApplicationContext context) throws BeansException {//從spring上下文中獲取添加了RegisterService的注解的beanString[] beanNames = context.getBeanNamesForAnnotation(RegisterService.class);for(String beanName : beanNames){Object bean = context.getBean(beanName);RegisterService annotation = bean.getClass().getAnnotation(RegisterService.class);Class interfaceClass = annotation.interfaceClass();//將接口的類名和對應的實例bean對應起來map.put(interfaceClass.getName(),bean);}} } /*** 2020/3/7* created by chenpp* 掃描com.chenpp.impl包下的類,注入spring容器*/ @Component @ComponentScan("com.chenpp.impl") public class SpringConfig {//把autoRpcServer bean注入spring容器,初始化完成后會觸發(fā)InitializingBean接口的afterPropertiesSet調(diào)用@Beanpublic AutoRpcServer autoRpcServer(){//這里直接寫死啟動的端口 : 8080return new AutoRpcServer(8080);} } /*** 2020/3/7* created by chenpp* 啟動類,通過注解方式啟動rpc服務端*/ public class StartServer {public static void main(String[] args) {AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(SpringConfig.class);((AnnotationConfigApplicationContext) applicationContext).start();} }GitHub源碼地址:https://github.com/dearfulan/cp-rpc
參考:https://www.jianshu.com/p/775d49b30567
總結(jié)
以上是生活随笔為你收集整理的手写实现RPC框架基础功能的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决IDEA本地仓库有jar包却无法引用
- 下一篇: IO中的阻塞、非阻塞、同步、异步概念分析