Zookeeper 客户端源码吐血总结
目錄
一、幾個(gè)重要的類
二、JAVA的基礎(chǔ)知識(shí)
三、大致了解
四 從入門到放棄的講解
Code1:ZK
Code2:創(chuàng)建 Zookeeper實(shí)例,實(shí)例化ClientCnxn,實(shí)例化ClientCnxnSocketNIO
Code3:實(shí)例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)
Code4:ClientCnxn的具體實(shí)例化
Code5:SendThread的具體實(shí)例化
Code6:EventThread的具體實(shí)例化
Code 7:SendThread核心run流程
Code 8:startConnect()
Code 9: clientCnxnSocket.connect
Code10:registerAndConnect()
Code11:primeConnection()
Code 12:doTransport()
Code13:findSendablePacket()
Code14:IO write
Code15:createBB()
Code 16:IO read
Code 17: readResponse
Code 18:EventThread run:
源碼: Zookeeper 3.4.6.jar(吐血總結(jié))
一、幾個(gè)重要的類
1) ZookeeperMain: main函數(shù)為入口,由zkCli.sh腳本調(diào)用啟動(dòng)2) ZooKeeper:客戶端入口3) ZooKeeper.SendThread: IO線程4) ZooKeeper.EventThread: 事件處理線程,處理各類消息callback5) ClientCnxn: 客戶端與服務(wù)器端交互的主要類6) ClientCnxnSocketNIO:繼承自ClientCnxnSocket,專門處理IO, 利用JAVANIO7) Watcher: 用于監(jiān)控Znode節(jié)點(diǎn)9) WatcherManager:用來管理Watcher,管理了ZK Client綁定的所有Watcher。二、JAVA的基礎(chǔ)知識(shí)
1)JAVA多線程 2)JAVANIO: 可參考:http://blog.csdn.net/cnh294141800/article/details/52996819 3)Socket編程(稍微了解即可) 4)JLine: 是一個(gè)用來處理控制臺(tái)輸入的Java類庫三、大致了解
上圖就是對(duì)Zookeeper源碼一個(gè)最好的解釋,
(1) Client端發(fā)送Request(封裝成Packet)請(qǐng)求到Zookeeper
(2) Zookeeper處理Request并將該請(qǐng)求放入Outgoing Queue(顧名思義,外出隊(duì)列,就是讓Zookeeper服務(wù)器處理的隊(duì)列),
(3) Zookeeper端處理Outgoing Queue,并將該事件移到Pending Queue中
(4) Zookeeper端消費(fèi)Pending Queue,并調(diào)用finishPacket(),生成Event
(5) EventThread線程消費(fèi)Event事件,并且處理Watcher.
四 從入門到放棄的講解
(1)應(yīng)用 提供watch實(shí)例(new MyWatcher(null))
(2)實(shí)例化zookeeper
? 實(shí)例化socket,默認(rèn)使用ClientCnxnSocketNIO
? 實(shí)例化ClientCnxn
? 實(shí)例化SendThread
? 實(shí)例化EventThread
Code1:ZK
Code2:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly)throws IOException{…watchManager.defaultWatcher = watcher; // 設(shè)置defaultWatcher 為 MyWatcherConnectStringParser connectStringParser = new ConnectStringParser(connectString); // 解析-server 獲取 IP以及PORTHostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly); // 創(chuàng)建 ClientCnxn實(shí)例cnxn.start(); // 啟動(dòng)cnxn中的SendThread and EventThread進(jìn)程}Code3:實(shí)例化ClientCnxnSocketNIO (which extends ClientCnxnSocket)
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}try {return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ clientCnxnSocketName);ioe.initCause(e);throw ioe;}}Code4:ClientCnxn的具體實(shí)例化
/* 另一個(gè)ClientCnxn構(gòu)造函數(shù), 可見時(shí)sessionId=0
Code5:SendThread的具體實(shí)例化
SendThread(ClientCnxnSocket clientCnxnSocket) {super(makeThreadName("-SendThread()"));state = States.CONNECTING; // 將狀態(tài)設(shè)置為連接狀態(tài)(此時(shí)還未連接)this.clientCnxnSocket = clientCnxnSocket;setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true); //設(shè)為守護(hù)線程}Code6:EventThread的具體實(shí)例化
EventThread() {super(makeThreadName("-EventThread"));setUncaughtExceptionHandler(uncaughtExceptionHandler);setDaemon(true);}至此所有對(duì)象實(shí)例化完成,然后啟動(dòng)SendThread、EventThread進(jìn)程
(3)啟動(dòng)zookeeper
? 啟動(dòng)SendThread
? 連接服務(wù)器
? 產(chǎn)生真正的socket,見ClientCnxnSocketNIO.createSock
? 向select注冊(cè)一個(gè)OP_CONNECT事件并連接服務(wù)器,由于是非阻塞連接,此時(shí)有可能并不會(huì)立即連上,如果連上就會(huì)調(diào)用SendThread.primeConnection初始化連接來注冊(cè)讀寫事件,否則會(huì)在接下來的輪詢select獲取連接事件中處理
? 復(fù)位socket的incomingBuffer
? 連接成功后會(huì)產(chǎn)生一個(gè)connect型的請(qǐng)求發(fā)給服務(wù),用于獲取本次連接的sessionid
? 進(jìn)入循環(huán)等待來自應(yīng)用的請(qǐng)求,如果沒有就根據(jù)時(shí)間來ping 服務(wù)器
? 啟動(dòng)EventThread
開始進(jìn)入無限循環(huán),從隊(duì)列waitingEvents中獲取事件,如果沒有就阻塞等待
Code 7:SendThread核心run流程
可以對(duì)run進(jìn)行抽象看待,流程如下
先判斷是否連接,沒有連接則調(diào)用connect方法進(jìn)行連接,有連接則直接使用;然后調(diào)用doTransport方法進(jìn)行通信,若連接過程中出現(xiàn)異常,則調(diào)用cleanup()方法;最后關(guān)閉連接。
public void run() {while (state.isAlive()) { // this != CLOSED && this != AUTH_FAILED; 剛才設(shè)置了首次狀態(tài)為連接狀態(tài)try {//如果還沒連上,則啟動(dòng)連接程序if (!clientCnxnSocket.isConnected()) { //所有的clientCnxnSocket都是clientCnxnSocketDIO實(shí)例//不是首次連接則休息1Sif(!isFirstConnect){ try {Thread.sleep(r.nextInt(1000));} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}// don't re-establish connection if we are closingif (closing || !state.isAlive()) {break;}startConnect();// 啟動(dòng)連接clientCnxnSocket.updateLastSendAndHeard(); //更新Socket最后一次發(fā)送以及聽到消息的時(shí)間}if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {...... }// 下一次超時(shí)時(shí)間to = readTimeout - clientCnxnSocket.getIdleRecv();} else {// 如果還沒連接上 重置當(dāng)前剩余可連接時(shí)間to = connectTimeout - clientCnxnSocket.getIdleRecv();}// 連接超時(shí)if (to <= 0) {}// 判斷是否 需要發(fā)送Ping心跳包if (state.isConnected()) {sendPing();}// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {}// The most important step. Do real IOclientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);} catch (Throwable e) {}}cleanup();...} }Code 8:startConnect()
// 具體實(shí)際連接部分
Code 9: clientCnxnSocket.connect
void connect(InetSocketAddress addr) throws IOException {SocketChannel sock = createSock(); // 創(chuàng)建一個(gè)非阻塞空SocketChanneltry {registerAndConnect(sock, addr); //注冊(cè)并且連接sock到辣個(gè)addr } catch (IOException e) {….}initialized = false;/* Reset incomingBuffer*/lenBuffer.clear();incomingBuffer = lenBuffer;} }Code10:registerAndConnect()
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {sockKey = sock.register(selector, SelectionKey.OP_CONNECT); //將socket注冊(cè)到selector中boolean immediateConnect = sock.connect(addr); //socket連接服務(wù)器if (immediateConnect) {sendThread.primeConnection(); //初始化連接事件} }Code11:primeConnection()
void primeConnection() IOException {LOG.info("Socket connection established to "+ clientCnxnSocket.getRemoteSocketAddress()+ ", initiating session");isFirstConnect = false; // 設(shè)置為非首次連接long sessId = (seenRwServerBefore) ? sessionId : 0; // 客戶端默認(rèn)sessionid為0// 創(chuàng)建連接request lastZxid 代表最新一次的節(jié)點(diǎn)ZXIDConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd); // 線程安全占用outgoing synchronized (outgoingQueue) {…//組合成通訊層的Packet對(duì)象,添加到發(fā)送隊(duì)列,對(duì)于ConnectRequest其requestHeader為null outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));}//確保讀寫事件都監(jiān)聽,也就是設(shè)置成可讀可寫clientCnxnSocket.enableReadWriteOnly();if (LOG.isDebugEnabled()) {LOG.debug("Session establishment request sent on "+ clientCnxnSocket.getRemoteSocketAddress());}}至此Channelsocket已經(jīng)成功連接,并且已將連接請(qǐng)求做為隊(duì)列放到Outgoing中。此時(shí),需要再回頭看Code7, 也就是一直在循環(huán)的SendThread部分。可以看到連接部分成功完成,接下來需要做doTransport()。// CnxnClientSocketNio
Code 12:doTransport()
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {//selectselector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}// Everything below and until we get back to the select is// non blocking, so time is effectively a constant. That is// Why we just have to do this once, hereupdateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());//如果之前連接沒有立馬連上,則在這里處理OP_CONNECT事件if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();sendThread.primeConnection();}} //如果讀寫就位,則處理之else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {doIO(pendingQueue, outgoingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {synchronized(outgoingQueue) {//找到連接Packet并且將他放到隊(duì)列頭if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// 將要Channecl設(shè)置為可讀enableWrite();}}}selected.clear(); }Code13:findSendablePacket()
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,boolean clientTunneledAuthenticationInProgress) {synchronized (outgoingQueue) {..// Since client's authentication with server is in progress,// send only the null-header packet queued by primeConnection().// This packet must be sent so that the SASL authentication process// can proceed, but all other packets should wait until// SASL authentication completes.//因?yàn)镃onn Packet需要發(fā)送到SASL authentication進(jìn)行處理,其他Packet都需要等待直到該處理完成,//Conn Packet必須第一個(gè)處理,所以找出它并且把它放到OutgoingQueue頭,也就是requestheader=null的辣個(gè)ListIterator<Packet> iter = outgoingQueue.listIterator();while (iter.hasNext()) {Packet p = iter.next();if (p.requestHeader == null) {// We've found the priming-packet. Move it to the beginning of the queue.iter.remove(); outgoingQueue.add(0, p); // 將連接放到outgogingQueue第一個(gè)return p;} else {// Non-priming packet: defer it until later, leaving it in the queue// until authentication completes.if (LOG.isDebugEnabled()) {LOG.debug("deferring non-priming packet: " + p +"until SASL authentication completes.");}}}// no sendable packet found.return null;} }然后就是最重要的IO部分:
? 需要處理兩類網(wǎng)絡(luò)事件(讀、寫)
Code14:IO write
if (sockKey.isWritable()) {synchronized(outgoingQueue) {// 獲得packetPacket p = findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress());if (p != null) {updateLastSend();// If we already started writing p, p.bb will already existif (p.bb == null) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != OpCode.ping) &&(p.requestHeader.getType() != OpCode.auth)) {//如果不是 連接事件,不是ping 事件,不是 認(rèn)證時(shí)間 p.requestHeader.setXid(cnxn.getXid());}// 序列化p.createBB();}//將數(shù)據(jù)寫入Channelsock.write(p.bb);// p.bb中如果沒有內(nèi)容 則表示發(fā)送成功if (!p.bb.hasRemaining()) {//發(fā)送數(shù)+1sentCount++;//將該P(yáng)從隊(duì)列中移除outgoingQueue.removeFirstOccurrence(p);//如果該事件不是連接事件,不是ping事件,不是認(rèn)證事件, 則將他加入pending隊(duì)列中if (p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {pendingQueue.add(p);}}}}if (outgoingQueue.isEmpty()) {// No more packets to send: turn off write interest flag.// Will be turned on later by a later call to enableWrite(),// from within ZooKeeperSaslClient (if client is configured// to attempt SASL authentication), or in either doIO() or// in doTransport() if not.disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {// On initial connection, write the complete connect request// packet, but then disable further writes until after// receiving a successful connection response. If the// session is expired, then the server sends the expiration// response and immediately closes its end of the socket. If// the client is simultaneously writing on its end, then the// TCP stack may choose to abort with RST, in which case the// client would never receive the session expired event. See// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.htmldisableWrite();} else {// Just in caseenableWrite();}}}Code15:createBB()
public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in later// 如果不是連接事件則設(shè)置協(xié)議頭if (requestHeader != null) {requestHeader.serialize(boa, "header");}//設(shè)置協(xié)議體if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();//生成ByteBufferthis.bb = ByteBuffer.wrap(baos.toByteArray());//將bytebuffer的前4個(gè)字節(jié)修改成真正的長(zhǎng)度,總長(zhǎng)度減去一個(gè)int的長(zhǎng)度頭 this.bb.putInt(this.bb.capacity() - 4);//準(zhǔn)備給后續(xù)讀 讓buffer position = 0this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}Code 16:IO read
if (sockKey.isReadable()) {//先從Channel讀4個(gè)字節(jié),代表頭 int rc = sock.read(incomingBuffer);if (rc < 0) {throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"+ Long.toHexString(sessionId)+ ", likely server has closed socket");}if (!incomingBuffer.hasRemaining()) {incomingBuffer.flip();if (incomingBuffer == lenBuffer) {recvCount++;readLength();} //初始化else if (!initialized) {readConnectResult(); // 讀取連接結(jié)果enableRead(); // Channel 可讀if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {// Since SASL authentication has completed (if client is configured to do so),// outgoing packets waiting in the outgoingQueue can now be sent.enableWrite();}lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();initialized = true;} else { // 處理其他請(qǐng)求sendThread.readResponse(incomingBuffer);lenBuffer.clear();incomingBuffer = lenBuffer;updateLastHeard();}}}還有一個(gè)比較關(guān)鍵的函數(shù)就是readResponse函數(shù),用來消費(fèi)PendingQueue,處理的消息分為三類
? ping 消息 XID=-2
? auth認(rèn)證消息 XID=-4
? 訂閱的消息,即各種變化的通知,比如子節(jié)點(diǎn)變化、節(jié)點(diǎn)內(nèi)容變化,由服務(wù)器推過來的消息 ,獲取到這類消息或通過eventThread.queueEvent將消息推入事件隊(duì)列
XID=-1
Code 17: readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}//將事件加入到 event隊(duì)列中eventThread.queueEvent( we );return;}結(jié)束了IO之后就是對(duì)于事件的消費(fèi),也就是一開始圖示的右半部分也是接近最后部分啦
Code 18:EventThread run:
public void run() {try {isRunning = true;while (true) {// 獲取事件Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {//處理事件processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");}} }最后就是processEvent了,這個(gè)就不貼代碼了(代碼備注的累死了),寫思路。
ProcessEvent:
processEvent 是 EventThread 處理事件核心函數(shù),核心邏輯如下:
1、如果 event instanceof WatcherSetEventPair ,取出 pair 中的 Watchers ,逐個(gè)調(diào)用 watcher.process(pair.event)
2、否則 event 為 AsyncCallback ,根據(jù) p.response 判斷為哪種響應(yīng)類型,執(zhí)行響應(yīng)的回調(diào) processResult 。
Watcher 和 AsyncCallback 的區(qū)別
Watcher: Watcher 是用于監(jiān)聽節(jié)點(diǎn),session 狀態(tài)的,比如 getData 對(duì)數(shù)據(jù)節(jié)點(diǎn) a 設(shè)置了 watcher ,那么當(dāng) a 的數(shù)據(jù)內(nèi)容發(fā)生改變時(shí),客戶端會(huì)收到 NodeDataChanged 通知,然后進(jìn)行 watcher 的回調(diào)。
AsyncCallback : AsyncCallback 是在以異步方式使用 ZooKeeper API 時(shí),用于處理返回結(jié)果的。例如:getData 同步調(diào)用的版本是: byte[] getData(String path, boolean watch,Stat stat) ,異步調(diào)用的版本是: void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx) ,可以看到,前者是直接返回獲取的結(jié)果,后者是通過 AsyncCallback 回調(diào)處理結(jié)果的。
**接下來就是客戶端發(fā)送指令與負(fù)責(zé)端進(jìn)行交互比如:
Ls、getChildren、getData等**
參考文獻(xiàn):
[1] http://www.cnblogs.com/davidwang456/p/5000927.html
[2] http://www.verydemo.com/demo_c89_i33659.html
[3] http://blog.csdn.net/pwlazy/article/details/8000566
[4] http://www.cnblogs.com/ggjucheng/p/3376548.html
[5] http://zookeeper.apache.org/doc/r3.3.6/api/index.html
[6] http://www.tuicool.com/articles/i6vMVze
[7]http://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/
總結(jié)
以上是生活随笔為你收集整理的Zookeeper 客户端源码吐血总结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Zookeeper选举算法( FastL
- 下一篇: 进程与线程的超级简单形象解释