日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)

發布時間:2024/9/27 编程问答 57 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

9.1 基本使用

org.apache.zookeeper.Zookeeper是客戶端入口主類,負責建立與server的會話

它提供以下幾類主要方法? :

功能

描述

create

在本地目錄樹中創建一個節點

delete

刪除一個節點

exists

測試本地是否存在目標節點

get/set data

從目標節點上讀取?/?寫數據

get/set ACL

獲取?/?設置目標節點訪問控制列表信息

get children

檢索一個子節點上的列表

sync

等待要被傳送的數據

?

?

?

?

?

?

?

?

添加pom的maven依賴:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>zookeeper-demo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.1</version></dependency></dependencies></project>

表 1 : ZooKeeper API 描述

9.2 增刪改查znode數據

package cn.com.toto.zk;

?

import java.io.IOException;

?

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

?

public class SimpleDemo {

??? //回話超時時間,設置為與系統默認時間一致

??? private static final int SESSION_TIMEOUT = 30000;

??? //創建ZooKeeper實例

??? ZooKeeper zk;

??? //創建Watcher實例

??? Watcher wh = new Watcher() {

???

?????? @Override

?????? public void process(WatchedEvent event) {

?????????? System.out.println(event.toString());

?????? }

??? };

???

??? //初始化ZooKeeper實例

??? private void createZKInstance() throws IOException {

?????? zk = new ZooKeeper("hadoop:2181,hadoop2:2181,hadoop3:2181",SimpleDemo.SESSION_TIMEOUT,this.wh);

??? }

???

??? private void ZKOperations() throws KeeperException, InterruptedException {

?????? System.out.println("/n1. 創建 ZooKeeper 節點 (znode zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent");

?????? zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

?????? System.out.println("/n2.查看是否創建成功:");

?????? System.out.println(new String(zk.getData("/zoo2", false, null)));

?????? System.out.println("/n3.修改節點數據");

?????? zk.setData("/zoo2", "toto".getBytes(), -1);

?????? System.out.println("/n4.查看是否修改成功:");

?????? System.out.println(new String(zk.getData("/zoo2", false, null)));

?????? System.out.println("/n5.刪除節點");

?????? zk.delete("/zoo2", -1);

?????? System.out.println("/n6.查看節點是否被刪除:");

?????? System.out.println("節點狀態:[" + zk.exists("/zoo2", false) + "]");

??? }

???

??? private void ZKClose() throws InterruptedException {

?????? zk.close();

??? }

???

??? public static void main(String[] args) throws KeeperException, InterruptedException, IOException {

?????? SimpleDemo dm = new SimpleDemo();

?????? dm.createZKInstance();

?????? dm.ZKOperations();

?????? dm.ZKClose();

??? }

}

運行結果:

/n1. 創建 ZooKeeper 節點 (znode : zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent

一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection

信息: Socket connection established to hadoop3/192.168.106.82:2181, initiating session

一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread onConnected

信息: Session establishment complete on server hadoop3/192.168.106.82:2181, sessionid = 0x35983c177d00007, negotiated timeout = 30000

WatchedEvent state:SyncConnected type:None path:null

/n2.查看是否創建成功:

myData2

/n3.修改節點數據

/n4.查看是否修改成功:

toto

/n5.刪除節點

/n6.查看節點是否被刪除:

節點狀態:[null]

?

9.3 監聽znode

Zookeeper的監聽器工作機制

?

監聽器是一個接口,我們的代碼中可以實現Wather這個接口,實現其中的process方法,方法中即我們自己的業務邏輯

?

?

?

?

監聽器的注冊是在獲取數據的操作中實現:

getData(path,watch?)監聽的事件是:節點數據變化事件

getChildren(path,watch?)監聽的事件是:節點下的子節點增減變化事件

?

9.4其它案例

所需jar包:

?

圖1 項目包結構

package cn.com.toto.zk;

?

import java.util.List;

import java.util.concurrent.CountDownLatch;

?

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

import org.junit.Before;

import org.junit.Test;

?

public class SimpleZkClient {

???

??? private static final String connectString = "192.168.106.80:2181,192.168.106.81:2181,192.168.106.82:2181";

??? private static final int sessionTimeout = 2000;

???

??? // latch就相當于一個對象鎖,當latch.await()方法執行時,方法所在的線程會等待

??? //latchcount減為0時,將會喚醒等待的線程

??? CountDownLatch latch = new CountDownLatch(1);

??? ZooKeeper zkClient = null;

???

??? @Before

??? public void init() throws Exception {

?????? zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

??????????

?????????? //事件監聽回調方法

?????????? @Override

?????????? public void process(WatchedEvent event) {

????????????? if (latch.getCount() > 0 && event.getState() == KeeperState.SyncConnected) {

????????????????? System.out.println("countdown");

????????????????? latch.countDown();

????????????? }

?????????????

????????????? //收到事件通知后的回調函數(應該是我們自己的事件處理邏輯)

????????????? System.out.println(event.getType() + "---" + event.getPath());

????????????? System.out.println(event.getState());

?????????? }

?????? });

?????? latch.await();

??? }

???

??? //創建數據節點到zk

??? @Test

??? public void testCreate() throws KeeperException, InterruptedException {

?????? //參數1:要創建的節點的路徑? 參數2:節點大數據參數3:節點的權限? 參數4:節點的類型

?????? String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

??? ??? //上傳的數據可以是任何類型,但都要轉成byte

?????? zkClient.close();

??? }

???

??? //判斷znode是否存在

??? @Test

??? public void testExist() throws KeeperException, InterruptedException {

?????? Stat stat = zkClient.exists("/eclipse", false);

?????? System.out.println(stat == null ? "not exist" : "exist");

??? }

???

??? //獲取znode下的孩子節點

??? @Test

??? public void getChildren() throws KeeperException, InterruptedException {

?????? ?List<String> children = zkClient.getChildren("/", true);

?????? ?for(String child : children) {

?????????? ?System.out.println(child);

?????? ?}

?????? ?Thread.sleep(Long.MAX_VALUE);

??? }

?

??? //獲取參數

??? @Test

??? public void getData() throws KeeperException, InterruptedException {

?????? byte[] data = zkClient.getData("/eclipse", true, null);

?????? System.out.println(new String(data));

?????? Thread.sleep(Long.MAX_VALUE);

??? }

???

??? //刪除znode

??? @Test

??? public void deleteZnode() throws InterruptedException, KeeperException {

??? ??? //參數2:指定要刪除的版本,-1表示刪除所有版本

?????? zkClient.delete("/eclipse", -1);

??? }

???

??? //設置參數

??? @Test

??? public void setData() throws Exception {

?????? //要注意,這里的/zookeeper 要在zookeeper中的節點中有

?????? zkClient.setData("/zookeeper", "imissyou angelababy".getBytes(), -1);

?

?????? byte[] data = zkClient.getData("/zookeeper", false, null);

?????? System.out.println(new String(data));

??? }

}

案例二

package cn.com.toto.zk;

?

import java.util.List;

import java.util.concurrent.CountDownLatch;

?

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

?

import com.sun.org.apache.bcel.internal.generic.NEW;

?

public class TestZKclient {

??? static ZooKeeper zk = null;

???

??? public static void main(String[] args) throws Exception {

??? ??? final CountDownLatch countDownLatch = new CountDownLatch(1);

??? ??? zk = new ZooKeeper("hadoop:2181",2000,new Watcher() {

??????????

?????????? @Override

?????????? public void process(WatchedEvent event) {

????????????? if (event.getState() == KeeperState.SyncConnected) {

????????????????? countDownLatch.countDown();

????????????? }

????????????? System.out.println(event.getPath());

????????????? System.out.println(event.getType());

????????????? try {

????????????????? zk.getChildren("/zookeeper", true);

????????????? } catch (Exception e) {

????????????????? e.printStackTrace();

????????????? }

?????????? }

?????? });

??? ???

??? ??? countDownLatch.await();

??? ???

??? ??? /**

??? ??? zk.create("/myboys", "丑陋型".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

??? ??? zk.close();

??? ??? **/

??? ???

??? ??? /**

??? ??? byte[] data = zk.getData("/myboys", true, null);

??? ??? System.out.println(new String(data,"UTF-8"));

??? ??? Thread.sleep(Long.MAX_VALUE);

??? ??? **/

??? ???

??? ??? /**

??? ??? zk.create("/myboys/wangkai", "測試型".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

??? ??? zk.close();

??? ??? **/

??? ???

??? ??? /**

??? ??? List<String> children = zk.getChildren("/myboys", true);

??? ??? for(String child : children) {

??? ?????? System.out.println(child);

??? ??? }

??? ??? **/

??? ???

??? ??? /**zk.delete("/myboys/wangkai", -1);**/

??? ???

??? ??? /**zk.setData("/myboys", "fasdfasdf".getBytes(), -1);**/

??? ??? /**

??? ??? byte[] data = zk.getData("/myboys", true, null);

??? ??? System.out.println(new String(data,"UTF-8"));

??? ??? **/

??? ???

??? ??? Stat stat = zk.exists("/mywives", true);

??? ??? System.out.println(stat == null ? "確實不存在" : "存在");

??? ??? zk.close();

??? }

}

?

9.5 其它網絡參考資料

準備工作

拷貝ZooKeeper安裝目錄下的zookeeper.x.x.x.jar文件到項目的classpath路徑下.

創建連接和回調接口

首先需要創建ZooKeeper對象, 后續的一切操作都是基于該對象進行的.

1.? ZooKeeper(String?connectString,?int?sessionTimeout,?Watcher?watcher)?throws?IOException??

以下為各個參數的詳細說明:

·???????? connectString. zookeeper server列表, 以逗號隔開. ZooKeeper對象初始化后, 將從server列表中選擇一個server, 并嘗試與其建立連接. 如果連接建立失敗, 則會從列表的剩余項中選擇一個server, 并再次嘗試建立連接.

·???????? sessionTimeout. 指定連接的超時時間.

·???????? watcher. 事件回調接口.

注意, 創建ZooKeeper對象時, 只要對象完成初始化便立刻返回. 建立連接是以異步的形式進行的, 當連接成功建立后, 會回調watcherprocess方法. 如果想要同步建立與server的連接, 需要自己進一步封裝.

1.? public?class?ZKConnection?{??

2.? ????/**?

3.? ?????*?server列表,?以逗號分割?

4.? ?????*/??

5.? ????protected?String?hosts?=?"localhost:4180,localhost:4181,localhost:4182";??

6.? ????/**?

7.? ?????*?連接的超時時間,?毫秒?

8.? ?????*/??

9.? ????private?static?final?int?SESSION_TIMEOUT?=?5000;??

10. ????private?CountDownLatch?connectedSignal?=?new?CountDownLatch(1);??

11. ????protected?ZooKeeper?zk;??

12. ??

13. ????/**?

14. ?????*?連接zookeeper?server?

15. ?????*/??

16. ????public?void?connect()?throws?Exception?{??

17. ????????zk?=?new?ZooKeeper(hosts,?SESSION_TIMEOUT,?new?ConnWatcher());??

18. ????????//?等待連接完成??

19. ????????connectedSignal.await();??

20. ????}??

21. ??

22. ????public?class?ConnWatcher?implements?Watcher?{??

23. ????????public?void?process(WatchedEvent?event)?{??

24. ????????????//?連接建立,?回調process接口時,?event.getState()KeeperState.SyncConnected??

25. ????????????if?(event.getState()?==?KeeperState.SyncConnected)?{??

26. ????????????????//?放開閘門,?waitconnect方法上的線程將被喚醒??

27. ????????????????connectedSignal.countDown();??

28. ????????????}??

29. ????????}??

30. ????}??

31. }??

?

創建znode

ZooKeeper對象的create方法用于創建znode.

1.? String?create(String?path,?byte[]?data,?List?acl,?CreateMode?createMode);??

以下為各個參數的詳細說明:

·???????? path. znode的路徑.

·???????? data. znode關聯的數據.

·???????? acl. 指定權限信息, 如果不想指定權限, 可以傳入Ids.OPEN_ACL_UNSAFE.

·???????? 指定znode類型. CreateMode是一個枚舉類, 從中選擇一個成員傳入即可. 關于znode類型的詳細說明, 可參考本人的上一篇博文.

1.? /**?

2.? ?*?創建臨時節點?

3.? ?*/??

4.? public?void?create(String?nodePath,?byte[]?data)?throws?Exception?{??

5.? ????zk.create(nodePath,?data,?Ids.OPEN_ACL_UNSAFE,?CreateMode.EPHEMERAL);??

6.? }??

?

獲取子node列表

ZooKeeper對象的getChildren方法用于獲取子node列表.

1.? List?getChildren(String?path,?boolean?watch);??

watch參數用于指定是否監聽path node的子node的增加和刪除事件, 以及path node本身的刪除事件.

判斷znode是否存在

ZooKeeper對象的exists方法用于判斷指定znode是否存在.

1.? Stat?exists(String?path,?boolean?watch);??

watch參數用于指定是否監聽path node的創建, 刪除事件, 以及數據更新事件. 如果該node存在, 則返回該node的狀態信息, 否則返回null.

獲取node中關聯的數據

ZooKeeper對象的getData方法用于獲取node關聯的數據.

1.? byte[]?getData(String?path,?boolean?watch,?Stat?stat);??

watch參數用于指定是否監聽path node的刪除事件, 以及數據更新事件, 注意, 不監聽path node的創建事件, 因為如果path node不存在, 該方法將拋出KeeperException.NoNodeException異常.
stat
參數是個傳出參數, getData方法會將path node的狀態信息設置到該參數中.

更新node中關聯的數據

ZooKeeper對象的setData方法用于更新node關聯的數據.

1.? Stat?setData(final?String?path,?byte?data[],?int?version);??

data為待更新的數據.
version
參數指定要更新的數據的版本, 如果version和真實的版本不同, 更新操作將失敗. 指定version-1則忽略版本檢查.
返回path node的狀態信息.

刪除znode

ZooKeeper對象的delete方法用于刪除znode.

1.? void?delete(final?String?path,?int?version);??

version參數的作用同setData方法.

其余接口

請查看ZooKeeper對象的API文檔.

需要注意的幾個地方

·???????? znode中關聯的數據不能超過1M. zookeeper的使命是分布式協作, 而不是數據存儲.

·???????? getChildren, getData, exists方法可指定是否監聽相應的事件. create, delete, setData方法則會觸發相應的事件的發生.

·???????? 以上介紹的幾個方法大多存在其異步的重載方法, 具體請查看API說明.

?

總結

以上是生活随笔為你收集整理的Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。