日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Zookeeper常用命令操作,javaAPI操作之Curator框架 API

發(fā)布時間:2024/4/15 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Zookeeper常用命令操作,javaAPI操作之Curator框架 API 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

淺談:Zookeeper

Zookeeper 概念

? Zookeeper 是 Apache Hadoop 項目下的一個子項目,是一個樹形目錄服務(wù)。
? Zookeeper 翻譯過來就是 動物園管理員,他是用來管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 豬)的管理員。簡稱zk
? Zookeeper 是一個分布式的、開源的分布式應(yīng)用程序的協(xié)調(diào)服務(wù)。
? Zookeeper 提供的主要功能包括:
? 配置管理:用于管理服務(wù)信息,ip,端口,提供服務(wù)功能接口等,
? 分布式鎖:用于控制服務(wù)資源,鎖住之后,只能有一個服務(wù)可以操作,其他服務(wù)等待
? 集群管理:用于管理服務(wù)節(jié)點(每個節(jié)點提供的服務(wù)相同,則組成一個集群),組成高可用

Zookeeper 數(shù)據(jù)模型

? ZooKeeper 是一個樹形目錄服務(wù),其數(shù)據(jù)模型和Unix的文件系統(tǒng)目錄樹很類似,擁有一個層次化結(jié)構(gòu)。
? 這里面的每一個節(jié)點都被稱為: ZNode,每個節(jié)點上都會保存自己的數(shù)據(jù)和節(jié)點信息。
? 節(jié)點可以擁有子節(jié)點,同時也允許少量(1MB)數(shù)據(jù)存儲在該節(jié)點之下。
? 節(jié)點可以分為四大類:

? PERSISTENT 持久化節(jié)點
? EPHEMERAL 臨時節(jié)點 :-e
? PERSISTENT_SEQUENTIAL 持久化順序節(jié)點 :-s
? EPHEMERAL_SEQUENTIAL 臨時順序節(jié)點 :-es

ZooKeeper 命令操作

Zookeeper 服務(wù)端常用命令

? 啟動 ZooKeeper 服務(wù): ./zkServer.sh start
? 查看 ZooKeeper 服務(wù)狀態(tài): ./zkServer.sh status
? 停止 ZooKeeper 服務(wù): ./zkServer.sh stop
? 重啟 ZooKeeper 服務(wù): ./zkServer.sh restart

Zookeeper 客戶端常用命令

? 連接ZooKeeper服務(wù)端
./zkCli.sh –server ip:port
? 斷開連接
quit
? 查看命令幫助
help
? 顯示指定目錄下節(jié)點
ls 目錄
? 創(chuàng)建節(jié)點
create /節(jié)點path value
? 獲取節(jié)點值
get /節(jié)點path

? 創(chuàng)建臨時節(jié)點
create -e /節(jié)點path value
? 創(chuàng)建順序節(jié)點
create -s /節(jié)點path value
? 查詢節(jié)點詳細信息
ls –s /節(jié)點path

節(jié)點詳細信息解釋
? czxid:節(jié)點被創(chuàng)建的事務(wù)ID
? ctime: 創(chuàng)建時間
? mzxid: 最后一次被更新的事務(wù)ID
? mtime: 修改時間
? pzxid:子節(jié)點列表最后一次被更新的事務(wù)ID
? cversion:子節(jié)點的版本號
? dataversion:數(shù)據(jù)版本號
? aclversion:權(quán)限版本號
? ephemeralOwner:用于臨時節(jié)點,代表臨時節(jié)點的事務(wù)ID,如果為持
久節(jié)點則為0
? dataLength:節(jié)點存儲的數(shù)據(jù)的長度
? numChildren:當前節(jié)點的子節(jié)點個數(shù)

ZooKeeper JavaAPI 操作

Curator 介紹

? Curator 是 Apache ZooKeeper 的Java客戶端庫。
? 常見的ZooKeeper Java API :
? 原生Java API
? ZkClient
? Curator
? Curator 項目的目標是簡化 ZooKeeper 客戶端的使用。
? Curator 最初是 Netfix 研發(fā)的,后來捐獻了 Apache 基金會,目前是 Apache 的頂級項目。
? 官網(wǎng):http://curator.apache.org/

Curator API 常用操作

? 建立連接
? 添加節(jié)點
? 刪除節(jié)點
? 修改節(jié)點
? 查詢節(jié)點
? Watch事件監(jiān)聽
? 分布式鎖實現(xiàn)

代碼演示(api詳細解釋在代碼注釋中)

增刪改查節(jié)點代碼

package com.fs.curator;import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test;import java.util.List;/* 若測試一直轉(zhuǎn)圈圈,請關(guān)閉Linux防火墻*/ public class CuratorTest {//聲明成員變量 提升作用域private CuratorFramework client;//建立連接@Before//在test方法執(zhí)行錢 // @Testpublic void testConnection(){//重試策略,這個類是RetryPolicy的實現(xiàn)類,然后我們指定時間重試次數(shù)ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000,10);//第一中方式獲得curator // CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.93.132:2181", 60 * 1000, 15 * 1000, exponentialBackoffRetry); // curatorFramework.start();//第二種方式,鏈式編程client = CuratorFrameworkFactory.builder()//zookeeper服務(wù)的ip和端口.connectString("192.168.93.132:2181")//會話超時時間(單位ms),若這個時間類,客戶端和服務(wù)的沒有進行會話連接,就會關(guān)閉會話.sessionTimeoutMs(60 * 1000)//連接超時時間.connectionTimeoutMs(15 * 1000)//給定重試策略.retryPolicy(exponentialBackoffRetry)//名稱空間,相當于操作當前CuratorFramework對象,默認zookeeper的根目錄不是/了,會默認在根目錄后加上/xf,而且若沒有這個節(jié)點會默認創(chuàng)建.namespace("xf")//build后就返回CuratorFramework對象.build();//開啟連接client.start();}/*//創(chuàng)建節(jié)點create 持久,臨時,順序,數(shù)據(jù)基本創(chuàng)建,創(chuàng)建節(jié)點,帶有數(shù)據(jù)設(shè)置節(jié)點的類型創(chuàng)建一個帶有多級節(jié)點 /app1/p1*/@Testpublic void createNode() throws Exception {//基本創(chuàng)建,默認持久化節(jié)點//如果創(chuàng)建節(jié)點,沒有指定數(shù)據(jù),則默認將當前客戶端的ip作為數(shù)據(jù)存儲String s = client.create().forPath("/app1");System.out.println(s);}@Testpublic void createNodeData() throws Exception {//創(chuàng)建節(jié)點,給定節(jié)點值String s = client.create().forPath("/app2","hehe".getBytes());System.out.println(s);}@Testpublic void createNodeNode() throws Exception {//默認類型,持久化節(jié)點,//創(chuàng)建臨時節(jié)點,創(chuàng)建后看不到,因為代碼執(zhí)行了就結(jié)束會話了String s = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","hehe".getBytes());System.out.println(s);//要讓方法不結(jié)束,才能看到臨時節(jié)點的效果Thread.sleep(10000);}@Testpublic void createNodeTrue() throws Exception {//創(chuàng)建多級節(jié)點//父節(jié)點不存在,是不能創(chuàng)建的,但是代碼可以creatingParentsIfNeeded()如果需要,創(chuàng)建父節(jié)點,如果父節(jié)點不存在,則創(chuàng)建父節(jié)點String s = client.create().creatingParentsIfNeeded().forPath("/app4/p1");System.out.println(s);}/*** 查詢節(jié)點--------------------------------------------*///查詢數(shù)據(jù)@Testpublic void findNode() throws Exception {//查詢數(shù)據(jù)byte[] bytes = client.getData().forPath("/app1");System.out.println(new String(bytes));}//查詢子節(jié)點@Testpublic void findNode02() throws Exception {//查詢子節(jié)點 /因為為我們上面設(shè)置的xf父節(jié)點,相當與查詢xf的子節(jié)點List<String> strings = client.getChildren().forPath("/");System.out.println(strings);}//查詢節(jié)點的狀態(tài)信息@Testpublic void findNode03() throws Exception {//查詢節(jié)點狀態(tài)信息Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println(stat);}/*** 修改節(jié)點的數(shù)據(jù)---------------------------------------------* 是沒有修改節(jié)點名稱的命令的,只能先刪除,在添加*///修改數(shù)據(jù)@Testpublic void setNode() throws Exception {client.setData().forPath("/app1","xiaofuhaha".getBytes());}//根據(jù)版本修改/*version是通過查詢出來的,保證我們這次修改和我查下的version版本要一致思想就是版本匹配才修改成功,假如有2個人同時對這個節(jié)點進行數(shù)據(jù)修改,假如第一個已經(jīng)修改了,第二個人去修改不會成功的,因為第二個獲取的是第一個人還未提交的版本,*/@Testpublic void setNodeForVersion() throws Exception {//查詢節(jié)點狀態(tài)信息Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println(stat.getVersion());//然后根據(jù)版本修改client.setData().withVersion(stat.getVersion()).forPath("/app1","xiaofubb".getBytes());}/*** 刪除節(jié)點----------------------------* 若要刪除節(jié)點的數(shù)據(jù),沒有對應(yīng)的命令,但是可以通過set節(jié)點信息為空就相當于刪除了節(jié)點數(shù)據(jù)*/@Testpublic void delNode() throws Exception {//刪除單個節(jié)點client.delete().forPath("/app1");}@Testpublic void delNodeAll() throws Exception {//刪除帶有子節(jié)點的節(jié)點deletingChildrenIfNeeded,如果有父節(jié)點,先刪除父client.delete().deletingChildrenIfNeeded().forPath("/app4");}@Testpublic void delNodeOK() throws Exception {//必須成功刪除guaranteed(),為了防止網(wǎng)絡(luò)抖動,若沒有刪除,會重試client.delete().guaranteed().forPath("/app2");}@Testpublic void delNodeBackground() throws Exception {//回調(diào),,刪除之后在 執(zhí)行什么代碼/*回調(diào)不光刪除有,增刪改都有..inBackground(new BackgroundCallback() {重新方法}*/client.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {//刪除后執(zhí)行的方法,System.out.println("我被刪除了~~~");System.out.println(curatorEvent);}}).forPath("/app1");}@After//@test執(zhí)行后public void close(){//關(guān)閉連接client.close();} }

Watch事件監(jiān)聽代碼

package com.fs.curator;import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test;import java.util.List;/* 若測試一直轉(zhuǎn)圈圈,請關(guān)閉Linux防火墻*/ public class CuratorWatcherTest {//聲明成員變量 提升作用域private CuratorFramework client;//建立連接@Before//在test方法執(zhí)行錢 // @Testpublic void testConnection(){//重試策略,這個類是RetryPolicy的實現(xiàn)類,然后我們指定時間重試次數(shù)ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000,10);client = CuratorFrameworkFactory.builder()//zookeeper服務(wù)的ip和端口.connectString("192.168.93.132:2181")//會話超時時間(單位ms),若這個時間類,客戶端和服務(wù)的沒有進行會話連接,就會關(guān)閉會話.sessionTimeoutMs(60 * 1000)//連接超時時間.connectionTimeoutMs(15 * 1000)//給定重試策略.retryPolicy(exponentialBackoffRetry)//名稱空間,相當于操作當前CuratorFramework對象,默認zookeeper的根目錄不是/了,會默認在根目錄后加上/xf,而且若沒有這個節(jié)點會默認創(chuàng)建.namespace("xf")//build后就返回CuratorFramework對象.build();//開啟連接client.start();}/*nodeCache:對某一個節(jié)點注冊監(jiān)聽器*/@Test//@test執(zhí)行后public void testNodeCache() throws Exception {//創(chuàng)建nodeCache對象 匿名內(nèi)部類訪問外部變量需要將外部變量進行final修飾,但是1.8可以不用final NodeCache nodeCache = new NodeCache(client,"/app1");//注冊監(jiān)聽nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {/*當被監(jiān)聽的節(jié)點發(fā)生了修改,創(chuàng)建,刪除等操作,就會進到這個方法*/System.out.println("節(jié)點變化了~~~");//獲取修改節(jié)點后的數(shù)據(jù)byte[] data = nodeCache.getCurrentData().getData();System.out.println(new String(data));}});//開啟監(jiān)聽,如果設(shè)置true,則開啟監(jiān)聽的時候,加載緩存數(shù)據(jù)nodeCache.start(true);//睡10秒讓連接不關(guān),去修改節(jié)點信息,看是否能夠監(jiān)聽到Thread.sleep(10000);}/*PathChildrenCache:監(jiān)聽某個節(jié)點的所有子節(jié)點們(不能監(jiān)聽自己節(jié)點的變化,只能感知子節(jié)點的變化只會監(jiān)聽當前節(jié)點的子節(jié)點,不會監(jiān)聽子節(jié)點的子節(jié)點*/@Test//@test執(zhí)行后public void testPathChildrenCache() throws Exception {//注冊監(jiān)聽傳遞的參數(shù)true是cacheData,是否開啟緩存,開啟后,下次啟動,會從緩存中去獲取上次緩存的數(shù)據(jù)PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);//綁定監(jiān)聽器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {/*只對子節(jié)點的變更才會進入這個方法*/System.out.println("子節(jié)點變化了~~~");//輸出下子節(jié)點信息System.out.println(pathChildrenCacheEvent);//監(jiān)聽子節(jié)點的數(shù)據(jù)變更,并且拿到變更后的數(shù)據(jù)PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();//判斷類型是否是updateif (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){//由輸出的pathChildrenCacheEvent得知:PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p1', stat=131,134,1595825223664,1595825401122,1,0,0,0,4,0,131, data=[104, 101, 104, 101]}}//所以就點兩次dataSystem.out.println("數(shù)據(jù)變化~~~");byte[] data = pathChildrenCacheEvent.getData().getData();System.out.println(new String(data));}}});//開啟pathChildrenCache.start();//睡20秒讓連接不關(guān)Thread.sleep(20000);}/*TreeCache:可以監(jiān)聽整個節(jié)點下的所有節(jié)點,包括節(jié)點本身以及子節(jié)點,不包括父節(jié)點*/@Test//@test執(zhí)行后public void testTreeCache() throws Exception {//創(chuàng)建監(jiān)聽器TreeCache treeCache = new TreeCache(client,"/app2");//注冊監(jiān)聽treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {/*所有的節(jié)點進行變更都會進入這個方法*/System.out.println("子節(jié)點變化了~~~");//輸出下子節(jié)點信息System.out.println(treeCacheEvent);//監(jiān)節(jié)點的數(shù)據(jù)變更,并且拿到變更后的數(shù)據(jù)TreeCacheEvent.Type type = treeCacheEvent.getType();//判斷變更類型是否是updateif (type.equals(TreeCacheEvent.Type.NODE_UPDATED)){//由輸出的pathChildrenCacheEvent得知:PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p1', stat=131,134,1595825223664,1595825401122,1,0,0,0,4,0,131, data=[104, 101, 104, 101]}}//所以就點兩次dataSystem.out.println("數(shù)據(jù)變化~~~");byte[] data = treeCacheEvent.getData().getData();System.out.println(new String(data));}}});//開啟treeCache.start();//睡20秒讓連接不關(guān)Thread.sleep(20000);}@After//@test執(zhí)行后public void close(){//關(guān)閉連接client.close();} }

分布式鎖實現(xiàn)(不是zookeeper實現(xiàn)的,是curator框架實現(xiàn)的)

為什么使用臨時順序節(jié)點:防止偶發(fā)情況發(fā)生的死鎖,順序就是來的進程排隊

Curator實現(xiàn)分布式鎖API

? 在Curator中有五種鎖方案:
? InterProcessSemaphoreMutex:分布式排它鎖(非可重入鎖)
? InterProcessMutex:分布式可重入排它鎖
? InterProcessReadWriteLock:分布式讀寫鎖
? InterProcessMultiLock:將多個鎖作為單個實體管理的容器
? InterProcessSemaphoreV2:共享信號量

買票代碼演示

買票
package com.fs.curator;import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;/* 模擬12306買票,使用分布式鎖*/ public class Ticket12306 implements Runnable {//數(shù)據(jù)庫票數(shù)private int tickets = 10;private InterProcessMutex lock;public Ticket12306() {//重試策略,這個類是RetryPolicy的實現(xiàn)類,然后我們指定時間重試次數(shù)RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);CuratorFramework client = CuratorFrameworkFactory.builder()//zookeeper服務(wù)的ip和端口.connectString("192.168.93.132:2181")//會話超時時間(單位ms),若這個時間類,客戶端和服務(wù)的沒有進行會話連接,就會關(guān)閉會話.sessionTimeoutMs(60 * 1000)//連接超時時間.connectionTimeoutMs(15 * 1000)//給定重試策略.retryPolicy(retryPolicy)//build后就返回CuratorFramework對象.build();//開啟連接client.start();//初始化lock,這是添加臨時順序的節(jié)點lock = new InterProcessMutex(client, "/lock");}@Overridepublic void run() {while (true) {try {//獲得鎖lock.acquire(3, TimeUnit.SECONDS);//如果有票就賣if (tickets > 0) {//模擬輸出就賣出System.out.println(Thread.currentThread() + ":" + tickets);//緩慢執(zhí)行0.1秒Thread.sleep(100);//賣出就--tickets--;}else {//買完了就停止循環(huán)break;}} catch (Exception e) {e.printStackTrace();} finally {try {//釋放鎖lock.release();} catch (Exception e) {e.printStackTrace();}}}} }
測試
package com.fs.curator;public class LockTest {public static void main(String[] args) {Ticket12306 ticket12306 = new Ticket12306();//創(chuàng)建客戶端來模擬買票Thread t1 = new Thread(ticket12306, "fz");Thread t2 = new Thread(ticket12306, "xc");t1.start();t2.start();} } 超強干貨來襲 云風(fēng)專訪:近40年碼齡,通宵達旦的技術(shù)人生

總結(jié)

以上是生活随笔為你收集整理的Zookeeper常用命令操作,javaAPI操作之Curator框架 API的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。