Apache ZooKeeper - Watch 机制的底层原理
文章目錄
- Watch 機(jī)制
- API 使用
- Watch 機(jī)制的底層原理
- 客戶(hù)端 Watch 注冊(cè)實(shí)現(xiàn)過(guò)程 ZKWatchManager
- 服務(wù)端 Watch 注冊(cè)實(shí)現(xiàn)過(guò)程 WatchManager
- 服務(wù)端 Watch 事件的觸發(fā)過(guò)程
- 客戶(hù)端回調(diào)的處理過(guò)程
- 小結(jié)
- 實(shí)現(xiàn)一個(gè)分布式的發(fā)布訂閱功能
Watch 機(jī)制
ZooKeeper 又一關(guān)鍵技術(shù)——Watch 監(jiān)控機(jī)制 。
API 使用
ZooKeeper 的客戶(hù)端可以通過(guò) Watch 機(jī)制來(lái)訂閱當(dāng)服務(wù)器上某一節(jié)點(diǎn)的數(shù)據(jù)或狀態(tài)發(fā)生變化時(shí)收到相應(yīng)的通知,我們可以通過(guò)向 ZooKeeper 客戶(hù)端的構(gòu)造方法中傳遞 Watcher 參數(shù)的方式實(shí)現(xiàn)
new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)-
connectString 服務(wù)端地址
-
sessionTimeout:超時(shí)時(shí)間
-
Watcher:監(jiān)控事件
Watcher 將作為整個(gè) ZooKeeper 會(huì)話(huà)期間的上下文 ,一直被保存在客戶(hù)端 ZKWatchManager 的 defaultWatcher 中 .
除此之外,ZooKeeper 客戶(hù)端也可以通過(guò) getData、exists 和 getChildren 三個(gè)接口來(lái)向 ZooKeeper 服務(wù)器注冊(cè) Watcher,從而方便地在不同的情況下添加 Watch 事件
getData(String path, Watcher watcher, Stat stat)知道了 ZooKeeper 添加服務(wù)器監(jiān)控事件的方式,下面我們來(lái)講解一下觸發(fā)通知的條件
上圖中列出了客戶(hù)端在不同會(huì)話(huà)狀態(tài)下,相應(yīng)的在服務(wù)器節(jié)點(diǎn)所能支持的事件類(lèi)型。
例如在客戶(hù)端連接服務(wù)端的時(shí)候,可以對(duì)數(shù)據(jù)節(jié)點(diǎn)的創(chuàng)建、刪除、數(shù)據(jù)變更、子節(jié)點(diǎn)的更新等操作進(jìn)行監(jiān)控。
Watch 機(jī)制的底層原理
其結(jié)構(gòu)很像設(shè)計(jì)模式中的”觀察者模式“,一個(gè)對(duì)象或者數(shù)據(jù)節(jié)點(diǎn)可能會(huì)被多個(gè)客戶(hù)端監(jiān)控,當(dāng)對(duì)應(yīng)事件被觸發(fā)時(shí),會(huì)通知這些對(duì)象或客戶(hù)端。
我們可以將 Watch 機(jī)制理解為是分布式環(huán)境下的觀察者模式。
所以接下來(lái)就以觀察者模式的角度點(diǎn)來(lái)看看 ZooKeeper 底層 Watch 是如何實(shí)現(xiàn)的。
通常我們?cè)趯?shí)現(xiàn)觀察者模式時(shí),最核心或者說(shuō)關(guān)鍵的代碼就是創(chuàng)建一個(gè)列表來(lái)存放觀察者。
而在 ZooKeeper 中則是在客戶(hù)端和服務(wù)器端分別實(shí)現(xiàn)兩個(gè)存放觀察者列表,即:ZKWatchManager 和 WatchManager。
其核心操作就是圍繞著這兩個(gè)展開(kāi)的。
客戶(hù)端 Watch 注冊(cè)實(shí)現(xiàn)過(guò)程 ZKWatchManager
在發(fā)送一個(gè) Watch 監(jiān)控事件的會(huì)話(huà)請(qǐng)求時(shí),ZooKeeper 客戶(hù)端主要做了兩個(gè)工作:
1. 標(biāo)記該會(huì)話(huà)是一個(gè)帶有 Watch 事件的請(qǐng)求
2. 將 Watch 事件存儲(chǔ)到 ZKWatchManager
我們以 getData 接口為例子
當(dāng)發(fā)送一個(gè)帶有 Watch 事件的請(qǐng)求時(shí),客戶(hù)端首先會(huì)把該會(huì)話(huà)標(biāo)記為帶有 Watch 監(jiān)控的事件請(qǐng)求,之后通過(guò) DataWatchRegistration 類(lèi)來(lái)保存 watcher 事件和節(jié)點(diǎn)的對(duì)應(yīng)關(guān)系:
public byte[] getData(final String path, Watcher watcher, Stat stat){...WatchRegistration wcb = null;if (watcher != null) {wcb = new DataWatchRegistration(watcher, clientPath);}RequestHeader h = new RequestHeader();request.setWatch(watcher != null);...GetDataResponse response = new GetDataResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);}之后客戶(hù)端向服務(wù)器發(fā)送請(qǐng)求時(shí),是將請(qǐng)求封裝成一個(gè) Packet 對(duì)象,并添加到一個(gè)等待發(fā)送隊(duì)列 outgoingQueue 中:
public Packet queuePacket(RequestHeader h, ReplyHeader r,...) {Packet packet = null;...packet = new Packet(h, r, request, response, watchRegistration);...outgoingQueue.add(packet); ...return packet;}最后,ZooKeeper 客戶(hù)端就會(huì)向服務(wù)器端發(fā)送這個(gè)請(qǐng)求,完成請(qǐng)求發(fā)送后。調(diào)用負(fù)責(zé)處理服務(wù)器響應(yīng)的 SendThread 線程類(lèi)中的 readResponse 方法接收服務(wù)端的回調(diào),并在最后執(zhí)行 finishPacket()方法將 Watch 注冊(cè)到 ZKWatchManager 中:
private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}...}服務(wù)端 Watch 注冊(cè)實(shí)現(xiàn)過(guò)程 WatchManager
下面我們來(lái)看一下服務(wù)端是如何處理一個(gè) Watch 事件。
Zookeeper 服務(wù)端處理 Watch 事件基本有 2 個(gè)過(guò)程:
1. 解析收到的請(qǐng)求是否帶有 Watch 注冊(cè)事件
2. 將對(duì)應(yīng)的 Watch 事件存儲(chǔ)到 WatchManager
當(dāng) ZooKeeper 服務(wù)器接收到一個(gè)客戶(hù)端請(qǐng)求后,首先會(huì)對(duì)請(qǐng)求進(jìn)行解析,判斷該請(qǐng)求是否包含 Watch 事件.
ZooKeeper 底層是通過(guò) FinalRequestProcessor 類(lèi)中的 processRequest 函數(shù)實(shí)現(xiàn)的。當(dāng) getDataRequest.getWatch() 值為 True 時(shí),表明該請(qǐng)求需要進(jìn)行 Watch 監(jiān)控注冊(cè)。并通過(guò) zks.getZKDatabase().getData 函數(shù)將 Watch 事件注冊(cè)到服務(wù)端的 WatchManager 中
public void processRequest(Request request) {...byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);..}服務(wù)端 Watch 事件的觸發(fā)過(guò)程
在客戶(hù)端和服務(wù)端都對(duì) watch 注冊(cè)完成后,我們接下來(lái)看一下在 ZooKeeper 中觸發(fā)一個(gè) Watch 事件的底層實(shí)現(xiàn)過(guò)程:
以 setData 接口即“節(jié)點(diǎn)數(shù)據(jù)內(nèi)容發(fā)生變更”事件為例。在 DataTree#setData 方法內(nèi)部執(zhí)行完對(duì)節(jié)點(diǎn)數(shù)據(jù)的變更后,會(huì)調(diào)用 WatchManager.triggerWatch 方法觸發(fā)數(shù)據(jù)變更事件。
public Stat setData(String path, byte data[], ...){Stat s = new Stat();DataNode n = nodes.get(path);...dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}那看下 triggerWatch
首先,封裝了一個(gè)具有會(huì)話(huà)狀態(tài)、事件類(lèi)型、數(shù)據(jù)節(jié)點(diǎn) 3 種屬性的 WatchedEvent 對(duì)象。之后查詢(xún)?cè)摴?jié)點(diǎn)注冊(cè)的 Watch 事件,如果為空說(shuō)明該節(jié)點(diǎn)沒(méi)有注冊(cè)過(guò) Watch 事件。如果存在 Watch 事件則添加到定義的 Wathcers 集合中,并在 WatchManager 管理中刪除。最后,通過(guò)調(diào)用 process 方法向客戶(hù)端發(fā)送通知。
Set<Watcher> triggerWatch(String path, EventType type...) {WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);Set<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);...for (Watcher w : watchers) {Set<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}w.process(e);}return watchers;}客戶(hù)端回調(diào)的處理過(guò)程
知道了服務(wù)器端 Watch 事件的觸發(fā)過(guò)程后,我們來(lái)看一下客戶(hù)端接收到通知后如何進(jìn)行操作的。
客戶(hù)端使用 SendThread.readResponse() 方法來(lái)統(tǒng)一處理服務(wù)端的相應(yīng)。
首先反序列化服務(wù)器發(fā)送請(qǐng)求頭信息 replyHdr.deserialize(bbia, “header”),并判斷相屬性字段 xid 的值為 -1,表示該請(qǐng)求響應(yīng)為通知類(lèi)型。在處理通知類(lèi)型時(shí),首先將己收到的字節(jié)流反序列化轉(zhuǎn)換成 WatcherEvent 對(duì)象。
接著判斷客戶(hù)端是否配置了 chrootPath 屬性,如果為 True 說(shuō)明客戶(hù)端配置了 chrootPath 屬性。需要對(duì)接收到的節(jié)點(diǎn)路徑進(jìn)行 chrootPath 處理。
最后調(diào)用 eventThread.queueEvent( )方法將接收到的事件交給 EventThread 線程進(jìn)行處理
if (replyHdr.getXid() == -1) {...WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");...if (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");...event.setPath(serverPath.substring(chrootPath.length()));...}WatchedEvent we = new WatchedEvent(event);...eventThread.queueEvent( we );}接下來(lái)我們來(lái)看一下 EventThread.queueEvent() 方法內(nèi)部的執(zhí)行邏輯。
其主要工作分為 2 點(diǎn):
第 1 步按照通知的事件類(lèi)型,從 ZKWatchManager 中查詢(xún)注冊(cè)過(guò)的客戶(hù)端 Watch 信息。客戶(hù)端在查詢(xún)到對(duì)應(yīng)的 Watch 信息后,會(huì)將其從 ZKWatchManager 的管理中刪除。因此這里也請(qǐng)你多注意,客戶(hù)端的 Watcher 機(jī)制是一次性的,觸發(fā)后就會(huì)被刪除。
完成了第 1 步工作獲取到對(duì)應(yīng)的 Watcher 信息后,將查詢(xún)到的 Watcher 存儲(chǔ)到 waitingEvents 隊(duì)列中,調(diào)用 EventThread 類(lèi)中的 run 方法會(huì)循環(huán)取出在 waitingEvents 隊(duì)列中等待的 Watcher 事件進(jìn)行處理。
public void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}...}最后調(diào)用 processEvent(event) 方法來(lái)最終執(zhí)行實(shí)現(xiàn)了 Watcher 接口的 process()方法。
private void processEvent(Object event) {...if (event instanceof WatcherSetEventPair) {WatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {try {watcher.process(pair.event);} catch (Throwable t) {LOG.error("Error while calling watcher ", t);}}}}小結(jié)
ZooKeeper 中 Watch 機(jī)制的,大體上ZooKeeper 實(shí)現(xiàn)的方式是通過(guò)客服端和服務(wù)端分別創(chuàng)建有觀察者的信息列表。客戶(hù)端調(diào)用 getData、exist 等接口時(shí),首先將對(duì)應(yīng)的 Watch 事件放到本地的 ZKWatchManager 中進(jìn)行管理。服務(wù)端在接收到客戶(hù)端的請(qǐng)求后根據(jù)請(qǐng)求類(lèi)型判斷是否含有 Watch 事件,并將對(duì)應(yīng)事件放到 WatchManager 中進(jìn)行管理。
在事件觸發(fā)的時(shí)候服務(wù)端通過(guò)節(jié)點(diǎn)的路徑信息查詢(xún)相應(yīng)的 Watch 事件通知給客戶(hù)端,客戶(hù)端在接收到通知后,首先查詢(xún)本地的 ZKWatchManager 獲得對(duì)應(yīng)的 Watch 信息處理回調(diào)操作。
這種設(shè)計(jì)不但實(shí)現(xiàn)了一個(gè)分布式環(huán)境下的觀察者模式,而且通過(guò)將客戶(hù)端和服務(wù)端各自處理 Watch 事件所需要的額外信息分別保存在兩端,減少彼此通信的內(nèi)容,提升了服務(wù)的處理性能。
實(shí)現(xiàn)一個(gè)分布式的發(fā)布訂閱功能
來(lái)搞個(gè)實(shí)際應(yīng)用來(lái)加深我們對(duì) ZooKeeper 中 Watch 機(jī)制的理解。
提到 ZooKeeper 的應(yīng)用場(chǎng)景,可能第一時(shí)間會(huì)想到最為典型的發(fā)布訂閱功能。
發(fā)布訂閱功能可以看作是一個(gè)一對(duì)多的關(guān)系,即一個(gè)服務(wù)或數(shù)據(jù)的發(fā)布者可以被多個(gè)不同的消費(fèi)者調(diào)用。一般一個(gè)發(fā)布訂閱模式的數(shù)據(jù)交互可以分為消費(fèi)者主動(dòng)請(qǐng)求生產(chǎn)者信息的拉取模式,和生產(chǎn)者數(shù)據(jù)變更時(shí)主動(dòng)推送給消費(fèi)者的推送模式。ZooKeeper 采用了兩種模式結(jié)合的方式實(shí)現(xiàn)訂閱發(fā)布功能。
下面我們來(lái)分析一個(gè)具體案例:
在系統(tǒng)開(kāi)發(fā)的過(guò)程中會(huì)用到各種各樣的配置信息,如數(shù)據(jù)庫(kù)配置項(xiàng)、第三方接口、服務(wù)地址等,我們可以用配置管理功能自動(dòng)完成服務(wù)器配置信息的維護(hù),利用ZooKeeper 的發(fā)布訂閱功能就能解決這個(gè)問(wèn)題。
可以把諸如數(shù)據(jù)庫(kù)配置項(xiàng)這樣的信息存儲(chǔ)在 ZooKeeper 數(shù)據(jù)節(jié)點(diǎn)中。比如下圖中的 /confs/data_item1。
服務(wù)器集群客戶(hù)端對(duì)該節(jié)點(diǎn)添加 Watch 事件監(jiān)控,當(dāng)集群中的服務(wù)啟動(dòng)時(shí),會(huì)讀取該節(jié)點(diǎn)數(shù)據(jù)獲取數(shù)據(jù)配置信息。而當(dāng)該節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化時(shí),ZooKeeper 服務(wù)器會(huì)發(fā)送 Watch 事件給各個(gè)客戶(hù)端,集群中的客戶(hù)端在接收到該通知后,重新讀取節(jié)點(diǎn)的數(shù)據(jù)庫(kù)配置信息。
我們使用 Watch 機(jī)制實(shí)現(xiàn)了一個(gè)分布式環(huán)境下的配置管理功能,通過(guò)對(duì) ZooKeeper 服務(wù)器節(jié)點(diǎn)添加數(shù)據(jù)變更事件,實(shí)現(xiàn)當(dāng)數(shù)據(jù)庫(kù)配置項(xiàng)信息變更后,集群中的各個(gè)客戶(hù)端能接收到該變更事件的通知,并獲取最新的配置信息。要注意一點(diǎn)是,我們提到 Watch 具有一次性,所以當(dāng)我們獲得服務(wù)器通知后要再次添加 Watch 事件。
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專(zhuān)家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的Apache ZooKeeper - Watch 机制的底层原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Apache ZooKeeper - Z
- 下一篇: Apache ZooKeeper - 使