Apache ZooKeeper - 使用Apache Curator操作ZK
文章目錄
- 原生ZK API VS Curator
- Curator 概述
- Maven依賴
- 會話創建
- 靜態工廠方式創建會話
- 使用 fluent 風格創建會話
- 創建節點
- protection 模式 ,規避僵尸節點
- 獲取數據
- 修改數據
- 刪除數據 guaranteed()
- 獲取子節點
- 異步線程池
原生ZK API VS Curator
Apache ZooKeeper - 使用原生的API操作ZK
ZooKeeper原生Java API的不足之處:
- 連接zk超時時,不支持自動重連,需要手動操作
- Watch注冊一次就會失效,需手工反復注冊
- 不支持遞歸創建節點
- 異步支持,沒有線程池
- …
Apache curator:
- 解決Watch注冊一次就會失效的問題
- API 更加簡單易用、封裝了常用的ZooKeeper工具類
- 使用Curator實現比如分布式鎖等需求更簡單
- 異步執行,支持自定義線程池
- …
Curator是netflix公司開源的一套zookeeper客戶端,Apache的頂級項目
與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量
Curator解決了很多zookeeper客戶端非常底層的細節開發工作,包括連接重連、反復注冊wathcer和NodeExistsException 異常等
Curator 概述
Apache Curator : https://curator.apache.org/
看看模塊
-
curator-framework:對zookeeper的底層api的一些封裝
-
curator-client:提供一些客戶端的操作,例如重試策略等
-
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等
Maven依賴
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency>會話創建
和客戶端/ 服務器交互,第一步就要創建會話
Curator 提供了多種方式創建會話
靜態工廠方式創建會話
RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);CuratorFramework client = CuratorFrameworkFactory.newClient(getConnectStr(), retryPolicy);client .start();使用 fluent 風格創建會話
RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr()).retryPolicy(retryPolicy).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).canBeReadOnly(true).build();curatorFramework.start();上述代碼采用了流式方式,最核心的類是 CuratorFramework 類,該類的作用是定義一個 ZooKeeper 客戶端對象,并在之后的上下文中使用。
在定義 CuratorFramework 對象實例的時候, 使用了 CuratorFrameworkFactory 工廠方法,并指定了 connectionString服務器地址列表、retryPolicy 重試策略 、sessionTimeoutMs 會話超時時間、connectionTimeoutMs 會話創建超時時間。
- connectionString:服務器地址列表,在指定服務器地址列表的時候可以是一個地址,也可以是多個地址。如果是多個地址,那么每個服務器地址列表用逗號分隔, 如 host1:port1,host2:port2,host3:port3
- retryPolicy:重試策略,當客戶端異常退出或者與服務端失去連接的時候,可以通過設置客戶端重新連接 ZooKeeper 服務端。而 Curator 提供了 一次重試、多次重試等不同種類的實現方式。在 Curator 內部,可以通過判斷服務器返回的 keeperException 的狀態代碼來判斷是否進行重試處理,如果返回的是 OK 表示一切操作都沒有問題,而 SYSTEMERROR 表示系統或服務端錯誤
| ExponentialBackoffRetry | 重試一組次數,重試之間的睡眠時間增加 |
| RetryNTimes | 重試最大次數 |
| RetryOneTime | 只重試一次 |
| RetryUntilElapsed | 在給定的時間結束之前重試 |
-
sessionTimeoutMs 超時時間:Curator 客戶端創建過程中,有兩個超時時間的設置。一個是 sessionTimeoutMs會話超時時間,用來設置該條會話在 ZooKeeper 服務端的失效時間。
-
另一個是 connectionTimeoutMs 客戶端創建會話的超時時間,用來限制客戶端發起一個會話連接到接收ZooKeeper 服務端應答的時間。sessionTimeoutMs 作用在服務端,而connectionTimeoutMs 作用在客戶端。
創建節點
/*** 遞歸創建子節點*/@SneakyThrows@Testpublic void testCreateWithParent() {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/artisan-node/artisan-node-sub1/artisan-node-sub1-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{} successfully.", path);}使用 create 函數創建數據節點,并通過 withMode 函數指定節點類型持久化節點,臨時節點,順序節點,臨時順序節點,持久化順序節點等),默認是持久化節點,之后調用?forPath?函數來指定節點的路徑和數據信息
protection 模式 ,規避僵尸節點
/*** protection 模式,防止由于異常原因,導致僵尸節點* @throws Exception*/@SneakyThrows@Testpublic void testCreate() {CuratorFramework curatorFramework = getCuratorFramework();String forPath = curatorFramework.create().withProtection() // 防止僵尸節點.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "data".getBytes());log.info("curator create node :{} successfully.", forPath);}看下zk中的數據
實現原理后面單獨開篇解讀,總體思想就是 隨機生成一個UUID, 再創建之前客戶端根據這個緩存的UUID去看ZK Server是否存在,存在則認為是成功的,否則就繼續創建。
獲取數據
@Testpublic void testGetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node :{} successfully.", new String(bytes));}通過客戶端實例的getData() 方法更新 ZooKeeper 服務上的數據節點,在getData 方法的后邊,通過 forPath 函數來指定查詢的節點名稱
修改數據
@Testpublic void testSetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from node /curator-node :{} successfully.", new String(bytes));}通過客戶端實例的 setData() 方法更新 ZooKeeper 服務上的數據節點,在setData 方法的后邊,通過 forPath 函數來指定更新的數據節點路徑以及要更新的數據。
刪除數據 guaranteed()
@Testpublic void testDelete() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}guaranteed:主要起到一個保障刪除成功的作用, 只要該客戶端的會話有效,就會在后臺持續發起刪除請求,直到該數據節點在ZooKeeper 服務端被刪除。
deletingChildrenIfNeeded:指定了該函數后,系統在刪除該數據節點的時候會以遞歸的方式直接刪除其子節點,以及子節點的子節點。
獲取子節點
@Testpublic void testListChildren() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/artisan-node";List<String> list = curatorFramework.getChildren().forPath(pathWithParent);list.forEach(System.out::println); }通過客戶端實例的 getChildren() 方法更新 ZooKeeper 服務上的數據節點,在getChildren方法的后邊,通過 forPath 函數來指定節點下的一級子節點的名稱
異步線程池
Curator 使用BackgroundCallback 接口處理服務器端返回來的信息。
如果在異步線程中調用,默認在 EventThread 線程中調用,支持自定義線程池
/*** 使用默認的 EventThread異步線程處理* @throws Exception*/@Testpublic void testThreadPoolByDefaultEventThread() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String ZK_NODE="/artisan-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", new String(event.getData()));}).forPath(ZK_NODE);;} /*** 使用自定義線程池* @throws Exception*/@Testpublic void testThreadPoolByCustomThreadPool() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();ExecutorService executorService = Executors.newSingleThreadExecutor();String ZK_NODE="/artisan-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", new String(event.getData()));},executorService).forPath(ZK_NODE);}總結
以上是生活随笔為你收集整理的Apache ZooKeeper - 使用Apache Curator操作ZK的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache ZooKeeper - 使
- 下一篇: Apache ZooKeeper - 高