日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Zookeeper--Watcher机制源码剖析一

發布時間:2023/12/4 编程问答 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Zookeeper--Watcher机制源码剖析一 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Watcher-- 數據變更通知

  • 我們知道Zookeeper提供來分布式數據的訂閱/發布功能,一個典型的發布/訂閱模型系統定義了一種一對多的訂閱關系,能讓多個訂閱者同時監聽某個主題對象,當這個被監聽對象自身狀態發生變化時候,會通知所有訂閱者,Zookeeper中引入了Watcher機制來實現這種分布式通知功能,Zookeeper允許客戶端向服務器節點注冊一個Watcher監聽,當服務器端節點發生指定觸發的事件就會觸發這個Watcher,之后服務端會向指定客戶端發送一個事件通知,這樣來實現一個分布式通知的功能,如下圖所示的一個流程:
  • 上圖中流程,Zookeeper的Watcher機制主要包括客戶端線程,客戶端WatcherManager,和Zookeeper服務器,流程上簡單的說:
    • 客戶端向Zookeeper服務器注冊成功Watcher同時,將Watcher對象存儲在客戶端的WatcherManager
    • 當Zookeeper服務器觸發Watcher事件后,向客戶端發送通知
    • 客戶端線程從WatcherManager中撈出對應的Watcher對象來執行回調邏輯

Watcher接口

  • 在Zookeeper中,接口Watcher表示一個標準的事件處理器,訂閱來通知相關的邏輯,我們可以看他的源碼:
    • EventType:事件類型
    • KeeperState:通知狀態
    • Process(WatchedEvent event):會調方法
  • 其中事件類型和通知狀態是有對應關系,如下表中所示
KeeperStateEventType觸發條件解釋說明
SyncConnectedNone客戶端與服務器成功建立連接客戶端和服務器處于連接狀態
SyncConnectedNodeCreatedWatcher 監聽的對應數據節點成功創建客戶端和服務器處于連接狀態
SyncConnectedNodeDeletedWatcher監聽的數據節點成功刪除客戶端和服務器處于連接狀態
SyncConnectedNodeDataChangedWatcher監聽的數據節點內容變更客戶端和服務器處于連接狀態
SyncConnectedNodeChildrenChangedWatcher監聽的對應數據節點列表發生變更客戶端和服務器處于連接狀態
DisconnectedNone客戶端與Zookeeper服務器斷開連接客戶端和服務器斷開了連接
ExpiredNone會話超時客戶端回話失效,通常同時也會收到SessionExpiredException異常
AuthFailedNone兩種情況:使用錯誤scheme進行權限檢查, SASL權限檢查失敗通常同時收到AuthFailedException異常
Unknown3.1.0后廢棄
NoSYncConnected3.1.0后廢棄
  • 如上列舉了Zookeeper中常見的幾個通知狀態和事件類型,其中針對NodeDateChange事件說明的節點的變更并不一定是內容變化,可能版本號DataVersion變化也是一樣會觸發。

  • 回調方法process 是Watcher接口中的一個回調方法,當Zookeeper服務器端向客戶端發送一個Watcher事件通知的時候,客戶端會對相應的Process方法進行回調,從而實現對事件的處理,Process方法定義如下

void process(WatchedEvent event);
  • 如上參數WatcherEvent包含了一個事件的基本屬性:
public class WatchedEvent {private final KeeperState keeperState; //通知狀態private final EventType eventType; // 事件類型private String path; // 節點路徑/*** Create a WatchedEvent with specified type, state and path*/public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {this.keeperState = keeperState;this.eventType = eventType;this.path = path;} ...... }
  • Zookeeper服務端生成WatchedEvent事件后會調用getWrapper方法將字節包裝成一個可序列化的WatcherEvent,其實這是一個事務,都是對服務端事件的一個封裝,不同的是WatchedEvent是我們邏輯事件中的一個對象,主要用來我們程序內部的事件容器,而WatcherEvent因為實現了序列化的接口,因此可以用于網絡傳輸
  • 在服務端得到WatcherEvent后,通過網絡傳到客戶端,還原成一個WatchedEvent,并傳遞給process,然后process方法根據入參就可以解析完整的服務端事件了。

工作機制

  • Zookeeper的Watcher機制可以有如下三個過程:
    • 客戶端注冊Watcher
    • 服務端處理watcher
    • 客戶端回調Watcher
  • 以下類圖說明各組件之間的關系:
客戶端注冊Watcher
  • 我們通過如下部分源碼來分析Watcher的客戶端注冊,我們創建一個Zookeeper的客戶端對象實例時,可以向構造方法中傳入一個默認的Watcher:
//我們調用的方法 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {this(connectString, sessionTimeout, watcher, false); } //實際上初始化的方法 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, Integer.valueOf(sessionTimeout), watcher});if(clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;this.watchManager = this.defaultWatchManager();this.watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);this.hostProvider = aHostProvider;this.cnxn = this.createConnection(connectStringParser.getChrootPath(), this.hostProvider, sessionTimeout, this, this.watchManager, this.getClientCnxnSocket(), canBeReadOnly);this.cnxn.start();}
  • 如上源碼中我們給定的Watcher對象實際上被保存在客戶端ZKWatcherManager的defaultWatcher中,另外Zookeeper客戶端也可以通過getData,getChildren,exist三個接口來向Zookeeper服務器注冊Watcher,無論哪一種都一樣,我們用getData方法的源碼來分析:
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {PathUtils.validatePath(path); //校驗Path格式正確性ZooKeeper.DataWatchRegistration wcb = null;if(watcher != null) {wcb = new ZooKeeper.DataWatchRegistration(watcher, path);//封裝DataWatchRegistration}String serverPath = this.prependChroot(path);RequestHeader h = new RequestHeader();h.setType(4);GetDataRequest request = new GetDataRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetDataResponse response = new GetDataResponse();ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);if(r.getErr() != 0) {throw KeeperException.create(Code.get(r.getErr()), path);} else {if(stat != null) {DataTree.copyStat(response.getStat(), stat);}return response.getData();}}
  • 如上源碼中參數Path, Watcher對象,getData接口注冊Watcher后,做了兩件事情
    • 先用這兩個參數封裝來一個DataWatchRegistration,其實就是初始化來Zookeeper服務器中的WatchRegistration里面的 watcher,clientPath,這部分用來暫時存儲注冊信息保存節點和Watcher的對應關系
    • 接著會向客戶端請求request進行標記,將其設置為“使用watcher監聽”。
  • 接著繼續往下SubmitRequest方法:
ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb); public ReplyHeader submitRequest(RequestHeader h,Record request,Record response,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();Packet packet = queuePacket(h,r,request,response,null, null,null, null, watchRegistration, watchDeregistration);......return r;}
  • 這個步驟中又一次將ClientCnxn中的WatchRegistration封裝到Packet中,Zookeeper中,Packet可以被看作是一個最小通信協議單元,用于進行客戶端與服務器之間的網絡傳輸,任何需要傳輸的對象都需要包裝成一個Packet對象,接著他被放入發送隊列,如下queuePacket代碼:
public Packet queuePacket(RequestHeader h,ReplyHeader r,Record request,Record response,AsyncCallback cb,String clientPath,String serverPath,Object ctx,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) {Packet packet = null;// Note that we do not generate the Xid for the packet yet. It is// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),// where the packet is actually sent.packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;packet.watchDeregistration = watchDeregistration;// The synchronized block here is for two purpose:// 1. synchronize with the final cleanup() in SendThread.run() to avoid race// 2. synchronized against each packet. So if a closeSession packet is added,// later packet will be notified.synchronized (state) {......outgoingQueue.add(packet);....}
  • 我們繼續追這個outgoingQueue 隊列,可以看到隨后Zookeeper客戶端會向服務器端發送這個請求,同時等待請求的返回,王朝請求發送后,會由客戶端的SendThread線程的readResponse方法負責接受來自服務端的響應,finishPacket方法會從Packet中取出對于的Watcher并注冊到ZKWatcherManager中去。
protected void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}......
  • 如Packet中的Watchregistration就是我們剛才第一步getData中保存的節點對應的Watcher的注冊信息。現在他又從這部分中取出來封裝的Watcher,如下具體的register方法:
public void register(int rc) {if (shouldAddWatch(rc)) {Map<String, Set<Watcher>> watches = getWatches(rc);synchronized (watches) {Set<Watcher> watchers = watches.get(clientPath);if (watchers == null) {watchers = new HashSet<Watcher>();watches.put(clientPath, watchers);}watchers.add(watcher);}}}//getWatchesprotected final ZKWatchManager watchManager;protected Map<String, Set<Watcher>> getWatches(int rc) {return watchManager.dataWatches;}
  • 如上register方法中客戶端將之前暫時保存的Watcher取出來之后,放入到getWatcher獲取到的一個Map對象中,這個Mp對象就是ZkWatcherManager中的一個dataWatches,我們將剛才存入WatchRegistration中的臨時信息取出用來初始化ZKWatchManager.dataWatches,用于將數據節點的路徑和watcher對象進行一一映射,這樣就完成來客戶端Watcher的注冊,整個Watcher流程如下

  • 如上流程中我們每次調用getData都會注冊一個Watcher,如果這些Watcher都隨著請求發送到服務器的話肯定會內存緊張,現實是這樣的碼,我們可以看之前代碼中負責傳輸數據的對象Packet中,我們將WatchRegistration封裝進去,如下Packet中的序列化方法createBB:

public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");//封裝requestHeader}if (request instanceof ConnectRequest) {//封裝requestrequest.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Unexpected exception", e);}}
  • 如上源碼中可以看到并沒有整個對象完全序列化進去,zookeeper只是將requestHeader和request兩個屬性進行序列化,WatchRegistration并沒有被序列化到底層字節數組中,所以不會進行網絡傳輸

服務端處理Watcher

  • 上面講解了客戶端注冊Watcher的過程,并且已經了解了最終客戶端不會將Watcher對象真正床底到服務器,那么,服務端是怎么樣完成客戶端的Watcher注冊,一下我們對這部分文件進行解析。
ServerCnxn存儲
  • 我們先看下服務器接收Watcher并將其存儲起來的過程,如下Zookeeper服務端處理Watcher序列圖:
  • 我們先從源頭分析客戶端給了服務器那些信息,如下Zookeeper類中getData方法的源碼:
...... RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ......
  • 如上RequestHeader中type類型設置的 4 ,request中給定了節點path路徑,以及一個boolean類型的watcher標識是否天劍監聽。服務端收到來自客戶端的請求后,在FinalRequestProcessor.processRequest()中會判斷當前請求的類型type來做一個策略來決定處理不同類型的請求,如下源碼:
switch (request.type) { ......case OpCode.getData: {......Stat stat = new Stat();byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);//為tru則會傳遞一個 ServerCnxn cnxn = request.cnxn;對象到實際的注冊方法中,否則給nullrsp = new GetDataResponse(b, stat);break;} ......}
  • 如上,從getData請求的處理邏輯可以看到當getDataRequest.getwatch為true的時候,Zookeeper就認為當前客戶端請求需要進行Watcher注冊,于是將當前的ServerCnxn對象和數據節點路徑傳入getData方法
  • ServerCnxn是一個Zookeeper客戶端和服務器之間的鏈接接口,代表了一個客戶端和服務器的鏈接,ServerCnxn接口默認實現是NIOServerCNxn,同時3.4.0版本開始引入了Netty實現:NettyServerCnxn,都實現了Watcher接口并且實現process接口,所有把他看成一個Watcher對象,如下ServerCnxn對象以及兩種process實現
public abstract class ServerCnxn implements Stats, Watcher { ......public abstract void process(WatchedEvent event);...... }

  • 繼續追getData源碼,getZkDataBase獲取到的ZKDatabase 對象,其中DataTree 對象是現在Zookeeper現有的節點數據的樹形存儲,我們可以通過path來從這獲取到對應節點信息,如下獲取DataNode,初始化節點狀態,將DataNode天驕到WatchManager 對象中的WatchTable和watch2Paths中
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null); //如下getData實現 public byte[] getData(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {n.copyStat(stat);if (watcher != null) {dataWatches.addWatch(path, watcher);}return n.data;}}
  • Watchmanager是Zookeeper服務端Watcher的管理者,內部管理的WatcherTable和Watch2Paths,所以一個節點存儲了兩次,不過是從如下兩個未存存儲
    • watchTable是從數據節點路徑的粒度來托管Watcher
    • watch2Paths是從Watcher的粒度來空值時間觸發需要出發的數據節點。
*/ public class WatchManager {private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);private final HashMap<String, HashSet<Watcher>> watchTable =new HashMap<String, HashSet<Watcher>>();private final HashMap<Watcher, HashSet<String>> watch2Paths =new HashMap<Watcher, HashSet<String>>();......}
  • WatcherManager數據結構如下
WatcherManager
- watchTable: HashMap<String, HashSet>(); + watch2Paths :new HashMap<Watcher, HashSet>();
+ addwatch(String ,Watcher): void + removeWatcher(Watcher): void + triggerWatch(String, EventType):Set +Trigger

上一篇Zookeeper–ZAB與Paxos算法聯系與區別
下一篇Zookeeper–Watcher機制源碼剖析二

總結

以上是生活随笔為你收集整理的Zookeeper--Watcher机制源码剖析一的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。