手撕 RPC 1
rpc要素
rpc 最 low 的描述:調(diào)用遠(yuǎn)程服務(wù)像調(diào)用本地方法一樣,也就是面向 interface 開發(fā),最基本的,要做到像本地方法一樣調(diào)用,意味著 consumer 要知道 provider 有什么服務(wù)( interface 名字)是什么,方法( 接口里的方法)是什么,參數(shù)(方法的傳參)是什么,返回(方法的返回類型)什么。要解決這些問題,需要的知識(shí)點(diǎn)有:
- provider 和 consumer 的通信
- provider(至少一個(gè))和 consumer(一般是多個(gè))之間連接的數(shù)量,管理(多個(gè) consumer 不能相互打擾),
- 拆包:連接建立后,怎么拆成正確的對(duì)象以獲取正確的信息
- 動(dòng)態(tài)代理:服務(wù)要用代理通過數(shù)據(jù)包分發(fā)
- 協(xié)議封裝:不同的服務(wù)可以自己封裝不同的協(xié)議
- 連接池
- 其他 ....
摘要
這是第一版,所有代碼寫在一個(gè)類里。單機(jī)模擬客戶端和服務(wù)器,只有一個(gè) consumer 和一個(gè) provider。線程池管理只有一個(gè)線程,沒有服務(wù)分發(fā),注冊(cè)和發(fā)現(xiàn),通信用netty,代理用jdk的動(dòng)態(tài)代理,序列化用jdk的Serializable,用header模擬協(xié)議,拆包寫死。
本來想用門閂模擬線程阻塞,進(jìn)而控制客戶端的連接,但是 netty 的事件是異步的,無論在獲取客戶端之前還是之后聲明門閂,程序總能正確解開門閂,模擬失敗,不知道原因,所以啊,在寫多線程的程序時(shí),線程的調(diào)度要千萬千萬小心啊,程序很可能不按自己預(yù)想的運(yùn)行.
package rpc;import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.junit.Test;import java.io.*; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch;/*** 1.假設(shè)一個(gè)需求,寫一個(gè) rpc* 2.來回通信,連接數(shù)量,拆包* 3.動(dòng)態(tài)代理,序列化,協(xié)議封裝* 4.連接池*/ public class MyRpcTest {public void startServer() {NioEventLoopGroup eventExecutors = new NioEventLoopGroup(1);NioEventLoopGroup worker = eventExecutors;ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture localhost = serverBootstrap.group(eventExecutors, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ServerRequestHandler());}}).bind(new InetSocketAddress("localhost", 9090));try {localhost.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void get() {// 開啟一個(gè)線程模擬服務(wù)器new Thread(() -> {startServer();}).start();// 線程等待一下,讓服務(wù)端啟動(dòng)起來先try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("server start ....");// 獲取服務(wù)Car car = ProxyGet(Car.class);// 使用服務(wù)car.race("BMW 530Li");}private static <T> T ProxyGet(Class<T> interfaceInfo) {// 實(shí)現(xiàn)各個(gè)版本的動(dòng)態(tài)代理ClassLoader classLoader = interfaceInfo.getClassLoader();Class<?>[] interfaces = {interfaceInfo};// 用 jdk 的動(dòng)態(tài)代理實(shí)現(xiàn)return (T) Proxy.newProxyInstance(classLoader, interfaces, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1.調(diào)用,服務(wù)、方法、參數(shù),封裝成 contentString name = interfaceInfo.getName(); // 服務(wù)名String methodName = method.getName(); // 方法名Class<?>[] parameterTypes = method.getParameterTypes(); // 方法的返回類型// 2.把調(diào)用服務(wù)的信息封裝成一個(gè)可以序列化的對(duì)象// 先封裝 bodyMyContent content = new MyContent();content.setName(name);content.setMethodName(methodName);content.setParameterTypes(parameterTypes);content.setArgs(args);// 把 content 做成字節(jié)數(shù)組準(zhǔn)備寫出去ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(content);byte[] msgBody = bos.toByteArray();// 再封裝 header,header 需要 body 信息,MyHeader myHeader = createHeader(msgBody);bos.reset();oos = new ObjectOutputStream(bos);oos.writeObject(myHeader);byte[] msgHeader = bos.toByteArray();// 3.服務(wù)準(zhǔn)備好了,接下來準(zhǔn)備連接,模擬一個(gè) size 是1的連接池ClientFactory factory = ClientFactory.getFactory();// 想模擬主線程不指令沖排序執(zhí)行,但是 netty 的時(shí)間是異步的,試了好多次,無論門閂在獲取客戶端之前創(chuàng)建還是之后創(chuàng)建,不知道為什么每次都能正確解開門閂,模擬不成功synchronized (this) {NioSocketChannel client = factory.getClient(new InetSocketAddress("localhost", 9090));// 加個(gè)門閂,阻塞住,一個(gè) client 一個(gè) client 的處理,在處理返回的后再打開門閂,使程序繼續(xù)運(yùn)行,門閂和創(chuàng)建CountDownLatch countDownLatch = new CountDownLatch(1);// 4.發(fā)送,走 IO,走 nettylong requestId = myHeader.getRequestId();ResponseHandler.addCallBack(requestId, () -> {countDownLatch.countDown();});System.out.println("shuan zhu ....");ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);byteBuf.writeBytes(msgHeader);byteBuf.writeBytes(msgBody);ChannelFuture channelFuture = client.writeAndFlush(byteBuf);channelFuture.sync();System.out.println("before await ....");countDownLatch.await();System.out.println("after await ....");}return null;}});}static MyHeader createHeader(byte[] msgBytes) {MyHeader header = new MyHeader();int size = msgBytes.length;// 用16進(jìn)制的,32 位可以做很多事情int f = 0x14141414;long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());header.setFlag(f);header.setRequestId(requestId);header.setDataLen(size);return header;} }/*** 模擬服務(wù)*/ interface Car {void race(String msg); }/*** 頭部定義三個(gè)標(biāo)志* 1.方法的標(biāo)記,用32位的位標(biāo)記* 2.請(qǐng)求的 id* 3.請(qǐng)求體的長度*/ class MyHeader implements Serializable {int flag;long requestId;long dataLen;public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}public long getRequestId() {return requestId;}public void setRequestId(long requestId) {this.requestId = requestId;}public long getDataLen() {return dataLen;}public void setDataLen(long dataLen) {this.dataLen = dataLen;} }/*** 模擬請(qǐng)求體*/ class MyContent implements Serializable {// 服務(wù)名String name;// 方法名String methodName;// 返回值類型Class<?>[] parameterTypes;// 參數(shù)Object[] args;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getArgs() {return args;}public void setArgs(Object[] args) {this.args = args;} }/*** 模擬客戶端的創(chuàng)建,用單例*/ class ClientFactory {int pollSize = 1;NioEventLoopGroup clientWorker;Random random = new Random();private static final ClientFactory factory;private ClientFactory() {}static {factory = new ClientFactory();}public static ClientFactory getFactory() {return factory;}ConcurrentHashMap<InetSocketAddress, ClientPool> outboxes = new ConcurrentHashMap<InetSocketAddress, ClientPool>();public synchronized NioSocketChannel getClient(InetSocketAddress address) {ClientPool clientPool = outboxes.get(address);if (clientPool == null) {outboxes.putIfAbsent(address, new ClientPool(pollSize));clientPool = outboxes.get(address);}int i = random.nextInt(pollSize);// 如果有就返回if (clientPool.clients[i] != null && clientPool.clients[i].isActive()) {return clientPool.clients[i];}// 沒有就創(chuàng)建synchronized (clientPool.locks[i]) {return clientPool.clients[i] = create(address);}}private NioSocketChannel create(InetSocketAddress address) {// 基于 netty 的客戶端創(chuàng)建方式clientWorker = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(clientWorker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ClientResponses()); // 根據(jù) requestId 找到給誰}}).connect(address);try {NioSocketChannel client = (NioSocketChannel) connect.sync().channel();return client;} catch (InterruptedException e) {e.printStackTrace();}return null;} }/*** 模擬線連接池*/ class ClientPool {NioSocketChannel[] clients;Object[] locks;ClientPool(int size) {clients = new NioSocketChannel[size];locks = new Object[size];for (int i = 0; i < size; i++) {locks[i] = new Object();}} }/*** 客戶端注冊(cè),連接成功后放開門閂,讓主線程繼續(xù)運(yùn)行*/ class ClientResponses extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;if (byteBuf.readableBytes() >= 82) {byte[] bytes = new byte[82];byteBuf.readBytes(bytes); // 指針偏移到 msgHeader 的末尾ByteArrayInputStream bos = new ByteArrayInputStream(bytes);ObjectInputStream oin = new ObjectInputStream(bos);MyHeader header = (MyHeader) oin.readObject();System.out.println("client reponse id : " + header.getRequestId());System.out.println("channelRead ....");// 然后放開門閂ResponseHandler.runCallBack(header.getRequestId());}} }/*** 用于主線程的阻塞的控制*/ class ResponseHandler {static ConcurrentHashMap<Long, Runnable> mapping = new ConcurrentHashMap<>();public static void addCallBack(long requestId, Runnable cb) {mapping.putIfAbsent(requestId, cb);}public static void runCallBack(long requestId) {Runnable runnable = mapping.get(requestId);runnable.run();removeCallBack(requestId);}public static void removeCallBack(long requestId) {mapping.remove(requestId);} }/*** 服務(wù)端注冊(cè)的事件* 沒有具體的業(yè)務(wù)邏輯,只接收客戶端連接并打印出客戶端要請(qǐng)求的服務(wù)(方法都不打印)*/ class ServerRequestHandler extends ChannelInboundHandlerAdapter {// provider@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf b = (ByteBuf) msg;ByteBuf sendBuf = b.copy();// 82 是打斷點(diǎn)跟蹤得出來的 header 的長度if (sendBuf.readableBytes() >= 82) {byte[] bytes = new byte[82];// 指針移動(dòng)到 header 的末尾b.readBytes(bytes);ByteArrayInputStream bis = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bis);MyHeader myHeader = (MyHeader) objectInputStream.readObject();System.out.println("server response id : " + myHeader.getRequestId());// 讀取請(qǐng)求體,并答應(yīng)服務(wù)名if (b.readableBytes() >= myHeader.getDataLen()) {byte[] data = new byte[(int) myHeader.getDataLen()];b.readBytes(data);ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);MyContent myContent = (MyContent) ois.readObject();System.out.println("server response name : " + myContent.getName());}}// 把數(shù)據(jù)包打印回客戶端ChannelFuture channelFuture = ctx.writeAndFlush(sendBuf);channelFuture.sync();} }很明顯這是不能用的,第一,客戶端只有一個(gè),那么客戶端變成多個(gè)呢?緊接著手撕 RPC 2
總結(jié)
- 上一篇: 系统调用回答为什么要用buffer写
- 下一篇: 手撕 RPC 2