日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

支持断线重连、永久watcher、递归操作 ZooKeeper 客户端

發(fā)布時(shí)間:2023/12/4 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 支持断线重连、永久watcher、递归操作 ZooKeeper 客户端 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

項(xiàng)目介紹

ZooKeeper本質(zhì)上是一個(gè)分布式的小文件存儲(chǔ)系統(tǒng)。原本是Apache Hadoop的一個(gè)組件,現(xiàn)在被拆分為一個(gè)Hadoop的獨(dú)立子項(xiàng)目。

Zookeeper 作為一個(gè)分布式的服務(wù)框架,主要用來解決分布式集群中應(yīng)用系統(tǒng)的一致性問題,它能提供基于類似于文件系統(tǒng)的目錄節(jié)點(diǎn)樹方式的數(shù)據(jù)存儲(chǔ),但是 Zookeeper 并不是用來專門存儲(chǔ)數(shù)據(jù)的,它的作用主要是用來維護(hù)和監(jiān)控你存儲(chǔ)的數(shù)據(jù)的狀態(tài)變化。通過監(jiān)控這些數(shù)據(jù)狀態(tài)的變化,從而可以達(dá)到基于數(shù)據(jù)的集群管理。

Zookeeper 在Windows安裝和使用,可參考http://www.cnblogs.com/shanyou/p/3221990.html

ZookeeperClient是在https://github.com/shayhatsor/zookeeper基礎(chǔ)上的再次封裝,使開發(fā)者更方便使用ZooKeeper相關(guān)的功能。

ZookeeperClient實(shí)現(xiàn)了斷線重連,會(huì)話過期重連,永久監(jiān)聽,子節(jié)點(diǎn)數(shù)據(jù)變化的監(jiān)聽。并且加入了常用功能,例如分布式鎖,Leader選舉,分布式隊(duì)列

,項(xiàng)目地址https://github.com/milanyangbo/ZooKeeper.Net

支持的平臺(tái)

.NET 4及以上,目前尚不支持.NET Core, .NET Core推薦用另外一個(gè)項(xiàng)目?支持?jǐn)嗑€重連、永久watcher、遞歸操作并且能跨平臺(tái)(.NET Core)的ZooKeeper異步客戶端?

使用說明

下面列一下常用的使用方法,不僅限于此哦!

一、創(chuàng)建ZKClient對象

創(chuàng)建ZKClient對象 有兩種方式可以方便的創(chuàng)建ZKClient對象

  • 使用構(gòu)造函數(shù)創(chuàng)建

  • ?string address = "localhost:2181";
    ? ?ZKClient zkClient1 = new ZKClient(address); ? ?
    ? ?ZKClient zkClient2 = new ZKClient(address, TimeSpan.FromMilliseconds(10000)); ?
    ? ?ZKClient zkClient3 = new ZKClient(address, TimeSpan.FromMilliseconds(10000), TimeSpan.FromMilliseconds(10000)); ?
    ? ?ZKClient zkClient4 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer()); ? ? ?
    ? ?ZKClient zkClient5 = new ZKClient(address, TimeSpan.FromMilliseconds(30000), TimeSpan.FromMilliseconds(10000), new SerializableSerializer(), TimeSpan.FromMilliseconds(60000));?


  • 使用輔助類創(chuàng)建

  • string address = "localhost:2181"; ?
    ? ?ZKClient zkClient = ZKClientBuilder.NewZKClient(address) ?
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .SessionTimeout(30000)//可選 ?
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .Serializer(new SerializableSerializer())//可選 ?
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .RetryTimeout(60000)//可選 ?
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .ConnectionTimeout(10000)//可選 ?
    ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .Build(); //創(chuàng)建實(shí)例


    二、節(jié)點(diǎn)的新增、更新、刪除和獲取

    新增節(jié)點(diǎn)

  • 常規(guī)新增節(jié)點(diǎn)

    父節(jié)點(diǎn)不存在會(huì)拋出異常

  • await zkClient.CreateAsync("/test1", "123", CreateMode.EPHEMERAL); await zkClient.CreateAsync("/test1-1", 123, CreateMode.EPHEMERAL_SEQUENTIAL); await zkClient.CreateAsync("/test1-2", 123, CreateMode.PERSISTENT); await zkClient.CreateAsync("/test1-3", 123, CreateMode.PERSISTENT_SEQUENTIAL);
  • 遞歸新增節(jié)點(diǎn)(新增節(jié)點(diǎn)及其父節(jié)點(diǎn))

    如果父節(jié)點(diǎn)不存在會(huì)被一并創(chuàng)建。
    對于PERSISTENT類型的節(jié)點(diǎn),遞歸創(chuàng)建,父節(jié)點(diǎn)和子節(jié)點(diǎn)都創(chuàng)建為PERSISTENT。
    對于EPHEMERAL類型的節(jié)點(diǎn),遞歸創(chuàng)建,父節(jié)點(diǎn)都是PERSISTENT類型,而最后一級節(jié)點(diǎn)才是EPHEMERAL類型。(因?yàn)镋PHEMERAL不能擁有子節(jié)點(diǎn))
    注意:第二個(gè)參數(shù)為節(jié)點(diǎn)的值,指的的最后一級節(jié)點(diǎn)的值。

  • ?string path = "/test8/1/2/3"; ? ? //遞歸創(chuàng)建節(jié)點(diǎn)及父節(jié)點(diǎn)
    ? ? await zkClient.CreateRecursiveAsync(path, "abc", CreateMode.PERSISTENT); ? ? await zkClient.CreateRecursiveAsync(path, "123", ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);

  • 特殊的EPHEMERAL類型節(jié)點(diǎn)

    特殊類型的EPHEMERAL節(jié)點(diǎn),該節(jié)點(diǎn)在會(huì)話失效被刪除后,重新連接會(huì)被自動(dòng)創(chuàng)建。


  • 更新節(jié)點(diǎn)數(shù)據(jù)

    string path = "/test"; await zkClient.SetDataAsync(path, "456"); //帶期望版本號的更新,如果真實(shí)的版本號與期望版本號不一致會(huì)更新失敗,拋出異常await zkClient.SetDataAsync(path, "123", 2);

    刪除節(jié)點(diǎn)

  • 常規(guī)刪除

  • bool flag = await zkClient.DeleteAsync("/test");//刪除任意版本bool flag = await zkClient.DeleteAsync("/test",1);//刪除指定版本
  • 遞歸刪除(刪除節(jié)點(diǎn)及子節(jié)點(diǎn))

  • string path = "/test"; await zkClient.DeleteRecursiveAsync(path);//如果/test下有多個(gè)子節(jié)點(diǎn),會(huì)被一并刪除

    獲取節(jié)點(diǎn)數(shù)據(jù)

    string path = "/test"; await zkClient.GetDataAsync<string>(path); //如果節(jié)點(diǎn)不存在拋出異常await zkClient.GetDataAsync<string>(path, true); //如果節(jié)點(diǎn)不存在返回nullStat stat = (await zkClient.GetZKDataAsync<string>(path)).stat; //獲得數(shù)據(jù)以及stat信息

    等待節(jié)點(diǎn)創(chuàng)建

    string path = "/test"; //等待直到超時(shí)或者節(jié)點(diǎn)創(chuàng)建成功。await zkClient.WaitUntilExistsAsync(path, TimeSpan.FromMilliseconds(5000));

    三、權(quán)限管理

    ZooKeeper的權(quán)限管理亦即ACL控制功能通過Server、Client兩端協(xié)調(diào)完成:
    Server端:
    一個(gè)ZooKeeper的節(jié)點(diǎn)(znode)存儲(chǔ)兩部分內(nèi)容:數(shù)據(jù)和狀態(tài),狀態(tài)中包含ACL信息。創(chuàng)建一個(gè)znode會(huì)產(chǎn)生一個(gè)ACL列表,列表中每個(gè)ACL包括:

    驗(yàn)證模式(scheme)
    具體內(nèi)容(Id)(當(dāng)scheme=“digest”時(shí),Id為用戶名密碼,例如“root:J0sTy9BCUKubtK1y8pkbL7qoxSw=”)
    權(quán)限(perms)

    ZooKeeper提供了如下幾種驗(yàn)證模式(scheme):

    digest:Client端由用戶名和密碼驗(yàn)證,譬如user:password,digest的密碼生成方式是Sha1摘要的base64形式
    auth:不使用任何id,代表任何已確認(rèn)用戶。
    ip:Client端由IP地址驗(yàn)證,譬如172.2.0.0/24
    world:固定用戶為anyone,為所有Client端開放權(quán)限
    super:在這種scheme情況下,對應(yīng)的id擁有超級權(quán)限,可以做任何事情(cdrwa)

    注意的是,exists操作和getAcl操作并不受ACL許可控制,因此任何客戶端可以查詢節(jié)點(diǎn)的狀態(tài)和節(jié)點(diǎn)的ACL。
    節(jié)點(diǎn)的權(quán)限(perms)主要有以下幾種:

    Create 允許對子節(jié)點(diǎn)Create操作
    Read 允許對本節(jié)點(diǎn)GetChildren和GetData操作
    Write 允許對本節(jié)點(diǎn)SetData操作
    Delete 允許對子節(jié)點(diǎn)Delete操作
    Admin 允許對本節(jié)點(diǎn)setAcl操作
    Znode ACL權(quán)限用一個(gè)int型數(shù)字perms表示,perms的5個(gè)二進(jìn)制位分別表示setacl、delete、create、write、read。比如0x1f=adcwr,0x1=----r,0x15=a-c-r。

    四、監(jiān)聽相關(guān)

    注意:對于斷開連接時(shí)間過長造成的會(huì)話過期,由于服務(wù)器端在會(huì)話過期后會(huì)刪除客戶端設(shè)置的監(jiān)聽。

    即便客戶端在會(huì)話過期后自動(dòng)連接成功,但是在會(huì)話過期到會(huì)話重建這段時(shí)間客戶端監(jiān)聽的節(jié)點(diǎn)仍可能發(fā)生了改變,

    而具體哪些變了或是沒變,客戶端是無法感知到的。

    為了避免丟掉任何數(shù)據(jù)改變的事件,所有的監(jiān)聽器的都有一個(gè)回調(diào)方法(SessionExpiredHandler),用來處理會(huì)話過期這種特殊情況。

    SessionExpiredHandler = async (path) =>{ await Task.Run(() =>{ Console.WriteLine(path);});};

    訂閱節(jié)點(diǎn)的信息改變(創(chuàng)建節(jié)點(diǎn),刪除節(jié)點(diǎn),添加子節(jié)點(diǎn))

    IZKChildListener childListener = new ZKChildListener(); //子節(jié)點(diǎn)內(nèi)容變化childListener.ChildChangeHandler = async (parentPath, currentChilds) =>{ await Task.Run(() =>{ Console.WriteLine(parentPath); Console.WriteLine(string.Join(".", currentChilds));});}; //子節(jié)點(diǎn)數(shù)量變化childListener.ChildCountChangedHandler = async (parentPath, currentChilds) =>{ await Task.Run(() =>{ Console.WriteLine(parentPath); Console.WriteLine(string.Join(".", currentChilds));});}; //"/testUserNode" 監(jiān)聽的節(jié)點(diǎn),可以是現(xiàn)在存在的也可以是不存在的 zkClient.SubscribeChildChanges("/testUserNode3", childListener);

    訂閱節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容的變化

    IZKDataListener dataListener = new ZKDataListener(); // 節(jié)點(diǎn)創(chuàng)建和節(jié)點(diǎn)內(nèi)容變化dataListener.DataCreatedOrChangeHandler = async (dataPath, data) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath + ":" + Convert.ToString(data));});}; // 節(jié)點(diǎn)刪除dataListener.DataDeletedHandler = async (dataPath) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath);});}; // 節(jié)點(diǎn)創(chuàng)建dataListener.DataCreatedHandler = async (dataPath, data) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath + ":" + Convert.ToString(data));});}; // 節(jié)點(diǎn)內(nèi)容變化dataListener.DataChangeHandler = async (dataPath, data) =>{ await Task.Run(() =>{ Console.WriteLine(dataPath);});}; zkClient.SubscribeDataChanges("/testUserNode", dataListener);

    客戶端狀態(tài)監(jiān)聽

    IZKStateListener stateListener = new ZKStateListener(); //狀態(tài)改變stateListener.StateChangedHandler = async (state) =>{ await Task.Run(() =>{ Console.WriteLine(state.ToString());});}; //會(huì)話失效stateListener.SessionExpiredHandler = async (path) =>{ await Task.Run(() =>{ Console.WriteLine(path);});}; //創(chuàng)建會(huì)話stateListener.NewSessionHandler = async () =>{ await Task.Run(() =>{});}; //會(huì)話失敗stateListener.SessionEstablishmentErrorHandler = async (ex) =>{ await Task.Run(() =>{ Console.WriteLine(ex.Message);});}; zkClient.SubscribeStateChanges(stateListener);

    五、擴(kuò)展功能


    分布式鎖

    using (var zkClient = new ZKClient(TestUtil.zkServers)){ await zkClient.CreateRecursiveAsync("/zk/lock", null, CreateMode.Persistent); //創(chuàng)建分布式鎖, 非線程安全類,每個(gè)線程請創(chuàng)建單獨(dú)實(shí)例。var _lock = new ZKDistributedLock(zkClient, "/zk/lock"); await _lock.LockAsync(); //獲得鎖//do sometingawait _lock.UnLockAsync();//釋放鎖}

    Leader選舉

    using (var zkClient = new ZKClient(TestUtil.zkServers)){ await zkClient.CreateRecursiveAsync("/zk/leader", null, CreateMode.Persistent); var listener = new ZKLeaderSelectorListener();
    listener.takeLeadership = async (client, selector) =>{ Console.WriteLine("I am the leader-" + await selector.GetLeaderAsync()); selector.Close();}; var selector = new ZKLeaderSelector("id", true, zkClient, "/zk/leader", listener); //啟動(dòng)并參與Leader選舉selector.Start(); //獲得當(dāng)前主服務(wù)的IDawait selector.GetLeaderAsync(); //如果要退出Leader選舉selector.Close(); }

    分布式隊(duì)列

    using (var zkClient = new ZKClient(TestUtil.zkServers)){
    ? ? ? ?await zkClient.CreateRecursiveAsync("/zk/queue", null, CreateMode.PERSISTENT); var queue = new ZKDistributedQueue<long>(new ZKClient(TestUtil.zkServers), "/zk/queue") await queue.OfferAsync("123");//放入元素var value = await queue.PollAsync();//刪除并獲取頂部元素var value = await queue.PeekAsync(); //獲取頂部元素,不會(huì)刪除 ? ?}


    總結(jié)

    以上是生活随笔為你收集整理的支持断线重连、永久watcher、递归操作 ZooKeeper 客户端的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。