日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Zookeeper 客户端源码吐血总结

發(fā)布時間:2024/2/28 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Zookeeper 客户端源码吐血总结 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄
一、幾個重要的類
二、JAVA的基礎知識
三、大致了解
四 從入門到放棄的講解
Code1:ZK
Code2:創(chuàng)建 Zookeeper實例,實例化ClientCnxn,實例化ClientCnxnSocketNIO
Code3:實例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)
Code4:ClientCnxn的具體實例化
Code5:SendThread的具體實例化
Code6:EventThread的具體實例化
Code 7:SendThread核心run流程
Code 8:startConnect()
Code 9: clientCnxnSocket.connect
Code10:registerAndConnect()
Code11:primeConnection()
Code 12:doTransport()
Code13:findSendablePacket()
Code14:IO write
Code15:createBB()
Code 16:IO read
Code 17: readResponse
Code 18:EventThread run:

源碼: Zookeeper 3.4.6.jar(吐血總結)

一、幾個重要的類

1) ZookeeperMain: main函數為入口,由zkCli.sh腳本調用啟動2) ZooKeeper:客戶端入口3) ZooKeeper.SendThread: IO線程4) ZooKeeper.EventThread: 事件處理線程,處理各類消息callback5) ClientCnxn: 客戶端與服務器端交互的主要類6) ClientCnxnSocketNIO:繼承自ClientCnxnSocket,專門處理IO, 利用JAVANIO7) Watcher: 用于監(jiān)控Znode節(jié)點9) WatcherManager:用來管理Watcher,管理了ZK Client綁定的所有Watcher。

二、JAVA的基礎知識

1)JAVA多線程 2)JAVANIO: 可參考:http://blog.csdn.net/cnh294141800/article/details/52996819 3)Socket編程(稍微了解即可) 4)JLine: 是一個用來處理控制臺輸入的Java類庫

三、大致了解

上圖就是對Zookeeper源碼一個最好的解釋,
(1) Client端發(fā)送Request(封裝成Packet)請求到Zookeeper
(2) Zookeeper處理Request并將該請求放入Outgoing Queue(顧名思義,外出隊列,就是讓Zookeeper服務器處理的隊列),
(3) Zookeeper端處理Outgoing Queue,并將該事件移到Pending Queue中
(4) Zookeeper端消費Pending Queue,并調用finishPacket(),生成Event
(5) EventThread線程消費Event事件,并且處理Watcher.

四 從入門到放棄的講解
(1)應用 提供watch實例(new MyWatcher(null))

private class MyWatcher implements Watcher { // 默認Watcher public void process(WatchedEvent event) {if (getPrintWatches()) {ZooKeeperMain.printMessage("WATCHER::");ZooKeeperMain.printMessage(event.toString());}}}

(2)實例化zookeeper
? 實例化socket,默認使用ClientCnxnSocketNIO
? 實例化ClientCnxn
? 實例化SendThread
? 實例化EventThread
Code1:ZK

zk = new ZooKeeper(host Integer.parseInt(cl.getOption("timeout")),new MyWatcher(), readOnly); // 初始化ZK

Code2:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)throws IOException{…watchManager.defaultWatcher = watcher; // 設置defaultWatcher 為 MyWatcherConnectStringParser connectStringParser = new ConnectStringParser(connectString); // 解析-server 獲取 IP以及PORTHostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly); // 創(chuàng)建 ClientCnxn實例cnxn.start(); // 啟動cnxn中的SendThread and EventThread進程}

Code3:實例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)

private static ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}try {return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ clientCnxnSocketName);ioe.initCause(e);throw ioe;}}

Code4:ClientCnxn的具體實例化
/* 另一個ClientCnxn構造函數, 可見時sessionId=0

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)throws IOException {this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,clientCnxnSocket, 0, new byte[16], canBeReadOnly); } */ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {this.zooKeeper = zooKeeper; this.watcher = watcher;this.sessionId = sessionId;this.sessionPasswd = sessionPasswd;this.sessionTimeout = sessionTimeout;//主機列表this.hostProvider = hostProvider;this.chrootPath = chrootPath;//連接超時connectTimeout = sessionTimeout / hostProvider.size();//讀超時 readTimeout = sessionTimeout * 2 / 3;readOnly = canBeReadOnly;//初始化client2個核心線程,SendThread是client的IO核心線程,EventThread從SendThread里拿到eventsendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();}

Code5:SendThread的具體實例化

SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));state = States.CONNECTING; // 將狀態(tài)設置為連接狀態(tài)(此時還未連接)this.clientCnxnSocket = clientCnxnSocket;setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true); //設為守護線程}

Code6:EventThread的具體實例化

EventThread() {super(makeThreadName("-EventThread"));setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true);}

至此所有對象實例化完成,然后啟動SendThread、EventThread進程
(3)啟動zookeeper
? 啟動SendThread
? 連接服務器
? 產生真正的socket,見ClientCnxnSocketNIO.createSock
? 向select注冊一個OP_CONNECT事件并連接服務器,由于是非阻塞連接,此時有可能并不會立即連上,如果連上就會調用SendThread.primeConnection初始化連接來注冊讀寫事件,否則會在接下來的輪詢select獲取連接事件中處理
? 復位socket的incomingBuffer
? 連接成功后會產生一個connect型的請求發(fā)給服務,用于獲取本次連接的sessionid
? 進入循環(huán)等待來自應用的請求,如果沒有就根據時間來ping 服務器
? 啟動EventThread
開始進入無限循環(huán),從隊列waitingEvents中獲取事件,如果沒有就阻塞等待

Code 7:SendThread核心run流程
可以對run進行抽象看待,流程如下

loop:- try: - - !isConnected() - - - connect() - - doTransport() - catch: - - cleanup() close()

先判斷是否連接,沒有連接則調用connect方法進行連接,有連接則直接使用;然后調用doTransport方法進行通信,若連接過程中出現異常,則調用cleanup()方法;最后關閉連接。

public void run() {while (state.isAlive()) { // this != CLOSED && this != AUTH_FAILED; 剛才設置了首次狀態(tài)為連接狀態(tài)try {//如果還沒連上,則啟動連接程序if (!clientCnxnSocket.isConnected()) { //所有的clientCnxnSocket都是clientCnxnSocketDIO實例//不是首次連接則休息1Sif(!isFirstConnect){ try {Thread.sleep(r.nextInt(1000));} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}// don't re-establish connection if we are closingif (closing || !state.isAlive()) {break;}startConnect();// 啟動連接clientCnxnSocket.updateLastSendAndHeard(); //更新Socket最后一次發(fā)送以及聽到消息的時間}if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {...... }// 下一次超時時間to = readTimeout - clientCnxnSocket.getIdleRecv();} else {// 如果還沒連接上 重置當前剩余可連接時間to = connectTimeout - clientCnxnSocket.getIdleRecv();}// 連接超時if (to <= 0) {}// 判斷是否 需要發(fā)送Ping心跳包if (state.isConnected()) {sendPing();}// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {}// The most important step. Do real IOclientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);} catch (Throwable e) {}}cleanup();...} }

Code 8:startConnect()
// 具體實際連接部分

private void startConnect() throws IOException {state = States.CONNECTING; //state 狀態(tài)設置為連接InetSocketAddress addr; if (rwServerAddress != null) {addr = rwServerAddress;rwServerAddress = null;} else {addr = hostProvider.next(1000);}setName(getName().replaceAll("\\(.*\\)","(" + addr.getHostName() + ":" + addr.getPort() + ")"));if (ZooKeeperSaslClient.isEnabled()) {}logStartConnect(addr); //寫連接日志clientCnxnSocket.connect(addr); //連接Socket}

Code 9: clientCnxnSocket.connect

void connect(InetSocketAddress addr) throws IOException {SocketChannel sock = createSock(); // 創(chuàng)建一個非阻塞空SocketChanneltry {registerAndConnect(sock, addr); //注冊并且連接sock到辣個addr } catch (IOException e) {….}initialized = false;/* Reset incomingBuffer*/lenBuffer.clear();incomingBuffer = lenBuffer;} }

Code10:registerAndConnect()

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {sockKey = sock.register(selector, SelectionKey.OP_CONNECT); //將socket注冊到selector中boolean immediateConnect = sock.connect(addr); //socket連接服務器if (immediateConnect) {sendThread.primeConnection(); //初始化連接事件} }

Code11:primeConnection()

void primeConnection() IOException {LOG.info("Socket connection established to "+ clientCnxnSocket.getRemoteSocketAddress()+ ", initiating session");isFirstConnect = false; // 設置為非首次連接long sessId = (seenRwServerBefore) ? sessionId : 0; // 客戶端默認sessionid為0// 創(chuàng)建連接request lastZxid 代表最新一次的節(jié)點ZXIDConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd); // 線程安全占用outgoing synchronized (outgoingQueue) {…//組合成通訊層的Packet對象,添加到發(fā)送隊列,對于ConnectRequest其requestHeader為null outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));}//確保讀寫事件都監(jiān)聽,也就是設置成可讀可寫clientCnxnSocket.enableReadWriteOnly();if (LOG.isDebugEnabled()) {LOG.debug("Session establishment request sent on "+ clientCnxnSocket.getRemoteSocketAddress());}}

至此Channelsocket已經成功連接,并且已將連接請求做為隊列放到Outgoing中。此時,需要再回頭看Code7, 也就是一直在循環(huán)的SendThread部分??梢钥吹竭B接部分成功完成,接下來需要做doTransport()。// CnxnClientSocketNio

Code 12:doTransport()

void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {//selectselector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}// Everything below and until we get back to the select is// non blocking, so time is effectively a constant. That is// Why we just have to do this once, hereupdateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());//如果之前連接沒有立馬連上,則在這里處理OP_CONNECT事件if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();sendThread.primeConnection();}} //如果讀寫就位,則處理之else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {doIO(pendingQueue, outgoingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {synchronized(outgoingQueue) {//找到連接Packet并且將他放到隊列頭if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// 將要Channecl設置為可讀enableWrite();}}}selected.clear(); }

Code13:findSendablePacket()

private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,boolean clientTunneledAuthenticationInProgress) {synchronized (outgoingQueue) {..// Since client's authentication with server is in progress,// send only the null-header packet queued by primeConnection().// This packet must be sent so that the SASL authentication process// can proceed, but all other packets should wait until// SASL authentication completes.//因為Conn Packet需要發(fā)送到SASL authentication進行處理,其他Packet都需要等待直到該處理完成,//Conn Packet必須第一個處理,所以找出它并且把它放到OutgoingQueue頭,也就是requestheader=null的辣個ListIterator<Packet> iter = outgoingQueue.listIterator();while (iter.hasNext()) {Packet p = iter.next();if (p.requestHeader == null) {// We've found the priming-packet. Move it to the beginning of the queue.iter.remove(); outgoingQueue.add(0, p); // 將連接放到outgogingQueue第一個return p;} else {// Non-priming packet: defer it until later, leaving it in the queue// until authentication completes.if (LOG.isDebugEnabled()) {LOG.debug("deferring non-priming packet: " + p +"until SASL authentication completes.");}}}// no sendable packet found.return null;} }

然后就是最重要的IO部分:
? 需要處理兩類網絡事件(讀、寫)

Code14:IO write

if (sockKey.isWritable()) {synchronized(outgoingQueue) {// 獲得packetPacket p = findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress());if (p != null) {updateLastSend();// If we already started writing p, p.bb will already existif (p.bb == null) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != OpCode.ping) &&(p.requestHeader.getType() != OpCode.auth)) {//如果不是 連接事件,不是ping 事件,不是 認證時間 p.requestHeader.setXid(cnxn.getXid());}// 序列化p.createBB();}//將數據寫入Channelsock.write(p.bb);// p.bb中如果沒有內容 則表示發(fā)送成功if (!p.bb.hasRemaining()) {//發(fā)送數+1sentCount++;//將該P從隊列中移除outgoingQueue.removeFirstOccurrence(p);//如果該事件不是連接事件,不是ping事件,不是認證事件, 則將他加入pending隊列中if (p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {pendingQueue.add(p);}}}}if (outgoingQueue.isEmpty()) {// No more packets to send: turn off write interest flag.// Will be turned on later by a later call to enableWrite(),// from within ZooKeeperSaslClient (if client is configured// to attempt SASL authentication), or in either doIO() or// in doTransport() if not.disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {// On initial connection, write the complete connect request// packet, but then disable further writes until after// receiving a successful connection response. If the// session is expired, then the server sends the expiration// response and immediately closes its end of the socket. If// the client is simultaneously writing on its end, then the// TCP stack may choose to abort with RST, in which case the// client would never receive the session expired event. See// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.htmldisableWrite();} else {// Just in caseenableWrite();}}}

Code15:createBB()

public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in later// 如果不是連接事件則設置協議頭if (requestHeader != null) {requestHeader.serialize(boa, "header");}//設置協議體if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();//生成ByteBufferthis.bb = ByteBuffer.wrap(baos.toByteArray());//將bytebuffer的前4個字節(jié)修改成真正的長度,總長度減去一個int的長度頭 this.bb.putInt(this.bb.capacity() - 4);//準備給后續(xù)讀 讓buffer position = 0this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}

Code 16:IO read

if (sockKey.isReadable()) {//先從Channel讀4個字節(jié),代表頭 int rc = sock.read(incomingBuffer);if (rc < 0) {throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"+ Long.toHexString(sessionId)+ ", likely server has closed socket");}if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount++;readLength();} //初始化else if (!initialized) {readConnectResult(); // 讀取連接結果enableRead(); // Channel 可讀if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// Since SASL authentication has completed (if client is configured to do so),// outgoing packets waiting in the outgoingQueue can now be sent.enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;} else { // 處理其他請求sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}

還有一個比較關鍵的函數就是readResponse函數,用來消費PendingQueue,處理的消息分為三類
? ping 消息 XID=-2
? auth認證消息 XID=-4
? 訂閱的消息,即各種變化的通知,比如子節(jié)點變化、節(jié)點內容變化,由服務器推過來的消息 ,獲取到這類消息或通過eventThread.queueEvent將消息推入事件隊列
XID=-1

Code 17: readResponse

void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}//將事件加入到 event隊列中eventThread.queueEvent( we );return;}

結束了IO之后就是對于事件的消費,也就是一開始圖示的右半部分也是接近最后部分啦

Code 18:EventThread run:

public void run() {try {isRunning = true;while (true) {// 獲取事件Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {//處理事件processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");}} }

最后就是processEvent了,這個就不貼代碼了(代碼備注的累死了),寫思路。

ProcessEvent:

processEvent 是 EventThread 處理事件核心函數,核心邏輯如下:

1、如果 event instanceof WatcherSetEventPair ,取出 pair 中的 Watchers ,逐個調用 watcher.process(pair.event)
2、否則 event 為 AsyncCallback ,根據 p.response 判斷為哪種響應類型,執(zhí)行響應的回調 processResult 。

Watcher 和 AsyncCallback 的區(qū)別
Watcher: Watcher 是用于監(jiān)聽節(jié)點,session 狀態(tài)的,比如 getData 對數據節(jié)點 a 設置了 watcher ,那么當 a 的數據內容發(fā)生改變時,客戶端會收到 NodeDataChanged 通知,然后進行 watcher 的回調。
AsyncCallback : AsyncCallback 是在以異步方式使用 ZooKeeper API 時,用于處理返回結果的。例如:getData 同步調用的版本是: byte[] getData(String path, boolean watch,Stat stat) ,異步調用的版本是: void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx) ,可以看到,前者是直接返回獲取的結果,后者是通過 AsyncCallback 回調處理結果的。

**接下來就是客戶端發(fā)送指令與負責端進行交互比如:
Ls、getChildren、getData等**

參考文獻:
[1] http://www.cnblogs.com/davidwang456/p/5000927.html
[2] http://www.verydemo.com/demo_c89_i33659.html
[3] http://blog.csdn.net/pwlazy/article/details/8000566
[4] http://www.cnblogs.com/ggjucheng/p/3376548.html
[5] http://zookeeper.apache.org/doc/r3.3.6/api/index.html
[6] http://www.tuicool.com/articles/i6vMVze
[7]http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/

總結

以上是生活随笔為你收集整理的Zookeeper 客户端源码吐血总结的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。