Zookeeper客户端网络通讯模型分析
前言:
之前的Zookeeper系列文章中有分析過客戶端如何發送具體的增刪改查節點請求。
這些文章的分析都是偏業務層面的。如今回想起來,還是不知道該如何回答接下來的問題:
Zookeeper客戶端的網絡通訊模型是怎樣的?接收到的響應是如何精確匹配到對應請求的?
本文主要就圍繞這個問題來展開下。
有關于發送請求的一些具體內容,可以參考筆者之前的博客,比如:?Zookeeper源碼解析-客戶端創建節點過程分析_恐龍弟旺仔的博客-CSDN博客?
筆者主要從三個方面來分析下這個問題:包裝請求、發送請求、接收響應
下面以一次GET請求為例,來展示整個過程。
1.包裝請求
一切的開始還是要從Zookeeper.java說起,里面封裝了所有的操作
既然分析GET請求,那么就從Zookeeper.getData()開始
public class ZooKeeper {public byte[] getData(final String path, Watcher watcher, Stat stat)throws KeeperException, InterruptedException{final String clientPath = path;PathUtils.validatePath(clientPath);// the watch contains the un-chroot pathWatchRegistration wcb = null;if (watcher != null) {wcb = new DataWatchRegistration(watcher, clientPath);}// 1.拼裝請求路徑final String serverPath = prependChroot(clientPath);// 2.拼裝請求頭RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.getData);// 3.拼裝請求體GetDataRequest request = new GetDataRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetDataResponse response = new GetDataResponse();// 4.在這里將請求發送出去ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (r.getErr() != 0) {throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);}if (stat != null) {DataTree.copyStat(response.getStat(), stat);}return response.getData();} }請求的包裝=請求頭包裝+請求體包裝
格式如下:
2.發送請求
發送請求的工作交由ClientCnxn來完成
?
public class ClientCnxn {public ReplyHeader submitRequest(RequestHeader h, Record request,Record response, WatchRegistration watchRegistration)throws InterruptedException {ReplyHeader r = new ReplyHeader();// 1.這里將request和response都封裝到Packet對象里,具體見2.1Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration);// 需要特別注意下這里,這里一直再檢查packet對象的狀態synchronized (packet) {while (!packet.finished) {packet.wait();}}return r;} }Q:這里一直在不停的等待packet.finished狀態,雖然還沒看Packet這個finished代表什么,但是我們可以大膽猜測下,是不是在等待request執行完成呢,當接收到對應的response時,就把packet.finished設置為true,整個過程是阻塞的...?
這個后續我們通過代碼來分析。
2.1 ClientCnxn.queuePacket() 包裝Packet
public class ClientCnxn {private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,Record response, AsyncCallback cb, String clientPath,String serverPath, Object ctx, WatchRegistration watchRegistration){Packet packet = null;synchronized (outgoingQueue) {// 1.簡單的創建packet對象packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;if (!state.isAlive() || closing) {conLossPacket(packet);} else {if (h.getType() == OpCode.closeSession) {closing = true;}// 2.最終將請求包packet對象放到outgoingQueue中outgoingQueue.add(packet);}}// 3.然后把SendThread中的Selector喚醒sendThread.getClientCnxnSocket().wakeupCnxn();return packet;} }包裝packet這一步,最終將Packet放入到outgoingQueue就結束了。
下面到了SendThread的表演時刻了。
2.2 SendThread 發送請求packet
SendThread本身是一個Thread,主要內容在其run()方法中
class SendThread extends ZooKeeperThread {// 通訊器private final ClientCnxnSocket clientCnxnSocket;@Overridepublic void run() {clientCnxnSocket.introduce(this,sessionId);clientCnxnSocket.updateNow();clientCnxnSocket.updateLastSendAndHeard();int to;long lastPingRwServer = Time.currentElapsedTime();final int MAX_SEND_PING_INTERVAL = 10000; //10 secondsInetSocketAddress serverAddress = null;while (state.isAlive()) {try {...// 其他非重點內容我們直接忽略掉,直接看到這里發送請求包的地方,還是交由clientCnxnSocket來完成,具體見2.3clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);} catch (Throwable e) {if (closing) {if (LOG.isDebugEnabled()) {...}break;} else {...clientCnxnSocket.updateNow();clientCnxnSocket.updateLastSendAndHeard();}}}cleanup();clientCnxnSocket.close();if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None,Event.KeeperState.Disconnected, null));}ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),"SendThread exited loop for session: 0x"+ Long.toHexString(getSessionId()));} }SendThread通過run()方法不停的從上面2.1中的outgoingQueue集合中獲取packet,并發送出去。
這樣一種模型算是一種發送和執行的解耦模型。
2.3 clientCnxnSocket.doTransport() 發送Packet
public class ClientCnxnSocketNIO extends ClientCnxnSocket {void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {// 標準的NIOselector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}updateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());// 處理連接事件if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();sendThread.primeConnection();}// 1.處理讀寫事件 } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {// 交由doIO()方法處理doIO(pendingQueue, outgoingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {synchronized(outgoingQueue) {// 這里用來設置Selector的OP_WRITE事件,如果outgoingQueue中有數據,則設置該狀態if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {enableWrite();}}}selected.clear();}// 真正發送請求的地方void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)throws InterruptedException, IOException {SocketChannel sock = (SocketChannel) sockKey.channel();if (sock == null) {throw new IOException("Socket is null!");}// 處理讀事件,非重點,后續讀取響應時再分析if (sockKey.isReadable()) {...}if (sockKey.isWritable()) {synchronized(outgoingQueue) {// 1.按照先來后到原則,獲取第一個待發送的packetPacket p = findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress());if (p != null) {updateLastSend();// If we already started writing p, p.bb will already existif (p.bb == null) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != OpCode.ping) &&(p.requestHeader.getType() != OpCode.auth)) {// 這里需要注意下xidp.requestHeader.setXid(cnxn.getXid());}p.createBB();}// 2.將packet.bb通過SocketChannel發送出去sock.write(p.bb);if (!p.bb.hasRemaining()) {sentCount++;outgoingQueue.removeFirstOccurrence(p);if (p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {// 3.packet發送出去之后并不是將包直接丟掉了,而是放入到pendingQueuependingQueue.add(p);}}}}// 4.重新設置Selector的狀態,如果outgoingQueue還有packet,則繼續設置OP_WRITE狀態if (outgoingQueue.isEmpty()) {disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {disableWrite();} else {// Just in caseenableWrite();}}}} }總結:這里有四個比較有意思的小細節需要注意下
1)Selector OP_WRITE狀態設置
既然使用了NIO來發送接收數據,那么Selector的OP_READ OP_WRITE狀態必須要被設置。
那么OP_WRITE狀態什么時候被設置呢,就是上面分析的,如果outgoingQueue中有數據,則設置Selector OP_WRITE狀態。
2)Packet對象的發送
通過SocketChannel將數據發送到服務端,不是將整個Packet對象全部發送出去,而是將其中有效的RequestHeader和GetDataRequest對象序列化好發送出去
3)RequestHeader中的xid
這個xid是從cnxn中獲取的那個xid,對客戶端而言是唯一的,那么這個xid有什么作用呢?先賣個關子,后續分析
4)pendingQueue
通過SocketChannel將packet發送到服務端之后,然后將整個pendingQueue 集合中,這個有什么用呢?暫時還不清楚,但是感覺應該跟后續的接收響應有關
下面用一張圖做一個客戶端發送請求階段性總結,
3.接收響應
接收響應的操作同樣也是在clientCnxnSocket.doTransport()?方法中
就是上述忽略的OP_READ事件處理
?
public class ClientCnxnSocketNIO extends ClientCnxnSocket {void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {// 標準的NIOselector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}updateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());// 處理連接事件if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();sendThread.primeConnection();}// 1.處理讀寫事件 } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {// 交由doIO()方法處理doIO(pendingQueue, outgoingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {synchronized(outgoingQueue) {// 這里用來設置Selector的OP_WRITE事件,如果outgoingQueue中有數據,則設置該狀態if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {enableWrite();}}}selected.clear();}// 真正發送請求的地方void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)throws InterruptedException, IOException {SocketChannel sock = (SocketChannel) sockKey.channel();if (sock == null) {throw new IOException("Socket is null!");}// 處理讀事件,也就是響應信息if (sockKey.isReadable()) {// 1.標準的NIO讀法int rc = sock.read(incomingBuffer);if (rc < 0) {throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"+ Long.toHexString(sessionId)+ ", likely server has closed socket");}if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount++;readLength();} else if (!initialized) {...} else {// 2.讀取到的數據交由SendThread處理,具體見3.1sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}if (sockKey.isWritable()) {...}} }3.1 SendThread.readResponse() 處理響應
class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true;// 讀取響應結果void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();// 1.解析出響應頭replyHdr.deserialize(bbia, "header");// 各種異常處理...Packet packet;synchronized (pendingQueue) {if (pendingQueue.size() == 0) {throw new IOException("Nothing in the queue, but got "+ replyHdr.getXid());}// 2.從pendingQueue中獲取首個packetpacket = pendingQueue.remove();}try {// 如果請求頭的xid和響應頭的xid不一致,說明亂序了,直接拋異常if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid "+ replyHdr.getXid() + " with err " ++ replyHdr.getErr() +" expected Xid "+ packet.requestHeader.getXid()+ " for a packet with details: "+ packet );}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}// 3.解析響應體if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}if (LOG.isDebugEnabled()) {LOG.debug("Reading reply sessionid:0x"+ Long.toHexString(sessionId) + ", packet:: " + packet);}} finally {// 4.最后設置packet.finished=truefinishPacket(packet);}} }總結:
兩個需要注意的小細節,與2.3中的問題相互應
1)xid作用
xid本身算作一個唯一標識符,標識請求和響應的唯一性,Zookeeper服務端處理完請求后,還會將請求頭中的xid寫入到響應頭中。這樣請求便與響應對應起來了。
2)Packet.finished狀態
通過ClientCnxn發送GetDataRequest請求時,代碼如下
public class ClientCnxn {public ReplyHeader submitRequest(RequestHeader h, Record request,Record response, WatchRegistration watchRegistration)throws InterruptedException {ReplyHeader r = new ReplyHeader();Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration);synchronized (packet) {while (!packet.finished) {packet.wait();}}return r;} }一直在不停的檢查Packet.finished狀態,如果是false,則阻塞等待,一直到接收到響應為止。
同樣通過一張圖來展示下整個過程
總結:
回到開頭的問題,我們來回答下
1.客戶端的發送請求模型本質上是一種阻塞的、解耦的發送模型。將請求發送到集合中,通過SendThread異步獲取集合中的Packet請求發送到服務端。
2.發送到服務端的Packet請求在接收到響應之前,先放入pendingQueue集合,接收到響應時,通過請求頭和響應頭的xid來進行一致性比較
3.獲取到響應后,將響應體放入Packet中,設置Packet.finished為true,客戶端不再阻塞,可以獲取到響應。
?
總結
以上是生活随笔為你收集整理的Zookeeper客户端网络通讯模型分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Visual Studio 2010 项
- 下一篇: Docker 容器文件存储驱动 Over