日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Zookeeper客户端Curator使用详解

發(fā)布時(shí)間:2023/12/18 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Zookeeper客户端Curator使用详解 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
http://www.jianshu.com/p/70151fc0ef5d

Zookeeper客戶端Curator使用詳解

簡(jiǎn)介

Curator是Netflix公司開(kāi)源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開(kāi)發(fā)工作,包括連接重連、反復(fù)注冊(cè)Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評(píng)價(jià)。
引子和趣聞:
Zookeeper名字的由來(lái)是比較有趣的,下面的片段摘抄自《從PAXOS到ZOOKEEPER分布式一致性原理與實(shí)踐》一書(shū):
Zookeeper最早起源于雅虎的研究院的一個(gè)研究小組。在當(dāng)時(shí),研究人員發(fā)現(xiàn),在雅虎內(nèi)部很多大型的系統(tǒng)需要依賴一個(gè)類(lèi)似的系統(tǒng)進(jìn)行分布式協(xié)調(diào),但是這些系統(tǒng)往往存在分布式單點(diǎn)問(wèn)題。所以雅虎的開(kāi)發(fā)人員就試圖開(kāi)發(fā)一個(gè)通用的無(wú)單點(diǎn)問(wèn)題的分布式協(xié)調(diào)框架。在立項(xiàng)初期,考慮到很多項(xiàng)目都是用動(dòng)物的名字來(lái)命名的(例如著名的Pig項(xiàng)目),雅虎的工程師希望給這個(gè)項(xiàng)目也取一個(gè)動(dòng)物的名字。時(shí)任研究院的首席科學(xué)家Raghu Ramakrishnan開(kāi)玩笑說(shuō):再這樣下去,我們這兒就變成動(dòng)物園了。此話一出,大家紛紛表示就叫動(dòng)物園管理員吧——因?yàn)楦鱾€(gè)以動(dòng)物命名的分布式組件放在一起,雅虎的整個(gè)分布式系統(tǒng)看上去就像一個(gè)大型的動(dòng)物園了,而Zookeeper正好用來(lái)進(jìn)行分布式環(huán)境的協(xié)調(diào)——于是,Zookeeper的名字由此誕生了。

Curator無(wú)疑是Zookeeper客戶端中的瑞士軍刀,它譯作"館長(zhǎng)"或者''管理者'',不知道是不是開(kāi)發(fā)小組有意而為之,筆者猜測(cè)有可能這樣命名的原因是說(shuō)明Curator就是Zookeeper的館長(zhǎng)(腦洞有點(diǎn)大:Curator就是動(dòng)物園的園長(zhǎng))。
Curator包含了幾個(gè)包:
curator-framework:對(duì)zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操作,例如重試策略等
curator-recipes:封裝了一些高級(jí)特性,如:Cache事件監(jiān)聽(tīng)、選舉、分布式鎖、分布式計(jì)數(shù)器、分布式Barrier等
Maven依賴(使用curator的版本:2.12.0,對(duì)應(yīng)Zookeeper的版本為:3.4.x,如果跨版本會(huì)有兼容性問(wèn)題,很有可能導(dǎo)致節(jié)點(diǎn)操作失敗):

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency>

Curator的基本Api

創(chuàng)建會(huì)話

1.使用靜態(tài)工程方法創(chuàng)建客戶端

一個(gè)例子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(connectionInfo,5000,3000,retryPolicy);

newClient靜態(tài)工廠方法包含四個(gè)主要參數(shù):

參數(shù)名說(shuō)明
connectionString服務(wù)器列表,格式host1:port1,host2:port2,...
retryPolicy重試策略,內(nèi)建有四種重試策略,也可以自行實(shí)現(xiàn)RetryPolicy接口
sessionTimeoutMs會(huì)話超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms
connectionTimeoutMs連接創(chuàng)建超時(shí)時(shí)間,單位毫秒,默認(rèn)60000ms

2.使用Fluent風(fēng)格的Api創(chuàng)建會(huì)話

核心參數(shù)變?yōu)榱魇皆O(shè)置,一個(gè)列子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client =CuratorFrameworkFactory.builder().connectString(connectionInfo).sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();

3.創(chuàng)建包含隔離命名空間的會(huì)話

為了實(shí)現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,需要為每個(gè)業(yè)務(wù)分配一個(gè)獨(dú)立的命名空間(NameSpace),即指定一個(gè)Zookeeper的根路徑(官方術(shù)語(yǔ):為Zookeeper添加“Chroot”特性)。例如(下面的例子)當(dāng)客戶端指定了獨(dú)立命名空間為“/base”,那么該客戶端對(duì)Zookeeper上的數(shù)據(jù)節(jié)點(diǎn)的操作都是基于該目錄進(jìn)行的。通過(guò)設(shè)置Chroot可以將客戶端應(yīng)用與Zookeeper服務(wù)端的一課子樹(shù)相對(duì)應(yīng),在多個(gè)應(yīng)用共用一個(gè)Zookeeper集群的場(chǎng)景下,這對(duì)于實(shí)現(xiàn)不同應(yīng)用之間的相互隔離十分有意義。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client =CuratorFrameworkFactory.builder().connectString(connectionInfo).sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();

啟動(dòng)客戶端

當(dāng)創(chuàng)建會(huì)話成功,得到client的實(shí)例然后可以直接調(diào)用其start( )方法:

client.start();

數(shù)據(jù)節(jié)點(diǎn)操作

創(chuàng)建數(shù)據(jù)節(jié)點(diǎn)

Zookeeper的節(jié)點(diǎn)創(chuàng)建模式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且?guī)蛄刑?hào)
  • EPHEMERAL:臨時(shí)
  • EPHEMERAL_SEQUENTIAL:臨時(shí)并且?guī)蛄刑?hào)

創(chuàng)建一個(gè)節(jié)點(diǎn),初始內(nèi)容為空

client.create().forPath("path");

注意:如果沒(méi)有設(shè)置節(jié)點(diǎn)屬性,節(jié)點(diǎn)創(chuàng)建模式默認(rèn)為持久化節(jié)點(diǎn),內(nèi)容默認(rèn)為空

創(chuàng)建一個(gè)節(jié)點(diǎn),附帶初始化內(nèi)容

client.create().forPath("path","init".getBytes());

創(chuàng)建一個(gè)節(jié)點(diǎn),指定創(chuàng)建模式(臨時(shí)節(jié)點(diǎn)),內(nèi)容為空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

創(chuàng)建一個(gè)節(jié)點(diǎn),指定創(chuàng)建模式(臨時(shí)節(jié)點(diǎn)),附帶初始化內(nèi)容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

創(chuàng)建一個(gè)節(jié)點(diǎn),指定創(chuàng)建模式(臨時(shí)節(jié)點(diǎn)),附帶初始化內(nèi)容,并且自動(dòng)遞歸創(chuàng)建父節(jié)點(diǎn)

client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

這個(gè)creatingParentContainersIfNeeded()接口非常有用,因?yàn)橐话闱闆r開(kāi)發(fā)人員在創(chuàng)建一個(gè)子節(jié)點(diǎn)必須判斷它的父節(jié)點(diǎn)是否存在,如果不存在直接創(chuàng)建會(huì)拋出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能夠自動(dòng)遞歸創(chuàng)建所有所需的父節(jié)點(diǎn)。

刪除數(shù)據(jù)節(jié)點(diǎn)

刪除一個(gè)節(jié)點(diǎn)

client.delete().forPath("path");

注意,此方法只能刪除葉子節(jié)點(diǎn),否則會(huì)拋出異常。

刪除一個(gè)節(jié)點(diǎn),并且遞歸刪除其所有的子節(jié)點(diǎn)

client.delete().deletingChildrenIfNeeded().forPath("path");

刪除一個(gè)節(jié)點(diǎn),強(qiáng)制指定版本進(jìn)行刪除

client.delete().withVersion(10086).forPath("path");

刪除一個(gè)節(jié)點(diǎn),強(qiáng)制保證刪除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一個(gè)保障措施,只要客戶端會(huì)話有效,那么Curator會(huì)在后臺(tái)持續(xù)進(jìn)行刪除操作,直到刪除節(jié)點(diǎn)成功。

注意:上面的多個(gè)流式接口是可以自由組合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

讀取數(shù)據(jù)節(jié)點(diǎn)數(shù)據(jù)

讀取一個(gè)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容

client.getData().forPath("path");

注意,此方法返的返回值是byte[ ];

讀取一個(gè)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容,同時(shí)獲取到該節(jié)點(diǎn)的stat

Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("path");

更新數(shù)據(jù)節(jié)點(diǎn)數(shù)據(jù)

更新一個(gè)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容

client.setData().forPath("path","data".getBytes());

注意:該接口會(huì)返回一個(gè)Stat實(shí)例

更新一個(gè)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容,強(qiáng)制指定版本進(jìn)行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

檢查節(jié)點(diǎn)是否存在

client.checkExists().forPath("path");

注意:該方法返回一個(gè)Stat實(shí)例,用于檢查ZNode是否存在的操作. 可以調(diào)用額外的方法(監(jiān)控或者后臺(tái)處理)并在最后調(diào)用forPath( )指定要操作的ZNode

獲取某個(gè)節(jié)點(diǎn)的所有子節(jié)點(diǎn)路徑

client.getChildren().forPath("path");

注意:該方法的返回值為L(zhǎng)ist<String>,獲得ZNode的子節(jié)點(diǎn)Path列表。 可以調(diào)用額外的方法(監(jiān)控、后臺(tái)處理或者獲取狀態(tài)watch, background or get stat) 并在最后調(diào)用forPath()指定要操作的父ZNode

事務(wù)

CuratorFramework的實(shí)例包含inTransaction( )接口方法,調(diào)用此方法開(kāi)啟一個(gè)ZooKeeper事務(wù). 可以復(fù)合create, setData, check, and/or delete 等操作然后調(diào)用commit()作為一個(gè)原子操作提交。一個(gè)例子如下:

client.inTransaction().check().forPath("path").and().create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes()).and().setData().withVersion(10086).forPath("path","data2".getBytes()).and().commit();

異步接口

上面提到的創(chuàng)建、刪除、更新、讀取等方法都是同步的,Curator提供異步接口,引入了BackgroundCallback接口用于處理異步接口調(diào)用之后服務(wù)端返回的結(jié)果信息。BackgroundCallback接口中一個(gè)重要的回調(diào)值為CuratorEvent,里面包含事件類(lèi)型、響應(yīng)嗎和節(jié)點(diǎn)的詳細(xì)信息。

CuratorEventType

事件類(lèi)型對(duì)應(yīng)CuratorFramework實(shí)例的方法
CREATE#create()
DELETE#delete()
EXISTS#checkExists()
GET_DATA#getData()
SET_DATA#setData()
CHILDREN#getChildren()
SYNC#sync(String,Object)
GET_ACL#getACL()
SET_ACL#setACL()
WATCHED#Watcher(Watcher)
CLOSING#close()

響應(yīng)碼(#getResultCode())

響應(yīng)碼意義
0OK,即調(diào)用成功
-4ConnectionLoss,即客戶端與服務(wù)端斷開(kāi)連接
-110NodeExists,即節(jié)點(diǎn)已經(jīng)存在
-112SessionExpired,即會(huì)話過(guò)期

一個(gè)異步創(chuàng)建節(jié)點(diǎn)的例子如下:

Executor executor = Executors.newFixedThreadPool(2); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));},executor).forPath("path");

注意:如果#inBackground()方法不指定executor,那么會(huì)默認(rèn)使用Curator的EventThread去進(jìn)行異步處理。

Curator食譜(高級(jí)特性)

提醒:首先你必須添加curator-recipes依賴,下文僅僅對(duì)recipes一些特性的使用進(jìn)行解釋和舉例,不打算進(jìn)行源碼級(jí)別的探討

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency>

重要提醒:強(qiáng)烈推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),當(dāng)連接狀態(tài)為L(zhǎng)OST,curator-recipes下的所有Api將會(huì)失效或者過(guò)期,盡管后面所有的例子都沒(méi)有使用到ConnectionStateListener。

緩存

Zookeeper原生支持通過(guò)注冊(cè)Watcher來(lái)進(jìn)行事件監(jiān)聽(tīng),但是開(kāi)發(fā)者需要反復(fù)注冊(cè)(Watcher只能單次注冊(cè)單次使用)。Cache是Curator中對(duì)事件監(jiān)聽(tīng)的包裝,可以看作是對(duì)事件監(jiān)聽(tīng)的本地緩存視圖,能夠自動(dòng)為開(kāi)發(fā)者處理反復(fù)注冊(cè)監(jiān)聽(tīng)。Curator提供了三種Watcher(Cache)來(lái)監(jiān)聽(tīng)結(jié)點(diǎn)的變化。

Path Cache

Path Cache用來(lái)監(jiān)控一個(gè)ZNode的子節(jié)點(diǎn). 當(dāng)一個(gè)子節(jié)點(diǎn)增加, 更新,刪除時(shí), Path Cache會(huì)改變它的狀態(tài), 會(huì)包含最新的子節(jié)點(diǎn), 子節(jié)點(diǎn)的數(shù)據(jù)和狀態(tài),而狀態(tài)的更變將通過(guò)PathChildrenCacheListener通知。

實(shí)際使用時(shí)會(huì)涉及到四個(gè)類(lèi):

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通過(guò)下面的構(gòu)造函數(shù)創(chuàng)建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必須調(diào)用它的start方法,使用完后調(diào)用close方法。 可以設(shè)置StartMode來(lái)實(shí)現(xiàn)啟動(dòng)的模式,

StartMode有下面幾種:

  • NORMAL:正常初始化。
  • BUILD_INITIAL_CACHE:在調(diào)用start()之前會(huì)調(diào)用rebuild()。
  • POST_INITIALIZED_EVENT: 當(dāng)Cache初始化數(shù)據(jù)后發(fā)送一個(gè)PathChildrenCacheEvent.Type#INITIALIZED事件
  • public void addListener(PathChildrenCacheListener listener)可以增加listener監(jiān)聽(tīng)緩存的變化。

    getCurrentData()方法返回一個(gè)List<ChildData>對(duì)象,可以遍歷所有的子節(jié)點(diǎn)。

    設(shè)置/更新、移除其實(shí)是使用client (CuratorFramework)來(lái)操作, 不通過(guò)PathChildrenCache操作:

    public class PathCacheDemo {private static final String PATH = "/example/pathCache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();PathChildrenCache cache = new PathChildrenCache(client, PATH, true);cache.start();PathChildrenCacheListener cacheListener = (client1, event) -> {System.out.println("事件類(lèi)型:" + event.getType());if (null != event.getData()) {System.out.println("節(jié)點(diǎn)數(shù)據(jù):" + event.getData().getPath() + " = " + new String(event.getData().getData()));}};cache.getListenable().addListener(cacheListener);client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());Thread.sleep(10);client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());Thread.sleep(10);client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());Thread.sleep(10);for (ChildData data : cache.getCurrentData()) {System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));}client.delete().forPath("/example/pathCache/test01");Thread.sleep(10);client.delete().forPath("/example/pathCache/test02");Thread.sleep(1000 * 5);cache.close();client.close();System.out.println("OK!");} }

    注意:如果new PathChildrenCache(client, PATH, true)中的參數(shù)cacheData值設(shè)置為false,則示例中的event.getData().getData()、data.getData()將返回null,cache將不會(huì)緩存節(jié)點(diǎn)數(shù)據(jù)。

    注意:示例中的Thread.sleep(10)可以注釋掉,但是注釋后事件監(jiān)聽(tīng)的觸發(fā)次數(shù)會(huì)不全,這可能與PathCache的實(shí)現(xiàn)原理有關(guān),不能太過(guò)頻繁的觸發(fā)事件!

    Node Cache

    Node Cache與Path Cache類(lèi)似,Node Cache只是監(jiān)聽(tīng)某一個(gè)特定的節(jié)點(diǎn)。它涉及到下面的三個(gè)類(lèi):

    • NodeCache - Node Cache實(shí)現(xiàn)類(lèi)
    • NodeCacheListener - 節(jié)點(diǎn)監(jiān)聽(tīng)器
    • ChildData - 節(jié)點(diǎn)數(shù)據(jù)

    注意:使用cache,依然要調(diào)用它的start()方法,使用完后調(diào)用close()方法。

    getCurrentData()將得到節(jié)點(diǎn)當(dāng)前的狀態(tài),通過(guò)它的狀態(tài)可以得到當(dāng)前的值。

    public class NodeCacheDemo {private static final String PATH = "/example/cache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH);final NodeCache cache = new NodeCache(client, PATH);NodeCacheListener listener = () -> {ChildData data = cache.getCurrentData();if (null != data) {System.out.println("節(jié)點(diǎn)數(shù)據(jù):" + new String(cache.getCurrentData().getData()));} else {System.out.println("節(jié)點(diǎn)被刪除!");}};cache.getListenable().addListener(listener);cache.start();client.setData().forPath(PATH, "01".getBytes());Thread.sleep(100);client.setData().forPath(PATH, "02".getBytes());Thread.sleep(100);client.delete().deletingChildrenIfNeeded().forPath(PATH);Thread.sleep(1000 * 2);cache.close();client.close();System.out.println("OK!");} }

    注意:示例中的Thread.sleep(10)可以注釋,但是注釋后事件監(jiān)聽(tīng)的觸發(fā)次數(shù)會(huì)不全,這可能與NodeCache的實(shí)現(xiàn)原理有關(guān),不能太過(guò)頻繁的觸發(fā)事件!

    注意:NodeCache只能監(jiān)聽(tīng)一個(gè)節(jié)點(diǎn)的狀態(tài)變化。

    Tree Cache

    Tree Cache可以監(jiān)控整個(gè)樹(shù)上的所有節(jié)點(diǎn),類(lèi)似于PathCache和NodeCache的組合,主要涉及到下面四個(gè)類(lèi):

    • TreeCache - Tree Cache實(shí)現(xiàn)類(lèi)
    • TreeCacheListener - 監(jiān)聽(tīng)器類(lèi)
    • TreeCacheEvent - 觸發(fā)的事件類(lèi)
    • ChildData - 節(jié)點(diǎn)數(shù)據(jù)
    public class TreeCacheDemo {private static final String PATH = "/example/cache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH);TreeCache cache = new TreeCache(client, PATH);TreeCacheListener listener = (client1, event) ->System.out.println("事件類(lèi)型:" + event.getType() +" | 路徑:" + (null != event.getData() ? event.getData().getPath() : null));cache.getListenable().addListener(listener);cache.start();client.setData().forPath(PATH, "01".getBytes());Thread.sleep(100);client.setData().forPath(PATH, "02".getBytes());Thread.sleep(100);client.delete().deletingChildrenIfNeeded().forPath(PATH);Thread.sleep(1000 * 2);cache.close();client.close();System.out.println("OK!");} }

    注意:在此示例中沒(méi)有使用Thread.sleep(10),但是事件觸發(fā)次數(shù)也是正常的。

    注意:TreeCache在初始化(調(diào)用start()方法)的時(shí)候會(huì)回調(diào)TreeCacheListener實(shí)例一個(gè)事TreeCacheEvent,而回調(diào)的TreeCacheEvent對(duì)象的Type為INITIALIZED,ChildData為null,此時(shí)event.getData().getPath()很有可能導(dǎo)致空指針異常,這里應(yīng)該主動(dòng)處理并避免這種情況。

    Leader選舉

    在分布式計(jì)算中, leader elections是很重要的一個(gè)功能, 這個(gè)選舉過(guò)程是這樣子的: 指派一個(gè)進(jìn)程作為組織者,將任務(wù)分發(fā)給各節(jié)點(diǎn)。 在任務(wù)開(kāi)始前, 哪個(gè)節(jié)點(diǎn)都不知道誰(shuí)是leader(領(lǐng)導(dǎo)者)或者coordinator(協(xié)調(diào)者). 當(dāng)選舉算法開(kāi)始執(zhí)行后, 每個(gè)節(jié)點(diǎn)最終會(huì)得到一個(gè)唯一的節(jié)點(diǎn)作為任務(wù)leader. 除此之外, 選舉還經(jīng)常會(huì)發(fā)生在leader意外宕機(jī)的情況下,新的leader要被選舉出來(lái)。

    在zookeeper集群中,leader負(fù)責(zé)寫(xiě)操作,然后通過(guò)Zab協(xié)議實(shí)現(xiàn)follower的同步,leader或者follower都可以處理讀操作。

    Curator 有兩種leader選舉的recipe,分別是LeaderSelectorLeaderLatch

    前者是所有存活的客戶端不間斷的輪流做Leader,大同社會(huì)。后者是一旦選舉出Leader,除非有客戶端掛掉重新觸發(fā)選舉,否則不會(huì)交出領(lǐng)導(dǎo)權(quán)。某黨?

    LeaderLatch

    LeaderLatch有兩個(gè)構(gòu)造函數(shù):

    public LeaderLatch(CuratorFramework client, String latchPath) public LeaderLatch(CuratorFramework client, String latchPath, String id)

    LeaderLatch的啟動(dòng):

    leaderLatch.start( );

    一旦啟動(dòng),LeaderLatch會(huì)和其它使用相同latch path的其它LeaderLatch交涉,然后其中一個(gè)最終會(huì)被選舉為leader,可以通過(guò)hasLeadership方法查看LeaderLatch實(shí)例是否leader:

    leaderLatch.hasLeadership( ); //返回true說(shuō)明當(dāng)前實(shí)例是leader

    類(lèi)似JDK的CountDownLatch, LeaderLatch在請(qǐng)求成為leadership會(huì)block(阻塞),一旦不使用LeaderLatch了,必須調(diào)用close方法。 如果它是leader,會(huì)釋放leadership, 其它的參與者將會(huì)選舉一個(gè)leader。

    public void await() throws InterruptedException,EOFException /*Causes the current thread to wait until this instance acquires leadership unless the thread is interrupted or closed.*/ public boolean await(long timeout,TimeUnit unit)throws InterruptedException

    異常處理: LeaderLatch實(shí)例可以增加ConnectionStateListener來(lái)監(jiān)聽(tīng)網(wǎng)絡(luò)連接問(wèn)題。 當(dāng) SUSPENDED 或 LOST 時(shí), leader不再認(rèn)為自己還是leader。當(dāng)LOST后連接重連后RECONNECTED,LeaderLatch會(huì)刪除先前的ZNode然后重新創(chuàng)建一個(gè)。LeaderLatch用戶必須考慮導(dǎo)致leadership丟失的連接問(wèn)題。 強(qiáng)烈推薦你使用ConnectionStateListener。

    一個(gè)LeaderLatch的使用例子:

    public class LeaderLatchDemo extends BaseConnectionInfo {protected static String PATH = "/francis/leader";private static final int CLIENT_QTY = 10;public static void main(String[] args) throws Exception {List<CuratorFramework> clients = Lists.newArrayList();List<LeaderLatch> examples = Lists.newArrayList();TestingServer server=new TestingServer();try {for (int i = 0; i < CLIENT_QTY; i++) {CuratorFramework client= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));clients.add(client);LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);latch.addListener(new LeaderLatchListener() {@Overridepublic void isLeader() {// TODO Auto-generated method stubSystem.out.println("I am Leader");}@Overridepublic void notLeader() {// TODO Auto-generated method stubSystem.out.println("I am not Leader");}});examples.add(latch);client.start();latch.start();}Thread.sleep(10000);LeaderLatch currentLeader = null;for (LeaderLatch latch : examples) {if (latch.hasLeadership()) {currentLeader = latch;}}System.out.println("current leader is " + currentLeader.getId());System.out.println("release the leader " + currentLeader.getId());currentLeader.close();Thread.sleep(5000);for (LeaderLatch latch : examples) {if (latch.hasLeadership()) {currentLeader = latch;}}System.out.println("current leader is " + currentLeader.getId());System.out.println("release the leader " + currentLeader.getId());} finally {for (LeaderLatch latch : examples) {if (null != latch.getState())CloseableUtils.closeQuietly(latch);}for (CuratorFramework client : clients) {CloseableUtils.closeQuietly(client);}}} }

    可以添加test module的依賴方便進(jìn)行測(cè)試,不需要啟動(dòng)真實(shí)的zookeeper服務(wù)端:

    <dependency><groupId>org.apache.curator</groupId><artifactId>curator-test</artifactId><version>2.12.0</version></dependency>

    首先我們創(chuàng)建了10個(gè)LeaderLatch,啟動(dòng)后它們中的一個(gè)會(huì)被選舉為leader。 因?yàn)檫x舉會(huì)花費(fèi)一些時(shí)間,start后并不能馬上就得到leader。
    通過(guò)hasLeadership查看自己是否是leader, 如果是的話返回true。
    可以通過(guò).getLeader().getId()可以得到當(dāng)前的leader的ID。
    只能通過(guò)close釋放當(dāng)前的領(lǐng)導(dǎo)權(quán)。
    await是一個(gè)阻塞方法, 嘗試獲取leader地位,但是未必能上位。

    LeaderSelector

    LeaderSelector使用的時(shí)候主要涉及下面幾個(gè)類(lèi):

    • LeaderSelector
    • LeaderSelectorListener
    • LeaderSelectorListenerAdapter
    • CancelLeadershipException

    核心類(lèi)是LeaderSelector,它的構(gòu)造函數(shù)如下:

    public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener) public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

    類(lèi)似LeaderLatch,LeaderSelector必須start: leaderSelector.start(); 一旦啟動(dòng),當(dāng)實(shí)例取得領(lǐng)導(dǎo)權(quán)時(shí)你的listener的takeLeadership()方法被調(diào)用。而takeLeadership()方法只有領(lǐng)導(dǎo)權(quán)被釋放時(shí)才返回。 當(dāng)你不再使用LeaderSelector實(shí)例時(shí),應(yīng)該調(diào)用它的close方法。

    異常處理 LeaderSelectorListener類(lèi)繼承ConnectionStateListener。LeaderSelector必須小心連接狀態(tài)的改變。如果實(shí)例成為leader, 它應(yīng)該響應(yīng)SUSPENDED 或 LOST。 當(dāng) SUSPENDED 狀態(tài)出現(xiàn)時(shí), 實(shí)例必須假定在重新連接成功之前它可能不再是leader了。 如果LOST狀態(tài)出現(xiàn), 實(shí)例不再是leader, takeLeadership方法返回。

    重要: 推薦處理方式是當(dāng)收到SUSPENDED 或 LOST時(shí)拋出CancelLeadershipException異常.。這會(huì)導(dǎo)致LeaderSelector實(shí)例中斷并取消執(zhí)行takeLeadership方法的異常.。這非常重要, 你必須考慮擴(kuò)展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推薦的處理邏輯。

    下面的一個(gè)例子摘抄自官方:

    public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {private final String name;private final LeaderSelector leaderSelector;private final AtomicInteger leaderCount = new AtomicInteger();public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {this.name = name;leaderSelector = new LeaderSelector(client, path, this);leaderSelector.autoRequeue();}public void start() throws IOException {leaderSelector.start();}@Overridepublic void close() throws IOException {leaderSelector.close();}@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {final int waitSeconds = (int) (5 * Math.random()) + 1;System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");try {Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));} catch (InterruptedException e) {System.err.println(name + " was interrupted.");Thread.currentThread().interrupt();} finally {System.out.println(name + " relinquishing leadership.\n");}} }

    你可以在takeLeadership進(jìn)行任務(wù)的分配等等,并且不要返回,如果你想要要此實(shí)例一直是leader的話可以加一個(gè)死循環(huán)。調(diào)用 leaderSelector.autoRequeue();保證在此實(shí)例釋放領(lǐng)導(dǎo)權(quán)之后還可能獲得領(lǐng)導(dǎo)權(quán)。 在這里我們使用AtomicInteger來(lái)記錄此client獲得領(lǐng)導(dǎo)權(quán)的次數(shù), 它是”fair”, 每個(gè)client有平等的機(jī)會(huì)獲得領(lǐng)導(dǎo)權(quán)。

    public class LeaderSelectorDemo {protected static String PATH = "/francis/leader";private static final int CLIENT_QTY = 10;public static void main(String[] args) throws Exception {List<CuratorFramework> clients = Lists.newArrayList();List<LeaderSelectorAdapter> examples = Lists.newArrayList();TestingServer server = new TestingServer();try {for (int i = 0; i < CLIENT_QTY; i++) {CuratorFramework client= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));clients.add(client);LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);examples.add(selectorAdapter);client.start();selectorAdapter.start();}System.out.println("Press enter/return to quit\n");new BufferedReader(new InputStreamReader(System.in)).readLine();} finally {System.out.println("Shutting down...");for (LeaderSelectorAdapter exampleClient : examples) {CloseableUtils.closeQuietly(exampleClient);}for (CuratorFramework client : clients) {CloseableUtils.closeQuietly(client);}CloseableUtils.closeQuietly(server);}} }

    對(duì)比可知,LeaderLatch必須調(diào)用close()方法才會(huì)釋放領(lǐng)導(dǎo)權(quán),而對(duì)于LeaderSelector,通過(guò)LeaderSelectorListener可以對(duì)領(lǐng)導(dǎo)權(quán)進(jìn)行控制, 在適當(dāng)?shù)臅r(shí)候釋放領(lǐng)導(dǎo)權(quán),這樣每個(gè)節(jié)點(diǎn)都有可能獲得領(lǐng)導(dǎo)權(quán)。從而,LeaderSelector具有更好的靈活性和可控性,建議有LeaderElection應(yīng)用場(chǎng)景下優(yōu)先使用LeaderSelector。

    分布式鎖

    提醒:

    1.推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),因?yàn)楫?dāng)連接LOST時(shí)你不再擁有鎖

    2.分布式的鎖全局同步, 這意味著任何一個(gè)時(shí)間點(diǎn)不會(huì)有兩個(gè)客戶端都擁有相同的鎖。

    可重入共享鎖—Shared Reentrant Lock

    Shared意味著鎖是全局可見(jiàn)的, 客戶端都可以請(qǐng)求鎖。 Reentrant和JDK的ReentrantLock類(lèi)似,即可重入, 意味著同一個(gè)客戶端在擁有鎖的同時(shí),可以多次獲取,不會(huì)被阻塞。 它是由類(lèi)InterProcessMutex來(lái)實(shí)現(xiàn)。 它的構(gòu)造函數(shù)為:

    public InterProcessMutex(CuratorFramework client, String path)

    通過(guò)acquire()獲得鎖,并提供超時(shí)機(jī)制:

    public void acquire() Acquire the mutex - blocking until it's available. Note: the same thread can call acquire re-entrantly. Each call to acquire must be balanced by a call to release()public boolean acquire(long time,TimeUnit unit) Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()Parameters: time - time to wait unit - time unit Returns: true if the mutex was acquired, false if not

    通過(guò)release()方法釋放鎖。 InterProcessMutex 實(shí)例可以重用。

    Revoking ZooKeeper recipes wiki定義了可協(xié)商的撤銷(xiāo)機(jī)制。 為了撤銷(xiāo)mutex, 調(diào)用下面的方法:

    public void makeRevocable(RevocationListener<T> listener) 將鎖設(shè)為可撤銷(xiāo)的. 當(dāng)別的進(jìn)程或線程想讓你釋放鎖時(shí)Listener會(huì)被調(diào)用。 Parameters: listener - the listener

    如果你請(qǐng)求撤銷(xiāo)當(dāng)前的鎖, 調(diào)用attemptRevoke()方法,注意鎖釋放時(shí)RevocationListener將會(huì)回調(diào)。

    public static void attemptRevoke(CuratorFramework client,String path) throws Exception Utility to mark a lock for revocation. Assuming that the lock has been registered with a RevocationListener, it will get called and the lock should be released. Note, however, that revocation is cooperative. Parameters: client - the client path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

    二次提醒:錯(cuò)誤處理 還是強(qiáng)烈推薦你使用ConnectionStateListener處理連接狀態(tài)的改變。 當(dāng)連接LOST時(shí)你不再擁有鎖。

    首先讓我們創(chuàng)建一個(gè)模擬的共享資源, 這個(gè)資源期望只能單線程的訪問(wèn),否則會(huì)有并發(fā)問(wèn)題。

    public class FakeLimitedResource {private final AtomicBoolean inUse = new AtomicBoolean(false);public void use() throws InterruptedException {// 真實(shí)環(huán)境中我們會(huì)在這里訪問(wèn)/維護(hù)一個(gè)共享的資源//這個(gè)例子在使用鎖的情況下不會(huì)非法并發(fā)異常IllegalStateException//但是在無(wú)鎖的情況由于sleep了一段時(shí)間,很容易拋出異常if (!inUse.compareAndSet(false, true)) {throw new IllegalStateException("Needs to be used by one client at a time");}try {Thread.sleep((long) (3 * Math.random()));} finally {inUse.set(false);}} }

    然后創(chuàng)建一個(gè)InterProcessMutexDemo類(lèi), 它負(fù)責(zé)請(qǐng)求鎖, 使用資源,釋放鎖這樣一個(gè)完整的訪問(wèn)過(guò)程。

    public class InterProcessMutexDemo {private InterProcessMutex lock;private final FakeLimitedResource resource;private final String clientName;public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {this.resource = resource;this.clientName = clientName;this.lock = new InterProcessMutex(client, lockPath);}public void doWork(long time, TimeUnit unit) throws Exception {if (!lock.acquire(time, unit)) {throw new IllegalStateException(clientName + " could not acquire the lock");}try {System.out.println(clientName + " get the lock");resource.use(); //access resource exclusively} finally {System.out.println(clientName + " releasing the lock");lock.release(); // always release the lock in a finally block}}private static final int QTY = 5;private static final int REPETITIONS = QTY * 10;private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception {final FakeLimitedResource resource = new FakeLimitedResource();ExecutorService service = Executors.newFixedThreadPool(QTY);final TestingServer server = new TestingServer();try {for (int i = 0; i < QTY; ++i) {final int index = i;Callable<Void> task = new Callable<Void>() {@Overridepublic Void call() throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));try {client.start();final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);for (int j = 0; j < REPETITIONS; ++j) {example.doWork(10, TimeUnit.SECONDS);}} catch (Throwable e) {e.printStackTrace();} finally {CloseableUtils.closeQuietly(client);}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);} finally {CloseableUtils.closeQuietly(server);}} }

    代碼也很簡(jiǎn)單,生成10個(gè)client, 每個(gè)client重復(fù)執(zhí)行10次 請(qǐng)求鎖–訪問(wèn)資源–釋放鎖的過(guò)程。每個(gè)client都在獨(dú)立的線程中。 結(jié)果可以看到,鎖是隨機(jī)的被每個(gè)實(shí)例排他性的使用。

    既然是可重用的,你可以在一個(gè)線程中多次調(diào)用acquire(),在線程擁有鎖時(shí)它總是返回true。

    你不應(yīng)該在多個(gè)線程中用同一個(gè)InterProcessMutex, 你可以在每個(gè)線程中都生成一個(gè)新的InterProcessMutex實(shí)例,它們的path都一樣,這樣它們可以共享同一個(gè)鎖。

    不可重入共享鎖—Shared Lock

    這個(gè)鎖和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味著它不能在同一個(gè)線程中重入。這個(gè)類(lèi)是InterProcessSemaphoreMutex,使用方法和InterProcessMutex類(lèi)似

    public class InterProcessSemaphoreMutexDemo {private InterProcessSemaphoreMutex lock;private final FakeLimitedResource resource;private final String clientName;public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {this.resource = resource;this.clientName = clientName;this.lock = new InterProcessSemaphoreMutex(client, lockPath);}public void doWork(long time, TimeUnit unit) throws Exception {if (!lock.acquire(time, unit)){throw new IllegalStateException(clientName + " 不能得到互斥鎖");}System.out.println(clientName + " 已獲取到互斥鎖");if (!lock.acquire(time, unit)){throw new IllegalStateException(clientName + " 不能得到互斥鎖");}System.out.println(clientName + " 再次獲取到互斥鎖");try {System.out.println(clientName + " get the lock");resource.use(); //access resource exclusively} finally {System.out.println(clientName + " releasing the lock");lock.release(); // always release the lock in a finally blocklock.release(); // 獲取鎖幾次 釋放鎖也要幾次}}private static final int QTY = 5;private static final int REPETITIONS = QTY * 10;private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception {final FakeLimitedResource resource = new FakeLimitedResource();ExecutorService service = Executors.newFixedThreadPool(QTY);final TestingServer server = new TestingServer();try {for (int i = 0; i < QTY; ++i) {final int index = i;Callable<Void> task = new Callable<Void>() {@Overridepublic Void call() throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));try {client.start();final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);for (int j = 0; j < REPETITIONS; ++j) {example.doWork(10, TimeUnit.SECONDS);}} catch (Throwable e) {e.printStackTrace();} finally {CloseableUtils.closeQuietly(client);}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);} finally {CloseableUtils.closeQuietly(server);}Thread.sleep(Integer.MAX_VALUE);} }

    運(yùn)行后發(fā)現(xiàn),有且只有一個(gè)client成功獲取第一個(gè)鎖(第一個(gè)acquire()方法返回true),然后它自己阻塞在第二個(gè)acquire()方法,獲取第二個(gè)鎖超時(shí);其他所有的客戶端都阻塞在第一個(gè)acquire()方法超時(shí)并且拋出異常。

    這樣也就驗(yàn)證了InterProcessSemaphoreMutex實(shí)現(xiàn)的鎖是不可重入的。

    可重入讀寫(xiě)鎖—Shared Reentrant Read Write Lock

    類(lèi)似JDK的ReentrantReadWriteLock。一個(gè)讀寫(xiě)鎖管理一對(duì)相關(guān)的鎖。一個(gè)負(fù)責(zé)讀操作,另外一個(gè)負(fù)責(zé)寫(xiě)操作。讀操作在寫(xiě)鎖沒(méi)被使用時(shí)可同時(shí)由多個(gè)進(jìn)程使用,而寫(xiě)鎖在使用時(shí)不允許讀(阻塞)。

    此鎖是可重入的。一個(gè)擁有寫(xiě)鎖的線程可重入讀鎖,但是讀鎖卻不能進(jìn)入寫(xiě)鎖。這也意味著寫(xiě)鎖可以降級(jí)成讀鎖, 比如請(qǐng)求寫(xiě)鎖 --->請(qǐng)求讀鎖--->釋放讀鎖 ---->釋放寫(xiě)鎖。從讀鎖升級(jí)成寫(xiě)鎖是不行的。

    可重入讀寫(xiě)鎖主要由兩個(gè)類(lèi)實(shí)現(xiàn):InterProcessReadWriteLock、InterProcessMutex。使用時(shí)首先創(chuàng)建一個(gè)InterProcessReadWriteLock實(shí)例,然后再根據(jù)你的需求得到讀鎖或者寫(xiě)鎖,讀寫(xiě)鎖的類(lèi)型是InterProcessMutex。

    public class ReentrantReadWriteLockDemo {private final InterProcessReadWriteLock lock;private final InterProcessMutex readLock;private final InterProcessMutex writeLock;private final FakeLimitedResource resource;private final String clientName;public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {this.resource = resource;this.clientName = clientName;lock = new InterProcessReadWriteLock(client, lockPath);readLock = lock.readLock();writeLock = lock.writeLock();}public void doWork(long time, TimeUnit unit) throws Exception {// 注意只能先得到寫(xiě)鎖再得到讀鎖,不能反過(guò)來(lái)!!!if (!writeLock.acquire(time, unit)) {throw new IllegalStateException(clientName + " 不能得到寫(xiě)鎖");}System.out.println(clientName + " 已得到寫(xiě)鎖");if (!readLock.acquire(time, unit)) {throw new IllegalStateException(clientName + " 不能得到讀鎖");}System.out.println(clientName + " 已得到讀鎖");try {resource.use(); // 使用資源Thread.sleep(1000);} finally {System.out.println(clientName + " 釋放讀寫(xiě)鎖");readLock.release();writeLock.release();}}private static final int QTY = 5;private static final int REPETITIONS = QTY ;private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception {final FakeLimitedResource resource = new FakeLimitedResource();ExecutorService service = Executors.newFixedThreadPool(QTY);final TestingServer server = new TestingServer();try {for (int i = 0; i < QTY; ++i) {final int index = i;Callable<Void> task = new Callable<Void>() {@Overridepublic Void call() throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));try {client.start();final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);for (int j = 0; j < REPETITIONS; ++j) {example.doWork(10, TimeUnit.SECONDS);}} catch (Throwable e) {e.printStackTrace();} finally {CloseableUtils.closeQuietly(client);}return null;}};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);} finally {CloseableUtils.closeQuietly(server);}} }

    信號(hào)量—Shared Semaphore

    一個(gè)計(jì)數(shù)的信號(hào)量類(lèi)似JDK的Semaphore。 JDK中Semaphore維護(hù)的一組許可(permits),而Curator中稱之為租約(Lease)。 有兩種方式可以決定semaphore的最大租約數(shù)。第一種方式是用戶給定path并且指定最大LeaseSize。第二種方式用戶給定path并且使用SharedCountReader類(lèi)。如果不使用SharedCountReader, 必須保證所有實(shí)例在多進(jìn)程中使用相同的(最大)租約數(shù)量,否則有可能出現(xiàn)A進(jìn)程中的實(shí)例持有最大租約數(shù)量為10,但是在B進(jìn)程中持有的最大租約數(shù)量為20,此時(shí)租約的意義就失效了。

    這次調(diào)用acquire()會(huì)返回一個(gè)租約對(duì)象。 客戶端必須在finally中close這些租約對(duì)象,否則這些租約會(huì)丟失掉。 但是, 但是,如果客戶端session由于某種原因比如crash丟掉, 那么這些客戶端持有的租約會(huì)自動(dòng)close, 這樣其它客戶端可以繼續(xù)使用這些租約。 租約還可以通過(guò)下面的方式返還:

    public void returnAll(Collection<Lease> leases) public void returnLease(Lease lease)

    注意你可以一次性請(qǐng)求多個(gè)租約,如果Semaphore當(dāng)前的租約不夠,則請(qǐng)求線程會(huì)被阻塞。 同時(shí)還提供了超時(shí)的重載方法。

    public Lease acquire() public Collection<Lease> acquire(int qty) public Lease acquire(long time, TimeUnit unit) public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

    Shared Semaphore使用的主要類(lèi)包括下面幾個(gè):

    • InterProcessSemaphoreV2
    • Lease
    • SharedCountReader
    public class InterProcessSemaphoreDemo {private static final int MAX_LEASE = 10;private static final String PATH = "/examples/locks";public static void main(String[] args) throws Exception {FakeLimitedResource resource = new FakeLimitedResource();try (TestingServer server = new TestingServer()) {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);Collection<Lease> leases = semaphore.acquire(5);System.out.println("get " + leases.size() + " leases");Lease lease = semaphore.acquire();System.out.println("get another lease");resource.use();Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);System.out.println("Should timeout and acquire return " + leases2);System.out.println("return one lease");semaphore.returnLease(lease);System.out.println("return another 5 leases");semaphore.returnAll(leases);}} }

    首先我們先獲得了5個(gè)租約, 最后我們把它還給了semaphore。 接著請(qǐng)求了一個(gè)租約,因?yàn)閟emaphore還有5個(gè)租約,所以請(qǐng)求可以滿足,返回一個(gè)租約,還剩4個(gè)租約。 然后再請(qǐng)求一個(gè)租約,因?yàn)樽饧s不夠,阻塞到超時(shí),還是沒(méi)能滿足,返回結(jié)果為null(租約不足會(huì)阻塞到超時(shí),然后返回null,不會(huì)主動(dòng)拋出異常;如果不設(shè)置超時(shí)時(shí)間,會(huì)一致阻塞)。

    上面說(shuō)講的鎖都是公平鎖(fair)。 總ZooKeeper的角度看, 每個(gè)客戶端都按照請(qǐng)求的順序獲得鎖,不存在非公平的搶占的情況。

    多共享鎖對(duì)象 —Multi Shared Lock

    Multi Shared Lock是一個(gè)鎖的容器。 當(dāng)調(diào)用acquire(), 所有的鎖都會(huì)被acquire(),如果請(qǐng)求失敗,所有的鎖都會(huì)被release。 同樣調(diào)用release時(shí)所有的鎖都被release(失敗被忽略)。 基本上,它就是組鎖的代表,在它上面的請(qǐng)求釋放操作都會(huì)傳遞給它包含的所有的鎖。

    主要涉及兩個(gè)類(lèi):

    • InterProcessMultiLock
    • InterProcessLock

    它的構(gòu)造函數(shù)需要包含的鎖的集合,或者一組ZooKeeper的path。

    public InterProcessMultiLock(List<InterProcessLock> locks) public InterProcessMultiLock(CuratorFramework client, List<String> paths)

    用法和Shared Lock相同。

    public class MultiSharedLockDemo {private static final String PATH1 = "/examples/locks1";private static final String PATH2 = "/examples/locks2";public static void main(String[] args) throws Exception {FakeLimitedResource resource = new FakeLimitedResource();try (TestingServer server = new TestingServer()) {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();InterProcessLock lock1 = new InterProcessMutex(client, PATH1);InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));if (!lock.acquire(10, TimeUnit.SECONDS)) {throw new IllegalStateException("could not acquire the lock");}System.out.println("has got all lock");System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());try {resource.use(); //access resource exclusively} finally {System.out.println("releasing the lock");lock.release(); // always release the lock in a finally block}System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());}} }

    新建一個(gè)InterProcessMultiLock, 包含一個(gè)重入鎖和一個(gè)非重入鎖。 調(diào)用acquire()后可以看到線程同時(shí)擁有了這兩個(gè)鎖。 調(diào)用release()看到這兩個(gè)鎖都被釋放了。

    最后再重申一次, 強(qiáng)烈推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),當(dāng)連接狀態(tài)為L(zhǎng)OST,鎖將會(huì)丟失。

    分布式計(jì)數(shù)器

    顧名思義,計(jì)數(shù)器是用來(lái)計(jì)數(shù)的, 利用ZooKeeper可以實(shí)現(xiàn)一個(gè)集群共享的計(jì)數(shù)器。 只要使用相同的path就可以得到最新的計(jì)數(shù)器值, 這是由ZooKeeper的一致性保證的。Curator有兩個(gè)計(jì)數(shù)器, 一個(gè)是用int來(lái)計(jì)數(shù)(SharedCount),一個(gè)用long來(lái)計(jì)數(shù)(DistributedAtomicLong)。

    分布式int計(jì)數(shù)器—SharedCount

    這個(gè)類(lèi)使用int類(lèi)型來(lái)計(jì)數(shù)。 主要涉及三個(gè)類(lèi)。

    • SharedCount
    • SharedCountReader
    • SharedCountListener

    SharedCount代表計(jì)數(shù)器, 可以為它增加一個(gè)SharedCountListener,當(dāng)計(jì)數(shù)器改變時(shí)此Listener可以監(jiān)聽(tīng)到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本信息的值VersionedValue。

    public class SharedCounterDemo implements SharedCountListener {private static final int QTY = 5;private static final String PATH = "/examples/counter";public static void main(String[] args) throws IOException, Exception {final Random rand = new Random();SharedCounterDemo example = new SharedCounterDemo();try (TestingServer server = new TestingServer()) {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();SharedCount baseCount = new SharedCount(client, PATH, 0);baseCount.addListener(example);baseCount.start();List<SharedCount> examples = Lists.newArrayList();ExecutorService service = Executors.newFixedThreadPool(QTY);for (int i = 0; i < QTY; ++i) {final SharedCount count = new SharedCount(client, PATH, 0);examples.add(count);Callable<Void> task = () -> {count.start();Thread.sleep(rand.nextInt(10000));System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));return null;};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);for (int i = 0; i < QTY; ++i) {examples.get(i).close();}baseCount.close();}Thread.sleep(Integer.MAX_VALUE);}@Overridepublic void stateChanged(CuratorFramework arg0, ConnectionState arg1) {System.out.println("State changed: " + arg1.toString());}@Overridepublic void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {System.out.println("Counter's value is changed to " + newCount);} }

    在這個(gè)例子中,我們使用baseCount來(lái)監(jiān)聽(tīng)計(jì)數(shù)值(addListener方法來(lái)添加SharedCountListener )。 任意的SharedCount, 只要使用相同的path,都可以得到這個(gè)計(jì)數(shù)值。 然后我們使用5個(gè)線程為計(jì)數(shù)值增加一個(gè)10以內(nèi)的隨機(jī)數(shù)。相同的path的SharedCount對(duì)計(jì)數(shù)值進(jìn)行更改,將會(huì)回調(diào)給baseCount的SharedCountListener。

    count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

    這里我們使用trySetCount去設(shè)置計(jì)數(shù)器。 第一個(gè)參數(shù)提供當(dāng)前的VersionedValue,如果期間其它c(diǎn)lient更新了此計(jì)數(shù)值, 你的更新可能不成功, 但是這時(shí)你的client更新了最新的值,所以失敗了你可以嘗試再更新一次。 而setCount是強(qiáng)制更新計(jì)數(shù)器的值

    注意計(jì)數(shù)器必須start,使用完之后必須調(diào)用close關(guān)閉它。

    強(qiáng)烈推薦使用ConnectionStateListener。 在本例中SharedCountListener擴(kuò)展ConnectionStateListener。

    分布式long計(jì)數(shù)器—DistributedAtomicLong

    再看一個(gè)Long類(lèi)型的計(jì)數(shù)器。 除了計(jì)數(shù)的范圍比SharedCount大了之外, 它首先嘗試使用樂(lè)觀鎖的方式設(shè)置計(jì)數(shù)器, 如果不成功(比如期間計(jì)數(shù)器已經(jīng)被其它c(diǎn)lient更新了), 它使用InterProcessMutex方式來(lái)更新計(jì)數(shù)值。

    可以從它的內(nèi)部實(shí)現(xiàn)DistributedAtomicValue.trySet()中看出:

    AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception{MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);tryOptimistic(result, makeValue);if ( !result.succeeded() && (mutex != null) ){tryWithMutex(result, makeValue);}return result;}

    此計(jì)數(shù)器有一系列的操作:

    • get(): 獲取當(dāng)前值
    • increment(): 加一
    • decrement(): 減一
    • add(): 增加特定的值
    • subtract(): 減去特定的值
    • trySet(): 嘗試設(shè)置計(jì)數(shù)值
    • forceSet(): 強(qiáng)制設(shè)置計(jì)數(shù)值

    必須檢查返回結(jié)果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

    public class DistributedAtomicLongDemo {private static final int QTY = 5;private static final String PATH = "/examples/counter";public static void main(String[] args) throws IOException, Exception {List<DistributedAtomicLong> examples = Lists.newArrayList();try (TestingServer server = new TestingServer()) {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();ExecutorService service = Executors.newFixedThreadPool(QTY);for (int i = 0; i < QTY; ++i) {final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));examples.add(count);Callable<Void> task = () -> {try {AtomicValue<Long> value = count.increment();System.out.println("succeed: " + value.succeeded());if (value.succeeded())System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());} catch (Exception e) {e.printStackTrace();}return null;};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);Thread.sleep(Integer.MAX_VALUE);}} }

    分布式隊(duì)列

    使用Curator也可以簡(jiǎn)化Ephemeral Node (臨時(shí)節(jié)點(diǎn))的操作。Curator也提供ZK Recipe的分布式隊(duì)列實(shí)現(xiàn)。 利用ZK的 PERSISTENTS_EQUENTIAL節(jié)點(diǎn), 可以保證放入到隊(duì)列中的項(xiàng)目是按照順序排隊(duì)的。 如果單一的消費(fèi)者從隊(duì)列中取數(shù)據(jù), 那么它是先入先出的,這也是隊(duì)列的特點(diǎn)。 如果你嚴(yán)格要求順序,你就的使用單一的消費(fèi)者,可以使用Leader選舉只讓Leader作為唯一的消費(fèi)者。

    但是, 根據(jù)Netflix的Curator作者所說(shuō), ZooKeeper真心不適合做Queue,或者說(shuō)ZK沒(méi)有實(shí)現(xiàn)一個(gè)好的Queue,詳細(xì)內(nèi)容可以看 Tech Note 4, 原因有五:

  • ZK有1MB 的傳輸限制。 實(shí)踐中ZNode必須相對(duì)較小,而隊(duì)列包含成千上萬(wàn)的消息,非常的大。
  • 如果有很多節(jié)點(diǎn),ZK啟動(dòng)時(shí)相當(dāng)?shù)穆?而使用queue會(huì)導(dǎo)致好多ZNode. 你需要顯著增大 initLimit 和 syncLimit.
  • ZNode很大的時(shí)候很難清理。Netflix不得不創(chuàng)建了一個(gè)專門(mén)的程序做這事。
  • 當(dāng)很大量的包含成千上萬(wàn)的子節(jié)點(diǎn)的ZNode時(shí), ZK的性能變得不好
  • ZK的數(shù)據(jù)庫(kù)完全放在內(nèi)存中。 大量的Queue意味著會(huì)占用很多的內(nèi)存空間。
  • 盡管如此, Curator還是創(chuàng)建了各種Queue的實(shí)現(xiàn)。 如果Queue的數(shù)據(jù)量不太多,數(shù)據(jù)量不太大的情況下,酌情考慮,還是可以使用的。

    分布式隊(duì)列—DistributedQueue

    DistributedQueue是最普通的一種隊(duì)列。 它設(shè)計(jì)以下四個(gè)類(lèi):

    • QueueBuilder - 創(chuàng)建隊(duì)列使用QueueBuilder,它也是其它隊(duì)列的創(chuàng)建類(lèi)
    • QueueConsumer - 隊(duì)列中的消息消費(fèi)者接口
    • QueueSerializer - 隊(duì)列消息序列化和反序列化接口,提供了對(duì)隊(duì)列中的對(duì)象的序列化和反序列化
    • DistributedQueue - 隊(duì)列實(shí)現(xiàn)類(lèi)

    QueueConsumer是消費(fèi)者,它可以接收隊(duì)列的數(shù)據(jù)。處理隊(duì)列中的數(shù)據(jù)的代碼邏輯可以放在QueueConsumer.consumeMessage()中。

    正常情況下先將消息從隊(duì)列中移除,再交給消費(fèi)者消費(fèi)。但這是兩個(gè)步驟,不是原子的。可以調(diào)用Builder的lockPath()消費(fèi)者加鎖,當(dāng)消費(fèi)者消費(fèi)數(shù)據(jù)時(shí)持有鎖,這樣其它消費(fèi)者不能消費(fèi)此消息。如果消費(fèi)失敗或者進(jìn)程死掉,消息可以交給其它進(jìn)程。這會(huì)帶來(lái)一點(diǎn)性能的損失。最好還是單消費(fèi)者模式使用隊(duì)列。

    public class DistributedQueueDemo {private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));clientA.start();CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));clientB.start();DistributedQueue<String> queueA;QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);queueA = builderA.buildQueue();queueA.start();DistributedQueue<String> queueB;QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);queueB = builderB.buildQueue();queueB.start();for (int i = 0; i < 100; i++) {queueA.put(" test-A-" + i);Thread.sleep(10);queueB.put(" test-B-" + i);}Thread.sleep(1000 * 10);// 等待消息消費(fèi)完成queueB.close();queueA.close();clientB.close();clientA.close();System.out.println("OK!");}/*** 隊(duì)列消息序列化實(shí)現(xiàn)類(lèi)*/private static QueueSerializer<String> createQueueSerializer() {return new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};}/*** 定義隊(duì)列消費(fèi)者*/private static QueueConsumer<String> createQueueConsumer(final String name) {return new QueueConsumer<String>() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {System.out.println("連接狀態(tài)改變: " + newState.name());}@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消費(fèi)消息(" + name + "): " + message);}};} }

    例子中定義了兩個(gè)分布式隊(duì)列和兩個(gè)消費(fèi)者,因?yàn)镻ATH是相同的,會(huì)存在消費(fèi)者搶占消費(fèi)消息的情況。

    帶Id的分布式隊(duì)列—DistributedIdQueue

    DistributedIdQueue和上面的隊(duì)列類(lèi)似,但是可以為隊(duì)列中的每一個(gè)元素設(shè)置一個(gè)ID。 可以通過(guò)ID把隊(duì)列中任意的元素移除。 它涉及幾個(gè)類(lèi):

    • QueueBuilder
    • QueueConsumer
    • QueueSerializer
    • DistributedQueue

    通過(guò)下面方法創(chuàng)建:

    builder.buildIdQueue()

    放入元素時(shí):

    queue.put(aMessage, messageId);

    移除元素時(shí):

    int numberRemoved = queue.remove(messageId);

    在這個(gè)例子中, 有些元素還沒(méi)有被消費(fèi)者消費(fèi)前就移除了,這樣消費(fèi)者不會(huì)收到刪除的消息。

    public class DistributedIdQueueDemo {private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework client = null;DistributedIdQueue<String> queue = null;try {client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));client.start();QueueConsumer<String> consumer = createQueueConsumer();QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);queue = builder.buildIdQueue();queue.start();for (int i = 0; i < 10; i++) {queue.put(" test-" + i, "Id" + i);Thread.sleep((long) (15 * Math.random()));queue.remove("Id" + i);}Thread.sleep(20000);} catch (Exception ex) {} finally {CloseableUtils.closeQuietly(queue);CloseableUtils.closeQuietly(client);CloseableUtils.closeQuietly(server);}}private static QueueSerializer<String> createQueueSerializer() {return new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};}private static QueueConsumer<String> createQueueConsumer() {return new QueueConsumer<String>() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {System.out.println("connection new state: " + newState.name());}@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("consume one message: " + message);}};} }

    優(yōu)先級(jí)分布式隊(duì)列—DistributedPriorityQueue

    優(yōu)先級(jí)隊(duì)列對(duì)隊(duì)列中的元素按照優(yōu)先級(jí)進(jìn)行排序。 Priority越小, 元素越靠前, 越先被消費(fèi)掉。 它涉及下面幾個(gè)類(lèi):

    • QueueBuilder
    • QueueConsumer
    • QueueSerializer
    • DistributedPriorityQueue

    通過(guò)builder.buildPriorityQueue(minItemsBeforeRefresh)方法創(chuàng)建。 當(dāng)優(yōu)先級(jí)隊(duì)列得到元素增刪消息時(shí),它會(huì)暫停處理當(dāng)前的元素隊(duì)列,然后刷新隊(duì)列。minItemsBeforeRefresh指定刷新前當(dāng)前活動(dòng)的隊(duì)列的最小數(shù)量。 主要設(shè)置你的程序可以容忍的不排序的最小值。

    放入隊(duì)列時(shí)需要指定優(yōu)先級(jí):

    queue.put(aMessage, priority);

    例子:

    public class DistributedPriorityQueueDemo {private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework client = null;DistributedPriorityQueue<String> queue = null;try {client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));client.start();QueueConsumer<String> consumer = createQueueConsumer();QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);queue = builder.buildPriorityQueue(0);queue.start();for (int i = 0; i < 10; i++) {int priority = (int) (Math.random() * 100);System.out.println("test-" + i + " priority:" + priority);queue.put("test-" + i, priority);Thread.sleep((long) (50 * Math.random()));}Thread.sleep(20000);} catch (Exception ex) {} finally {CloseableUtils.closeQuietly(queue);CloseableUtils.closeQuietly(client);CloseableUtils.closeQuietly(server);}}private static QueueSerializer<String> createQueueSerializer() {return new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};}private static QueueConsumer<String> createQueueConsumer() {return new QueueConsumer<String>() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {System.out.println("connection new state: " + newState.name());}@Overridepublic void consumeMessage(String message) throws Exception {Thread.sleep(1000);System.out.println("consume one message: " + message);}};}}

    有時(shí)候你可能會(huì)有錯(cuò)覺(jué),優(yōu)先級(jí)設(shè)置并沒(méi)有起效。那是因?yàn)閮?yōu)先級(jí)是對(duì)于隊(duì)列積壓的元素而言,如果消費(fèi)速度過(guò)快有可能出現(xiàn)在后一個(gè)元素入隊(duì)操作之前前一個(gè)元素已經(jīng)被消費(fèi),這種情況下DistributedPriorityQueue會(huì)退化為DistributedQueue。

    分布式延遲隊(duì)列—DistributedDelayQueue

    JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了類(lèi)似的功能, 元素有個(gè)delay值, 消費(fèi)者隔一段時(shí)間才能收到元素。 涉及到下面四個(gè)類(lèi)。

    • QueueBuilder
    • QueueConsumer
    • QueueSerializer
    • DistributedDelayQueue

    通過(guò)下面的語(yǔ)句創(chuàng)建:

    QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path); ... more builder method calls as needed ... DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

    放入元素時(shí)可以指定delayUntilEpoch:

    queue.put(aMessage, delayUntilEpoch);

    注意delayUntilEpoch不是離現(xiàn)在的一個(gè)時(shí)間間隔, 比如20毫秒,而是未來(lái)的一個(gè)時(shí)間戳,如 System.currentTimeMillis() + 10秒。 如果delayUntilEpoch的時(shí)間已經(jīng)過(guò)去,消息會(huì)立刻被消費(fèi)者接收。

    public class DistributedDelayQueueDemo {private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework client = null;DistributedDelayQueue<String> queue = null;try {client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));client.start();QueueConsumer<String> consumer = createQueueConsumer();QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);queue = builder.buildDelayQueue();queue.start();for (int i = 0; i < 10; i++) {queue.put("test-" + i, System.currentTimeMillis() + 10000);}System.out.println(new Date().getTime() + ": already put all items");Thread.sleep(20000);} catch (Exception ex) {} finally {CloseableUtils.closeQuietly(queue);CloseableUtils.closeQuietly(client);CloseableUtils.closeQuietly(server);}}private static QueueSerializer<String> createQueueSerializer() {return new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};}private static QueueConsumer<String> createQueueConsumer() {return new QueueConsumer<String>() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState) {System.out.println("connection new state: " + newState.name());}@Overridepublic void consumeMessage(String message) throws Exception {System.out.println(new Date().getTime() + ": consume one message: " + message);}};} }

    SimpleDistributedQueue

    前面雖然實(shí)現(xiàn)了各種隊(duì)列,但是你注意到?jīng)]有,這些隊(duì)列并沒(méi)有實(shí)現(xiàn)類(lèi)似JDK一樣的接口。 SimpleDistributedQueue提供了和JDK基本一致的接口(但是沒(méi)有實(shí)現(xiàn)Queue接口)。 創(chuàng)建很簡(jiǎn)單:

    public SimpleDistributedQueue(CuratorFramework client,String path)

    增加元素:

    public boolean offer(byte[] data) throws Exception

    刪除元素:

    public byte[] take() throws Exception

    另外還提供了其它方法:

    public byte[] peek() throws Exception public byte[] poll(long timeout, TimeUnit unit) throws Exception public byte[] poll() throws Exception public byte[] remove() throws Exception public byte[] element() throws Exception

    沒(méi)有add方法, 多了take方法。

    take方法在成功返回之前會(huì)被阻塞。 而poll方法在隊(duì)列為空時(shí)直接返回null。

    public class SimpleDistributedQueueDemo {private static final String PATH = "/example/queue";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();CuratorFramework client = null;SimpleDistributedQueue queue;try {client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));client.start();queue = new SimpleDistributedQueue(client, PATH);Producer producer = new Producer(queue);Consumer consumer = new Consumer(queue);new Thread(producer, "producer").start();new Thread(consumer, "consumer").start();Thread.sleep(20000);} catch (Exception ex) {} finally {CloseableUtils.closeQuietly(client);CloseableUtils.closeQuietly(server);}}public static class Producer implements Runnable {private SimpleDistributedQueue queue;public Producer(SimpleDistributedQueue queue) {this.queue = queue;}@Overridepublic void run() {for (int i = 0; i < 100; i++) {try {boolean flag = queue.offer(("zjc-" + i).getBytes());if (flag) {System.out.println("發(fā)送一條消息成功:" + "zjc-" + i);} else {System.out.println("發(fā)送一條消息失敗:" + "zjc-" + i);}} catch (Exception e) {e.printStackTrace();}}}}public static class Consumer implements Runnable {private SimpleDistributedQueue queue;public Consumer(SimpleDistributedQueue queue) {this.queue = queue;}@Overridepublic void run() {try {byte[] datas = queue.take();System.out.println("消費(fèi)一條消息成功:" + new String(datas, "UTF-8"));} catch (Exception e) {e.printStackTrace();}}}}

    但是實(shí)際上發(fā)送了100條消息,消費(fèi)完第一條之后,后面的消息無(wú)法消費(fèi),目前沒(méi)找到原因。查看一下官方文檔推薦的demo使用下面幾個(gè)Api:

    Creating a SimpleDistributedQueuepublic SimpleDistributedQueue(CuratorFramework client,String path) Parameters: client - the client path - path to store queue nodes Add to the queuepublic boolean offer(byte[] data)throws Exception Inserts data into queue. Parameters: data - the data Returns: true if data was successfully added Take from the queuepublic byte[] take()throws Exception Removes the head of the queue and returns it, blocks until it succeeds. Returns: The former head of the queue NOTE: see the Javadoc for additional methods

    但是實(shí)際使用發(fā)現(xiàn)還是存在消費(fèi)阻塞問(wèn)題。

    分布式屏障—Barrier

    分布式Barrier是這樣一個(gè)類(lèi): 它會(huì)阻塞所有節(jié)點(diǎn)上的等待進(jìn)程,直到某一個(gè)被滿足, 然后所有的節(jié)點(diǎn)繼續(xù)進(jìn)行。

    比如賽馬比賽中, 等賽馬陸續(xù)來(lái)到起跑線前。 一聲令下,所有的賽馬都飛奔而出。

    DistributedBarrier

    DistributedBarrier類(lèi)實(shí)現(xiàn)了柵欄的功能。 它的構(gòu)造函數(shù)如下:

    public DistributedBarrier(CuratorFramework client, String barrierPath) Parameters: client - client barrierPath - path to use as the barrier

    首先你需要設(shè)置柵欄,它將阻塞在它上面等待的線程:

    setBarrier();

    然后需要阻塞的線程調(diào)用方法等待放行條件:

    public void waitOnBarrier()

    當(dāng)條件滿足時(shí),移除柵欄,所有等待的線程將繼續(xù)執(zhí)行:

    removeBarrier();

    異常處理 DistributedBarrier 會(huì)監(jiān)控連接狀態(tài),當(dāng)連接斷掉時(shí)waitOnBarrier()方法會(huì)拋出異常。

    public class DistributedBarrierDemo {private static final int QTY = 5;private static final String PATH = "/examples/barrier";public static void main(String[] args) throws Exception {try (TestingServer server = new TestingServer()) {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();ExecutorService service = Executors.newFixedThreadPool(QTY);DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);controlBarrier.setBarrier();for (int i = 0; i < QTY; ++i) {final DistributedBarrier barrier = new DistributedBarrier(client, PATH);final int index = i;Callable<Void> task = () -> {Thread.sleep((long) (3 * Math.random()));System.out.println("Client #" + index + " waits on Barrier");barrier.waitOnBarrier();System.out.println("Client #" + index + " begins");return null;};service.submit(task);}Thread.sleep(10000);System.out.println("all Barrier instances should wait the condition");controlBarrier.removeBarrier();service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);Thread.sleep(20000);}} }

    這個(gè)例子創(chuàng)建了controlBarrier來(lái)設(shè)置柵欄和移除柵欄。 我們創(chuàng)建了5個(gè)線程,在此Barrier上等待。 最后移除柵欄后所有的線程才繼續(xù)執(zhí)行。

    如果你開(kāi)始不設(shè)置柵欄,所有的線程就不會(huì)阻塞住。

    雙柵欄—DistributedDoubleBarrier

    雙柵欄允許客戶端在計(jì)算的開(kāi)始和結(jié)束時(shí)同步。當(dāng)足夠的進(jìn)程加入到雙柵欄時(shí),進(jìn)程開(kāi)始計(jì)算, 當(dāng)計(jì)算完成時(shí),離開(kāi)柵欄。 雙柵欄類(lèi)是DistributedDoubleBarrier。 構(gòu)造函數(shù)為:

    public DistributedDoubleBarrier(CuratorFramework client,String barrierPath,int memberQty) Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until all members have entered. When leave() is called, it blocks until all members have left.Parameters: client - the client barrierPath - path to use memberQty - the number of members in the barrier

    memberQty是成員數(shù)量,當(dāng)enter()方法被調(diào)用時(shí),成員被阻塞,直到所有的成員都調(diào)用了enter()。 當(dāng)leave()方法被調(diào)用時(shí),它也阻塞調(diào)用線程,直到所有的成員都調(diào)用了leave()。 就像百米賽跑比賽, 發(fā)令槍響, 所有的運(yùn)動(dòng)員開(kāi)始跑,等所有的運(yùn)動(dòng)員跑過(guò)終點(diǎn)線,比賽才結(jié)束。

    DistributedDoubleBarrier會(huì)監(jiān)控連接狀態(tài),當(dāng)連接斷掉時(shí)enter()和leave()方法會(huì)拋出異常。

    public class DistributedDoubleBarrierDemo {private static final int QTY = 5;private static final String PATH = "/examples/barrier";public static void main(String[] args) throws Exception {try (TestingServer server = new TestingServer()) {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();ExecutorService service = Executors.newFixedThreadPool(QTY);for (int i = 0; i < QTY; ++i) {final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);final int index = i;Callable<Void> task = () -> {Thread.sleep((long) (3 * Math.random()));System.out.println("Client #" + index + " enters");barrier.enter();System.out.println("Client #" + index + " begins");Thread.sleep((long) (3000 * Math.random()));barrier.leave();System.out.println("Client #" + index + " left");return null;};service.submit(task);}service.shutdown();service.awaitTermination(10, TimeUnit.MINUTES);Thread.sleep(Integer.MAX_VALUE);}}}

    參考資料:
    《從PAXOS到ZOOKEEPER分布式一致性原理與實(shí)踐》
    《 跟著實(shí)例學(xué)習(xí)ZooKeeper的用法》博客系列

    項(xiàng)目倉(cāng)庫(kù):
    https://github.com/zjcscut/curator-seed
    (其實(shí)本文的MD是帶導(dǎo)航目錄[toc]的,比較方便導(dǎo)航到每個(gè)章節(jié),只是簡(jiǎn)書(shū)不支持,本文的MD原文放在項(xiàng)目的/resources/md目錄下,有愛(ài)自取,文章用Typora編寫(xiě),建議用Typora打開(kāi))

    End on 2017-5-13 13:10.
    Help yourselves!
    我是throwable,在廣州奮斗,白天上班,晚上和雙休不定時(shí)加班,晚上有空?qǐng)?jiān)持寫(xiě)下博客。
    希望我的文章能夠給你帶來(lái)收獲,共勉。



    作者:zhrowable
    鏈接:http://www.jianshu.com/p/70151fc0ef5d
    來(lái)源:簡(jiǎn)書(shū)
    著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。pasting

    ?

    轉(zhuǎn)載于:https://www.cnblogs.com/jing1617/p/7875914.html

    總結(jié)

    以上是生活随笔為你收集整理的Zookeeper客户端Curator使用详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。