日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Ⅵ:zookeeper的Watcher事件监听机制

發布時間:2024/10/5 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Ⅵ:zookeeper的Watcher事件监听机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2021最新zookeeper系列

?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ??

Ⅰ:zookeeper的單機安裝 - 詳細教程:https://blog.csdn.net/Kevinnsm/article/details/116134397?spm=1001.2014.3001.5501

Ⅱ:zookeeper的相關shell命令:https://blog.csdn.net/Kevinnsm/article/details/116137602?spm=1001.2014.3001.5501

Ⅲ:zookeeper之查看節點的狀態信息:https://blog.csdn.net/Kevinnsm/article/details/116143218?spm=1001.2014.3001.5501

Ⅳ:zookeeper的acl權限控制:https://blog.csdn.net/Kevinnsm/article/details/116167394?spm=1001.2014.3001.5501

Ⅴ:zookeeper的相關Java Api:https://blog.csdn.net/Kevinnsm/article/details/116462557?spm=1001.2014.3001.5501

Ⅵ:zookeeper的Watcher事件監聽機制:https://blog.csdn.net/Kevinnsm/article/details/116501842?spm=1001.2014.3001.5501

Ⅶ:教你一招利用zookeeper作為服務的配置中心:https://blog.csdn.net/Kevinnsm/article/details/116542974?spm=1001.2014.3001.5501

?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ??

文章目錄

  • 前置:--》把握住Watcher流程《--
  • 1、watcher的連接狀態判斷
  • 2、watcher機制下的exists
    • Ⅰ、連接對象的監聽器
    • Ⅱ、自定義watcher
    • Ⅲ、watcher的多次監聽
    • Ⅳ、多個watcher同時監聽一個節點
  • 3、watcher機制下的getData
    • Ⅰ、連接對象的監聽器
    • Ⅱ、自定義watcher監聽器
    • Ⅲ、多次watcher監聽
    • Ⅳ、多個watcher同時監聽一個節點
  • 4、watcher機制下的getChildren
    • Ⅰ、連接對象的監視器
    • Ⅱ、自定義watcher監聽器
    • Ⅲ、多次watcher監聽
    • Ⅳ、多個watcher同時監聽一個節點


xshell7連接云服務器演示結果,如果未知請看第一章

前置:–》把握住Watcher流程《–

1、連接zookeeper服務器
2、連接時必須使當前線程等待(等待其他線程創建連接zookeeper服務成功,使用計數器實現)
3、執行回調函數process
4、釋放當前線程

1、watcher的連接狀態判斷

package com.zookeeper.watcher;import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper;import java.io.IOException; import java.util.concurrent.CountDownLatch;/*** @author:抱著魚睡覺的喵喵* @date:2021/5/7* @description:*/ public class WatcherConnection implements Watcher { //計數器,使當前線程等待其他線程完成static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;public static void main(String[] args) {try {//連接zookeeper服務zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection());//使當前線程等待其他線程完成(其他線程也就是連接zookeeper服務的線程)countDownLatch.await();Thread.sleep(1000);zooKeeper.close();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}//回調函數,進性狀態的判斷@Overridepublic void process(WatchedEvent watchedEvent) {try {if (watchedEvent.getType() == Event.EventType.None) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("連接成功!");countDownLatch.countDown();} else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {System.out.println("斷開連接");} else if (watchedEvent.getState() == Event.KeeperState.Expired) {System.out.println("超時了");} else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {System.out.println("認證失敗!");}}} catch (Exception e) {e.printStackTrace();}}}

2、watcher機制下的exists

Ⅰ、連接對象的監聽器

public class WatcherExistsTest {private String IP = "8.140.37.103:2181";private ZooKeeper zookeeper;@Beforepublic void connection() throws IOException, InterruptedException {//計數器對象,使當前線程等待其他線程的完成final CountDownLatch downLatch = new CountDownLatch(1);zookeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//判斷是否連接成功if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {//使CountDownLatch減到0(初始為1),其他線程可以繼續執行(該處應該是主線程可以繼續執行了)downLatch.countDown();System.out.println("連接成功!");}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});//主線程進入等待態downLatch.await();}@Testpublic void watcherExists() throws KeeperException, InterruptedException {//第一個參數是節點路徑//第二個參數為Boolean類型,true代表監聽path下的節點,false表示不進行監聽zookeeper.exists("/exists", true);Thread.sleep(10000);}@Afterpublic void close() {try {zookeeper.close();} catch (InterruptedException e) {e.printStackTrace();}} }

此時在zookeeper客戶端創建/exists節點

IDEA控制臺就會出現NodeCreated在這里插入代碼片

當然還有刪除節點的NodeDeleted等,不再演示

Ⅱ、自定義watcher

public class WatcherExistsTest {private String IP = "8.140.37.103:2181";private ZooKeeper zookeeper;@Beforepublic void connection() throws IOException, InterruptedException {//計數器對象,使當前線程等待其他線程的完成final CountDownLatch downLatch = new CountDownLatch(1);zookeeper = new ZooKeeper(IP, 6000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {//使CountDownLatch減到0(初始為1),其他線程可以繼續執行(該處應該是主線程可以繼續執行了)downLatch.countDown();}}});//主線程進入等待態downLatch.await();}@Testpublic void watcherExists2() throws KeeperException, InterruptedException {zookeeper.exists("/exists2", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("自定義watcher!");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});Thread.sleep(10000);System.out.println("--------------");}@Afterpublic void close() {try {zookeeper.close();} catch (InterruptedException e) {e.printStackTrace();}} }

執行@Test注解方法-》客戶端創建/exists2節點-》IDEA控制臺查看結果

當我修改/exists2節點的數據時,控制臺出現了NodeDataChanged

Ⅲ、watcher的多次監聽

本質上只能進性一次注冊,一次監聽;當然可以利用循環調用進行生命周期內的多次監聽

@Testpublic void watcherExists2() throws KeeperException, InterruptedException {zookeeper.exists("/exists2", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {System.out.println("自定義watcher!");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());zookeeper.exists("/exists2", this);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});Thread.sleep(10000);System.out.println("--------------");}


Ⅳ、多個watcher同時監聽一個節點

一般來說這種多個監聽對象才比較符合發布-訂閱模式,當節點中的數據發生變化時,會通知所有的監聽對象。

@Testpublic void watcherExists3() throws KeeperException, InterruptedException {System.out.println("============================");zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象1");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象2");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zookeeper.exists("/exists3", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象3");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(10000);System.out.println("==========================");}



3、watcher機制下的getData

getData(String path, boolean b, Stat stat)連接對象的監聽器
getData(String path, watcher watcher, Stat stat) 自定義的監聽器

Ⅰ、連接對象的監聽器

public class WatcherGetDataTest {static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;final String IP = "8.140.37.103:2181";@Beforepublic void before() throws IOException, InterruptedException {zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("=================");countDownLatch.countDown();}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});countDownLatch.await();}@Testpublic void test() throws KeeperException, InterruptedException {zooKeeper.getData("/data",true, null);Thread.sleep(10000);System.out.println("=======================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}}

啟動測試-》修改data節點的數據-》查看idea控制臺結果

Ⅱ、自定義watcher監聽器

@Testpublic void test2() throws KeeperException, InterruptedException {System.out.println("========================");zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}}, null);Thread.sleep(10000);System.out.println("============================");}


Ⅲ、多次watcher監聽

@Testpublic void test3() throws KeeperException, InterruptedException {System.out.println("=========================");Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());zooKeeper.getData("/data", this, null);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}};zooKeeper.getData("/data", watcher, null);Thread.sleep(5000);System.out.println("=======================");}


Ⅳ、多個watcher同時監聽一個節點

@Testpublic void test4() throws KeeperException, InterruptedException {System.out.println("=======================");zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象1");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象2");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);zooKeeper.getData("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象3");System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}}, null);Thread.sleep(5000);System.out.println("========================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}


4、watcher機制下的getChildren

getChildren(String path, boolean b) //使用連接對象的監視器
getChildren(String path, watcher w) //自定義監視器
子節點的修改不會被監測到

Ⅰ、連接對象的監視器

public class WatcherGetChildrenTest {static CountDownLatch countDownLatch = new CountDownLatch(1);static ZooKeeper zooKeeper;final String IP = "8.140.37.103:2181";@Beforepublic void before() throws IOException, InterruptedException {zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {System.out.println("=================");countDownLatch.countDown();}System.out.println(watchedEvent.getPath());System.out.println(watchedEvent.getType());}});countDownLatch.await();}@Testpublic void test() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", true);Thread.sleep(5000);}@Afterpublic void after() throws InterruptedException {zooKeeper.close();} }


Ⅱ、自定義watcher監聽器

@Testpublic void test2() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("==================");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(10000);System.out.println("====================");}


Ⅲ、多次watcher監聽

@Testpublic void test3() throws KeeperException, InterruptedException {zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("================");if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {try {System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());zooKeeper.getChildren("/data", this);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}});Thread.sleep(5000);}


Ⅳ、多個watcher同時監聽一個節點

@Testpublic void test4() throws KeeperException, InterruptedException {System.out.println("==================================");zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象1");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getType());}});zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象2");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});zooKeeper.getChildren("/data", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("監聽對象3");System.out.println(watchedEvent.getType());System.out.println(watchedEvent.getPath());}});Thread.sleep(5000);System.out.println("================================");}@Afterpublic void after() throws InterruptedException {zooKeeper.close();}


總結

以上是生活随笔為你收集整理的Ⅵ:zookeeper的Watcher事件监听机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。