日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Zookeeper 客户端源码吐血总结

發(fā)布時(shí)間:2024/2/28 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Zookeeper 客户端源码吐血总结 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄
一、幾個(gè)重要的類
二、JAVA的基礎(chǔ)知識(shí)
三、大致了解
四 從入門到放棄的講解
Code1:ZK
Code2:創(chuàng)建 Zookeeper實(shí)例,實(shí)例化ClientCnxn,實(shí)例化ClientCnxnSocketNIO
Code3:實(shí)例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)
Code4:ClientCnxn的具體實(shí)例化
Code5:SendThread的具體實(shí)例化
Code6:EventThread的具體實(shí)例化
Code 7:SendThread核心run流程
Code 8:startConnect()
Code 9: clientCnxnSocket.connect
Code10:registerAndConnect()
Code11:primeConnection()
Code 12:doTransport()
Code13:findSendablePacket()
Code14:IO write
Code15:createBB()
Code 16:IO read
Code 17: readResponse
Code 18:EventThread run:

源碼: Zookeeper 3.4.6.jar(吐血總結(jié))

一、幾個(gè)重要的類

1) ZookeeperMain: main函數(shù)為入口,由zkCli.sh腳本調(diào)用啟動(dòng)2) ZooKeeper:客戶端入口3) ZooKeeper.SendThread: IO線程4) ZooKeeper.EventThread: 事件處理線程,處理各類消息callback5) ClientCnxn: 客戶端與服務(wù)器端交互的主要類6) ClientCnxnSocketNIO:繼承自ClientCnxnSocket,專門處理IO, 利用JAVANIO7) Watcher: 用于監(jiān)控Znode節(jié)點(diǎn)9) WatcherManager:用來管理Watcher,管理了ZK Client綁定的所有Watcher。

二、JAVA的基礎(chǔ)知識(shí)

1)JAVA多線程 2)JAVANIO: 可參考:http://blog.csdn.net/cnh294141800/article/details/52996819 3)Socket編程(稍微了解即可) 4)JLine: 是一個(gè)用來處理控制臺(tái)輸入的Java類庫

三、大致了解

上圖就是對(duì)Zookeeper源碼一個(gè)最好的解釋,
(1) Client端發(fā)送Request(封裝成Packet)請(qǐng)求到Zookeeper
(2) Zookeeper處理Request并將該請(qǐng)求放入Outgoing Queue(顧名思義,外出隊(duì)列,就是讓Zookeeper服務(wù)器處理的隊(duì)列),
(3) Zookeeper端處理Outgoing Queue,并將該事件移到Pending Queue中
(4) Zookeeper端消費(fèi)Pending Queue,并調(diào)用finishPacket(),生成Event
(5) EventThread線程消費(fèi)Event事件,并且處理Watcher.

四 從入門到放棄的講解
(1)應(yīng)用 提供watch實(shí)例(new MyWatcher(null))

private class MyWatcher implements Watcher { // 默認(rèn)Watcher public void process(WatchedEvent event) {if (getPrintWatches()) {ZooKeeperMain.printMessage("WATCHER::");ZooKeeperMain.printMessage(event.toString());}}}

(2)實(shí)例化zookeeper
? 實(shí)例化socket,默認(rèn)使用ClientCnxnSocketNIO
? 實(shí)例化ClientCnxn
? 實(shí)例化SendThread
? 實(shí)例化EventThread
Code1:ZK

zk = new ZooKeeper(host Integer.parseInt(cl.getOption("timeout")),new MyWatcher(), readOnly); // 初始化ZK

Code2:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)throws IOException{…watchManager.defaultWatcher = watcher; // 設(shè)置defaultWatcher 為 MyWatcherConnectStringParser connectStringParser = new ConnectStringParser(connectString); // 解析-server 獲取 IP以及PORTHostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly); // 創(chuàng)建 ClientCnxn實(shí)例cnxn.start(); // 啟動(dòng)cnxn中的SendThread and EventThread進(jìn)程}

Code3:實(shí)例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)

private static ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}try {return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ clientCnxnSocketName);ioe.initCause(e);throw ioe;}}

Code4:ClientCnxn的具體實(shí)例化
/* 另一個(gè)ClientCnxn構(gòu)造函數(shù), 可見時(shí)sessionId=0

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)throws IOException {this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,clientCnxnSocket, 0, new byte[16], canBeReadOnly); } */ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {this.zooKeeper = zooKeeper; this.watcher = watcher;this.sessionId = sessionId;this.sessionPasswd = sessionPasswd;this.sessionTimeout = sessionTimeout;//主機(jī)列表this.hostProvider = hostProvider;this.chrootPath = chrootPath;//連接超時(shí)connectTimeout = sessionTimeout / hostProvider.size();//讀超時(shí) readTimeout = sessionTimeout * 2 / 3;readOnly = canBeReadOnly;//初始化client2個(gè)核心線程,SendThread是client的IO核心線程,EventThread從SendThread里拿到eventsendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();}

Code5:SendThread的具體實(shí)例化

SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));state = States.CONNECTING; // 將狀態(tài)設(shè)置為連接狀態(tài)(此時(shí)還未連接)this.clientCnxnSocket = clientCnxnSocket;setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true); //設(shè)為守護(hù)線程}

Code6:EventThread的具體實(shí)例化

EventThread() {super(makeThreadName("-EventThread"));setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true);}

至此所有對(duì)象實(shí)例化完成,然后啟動(dòng)SendThread、EventThread進(jìn)程
(3)啟動(dòng)zookeeper
? 啟動(dòng)SendThread
? 連接服務(wù)器
? 產(chǎn)生真正的socket,見ClientCnxnSocketNIO.createSock
? 向select注冊(cè)一個(gè)OP_CONNECT事件并連接服務(wù)器,由于是非阻塞連接,此時(shí)有可能并不會(huì)立即連上,如果連上就會(huì)調(diào)用SendThread.primeConnection初始化連接來注冊(cè)讀寫事件,否則會(huì)在接下來的輪詢select獲取連接事件中處理
? 復(fù)位socket的incomingBuffer
? 連接成功后會(huì)產(chǎn)生一個(gè)connect型的請(qǐng)求發(fā)給服務(wù),用于獲取本次連接的sessionid
? 進(jìn)入循環(huán)等待來自應(yīng)用的請(qǐng)求,如果沒有就根據(jù)時(shí)間來ping 服務(wù)器
? 啟動(dòng)EventThread
開始進(jìn)入無限循環(huán),從隊(duì)列waitingEvents中獲取事件,如果沒有就阻塞等待

Code 7:SendThread核心run流程
可以對(duì)run進(jìn)行抽象看待,流程如下

loop:- try: - - !isConnected() - - - connect() - - doTransport() - catch: - - cleanup() close()

先判斷是否連接,沒有連接則調(diào)用connect方法進(jìn)行連接,有連接則直接使用;然后調(diào)用doTransport方法進(jìn)行通信,若連接過程中出現(xiàn)異常,則調(diào)用cleanup()方法;最后關(guān)閉連接。

public void run() {while (state.isAlive()) { // this != CLOSED && this != AUTH_FAILED; 剛才設(shè)置了首次狀態(tài)為連接狀態(tài)try {//如果還沒連上,則啟動(dòng)連接程序if (!clientCnxnSocket.isConnected()) { //所有的clientCnxnSocket都是clientCnxnSocketDIO實(shí)例//不是首次連接則休息1Sif(!isFirstConnect){ try {Thread.sleep(r.nextInt(1000));} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}// don't re-establish connection if we are closingif (closing || !state.isAlive()) {break;}startConnect();// 啟動(dòng)連接clientCnxnSocket.updateLastSendAndHeard(); //更新Socket最后一次發(fā)送以及聽到消息的時(shí)間}if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {...... }// 下一次超時(shí)時(shí)間to = readTimeout - clientCnxnSocket.getIdleRecv();} else {// 如果還沒連接上 重置當(dāng)前剩余可連接時(shí)間to = connectTimeout - clientCnxnSocket.getIdleRecv();}// 連接超時(shí)if (to <= 0) {}// 判斷是否 需要發(fā)送Ping心跳包if (state.isConnected()) {sendPing();}// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {}// The most important step. Do real IOclientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);} catch (Throwable e) {}}cleanup();...} }

Code 8:startConnect()
// 具體實(shí)際連接部分

private void startConnect() throws IOException {state = States.CONNECTING; //state 狀態(tài)設(shè)置為連接InetSocketAddress addr; if (rwServerAddress != null) {addr = rwServerAddress;rwServerAddress = null;} else {addr = hostProvider.next(1000);}setName(getName().replaceAll("\\(.*\\)","(" + addr.getHostName() + ":" + addr.getPort() + ")"));if (ZooKeeperSaslClient.isEnabled()) {}logStartConnect(addr); //寫連接日志clientCnxnSocket.connect(addr); //連接Socket}

Code 9: clientCnxnSocket.connect

void connect(InetSocketAddress addr) throws IOException {SocketChannel sock = createSock(); // 創(chuàng)建一個(gè)非阻塞空SocketChanneltry {registerAndConnect(sock, addr); //注冊(cè)并且連接sock到辣個(gè)addr } catch (IOException e) {….}initialized = false;/* Reset incomingBuffer*/lenBuffer.clear();incomingBuffer = lenBuffer;} }

Code10:registerAndConnect()

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {sockKey = sock.register(selector, SelectionKey.OP_CONNECT); //將socket注冊(cè)到selector中boolean immediateConnect = sock.connect(addr); //socket連接服務(wù)器if (immediateConnect) {sendThread.primeConnection(); //初始化連接事件} }

Code11:primeConnection()

void primeConnection() IOException {LOG.info("Socket connection established to "+ clientCnxnSocket.getRemoteSocketAddress()+ ", initiating session");isFirstConnect = false; // 設(shè)置為非首次連接long sessId = (seenRwServerBefore) ? sessionId : 0; // 客戶端默認(rèn)sessionid為0// 創(chuàng)建連接request lastZxid 代表最新一次的節(jié)點(diǎn)ZXIDConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd); // 線程安全占用outgoing synchronized (outgoingQueue) {…//組合成通訊層的Packet對(duì)象,添加到發(fā)送隊(duì)列,對(duì)于ConnectRequest其requestHeader為null outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));}//確保讀寫事件都監(jiān)聽,也就是設(shè)置成可讀可寫clientCnxnSocket.enableReadWriteOnly();if (LOG.isDebugEnabled()) {LOG.debug("Session establishment request sent on "+ clientCnxnSocket.getRemoteSocketAddress());}}

至此Channelsocket已經(jīng)成功連接,并且已將連接請(qǐng)求做為隊(duì)列放到Outgoing中。此時(shí),需要再回頭看Code7, 也就是一直在循環(huán)的SendThread部分。可以看到連接部分成功完成,接下來需要做doTransport()。// CnxnClientSocketNio

Code 12:doTransport()

void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {//selectselector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}// Everything below and until we get back to the select is// non blocking, so time is effectively a constant. That is// Why we just have to do this once, hereupdateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());//如果之前連接沒有立馬連上,則在這里處理OP_CONNECT事件if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();sendThread.primeConnection();}} //如果讀寫就位,則處理之else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {doIO(pendingQueue, outgoingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {synchronized(outgoingQueue) {//找到連接Packet并且將他放到隊(duì)列頭if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// 將要Channecl設(shè)置為可讀enableWrite();}}}selected.clear(); }

Code13:findSendablePacket()

private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,boolean clientTunneledAuthenticationInProgress) {synchronized (outgoingQueue) {..// Since client's authentication with server is in progress,// send only the null-header packet queued by primeConnection().// This packet must be sent so that the SASL authentication process// can proceed, but all other packets should wait until// SASL authentication completes.//因?yàn)镃onn Packet需要發(fā)送到SASL authentication進(jìn)行處理,其他Packet都需要等待直到該處理完成,//Conn Packet必須第一個(gè)處理,所以找出它并且把它放到OutgoingQueue頭,也就是requestheader=null的辣個(gè)ListIterator<Packet> iter = outgoingQueue.listIterator();while (iter.hasNext()) {Packet p = iter.next();if (p.requestHeader == null) {// We've found the priming-packet. Move it to the beginning of the queue.iter.remove(); outgoingQueue.add(0, p); // 將連接放到outgogingQueue第一個(gè)return p;} else {// Non-priming packet: defer it until later, leaving it in the queue// until authentication completes.if (LOG.isDebugEnabled()) {LOG.debug("deferring non-priming packet: " + p +"until SASL authentication completes.");}}}// no sendable packet found.return null;} }

然后就是最重要的IO部分:
? 需要處理兩類網(wǎng)絡(luò)事件(讀、寫)

Code14:IO write

if (sockKey.isWritable()) {synchronized(outgoingQueue) {// 獲得packetPacket p = findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress());if (p != null) {updateLastSend();// If we already started writing p, p.bb will already existif (p.bb == null) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != OpCode.ping) &&(p.requestHeader.getType() != OpCode.auth)) {//如果不是 連接事件,不是ping 事件,不是 認(rèn)證時(shí)間 p.requestHeader.setXid(cnxn.getXid());}// 序列化p.createBB();}//將數(shù)據(jù)寫入Channelsock.write(p.bb);// p.bb中如果沒有內(nèi)容 則表示發(fā)送成功if (!p.bb.hasRemaining()) {//發(fā)送數(shù)+1sentCount++;//將該P(yáng)從隊(duì)列中移除outgoingQueue.removeFirstOccurrence(p);//如果該事件不是連接事件,不是ping事件,不是認(rèn)證事件, 則將他加入pending隊(duì)列中if (p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {pendingQueue.add(p);}}}}if (outgoingQueue.isEmpty()) {// No more packets to send: turn off write interest flag.// Will be turned on later by a later call to enableWrite(),// from within ZooKeeperSaslClient (if client is configured// to attempt SASL authentication), or in either doIO() or// in doTransport() if not.disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {// On initial connection, write the complete connect request// packet, but then disable further writes until after// receiving a successful connection response. If the// session is expired, then the server sends the expiration// response and immediately closes its end of the socket. If// the client is simultaneously writing on its end, then the// TCP stack may choose to abort with RST, in which case the// client would never receive the session expired event. See// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.htmldisableWrite();} else {// Just in caseenableWrite();}}}

Code15:createBB()

public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in later// 如果不是連接事件則設(shè)置協(xié)議頭if (requestHeader != null) {requestHeader.serialize(boa, "header");}//設(shè)置協(xié)議體if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();//生成ByteBufferthis.bb = ByteBuffer.wrap(baos.toByteArray());//將bytebuffer的前4個(gè)字節(jié)修改成真正的長(zhǎng)度,總長(zhǎng)度減去一個(gè)int的長(zhǎng)度頭 this.bb.putInt(this.bb.capacity() - 4);//準(zhǔn)備給后續(xù)讀 讓buffer position = 0this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}

Code 16:IO read

if (sockKey.isReadable()) {//先從Channel讀4個(gè)字節(jié),代表頭 int rc = sock.read(incomingBuffer);if (rc < 0) {throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"+ Long.toHexString(sessionId)+ ", likely server has closed socket");}if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount++;readLength();} //初始化else if (!initialized) {readConnectResult(); // 讀取連接結(jié)果enableRead(); // Channel 可讀if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// Since SASL authentication has completed (if client is configured to do so),// outgoing packets waiting in the outgoingQueue can now be sent.enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;} else { // 處理其他請(qǐng)求sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}

還有一個(gè)比較關(guān)鍵的函數(shù)就是readResponse函數(shù),用來消費(fèi)PendingQueue,處理的消息分為三類
? ping 消息 XID=-2
? auth認(rèn)證消息 XID=-4
? 訂閱的消息,即各種變化的通知,比如子節(jié)點(diǎn)變化、節(jié)點(diǎn)內(nèi)容變化,由服務(wù)器推過來的消息 ,獲取到這類消息或通過eventThread.queueEvent將消息推入事件隊(duì)列
XID=-1

Code 17: readResponse

void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}//將事件加入到 event隊(duì)列中eventThread.queueEvent( we );return;}

結(jié)束了IO之后就是對(duì)于事件的消費(fèi),也就是一開始圖示的右半部分也是接近最后部分啦

Code 18:EventThread run:

public void run() {try {isRunning = true;while (true) {// 獲取事件Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {//處理事件processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");}} }

最后就是processEvent了,這個(gè)就不貼代碼了(代碼備注的累死了),寫思路。

ProcessEvent:

processEvent 是 EventThread 處理事件核心函數(shù),核心邏輯如下:

1、如果 event instanceof WatcherSetEventPair ,取出 pair 中的 Watchers ,逐個(gè)調(diào)用 watcher.process(pair.event)
2、否則 event 為 AsyncCallback ,根據(jù) p.response 判斷為哪種響應(yīng)類型,執(zhí)行響應(yīng)的回調(diào) processResult 。

Watcher 和 AsyncCallback 的區(qū)別
Watcher: Watcher 是用于監(jiān)聽節(jié)點(diǎn),session 狀態(tài)的,比如 getData 對(duì)數(shù)據(jù)節(jié)點(diǎn) a 設(shè)置了 watcher ,那么當(dāng) a 的數(shù)據(jù)內(nèi)容發(fā)生改變時(shí),客戶端會(huì)收到 NodeDataChanged 通知,然后進(jìn)行 watcher 的回調(diào)。
AsyncCallback : AsyncCallback 是在以異步方式使用 ZooKeeper API 時(shí),用于處理返回結(jié)果的。例如:getData 同步調(diào)用的版本是: byte[] getData(String path, boolean watch,Stat stat) ,異步調(diào)用的版本是: void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx) ,可以看到,前者是直接返回獲取的結(jié)果,后者是通過 AsyncCallback 回調(diào)處理結(jié)果的。

**接下來就是客戶端發(fā)送指令與負(fù)責(zé)端進(jìn)行交互比如:
Ls、getChildren、getData等**

參考文獻(xiàn):
[1] http://www.cnblogs.com/davidwang456/p/5000927.html
[2] http://www.verydemo.com/demo_c89_i33659.html
[3] http://blog.csdn.net/pwlazy/article/details/8000566
[4] http://www.cnblogs.com/ggjucheng/p/3376548.html
[5] http://zookeeper.apache.org/doc/r3.3.6/api/index.html
[6] http://www.tuicool.com/articles/i6vMVze
[7]http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/

總結(jié)

以上是生活随笔為你收集整理的Zookeeper 客户端源码吐血总结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。