日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

zookeeper源码分析之三客户端发送请求流程

發布時間:2025/4/5 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 zookeeper源码分析之三客户端发送请求流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  znode?可以被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化可以通知設置監控的客戶端,這個功能是zookeeper對于應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,集群管理,分布式鎖等等。

知識準備:

zookeeper定義的狀態有:

Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);

事件定義的的類型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);

watcher定義的的類型有Children(1), Data(2), Any(3);

在上一篇

zookeeper源碼分析之一客戶端

中,我們連接zookeeper時,啟動了一個MyWatcher

protected void connectToZK(String newHost) throws InterruptedException, IOException {if (zk != null && zk.getState().isAlive()) {zk.close();}host = newHost;boolean readOnly = cl.getOption("readonly") != null;if (cl.getOption("secure") != null) {System.setProperty(ZooKeeper.SECURE_CLIENT, "true");System.out.println("Secure connection is enabled");} zk = new ZooKeeper(host,Integer.parseInt(cl.getOption("timeout")),new MyWatcher(), readOnly);}

創建zookeeper示例時,使用到watchManager:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider)throws IOException {LOG.info("Initiating client connection, connectString=" + connectString+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);watchManager = defaultWatchManager();watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);hostProvider = aHostProvider; cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();}

將傳進來的MyWatcher作為默認watcher,存入watchManager,然后通過ClientCnxn包裝后,啟動線程。

  那我們先了解一下ClientCnxn吧,ClientCnxn管理客戶端socket的io,它維護了一組可以連接上的server及當需要轉換時可以透明的轉換到的一組server。

先了解一下如何獲取socket的吧:

private static ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}try {return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ clientCnxnSocketName);ioe.initCause(e);throw ioe;}}

  接著啟動ClientCnxn的start()方法,在此方法中啟動了兩個線程:

public void start() {sendThread.start();eventThread.start();}

其中SendThread類為發送的請求隊列提供服務,并且產生心跳。它同時也產生ReadThread。

我們看一下SendThread的run方法的主體:

if (!clientCnxnSocket.isConnected()) {// don't re-establish connection if we are closingif (closing) {break;}startConnect();clientCnxnSocket.updateLastSendAndHeard();}if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {boolean sendAuthEvent = false;if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {try {zooKeeperSaslClient.initialize(ClientCnxn.this);} catch (SaslException e) {LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);state = States.AUTH_FAILED;sendAuthEvent = true;}}KeeperState authState = zooKeeperSaslClient.getKeeperState();if (authState != null) {if (authState == KeeperState.AuthFailed) {// An authentication error occurred during authentication with the Zookeeper Server.state = States.AUTH_FAILED;sendAuthEvent = true;} else {if (authState == KeeperState.SaslAuthenticated) {sendAuthEvent = true;}}}if (sendAuthEvent == true) {eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,authState,null));}}to = readTimeout - clientCnxnSocket.getIdleRecv();} else {to = connectTimeout - clientCnxnSocket.getIdleRecv();}if (to <= 0) {String warnInfo;warnInfo = "Client session timed out, have not heard from server in "+ clientCnxnSocket.getIdleRecv()+ "ms"+ " for sessionid 0x"+ Long.toHexString(sessionId);LOG.warn(warnInfo);throw new SessionTimeoutException(warnInfo);}if (state.isConnected()) {//1000(1 second) is to prevent race condition missing to send the second ping//also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVALif (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {sendPing();clientCnxnSocket.updateLastSend();} else {if (timeToNextPing < to) {to = timeToNextPing;}}}// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {long now = Time.currentElapsedTime();int idlePingRwServer = (int) (now - lastPingRwServer);if (idlePingRwServer >= pingRwTimeout) {lastPingRwServer = now;idlePingRwServer = 0;pingRwTimeout =Math.min(2*pingRwTimeout, maxPingRwTimeout);pingRwServer();}to = Math.min(to, pingRwTimeout - idlePingRwServer);} clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);

ClientCnxnSocketNetty實現了ClientCnxnSocket的抽象方法,它負責連接到server,讀取/寫入網絡流量,并作為網絡數據層和更高packet層的中間層。其生命周期如下:

loop:- try:- - !isConnected()- - - connect()- - doTransport()- catch:- - cleanup()close()

從上述描述中,我們可以看到ClientCnxnSocket的工作流程,先判斷是否連接,沒有連接則調用connect方法進行連接,有連接則直接使用;然后調用doTransport方法進行通信,若連接過程中出現異常,則調用cleanup()方法;最后關閉連接。故最主要的流程為doTransport()方法:

@Overridevoid doTransport(int waitTimeOut,List<Packet> pendingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {try {if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}Packet head = null;if (needSasl.get()) {if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}} else {if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {return;}}// check if being waken up on closing.if (!sendThread.getZkState().isAlive()) {// adding back the patck to notify of failure in conLossPacket(). addBack(head);return;}// channel disconnection happenedif (disconnected.get()) {addBack(head);throw new EndOfStreamException("channel for sessionid 0x"+ Long.toHexString(sessionId)+ " is lost");}if (head != null) {doWrite(pendingQueue, head, cnxn);}} finally {updateNow();}}

我們簡化一下上面的程序,一個是異常處理addBack(head),另一個正常流程處理doWrite(pendingQueue, head, cnxn),我們先拋掉異常,走正常流程看看:

先獲取Packet:

Packet head = null;if (needSasl.get()) {if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}} else {if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {return;}}

其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一個鏈表阻塞隊列,保存發出的請求;

然后執行doWrite方法:

/*** doWrite handles writing the packets from outgoingQueue via network to server.*/private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {updateNow();while (true) {if (p != WakeupPacket.getInstance()) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {p.requestHeader.setXid(cnxn.getXid());synchronized (pendingQueue) {pendingQueue.add(p);}} sendPkt(p);}if (outgoingQueue.isEmpty()) {break;}p = outgoingQueue.remove();}}

dowrite方法負責將outgoingQueue的報文通過網絡寫到服務器上。發送報文程序如上紅色所示:

private void sendPkt(Packet p) {// Assuming the packet will be sent out successfully. Because if it fails,// the channel will close and clean up queues. p.createBB();updateLastSend();sentCount++;channel.write(ChannelBuffers.wrappedBuffer(p.bb));}

1. Packet報文的結構如下:

/*** This class allows us to pass the headers and the relevant records around.*/static class Packet {RequestHeader requestHeader;ReplyHeader replyHeader;Record request;Record response;ByteBuffer bb;/** Client's view of the path (may differ due to chroot) **/String clientPath;/** Servers's view of the path (may differ due to chroot) **/String serverPath;boolean finished;AsyncCallback cb;Object ctx;WatchRegistration watchRegistration;public boolean readOnly;WatchDeregistration watchDeregistration;/** Convenience ctor */Packet(RequestHeader requestHeader, ReplyHeader replyHeader,Record request, Record response,WatchRegistration watchRegistration) {this(requestHeader, replyHeader, request, response,watchRegistration, false);}Packet(RequestHeader requestHeader, ReplyHeader replyHeader,Record request, Record response,WatchRegistration watchRegistration, boolean readOnly) {this.requestHeader = requestHeader;this.replyHeader = replyHeader;this.request = request;this.response = response;this.readOnly = readOnly;this.watchRegistration = watchRegistration;}public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) { requestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append("clientPath:" + clientPath);sb.append(" serverPath:" + serverPath);sb.append(" finished:" + finished);sb.append(" header:: " + requestHeader);sb.append(" replyHeader:: " + replyHeader);sb.append(" request:: " + request);sb.append(" response:: " + response);// jute toString is horrible, remove unnecessary newlinesreturn sb.toString().replaceAll("\r*\n+", " ");}}

從createBB方法中,我們看到在底層實際的網絡傳輸序列化中,zookeeper只會講requestHeader和request兩個屬性進行序列化,即只有這兩個會被序列化到底層字節數組中去進行網絡傳輸,不會將watchRegistration相關的信息進行網絡傳輸。

2. 更新最后一次發送updateLastSend

void updateLastSend() {this.lastSend = now;}

3. 使用nio channel 發送字節緩存到server

channel.write(ChannelBuffers.wrappedBuffer(p.bb));

其中,bb的類型為ByteBuffer,在packet中進行了初始化。

this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();

?

小結:

zookeeper客戶端和服務器的連接主要是通過ClientCnxnSocket來實現的,有兩個具體的實現類ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:

  先判斷是否連接,沒有連接則調用connect方法進行連接,有連接則進入下一步;

  然后調用doTransport方法進行通信,若連接過程中出現異常,則調用cleanup()方法;

  最后關閉連接。

上述的發現可以在SendThread的run方法中體現。

?

另:Zookeeper的特性--》順序一致性:按照客戶端發送請求的順序更新數據。我們再sendThread里可以看到多次更新時間戳來保證順序一致性,如下:

?

轉載于:https://www.cnblogs.com/davidwang456/p/5000927.html

總結

以上是生活随笔為你收集整理的zookeeper源码分析之三客户端发送请求流程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。