日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Zookeeper-watcher机制源码分析(二)

發(fā)布時間:2024/10/12 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Zookeeper-watcher机制源码分析(二) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

服務端接收請求處理流程

服務端有一個NettyServerCnxn類,用來處理客戶端發(fā)送過來的請求?

NettyServerCnxn?

public void receiveMessage(ChannelBuffer message) {try {while(message.readable() && !throttled) { if (bb != null) { //ByteBuffer不為空if (LOG.isTraceEnabled()) {LOG.trace("message readable " + message.readableBytes()+ " bb len " + bb.remaining() + " " + bb);ByteBuffer dat = bb.duplicate();dat.flip();LOG.trace(Long.toHexString(sessionId)+ " bb 0x"+ ChannelBuffers.hexDump(ChannelBuffers.copiedBuffer(dat)));}//bb剩余空間大于message中可讀字節(jié)大小if (bb.remaining() > message.readableBytes()) {int newLimit = bb.position() + message.readableBytes();bb.limit(newLimit);}// 將message寫入bb中message.readBytes(bb);bb.limit(bb.capacity());if (LOG.isTraceEnabled()) {LOG.trace("after readBytes message readable "+ message.readableBytes()+ " bb len " + bb.remaining() + " " + bb);ByteBuffer dat = bb.duplicate();dat.flip();LOG.trace("after readbytes "+ Long.toHexString(sessionId)+ " bb 0x"+ ChannelBuffers.hexDump(ChannelBuffers.copiedBuffer(dat)));}if (bb.remaining() == 0) { // 已經(jīng)讀完message,表示內(nèi)容已經(jīng)全部接收packetReceived(); // 統(tǒng)計接收信息bb.flip();ZooKeeperServer zks = this.zkServer;if (zks == null || !zks.isRunning()) {//Zookeeper服務器為空 ,說明服務端掛了throw new IOException("ZK down");}if (initialized) {//處理客戶端傳過來的數(shù)據(jù)包zks.processPacket(this, bb);if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {disableRecvNoWait();}} else {LOG.debug("got conn req request from "+ getRemoteSocketAddress());zks.processConnectRequest(this, bb);initialized = true;}bb = null;}} else { //bb為null的情況,大家自己去看,我就不細講了if (LOG.isTraceEnabled()) {LOG.trace("message readable "+ message.readableBytes()+ " bblenrem " + bbLen.remaining());ByteBuffer dat = bbLen.duplicate();dat.flip();LOG.trace(Long.toHexString(sessionId)+ " bbLen 0x"+ ChannelBuffers.hexDump(ChannelBuffers.copiedBuffer(dat)));}if (message.readableBytes() < bbLen.remaining()) {bbLen.limit(bbLen.position() + message.readableBytes());}message.readBytes(bbLen);bbLen.limit(bbLen.capacity());if (bbLen.remaining() == 0) {bbLen.flip();if (LOG.isTraceEnabled()) {LOG.trace(Long.toHexString(sessionId)+ " bbLen 0x"+ ChannelBuffers.hexDump(ChannelBuffers.copiedBuffer(bbLen)));}int len = bbLen.getInt();if (LOG.isTraceEnabled()) {LOG.trace(Long.toHexString(sessionId)+ " bbLen len is " + len);}bbLen.clear();if (!initialized) {if (checkFourLetterWord(channel, message, len)) {return;}}if (len < 0 || len > BinaryInputArchive.maxBuffer) {throw new IOException("Len error " + len);}bb = ByteBuffer.allocate(len);}}}} catch(IOException e) {LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);close();}}

ZookeeperServer-zks.processPacket(this, bb);

處理客戶端傳送過來的數(shù)據(jù)包

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {// We have the request, now process and setup for nextInputStream bais = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);RequestHeader h = new RequestHeader();h.deserialize(bia, "header"); //反序列化客戶端header頭信息// Through the magic of byte buffers, txn will not be// pointing// to the start of the txnincomingBuffer = incomingBuffer.slice();if (h.getType() == OpCode.auth) { //判斷當前操作類型,如果是auth操作,則執(zhí)行下面的代碼LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());AuthPacket authPacket = new AuthPacket();ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);String scheme = authPacket.getScheme();ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);Code authReturn = KeeperException.Code.AUTHFAILED;if(ap != null) {try {authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());} catch(RuntimeException e) {LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);authReturn = KeeperException.Code.AUTHFAILED;}}if (authReturn == KeeperException.Code.OK) {if (LOG.isDebugEnabled()) {LOG.debug("Authentication succeeded for scheme: " + scheme);}LOG.info("auth success " + cnxn.getRemoteSocketAddress());ReplyHeader rh = new ReplyHeader(h.getXid(), 0,KeeperException.Code.OK.intValue());cnxn.sendResponse(rh, null, null);} else {if (ap == null) {LOG.warn("No authentication provider for scheme: "+ scheme + " has "+ ProviderRegistry.listProviders());} else {LOG.warn("Authentication failed for scheme: " + scheme);}// send a response...ReplyHeader rh = new ReplyHeader(h.getXid(), 0,KeeperException.Code.AUTHFAILED.intValue());cnxn.sendResponse(rh, null, null);// ... and close connectioncnxn.sendBuffer(ServerCnxnFactory.closeConn);cnxn.disableRecv();}return;} else { //如果不是授權(quán)操作,再判斷是否為sasl操作if (h.getType() == OpCode.sasl) {Record rsp = processSasl(incomingBuffer,cnxn);ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?return;}else {//最終進入這個代碼塊進行處理//封裝請求對象Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),h.getType(), incomingBuffer, cnxn.getAuthInfo());si.setOwner(ServerCnxn.me);// Always treat packet from the client as a possible// local request.setLocalSessionFlag(si);submitRequest(si); //提交請求}}cnxn.incrOutstandingRequests(h);}

  

submitRequest

負責在服務端提交當前請求

public void submitRequest(Request si) {if (firstProcessor == null) { //processor處理器,request過來以后會經(jīng)歷一系列處理器的處理過程synchronized (this) {try {// Since all requests are passed to the request// processor it should wait for setting up the request// processor chain. The state will be updated to RUNNING// after the setup.while (state == State.INITIAL) {wait(1000);}} catch (InterruptedException e) {LOG.warn("Unexpected interruption", e);}if (firstProcessor == null || state != State.RUNNING) {throw new RuntimeException("Not started");}}}try {touch(si.cnxn);boolean validpacket = Request.isValid(si.type); //判斷是否合法if (validpacket) {firstProcessor.processRequest(si); 調(diào)用firstProcessor發(fā)起請求,而這個firstProcess是一個接口,有多個實現(xiàn)類,具體的調(diào)用鏈是怎么樣的?往下看吧if (si.cnxn != null) {incInProcess();}} else {LOG.warn("Received packet at server of unknown type " + si.type);new UnimplementedRequestProcessor().processRequest(si);}} catch (MissingSessionException e) {if (LOG.isDebugEnabled()) {LOG.debug("Dropping request: " + e.getMessage());}} catch (RequestProcessorException e) {LOG.error("Unable to process request:" + e.getMessage(), e);}}

  

firstProcessor的請求鏈組成

1.firstProcessor的初始化是在ZookeeperServer的setupRequestProcessor中完成的,代碼如下

protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);((SyncRequestProcessor)syncProcessor).start();firstProcessor = new PrepRequestProcessor(this, syncProcessor);//需要注意的是,PrepRequestProcessor中傳遞的是一個syncProcessor((PrepRequestProcessor)firstProcessor).start();}

  

從上面我們可以看到firstProcessor的實例是一個PrepRequestProcessor,而這個構(gòu)造方法中又傳遞了一個Processor構(gòu)成了一個調(diào)用鏈。

RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);

syncProcessor的構(gòu)造方法傳遞的又是一個Processor,對應的是FinalRequestProcessor

2.所以整個調(diào)用鏈是PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor

PredRequestProcessor.processRequest(si);

通過上面了解到調(diào)用鏈關系以后,我們繼續(xù)再看firstProcessor.processRequest(si) 會調(diào)用到PrepRequestProcessor

?

public void processRequest(Request request) {submittedRequests.add(request);}

  

唉,很奇怪,processRequest只是把request添加到submittedRequests中,根據(jù)前面的經(jīng)驗,很自然的想到這里又是一個異步操作。而subittedRequests又是一個阻塞隊列

LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

PrepRequestProcessor這個類又繼承了線程類,因此我們直接找到當前類中的run方法如下

public void run() {try {while (true) { Request request = submittedRequests.take(); //ok,從隊列中拿到請求進行處理long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'P', request, "");}if (Request.requestOfDeath == request) {break;}pRequest(request); //調(diào)用pRequest進行預處理}} catch (RequestProcessorException e) {if (e.getCause() instanceof XidRolloverException) {LOG.info(e.getCause().getMessage());}handleException(this.getName(), e);} catch (Exception e) {handleException(this.getName(), e);}LOG.info("PrepRequestProcessor exited loop!");}

  

pRequest

預處理這塊的代碼太長,就不好貼了。前面的N行代碼都是根據(jù)當前的OP類型進行判斷和做相應的處理,在這個方法中的最后一行中,我們會看到如下代碼

nextProcessor.processRequest(request);

nextProcessor.processRequest(request);

很顯然,nextProcessor對應的應該是SyncRequestProcessor

SyncRequestProcessor. processRequest

public void processRequest(Request request) {// request.addRQRec(">sync");queuedRequests.add(request);}

這個方法的代碼也是一樣,基于異步化的操作,把請求添加到queuedRequets中,那么我們繼續(xù)在當前類找到run方法

public void run() {try {int logCount = 0;// we do this in an attempt to ensure that not all of the servers// in the ensemble take a snapshot at the same timeint randRoll = r.nextInt(snapCount/2);while (true) {Request si = null;//從阻塞隊列中獲取請求if (toFlush.isEmpty()) {si = queuedRequests.take(); } else {si = queuedRequests.poll();if (si == null) {flush(toFlush);continue;}}if (si == requestOfDeath) {break;}if (si != null) {// track the number of records written to the log//下面這塊代碼,粗略看來是觸發(fā)快照操作,啟動一個處理快照的線程if (zks.getZKDatabase().append(si)) {logCount++;if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);// roll the logzks.getZKDatabase().rollLog();// take a snapshotif (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");} else {snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch(Exception e) {LOG.warn("Unexpected exception", e);}}};snapInProcess.start();}logCount = 0;}} else if (toFlush.isEmpty()) {// optimization for read heavy workloads// iff this is a read, and there are no pending// flushes (writes), then just pass this to the next// processorif (nextProcessor != null) {nextProcessor.processRequest(si); //繼續(xù)調(diào)用下一個處理器來處理請求if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}continue;}toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);}}}} catch (Throwable t) {handleException(this.getName(), t);} finally{running = false;}LOG.info("SyncRequestProcessor exited!");}

FinalRequestProcessor.?processRequest

這個方法就是我們在課堂上分析到的方法了,FinalRequestProcessor.processRequest方法并根據(jù)Request對象中的操作更新內(nèi)存中Session信息或者znode數(shù)據(jù)。

這塊代碼有小300多行,就不全部貼出來了,我們直接定位到關鍵代碼,根據(jù)客戶端的OP類型找到如下的代碼

case OpCode.exists: {lastOp = "EXIS";// TODO we need to figure out the security requirement for this!ExistsRequest existsRequest = new ExistsRequest();//反序列化 (將ByteBuffer反序列化成為ExitsRequest.這個就是我們在客戶端發(fā)起請求的時候傳遞過來的Request對象ByteBufferInputStream.byteBuffer2Record(request.request,existsRequest);String path = existsRequest.getPath(); //得到請求的路徑if (path.indexOf('\0') != -1) {throw new KeeperException.BadArgumentsException();}//終于找到一個很關鍵的代碼,判斷請求的getWatch是否存在,如果存在,則傳遞cnxn(servercnxn) //對于exists請求,需要監(jiān)聽data變化事件,添加watcher Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);rsp = new ExistsResponse(stat); //在服務端內(nèi)存數(shù)據(jù)庫中根據(jù)路徑得到結(jié)果進行組裝,設置為ExistsResponsebreak;}

  

statNode這個方法做了什么?

public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {return dataTree.statNode(path, serverCnxn);}

一路向下,在下面這個方法中,講ServerCnxn向上轉(zhuǎn)型為Watcher了。 因為ServerCnxn實現(xiàn)了Watcher接口

public Stat statNode(String path, Watcher watcher)throws KeeperException.NoNodeException {Stat stat = new Stat();DataNode n = nodes.get(path); //獲得節(jié)點數(shù)據(jù)if (watcher != null) { //如果watcher不為空,則講當前的watcher和path進行綁定dataWatches.addWatch(path, watcher);}if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {n.copyStat(stat);return stat;}}

WatchManager.addWatch(path, watcher);

synchronized void addWatch(String path, Watcher watcher) {HashSet<Watcher> list = watchTable.get(path); //判斷watcherTable中是否存在當前路徑對應的watcherif (list == null) { //不存在則主動添加// don't waste memory if there are few watches on a node// rehash when the 4th entry is added, doubling size thereafter// seems like a good compromiselist = new HashSet<Watcher>(4); // 新生成watcher集合watchTable.put(path, list);}list.add(watcher); //添加到watcher表HashSet<String> paths = watch2Paths.get(watcher);if (paths == null) {// cnxns typically have many watches, so use default cap herepaths = new HashSet<String>();watch2Paths.put(watcher, paths); // 設置watcher到節(jié)點路徑的映射}paths.add(path); // 將路徑添加至paths集合}

  

其大致流程如下

①通過傳入的路徑(節(jié)點路徑)從watchTable獲取相應的觀察者集合,進入②

②?判斷①中的觀察者是否為空,若為空,則進入③,否則,進入④

③?新生成觀察者集合,并將路徑路徑和此集合添加至watchTable中,進入④

④將傳輸?shù)挠^察者添加至觀察者集合,即完成了路徑和觀察者添加至watchTable的步驟,進入⑤

⑤通過傳入的觀察者從watch2Paths中獲取相應的路徑集合,進入⑥

⑥?判斷路徑集合是否為空,若為空,則進入⑦,否則,進入⑧

⑦?新生成路徑集合,并將觀察者和路徑添加至watch2Paths中,進入⑧

⑧將傳入的路徑(節(jié)點路徑)添加至路徑集合,即完成了路徑和觀察者添加至watch2Paths步驟的

總結(jié)
調(diào)用關系鏈如下

圖文里的技術(shù)如何學習,有沒有免費資料?

對Java技術(shù),架構(gòu)技術(shù)感興趣的同學,歡迎加QQ群619881427,一起學習,相互討論。

群內(nèi)已經(jīng)有小伙伴將知識體系整理好(源碼,筆記,PPT,學習視頻),歡迎加群免費領取。

分享給喜歡Java的,喜歡編程,有夢想成為架構(gòu)師的程序員們,希望能夠幫助到你們。

不是Java的程序員也沒關系,幫忙轉(zhuǎn)發(fā)給更多朋友!謝謝。

一個分享小技巧點擊閱讀原文也。可以輕松到電子雜志學習資料哦!

轉(zhuǎn)載于:https://www.cnblogs.com/xueSpring/p/9437359.html

總結(jié)

以上是生活随笔為你收集整理的Zookeeper-watcher机制源码分析(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。