Ⅵ:zookeeper的Watcher事件监听机制
2021最新zookeeper系列
?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ??
Ⅰ:zookeeper的單機安裝 - 詳細教程:https://blog.csdn.net/Kevinnsm/article/details/116134397?spm=1001.2014.3001.5501
Ⅱ:zookeeper的相關shell命令:https://blog.csdn.net/Kevinnsm/article/details/116137602?spm=1001.2014.3001.5501
Ⅲ:zookeeper之查看節點的狀態信息:https://blog.csdn.net/Kevinnsm/article/details/116143218?spm=1001.2014.3001.5501
Ⅳ:zookeeper的acl權限控制:https://blog.csdn.net/Kevinnsm/article/details/116167394?spm=1001.2014.3001.5501
Ⅴ:zookeeper的相關Java Api:https://blog.csdn.net/Kevinnsm/article/details/116462557?spm=1001.2014.3001.5501
Ⅵ:zookeeper的Watcher事件監聽機制:https://blog.csdn.net/Kevinnsm/article/details/116501842?spm=1001.2014.3001.5501
Ⅶ:教你一招利用zookeeper作為服務的配置中心:https://blog.csdn.net/Kevinnsm/article/details/116542974?spm=1001.2014.3001.5501
?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ??
文章目錄
- 前置:--》把握住Watcher流程《--
- 1、watcher的連接狀態判斷
- 2、watcher機制下的exists
- Ⅰ、連接對象的監聽器
- Ⅱ、自定義watcher
- Ⅲ、watcher的多次監聽
- Ⅳ、多個watcher同時監聽一個節點
- 3、watcher機制下的getData
- Ⅰ、連接對象的監聽器
- Ⅱ、自定義watcher監聽器
- Ⅲ、多次watcher監聽
- Ⅳ、多個watcher同時監聽一個節點
- 4、watcher機制下的getChildren
- Ⅰ、連接對象的監視器
- Ⅱ、自定義watcher監聽器
- Ⅲ、多次watcher監聽
- Ⅳ、多個watcher同時監聽一個節點
xshell7連接云服務器演示結果,如果未知請看第一章
前置:–》把握住Watcher流程《–
1、連接zookeeper服務器
2、連接時必須使當前線程等待(等待其他線程創建連接zookeeper服務成功,使用計數器實現)
3、執行回調函數process
4、釋放當前線程
1、watcher的連接狀態判斷
package com.zookeeper.watcher;import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;import java.io.IOException; import java.util.concurrent.CountDownLatch;/*** @author:抱著魚睡覺的喵喵* @date:2021/5/7* @description:*/ public class WatcherConnection implements Watcher { //計數器,使當前線程等待其他線程完成static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;public static void main(String[] args) {try {//連接zookeeper服務zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection());//使當前線程等待其他線程完成(其他線程也就是連接zookeeper服務的線程)countDownLatch.await();Thread.sleep(1000);zooKeeper.close();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}//回調函數,進性狀態的判斷@Overridepublic void process(WatchedEvent watchedEvent) {try {if (watchedEvent.getType() == Event.EventType.None) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("連接成功!");countDownLatch.countDown();} else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {System.out.println("斷開連接");} else if (watchedEvent.getState() == Event.KeeperState.Expired) {System.out.println("超時了");} else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {System.out.println("認證失敗!");}}} catch (Exception e) {e.printStackTrace();}}}2、watcher機制下的exists
Ⅰ、連接對象的監聽器
public class WatcherExistsTest {private String IP = "8.140.37.103:2181";private ZooKeeper zookeeper;@Beforepublic void connection() throws IOException, InterruptedException {//計數器對象,使當前線程等待其他線程的完成final CountDownLatch downLatch = new CountDownLatch(1);zookeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//判斷是否連接成功if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {//使CountDownLatch減到0(初始為1),其他線程可以繼續執行(該處應該是主線程可以繼續執行了)downLatch.countDown();System.out.println("連接成功!");}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});//主線程進入等待態downLatch.await();}@Testpublic void watcherExists() throws KeeperException, InterruptedException {//第一個參數是節點路徑//第二個參數為Boolean類型,true代表監聽path下的節點,false表示不進行監聽zookeeper.exists("/exists", true);Thread.sleep(10000);}@Afterpublic void close() {try {zookeeper.close();} catch (InterruptedException e) {e.printStackTrace();}} }此時在zookeeper客戶端創建/exists節點
IDEA控制臺就會出現NodeCreated在這里插入代碼片
當然還有刪除節點的NodeDeleted等,不再演示
Ⅱ、自定義watcher
public class WatcherExistsTest {private String IP = "8.140.37.103:2181";private ZooKeeper zookeeper;@Beforepublic void connection() throws IOException, InterruptedException {//計數器對象,使當前線程等待其他線程的完成final CountDownLatch downLatch = new CountDownLatch(1);zookeeper = new ZooKeeper(IP, 6000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {//使CountDownLatch減到0(初始為1),其他線程可以繼續執行(該處應該是主線程可以繼續執行了)downLatch.countDown();}}});//主線程進入等待態downLatch.await();}@Testpublic void watcherExists2() throws KeeperException, InterruptedException {zookeeper.exists("/exists2", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("自定義watcher!");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});Thread.sleep(10000);System.out.println("--------------");}@Afterpublic void close() {try {zookeeper.close();} catch (InterruptedException e) {e.printStackTrace();}} }執行@Test注解方法-》客戶端創建/exists2節點-》IDEA控制臺查看結果
當我修改/exists2節點的數據時,控制臺出現了NodeDataChanged
Ⅲ、watcher的多次監聽
本質上只能進性一次注冊,一次監聽;當然可以利用循環調用進行生命周期內的多次監聽
@Testpublic void watcherExists2() throws KeeperException, InterruptedException {zookeeper.exists("/exists2", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {System.out.println("自定義watcher!");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());zookeeper.exists("/exists2", this);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});Thread.sleep(10000);System.out.println("--------------");}
Ⅳ、多個watcher同時監聽一個節點
一般來說這種多個監聽對象才比較符合發布-訂閱模式,當節點中的數據發生變化時,會通知所有的監聽對象。
@Testpublic void watcherExists3() throws KeeperException, InterruptedException {System.out.println("============================");zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象1");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象2");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象3");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(10000);System.out.println("==========================");}
3、watcher機制下的getData
getData(String path, boolean b, Stat stat)連接對象的監聽器
getData(String path, watcher watcher, Stat stat) 自定義的監聽器
Ⅰ、連接對象的監聽器
public class WatcherGetDataTest {static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;final String IP = "8.140.37.103:2181";@Beforepublic void before() throws IOException, InterruptedException {zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("=================");countDownLatch.countDown();}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});countDownLatch.await();}@Testpublic void test() throws KeeperException, InterruptedException {zooKeeper.getData("/data",true, null);Thread.sleep(10000);System.out.println("=======================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}}啟動測試-》修改data節點的數據-》查看idea控制臺結果
Ⅱ、自定義watcher監聽器
@Testpublic void test2() throws KeeperException, InterruptedException {System.out.println("========================");zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}}, null);Thread.sleep(10000);System.out.println("============================");}
Ⅲ、多次watcher監聽
@Testpublic void test3() throws KeeperException, InterruptedException {System.out.println("=========================");Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());zooKeeper.getData("/data", this, null);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}};zooKeeper.getData("/data", watcher, null);Thread.sleep(5000);System.out.println("=======================");}
Ⅳ、多個watcher同時監聽一個節點
@Testpublic void test4() throws KeeperException, InterruptedException {System.out.println("=======================");zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象1");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象2");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象3");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);Thread.sleep(5000);System.out.println("========================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}
4、watcher機制下的getChildren
getChildren(String path, boolean b) //使用連接對象的監視器
getChildren(String path, watcher w) //自定義監視器
子節點的修改不會被監測到
Ⅰ、連接對象的監視器
public class WatcherGetChildrenTest {static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;final String IP = "8.140.37.103:2181";@Beforepublic void before() throws IOException, InterruptedException {zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("=================");countDownLatch.countDown();}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});countDownLatch.await();}@Testpublic void test() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", true);Thread.sleep(5000);}@Afterpublic void after() throws InterruptedException {zooKeeper.close();} }
Ⅱ、自定義watcher監聽器
@Testpublic void test2() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("==================");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(10000);System.out.println("====================");}
Ⅲ、多次watcher監聽
@Testpublic void test3() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("================");if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {try {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());zooKeeper.getChildren("/data", this);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}});Thread.sleep(5000);}
Ⅳ、多個watcher同時監聽一個節點
@Testpublic void test4() throws KeeperException, InterruptedException {System.out.println("==================================");zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象1");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getType());}});zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象2");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象3");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(5000);System.out.println("================================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}
總結
以上是生活随笔為你收集整理的Ⅵ:zookeeper的Watcher事件监听机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ⅴ:zookeeper的相关Java A
- 下一篇: IDEA主题设置(字体颜色背景)