api 创建zookeeper客户端_zookeeper分布式锁原理及实现
生活随笔
收集整理的這篇文章主要介紹了
api 创建zookeeper客户端_zookeeper分布式锁原理及实现
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
前言
本文介紹下 zookeeper方式 實(shí)現(xiàn)分布式鎖
原理簡(jiǎn)介
zookeeper實(shí)現(xiàn)分布式鎖的原理就是多個(gè)節(jié)點(diǎn)同時(shí)在一個(gè)指定的節(jié)點(diǎn)下面創(chuàng)建臨時(shí)會(huì)話順序節(jié)點(diǎn),誰創(chuàng)建的節(jié)點(diǎn)序號(hào)最小,誰就獲得了鎖,并且其他節(jié)點(diǎn)就會(huì)監(jiān)聽序號(hào)比自己小的節(jié)點(diǎn),一旦序號(hào)比自己小的節(jié)點(diǎn)被刪除了,其他節(jié)點(diǎn)就會(huì)得到相應(yīng)的事件,然后查看自己是否為序號(hào)最小的節(jié)點(diǎn),如果是,則獲取鎖
docker 安裝 zk
下載鏡像
docker pull zookeeper
啟動(dòng)鏡像
docker run --name zk -p 2181:2181 -p 2888:2888 -p 3888:3888 --restart always -d zookeeper-p 端口映射
--name 容器實(shí)例名稱
-d 后臺(tái)運(yùn)行
2181 Zookeeper客戶端交互端口
2888 Zookeeper集群端口
3888 Zookeeper選舉端口
查看容器
docker ps |grep zookeeper
zk簡(jiǎn)單的幾個(gè)操作命令
進(jìn)入docker容器
docker exec -it 942142604a46 bash
查看節(jié)點(diǎn)狀態(tài)
./bin/zkServer.sh status
開啟客戶端
./bin/zkCli.sh
創(chuàng)建臨時(shí)節(jié)點(diǎn)
create -e /node1 node1.1創(chuàng)建臨時(shí)節(jié)點(diǎn),當(dāng)客戶端關(guān)閉時(shí)候,該節(jié)點(diǎn)會(huì)隨之刪除。不加參數(shù)-e創(chuàng)建永久節(jié)點(diǎn)
獲取節(jié)點(diǎn)值
get /node
列出節(jié)點(diǎn)值
ls /node
刪除節(jié)點(diǎn)值
delete /node
查看節(jié)點(diǎn)信息
stat /test
先介紹下zk的客戶端框架Curator
簡(jiǎn)介
Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊(cè)Watcher和NodeExistsException異常等
Curator的maven依賴
介紹下Curator的基本API
- 使用靜態(tài)工程方法創(chuàng)建會(huì)話
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",5000, 5000, retryPolicy);RetryPolicy為重試策略第一個(gè)參數(shù)為baseSleepTimeMs初始的sleep時(shí)間,用于計(jì)算之后的每次重試的sleep時(shí)間。第二個(gè)參數(shù)為maxRetries,最大重試次數(shù)
- 使用Fluent風(fēng)格api創(chuàng)建
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(5000) // 會(huì)話超時(shí)時(shí)間.connectionTimeoutMs(5000) // 連接超時(shí)時(shí)間.retryPolicy(retryPolicy).namespace("base") // 包含隔離名稱.build();client.start();
- 創(chuàng)建數(shù)據(jù)節(jié)點(diǎn)
lient.create().creatingParentContainersIfNeeded() // 遞歸創(chuàng)建所需父節(jié)點(diǎn).withMode(CreateMode.PERSISTENT) // 創(chuàng)建類型為持久節(jié)點(diǎn).forPath("/nodeA", "init".getBytes()); // 目錄及內(nèi)容
- 刪除數(shù)據(jù)節(jié)點(diǎn)
client.delete().guaranteed() // 強(qiáng)制保證刪除.deletingChildrenIfNeeded() // 遞歸刪除子節(jié)點(diǎn).withVersion(10086) // 指定刪除的版本號(hào).forPath("/nodeA");
- 讀取數(shù)據(jù)節(jié)點(diǎn)
byte[] bytes = client.getData().forPath("/nodeA");System.out.println(new String(bytes));
- 讀stat
Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/nodeA");
- 修改數(shù)據(jù)節(jié)點(diǎn)
client.setData().withVersion(10086) // 指定版本修改.forPath("/nodeA", "data".getBytes());
- 事務(wù)
client.inTransaction().check().forPath("/nodeA").and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes()).and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes()).and().commit();
- 其他
client.checkExists() // 檢查是否存在.forPath("/nodeA");client.getChildren().forPath("/nodeA"); // 獲取子節(jié)點(diǎn)的路徑
- 異步回調(diào)
Executor executor = Executors.newFixedThreadPool(2);client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((curatorFramework, curatorEvent) -> {System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));},executor).forPath("path");
zk分布式實(shí)現(xiàn)的代碼分析
先說下這個(gè)test方法 描述了 獲取zk鎖的完整流程
再說下 如何通過訪問接口的方式的實(shí)現(xiàn)
目錄結(jié)構(gòu)
初始化zk客戶端連接
zk 客戶端申請(qǐng)、釋放鎖實(shí)現(xiàn)
- 實(shí)現(xiàn)了 InitializingBean, DisposableBean接口
應(yīng)用在啟動(dòng)的時(shí)候(client.start方法執(zhí)行的時(shí)候)zookeeper客戶端就會(huì)和zookeeper服務(wù)器時(shí)間建立會(huì)話,系統(tǒng)關(guān)閉時(shí),客戶端與zookeeper服務(wù)器的會(huì)話就關(guān)閉了
定義一個(gè)抽象的業(yè)務(wù)處理接口
單個(gè)線程獲取zk鎖
多個(gè)線程獲取zk鎖
創(chuàng)建線程池ExecutorService executorService = Executors.newFixedThreadPool(20);20個(gè)線程同時(shí)發(fā)起對(duì)同一個(gè)zk鎖的獲取申請(qǐng)
Curator 源碼分析
會(huì)話的建立與關(guān)閉
在client.start調(diào)用后,就會(huì)創(chuàng)建與zookeeper服務(wù)器之間的會(huì)話鏈接
系統(tǒng)關(guān)閉時(shí) 會(huì)話就會(huì)斷開
- client.start 源碼分析
- 啟動(dòng)日志
- 關(guān)閉日志
- 系統(tǒng)啟動(dòng)時(shí)zk的日志
- 系統(tǒng)關(guān)閉時(shí)zk的日志
- 訪問多線程獲取zk鎖接口
curl http://127.0.0.1:8080/batch-acquire-lock查看zk鎖情況查看日志
20個(gè)線程同時(shí)獲取鎖 會(huì)在/lock-path下面創(chuàng)建20個(gè)臨時(shí)節(jié)點(diǎn) 序號(hào)從0-19 只有創(chuàng)建序號(hào)0的臨時(shí)節(jié)點(diǎn)的那個(gè)線程才會(huì)成功獲取得鎖 其他的沒有獲取鎖的臨時(shí)節(jié)點(diǎn)會(huì)刪除
此時(shí)那個(gè)獲得zk鎖的線程如果使用鎖完畢之后如果不釋放鎖 這個(gè)鎖對(duì)應(yīng)的臨時(shí)節(jié)點(diǎn)還會(huì)存在
由此也會(huì)看出一個(gè)缺點(diǎn)臨時(shí)會(huì)話順序節(jié)點(diǎn)會(huì)被刪除,但是它們的父節(jié)點(diǎn)/lock-path不會(huì)被刪除。因此,高并發(fā)的業(yè)務(wù)場(chǎng)景下使用zookeeper分布式鎖時(shí),會(huì)留下很多的空節(jié)點(diǎn)節(jié)點(diǎn)創(chuàng)建
跟蹤lock.acquire(200, TimeUnit.MILLISECONDS)進(jìn)入到
org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver#createsTheLock
創(chuàng)建的節(jié)點(diǎn)為臨時(shí)會(huì)話順序節(jié)點(diǎn)(EPHEMERAL_SEQUENTIAL)
即該節(jié)點(diǎn)會(huì)在客戶端鏈接斷開時(shí)被刪除,還有,我們調(diào)用org.apache.curator.framework.recipes.locks.InterProcessMutex#release時(shí)也會(huì)刪除該節(jié)點(diǎn)
可重入性
跟蹤獲取鎖的代碼進(jìn)入到org.apache.curator.framework.recipes.locks.InterProcessMutex#internalLock
可以看見zookeeper的鎖是可重入的,即同一個(gè)線程可以多次獲取鎖,只有第一次真正的去創(chuàng)建臨時(shí)會(huì)話順序節(jié)點(diǎn),后面的獲取鎖都是對(duì)重入次數(shù)加1。相應(yīng)的,在釋放鎖的時(shí)候,前面都是對(duì)鎖的重入次數(shù)減1,只有最后一次才是真正的去刪除節(jié)點(diǎn)
客戶端故障檢測(cè):
正常情況下,客戶端會(huì)在會(huì)話的有效期內(nèi),向服務(wù)器端發(fā)送PING 請(qǐng)求,來進(jìn)行心跳檢查,說明自己還是存活的。服務(wù)器端接收到客戶端的請(qǐng)求后,會(huì)進(jìn)行對(duì)應(yīng)的客戶端的會(huì)話激活,會(huì)話激活就會(huì)延長(zhǎng)該會(huì)話的存活期。如果有會(huì)話一直沒有激活,那么說明該客戶端出問題了,服務(wù)器端的會(huì)話超時(shí)檢測(cè)任務(wù)就會(huì)檢查出那些一直沒有被激活的與客戶端的會(huì)話,然后進(jìn)行清理,清理中有一步就是刪除臨時(shí)會(huì)話節(jié)點(diǎn)(包括臨時(shí)會(huì)話順序節(jié)點(diǎn))。這就保證了zookeeper分布鎖的容錯(cuò)性,不會(huì)因?yàn)榭蛻舳说囊馔馔顺?#xff0c;導(dǎo)致鎖一直不釋放,其他客戶端獲取不到鎖。數(shù)據(jù)一致性:
zookeeper服務(wù)器集群一般由一個(gè)leader節(jié)點(diǎn)和其他的follower節(jié)點(diǎn)組成,數(shù)據(jù)的讀寫都是在leader節(jié)點(diǎn)上進(jìn)行。當(dāng)一個(gè)寫請(qǐng)求過來時(shí),leader節(jié)點(diǎn)會(huì)發(fā)起一個(gè)proposal,待大多數(shù)follower節(jié)點(diǎn)都返回ack之后,再發(fā)起commit,待大多數(shù)follower節(jié)點(diǎn)都對(duì)這個(gè)proposal進(jìn)行commit了,leader才會(huì)對(duì)客戶端返回請(qǐng)求成功;如果之后leader掛掉了,那么由于zookeeper集群的leader選舉算法采用zab協(xié)議保證數(shù)據(jù)最新的follower節(jié)點(diǎn)當(dāng)選為新的leader,所以,新的leader節(jié)點(diǎn)上都會(huì)有原來leader節(jié)點(diǎn)上提交的所有數(shù)據(jù)。這樣就保證了客戶端請(qǐng)求數(shù)據(jù)的一致性了。CAP:
任何分布式架構(gòu)都不能同時(shí)滿足C(一致性)、A(可用性)、P(分區(qū)耐受性),因此,zookeeper集群在保證一致性的同時(shí),在A和P之間做了取舍,最終選擇了P,因此可用性差一點(diǎn)。
綜上所述
zookeeper分布式鎖保證了鎖的容錯(cuò)性、一致性。但是會(huì)產(chǎn)生空閑節(jié)點(diǎn)(/lock-path),并且有些時(shí)候不可用。
源碼
https://gitee.com/pingfanrenbiji/distributed-lock/tree/master/zookeeper
引用文章
https://my.oschina.net/yangjianzhou/blog/1930493
https://www.jianshu.com/p/db65b64f38aa
總結(jié)
以上是生活随笔為你收集整理的api 创建zookeeper客户端_zookeeper分布式锁原理及实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 希腊神话作者是谁啊?
- 下一篇: pythontype函数使用_基础教程: