ZooKeeper管理分布式环境中的数据
Reference:?http://www.cnblogs.com/wuxl360/p/5817549.html
?
本節(jié)本來是要介紹ZooKeeper的實(shí)現(xiàn)原理,但是ZooKeeper的原理比較復(fù)雜,它涉及到了paxos算法、Zab協(xié)議、通信協(xié)議等相關(guān)知 識,理解起來比較抽象所以還需要借助一些應(yīng)用場景,來幫我們理解。由于內(nèi)容比較多,一口氣吃不成胖子,得慢慢來一步一個(gè)腳印,因此我對后期 ZooKeeper的學(xué)習(xí)規(guī)劃如下:
第一階段:
|---理解ZooKeeper的應(yīng)用
????|---ZooKeeper是什么
????|---ZooKeeper能干什么
????|---ZooKeeper 怎么使用
第二階段:
|---理解ZooKeeper原理準(zhǔn)備
????|---了解paxos
????|---理解 zab原理
????|---理解選舉/同步流程
第三階段:
????|---深入ZooKeeper原理
????????|---分析源碼
????????|---嘗試開發(fā)分布式應(yīng)用
由于內(nèi)容較多,而且理解較為復(fù)雜,所以每個(gè)階段分開來學(xué)習(xí)和介紹,那么本文主要介紹的的是第一階段,該階段一般 應(yīng)該放在前面介紹,但感覺像一些ZooKeeper應(yīng)用案例,如果沒有一定的ZooKeeper基礎(chǔ),理解起來也比較抽象, 所以放在這介紹。大家可以對比一下前面的應(yīng)用程序,來對比理解一下前面的那些應(yīng)用到底用到ZooKeeper的那些功能,來進(jìn)一步理解ZooKeeper 的實(shí)現(xiàn)理念,由于網(wǎng)上關(guān)于這方面的介紹比較多,如果一些可愛的博友對該內(nèi)容已經(jīng)比較了解,那么您可以不用往下看了,繼續(xù)下一步學(xué)習(xí)。
一、ZooKeeper產(chǎn)生背景
1.1 分布式的發(fā)展
分布式這個(gè)概念我想大家并不陌生,但真正實(shí)戰(zhàn)開始還要從google說起,很早以前在實(shí)驗(yàn)室中分布式被人提出,可是說是計(jì)算機(jī)內(nèi)入行較為復(fù)雜學(xué)習(xí)較為困難的技術(shù),并且市場也并不成熟,因此大規(guī)模的商業(yè)應(yīng)用一直未成出現(xiàn),但從Google 發(fā)布了MapReduce 和DFS 以及Bigtable的論文之后,分布式在計(jì)算機(jī)界的格局就發(fā)生了變化,從架構(gòu)上實(shí)現(xiàn)了分布式的難題,并且成熟的應(yīng)用在了海量數(shù)據(jù)存儲和計(jì)算上,其集群的規(guī)模也是當(dāng)前世界上最為龐大的。
以DFS 為基礎(chǔ)的分布式計(jì)算框架和key、value?數(shù)據(jù)高效的解決運(yùn)算的瓶頸, 而且開發(fā)人員不用再寫復(fù)雜的分布式程序,只要底層框架完備開發(fā)人員只要用較少的代碼就可以完成分布式程序的開發(fā),這使得開發(fā)人員只需要關(guān)注業(yè)務(wù)邏輯的即 可。Google 在業(yè)界技術(shù)上的領(lǐng)軍地位,讓業(yè)界望塵莫及的技術(shù)實(shí)力,IT 因此也是對Google 所退出的技術(shù)十分推崇。在最近幾年中分布式則是成為了海量數(shù)據(jù)存儲以及計(jì)算、高并發(fā)、高可靠性、高可用性的解決方案。
1.2 ZooKeeper的產(chǎn)生
眾所周知通常分布式架構(gòu)都是中心化的設(shè)計(jì),就是一個(gè)主控機(jī)連接多個(gè)處理節(jié)點(diǎn)。 問題可以從這里考慮,當(dāng)主控機(jī)失效時(shí),整個(gè)系統(tǒng)則就無法訪問了,所以保證系統(tǒng)的高可用性是非常關(guān)鍵之處,也就是要保證主控機(jī)的高可用性。分布式鎖就是一個(gè) 解決該問題的較好方案,多主控機(jī)搶一把鎖。在這里我們就涉及到了我們的重點(diǎn)Zookeeper。
ZooKeeper是什么,chubby 我想大家都不會陌生的,chubby 是實(shí)現(xiàn)Google 的一個(gè)分布式鎖的實(shí)現(xiàn),運(yùn)用到了paxos 算法解決的一個(gè)分布式事務(wù)管理的系統(tǒng)。Zookeeper 就是雅虎模仿強(qiáng)大的Google chubby 實(shí)現(xiàn)的一套分布式鎖管理系統(tǒng)。同時(shí),Zookeeper 分布式服務(wù)框架是Apache Hadoop的一個(gè)子項(xiàng)目,它是一個(gè)針對大型分布式系統(tǒng)的可靠協(xié)調(diào)系統(tǒng),它主要是用來解決分布式應(yīng)用中經(jīng)常遇到的一些數(shù)據(jù)管理問題,可以高可靠的維護(hù)元數(shù)據(jù)。提供的功能包括:配置維護(hù)、名字服務(wù)、分布式同步、組服務(wù)等。ZooKeeper的設(shè)計(jì)目標(biāo)就是封裝好復(fù)雜易出錯(cuò)的關(guān)鍵服務(wù),將簡單易用的接口和性能高效、功能穩(wěn)定的系統(tǒng)提供給用戶。
1.3 ZooKeeper的使用
Zookeeper 作為一個(gè)分布式的服務(wù)框架,主要用來解決分布式集群中應(yīng)用系統(tǒng)的一致性問題,它能提供基于類似于文件系統(tǒng)的目錄節(jié)點(diǎn)樹方式的數(shù)據(jù)存儲,但是 Zookeeper 并不是用來專門存儲數(shù)據(jù)的,它的作用主要是用來維護(hù)和監(jiān)控你存儲的數(shù)據(jù)的狀態(tài)變化。通過監(jiān)控這些數(shù)據(jù)狀態(tài)的變化,從而可以達(dá)到基于數(shù)據(jù)的集群管理,后面將 會詳細(xì)介紹 Zookeeper 能夠解決的一些典型問題。
注意一下這里的"數(shù)據(jù)"是有限制的:
(1)?從數(shù)據(jù)大小來看:我們知道ZooKeeper的數(shù)據(jù)存儲在一個(gè)叫ReplicatedDataBase 的 數(shù)據(jù)庫中,該數(shù)據(jù)是一個(gè)內(nèi)存數(shù)據(jù)庫,既然是在內(nèi)存當(dāng)中,我就應(yīng)該知道該數(shù)據(jù)量就應(yīng)該不會太大,這一點(diǎn)上就與hadoop的HDFS有了很大的區(qū) 別,HDFS的數(shù)據(jù)主要存儲在磁盤上,因此數(shù)據(jù)存儲主要是HDFS的事,而ZooKeeper主要是協(xié)調(diào)功能,并不是用來存儲數(shù)據(jù)的。
(2) 從數(shù)據(jù)類型來看:正如前面所說的,ZooKeeper的數(shù)據(jù)在內(nèi)存中,由于內(nèi)存空間的限制,那么我們就不能在上面隨心所欲的存儲數(shù)據(jù),所以ZooKeeper存儲的數(shù)據(jù)都是我們所關(guān)心的數(shù)據(jù)而且數(shù)據(jù)量還不能太大,而且還會根據(jù)我們要以實(shí)現(xiàn)的功能來選擇相應(yīng)的數(shù)據(jù)。簡單來說,干什么事存什么數(shù)據(jù),ZooKeeper所實(shí)現(xiàn)的一切功能,都是由ZK節(jié)點(diǎn)的性質(zhì)和該節(jié)點(diǎn)所關(guān)聯(lián)的數(shù)據(jù)實(shí)現(xiàn)的,至于關(guān)聯(lián)什么數(shù)據(jù)那就要看你干什么事了。
例如:
①?集群管理:利用臨時(shí)節(jié)點(diǎn)特性,節(jié)點(diǎn)關(guān)聯(lián)的是機(jī)器的主機(jī)名、IP地址等相關(guān)信息,集群單點(diǎn)故障也屬于該范疇。
② 統(tǒng)一命名:主要利用節(jié)點(diǎn)的唯一性和目錄節(jié)點(diǎn)樹結(jié)構(gòu)。
③ 配置管理:節(jié)點(diǎn)關(guān)聯(lián)的是配置信息。
④ 分布式鎖:節(jié)點(diǎn)關(guān)聯(lián)的是要競爭的資源。
二、ZooKeeper應(yīng)用場景
ZooKeeper是一個(gè)高可用的分布式數(shù)據(jù)管理與系統(tǒng)協(xié)調(diào)框架。基于對Paxos算法的實(shí)現(xiàn),使該框架保證了分布式環(huán)境中數(shù)據(jù)的強(qiáng)一致性,也正是 基于這樣的特性,使得zookeeper能夠應(yīng)用于很多場景。需要注意的是,ZK并不是生來就為這些場景設(shè)計(jì),都是后來眾多開發(fā)者根據(jù)框架的特性,摸索出 來的典型使用方法。因此,我們也可以根據(jù)自己的需要來設(shè)計(jì)相應(yīng)的場景實(shí)現(xiàn)。正如前文所提到的,ZooKeeper 實(shí)現(xiàn)的任何功能都離不開ZooKeeper的數(shù)據(jù)結(jié)構(gòu),任何功能的實(shí)現(xiàn)都是利用"Znode結(jié)構(gòu)特性+節(jié)點(diǎn)關(guān)聯(lián)的數(shù)據(jù)"來實(shí)現(xiàn)的,好吧那么我們就看一下ZooKeeper數(shù)據(jù)結(jié)構(gòu)有哪些特性。ZooKeeper數(shù)據(jù)結(jié)構(gòu)如下圖所示:
圖2.1 ZooKeeper數(shù)據(jù)結(jié)構(gòu)
Zookeeper 這種數(shù)據(jù)結(jié)構(gòu)有如下這些特點(diǎn):
①?每個(gè)子目錄項(xiàng)如 NameService 都被稱作為 znode,這個(gè) znode 是被它所在的路徑唯一標(biāo)識,如 Server1 這個(gè) znode 的標(biāo)識為?/NameService/Server1;
②?znode 可以有子節(jié)點(diǎn)目錄,并且每個(gè) znode 可以存儲數(shù)據(jù),注意EPHEMERAL 類型的目錄節(jié)點(diǎn)不能有子節(jié)點(diǎn)目錄;
③?znode 是有版本的,每個(gè) znode 中存儲的數(shù)據(jù)可以有多個(gè)版本,也就是一個(gè)訪問路徑中可以存儲多份數(shù)據(jù);
④?znode 可以是臨時(shí)節(jié)點(diǎn),一旦創(chuàng)建這個(gè) znode 的客戶端與服務(wù)器失去聯(lián)系,這個(gè) znode 也將自動刪除,Zookeeper 的客戶端和服務(wù)器通信采用長連接方式,每個(gè)客戶端和服務(wù)器通過心跳來保持連接,這個(gè)連接狀態(tài)稱為 session,如果 znode 是臨時(shí)節(jié)點(diǎn),這個(gè) session 失效,znode 也就刪除了;
⑤?znode 的目錄名可以自動編號,如 App1 已經(jīng)存在,再創(chuàng)建的話,將會自動命名為 App2;
⑥?znode 可以被監(jiān)控,包括這個(gè)目錄節(jié)點(diǎn)中存儲的數(shù)據(jù)的修改,子節(jié)點(diǎn)目錄的變化等,一旦變化可以通知設(shè)置監(jiān)控的客戶端,這個(gè)是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基于這個(gè)特性實(shí)現(xiàn)的。
2.1數(shù)據(jù)發(fā)布與訂閱
(1) 典型場景描述
發(fā)布與訂閱即所謂的配置管理,顧名思義就是將數(shù)據(jù)發(fā)布到ZK節(jié)點(diǎn)上,供訂閱者動態(tài)獲取數(shù)據(jù),實(shí)現(xiàn)配置信息的集中式管理和動態(tài)更新。例如全局的配置信息,地址列表等就非常適合使用。集中式的配置管理在應(yīng)用集群中是非常常見的,一般商業(yè)公司內(nèi)部都會實(shí)現(xiàn)一套集中的配置管理中心,應(yīng)對不同的應(yīng)用集群對于共享各自配置的需求,并且在配置變更時(shí)能夠通知到集群中的每一個(gè)機(jī)器。
(2) 應(yīng)用
①?索引信息和集群中機(jī)器節(jié)點(diǎn)狀態(tài)存放在ZK的一些指定節(jié)點(diǎn),供各個(gè)客戶端訂閱使用。
②?系統(tǒng)日志(經(jīng)過處理后的)存儲,這些日志通常2-3天后被清除。
③?應(yīng)用中用到的一些配置信息集中管理,在應(yīng)用啟動的時(shí)候主動來獲取一次,并且在節(jié)點(diǎn)上注冊一個(gè)Watcher,以后每次配置有更新,實(shí)時(shí)通知到應(yīng)用,獲取最新配置信息。
④?業(yè)務(wù)邏輯中需要用到的一些全局變量,比如一些消息中間件的消息隊(duì)列通常有個(gè)offset,這個(gè)offset存放在zk上,這樣集群中每個(gè)發(fā)送者都能知道當(dāng)前的發(fā)送進(jìn)度。
⑤?系統(tǒng)中有些信息需要?jiǎng)討B(tài)獲取,并且還會存在人工手動去修改這個(gè)信息。以前通常是暴露出接口,例如JMX接口,有了ZK后,只要將這些信息存放到ZK節(jié)點(diǎn)上即可。
(3) 應(yīng)用舉例
例如:同一個(gè)應(yīng)用系統(tǒng)需要多臺 PC Server 運(yùn)行,但是它們運(yùn)行的應(yīng)用系統(tǒng)的某些配置項(xiàng)是相同的,如果要修改這些相同的配置項(xiàng),那么就必須同時(shí)修改每臺運(yùn)行這個(gè)應(yīng)用系統(tǒng)的 PC Server,這樣非常麻煩而且容易出錯(cuò)。將配置信息保存在 Zookeeper 的某個(gè)目錄節(jié)點(diǎn)中,然后將所有需要修改的應(yīng)用機(jī)器監(jiān)控配置信息的狀態(tài),一旦配置信息發(fā)生變化,每臺應(yīng)用機(jī)器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應(yīng)用到系統(tǒng)中。ZooKeeper配置管理服務(wù)如下圖所示:
圖2.2 配置管理結(jié)構(gòu)圖
Zookeeper很容易實(shí)現(xiàn)這種集中式的配置管理,比如將所需要的配置信息放到/Configuration?節(jié)點(diǎn)上,集群中所有機(jī)器一啟動就會通過Client對/Configuration這個(gè)節(jié)點(diǎn)進(jìn)行監(jiān)控【zk.exist("/Configuration″,true)】,并且實(shí)現(xiàn)Watcher回調(diào)方法process(),那么在zookeeper上/Configuration節(jié)點(diǎn)下數(shù)據(jù)發(fā)生變化的時(shí)候,每個(gè)機(jī)器都會收到通知,Watcher回調(diào)方法將會被執(zhí)行,那么應(yīng)用再取下數(shù)據(jù)即可【zk.getData("/Configuration″,false,null)】。
2.2統(tǒng)一命名服務(wù)(Name Service)
(1) 場景描述
分布式應(yīng)用中,通常需要有一套完整的命名規(guī)則,既能夠產(chǎn)生唯一的名稱又便于人識別和記住,通常情況下用樹形的名稱結(jié)構(gòu)是一個(gè)理想的選擇,樹形的名稱 結(jié)構(gòu)是一個(gè)有層次的目錄結(jié)構(gòu),既對人友好又不會重復(fù)。說到這里你可能想到了 JNDI,沒錯(cuò) Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結(jié)構(gòu)關(guān)聯(lián)到一定資源上,但是Zookeeper的Name Service 更加是廣泛意義上的關(guān)聯(lián),也許你并不需要將名稱關(guān)聯(lián)到特定資源上,你可能只需要一個(gè)不會重復(fù)名稱,就像數(shù)據(jù)庫中產(chǎn)生一個(gè)唯一的數(shù)字主鍵一樣。
(2) 應(yīng)用
在分布式系統(tǒng)中,通過使用命名服務(wù),客戶端應(yīng)用能夠根據(jù)指定的名字來獲取資源服務(wù)的地址,提供者等信息。被命名的實(shí)體通常可以是集群中的機(jī)器,提供的服務(wù)地址,進(jìn)程對象等等,這些我們都可以統(tǒng)稱他們?yōu)槊?#xff08;Name)。其中較為常見的就是一些分布式服務(wù)框架中的服務(wù)地址列表。 通過調(diào)用ZK提供的創(chuàng)建節(jié)點(diǎn)的API,能夠很容易創(chuàng)建一個(gè)全局唯一的path,這個(gè)path就可以作為一個(gè)名稱。Name Service 已經(jīng)是Zookeeper 內(nèi)置的功能,你只要調(diào)用 Zookeeper 的 API 就能實(shí)現(xiàn)。如調(diào)用 create 接口就可以很容易創(chuàng)建一個(gè)目錄節(jié)點(diǎn)。
(3) 應(yīng)用舉例
阿里開源的分布式服務(wù)框架Dubbo中使用ZooKeeper來作為其命名服務(wù),維護(hù)全局的服務(wù)地址列表。在Dubbo實(shí)現(xiàn)中:?服務(wù)提供者在啟動的時(shí)候,向ZK上的指定節(jié)點(diǎn)/dubbo/${serviceName}/providers目錄下寫入自己的URL地址,這個(gè)操作就完成了服務(wù)的發(fā)布。?服務(wù)消費(fèi)者啟 動的時(shí)候,訂閱/dubbo/${serviceName}/providers目錄下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目錄下寫入自己的URL地址。 注意,所有向ZK上注冊的地址都是臨時(shí)節(jié)點(diǎn),這樣就能夠保證服務(wù)提供者和消費(fèi)者能夠自動感應(yīng)資源的變化。 另外,Dubbo還有針對服務(wù)粒度的監(jiān)控,方法是訂閱/dubbo/${serviceName}目錄下所有提供者和消費(fèi)者的信息。
2.3分布通知/協(xié)調(diào)(Distribution of notification/coordination)
(1) 典型場景描述
ZooKeeper中特有watcher注冊與異步通知機(jī)制,能夠很好的實(shí)現(xiàn)分布式環(huán)境下不同系統(tǒng)之間的通知與協(xié)調(diào),實(shí)現(xiàn)對數(shù)據(jù)變更的實(shí)時(shí)處理。使用方法通常是不同系統(tǒng)都對ZK上同一個(gè)znode進(jìn)行注冊,監(jiān)聽znode的變化(包括znode本身內(nèi)容及子節(jié)點(diǎn)的),其中一個(gè)系統(tǒng)update了znode,那么另一個(gè)系統(tǒng)能夠收到通知,并作出相應(yīng)處理。
(2) 應(yīng)用
①?另一種心跳檢測機(jī)制:檢測系統(tǒng)和被檢測系統(tǒng)之間并不直接關(guān)聯(lián)起來,而是通過ZK上某個(gè)節(jié)點(diǎn)關(guān)聯(lián),大大減少系統(tǒng)耦合。
②?另一種系統(tǒng)調(diào)度模式:某系統(tǒng)由控制臺和推送系統(tǒng)兩部分組成,控制臺的職責(zé)是控制推送系統(tǒng)進(jìn)行相應(yīng)的推送工作。管理人員在控制臺作的一些操作,實(shí)際上是修改了ZK上某些節(jié)點(diǎn)的狀態(tài),而ZK就把這些變化通知給他們注冊Watcher的客戶端,即推送系統(tǒng),于是,作出相應(yīng)的推送任務(wù)。
③?另一種工作匯報(bào)模式:一些類似于任務(wù)分發(fā)系統(tǒng),子任務(wù)啟動后,到ZK來注冊一個(gè)臨時(shí)節(jié)點(diǎn),并且定時(shí)將自己的進(jìn)度進(jìn)行匯報(bào)(將進(jìn)度寫回這個(gè)臨時(shí)節(jié)點(diǎn)),這樣任務(wù)管理者就能夠?qū)崟r(shí)知道任務(wù)進(jìn)度。
總之,使用zookeeper來進(jìn)行分布式通知和協(xié)調(diào)能夠大大降低系統(tǒng)之間的耦合。
2.4分布式鎖(Distribute Lock)
(1) 場景描述
分布式鎖,這個(gè)主要得益于ZooKeeper為我們保證了數(shù)據(jù)的強(qiáng)一致性,即用戶只要完全相信每時(shí)每刻,zk集群中任意節(jié)點(diǎn)(一個(gè)zk server)上的相同znode的數(shù)據(jù)是一定是相同的。鎖服務(wù)可以分為兩類,一個(gè)是保持獨(dú)占,另一個(gè)是控制時(shí)序。
保持獨(dú)占,就是所有試圖來獲取這個(gè)鎖的客戶端,最終只有一個(gè)可以成功獲得這把 鎖。通常的做法是把ZK上的一個(gè)znode看作是一把鎖,通過create znode的方式來實(shí)現(xiàn)。所有客戶端都去創(chuàng)建 /distribute_lock 節(jié)點(diǎn),最終成功創(chuàng)建的那個(gè)客戶端也即擁有了這把鎖。
控制時(shí)序,就是所有試圖來獲取這個(gè)鎖的客戶端,最終都是會被安排執(zhí)行,只是有 個(gè)全局時(shí)序了。做法和上面基本類似,只是這里 /distribute_lock 已經(jīng)預(yù)先存在,客戶端在它下面創(chuàng)建臨時(shí)有序節(jié)點(diǎn)。Zk的父節(jié)點(diǎn)(/distribute_lock)維持一份sequence,保證子節(jié)點(diǎn)創(chuàng)建的時(shí)序性, 從而也形成了每個(gè)客戶端的全局時(shí)序。
(2) 應(yīng)用
共享鎖在同一個(gè)進(jìn)程中很容易實(shí)現(xiàn),但是在跨進(jìn)程或者在不同 Server 之間就不好實(shí)現(xiàn)了。Zookeeper 卻很容易實(shí)現(xiàn)這個(gè)功能,實(shí)現(xiàn)方式也是需要獲得鎖的 Server 創(chuàng)建一個(gè)?EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn),然后調(diào)用?getChildren方法獲取當(dāng)前的目錄節(jié)點(diǎn)列表中最小的目錄節(jié)點(diǎn)是不是就是自己創(chuàng)建的目錄節(jié)點(diǎn),如果正是自己創(chuàng)建的,那么它就獲得了這個(gè)鎖,如果不是那么它就調(diào)用?exists(String path, boolean watch) 方法并監(jiān)控 Zookeeper 上目錄節(jié)點(diǎn)列表的變化,一直到自己創(chuàng)建的節(jié)點(diǎn)是列表中最小編號的目錄節(jié)點(diǎn),從而獲得鎖,釋放鎖很簡單,只要?jiǎng)h除前面它自己所創(chuàng)建的目錄節(jié)點(diǎn)就行了。
圖 2.3 ZooKeeper實(shí)現(xiàn)Locks的流程圖
代碼清單1 TestMainClient 代碼
?
package org.zk.leader.election;import org.apache.log4j.xml.DOMConfigurator;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;/*** TestMainClient* <p/>* Author By: sunddenly工作室* Created Date: 2014-11-13*/public class TestMainClient implements Watcher {protected static ZooKeeper zk = null;protected static Integer mutex;int sessionTimeout = 10000;protected String root;public TestMainClient(String connectString) {if(zk == null){try {String configFile = this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml";DOMConfigurator.configure(configFile);System.out.println("創(chuàng)建一個(gè)新的連接:");zk = new ZooKeeper(connectString, sessionTimeout, this);mutex = new Integer(-1);} catch (IOException e) {zk = null;}}}synchronized public void process(WatchedEvent event) {synchronized (mutex) {mutex.notify();}}}?
清單 2 Locks 代碼
?
?
package org.zk.locks;import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.data.Stat;import org.zk.leader.election.TestMainClient;import java.util.Arrays;import java.util.List;/*** locks* <p/>* Author By: sunddenly工作室* Created Date: 2014-11-13 16:49:40*/public class Locks extends TestMainClient {public static final Logger logger = Logger.getLogger(Locks.class);String myZnode;public Locks(String connectString, String root) {super(connectString);this.root = root;if (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (KeeperException e) {logger.error(e);} catch (InterruptedException e) {logger.error(e);}}}void getLock() throws KeeperException, InterruptedException{List<String> list = zk.getChildren(root, false);String[] nodes = list.toArray(new String[list.size()]);Arrays.sort(nodes);if(myZnode.equals(root+"/"+nodes[0])){doAction();}else{waitForLock(nodes[0]);}}void check() throws InterruptedException, KeeperException {myZnode = zk.create(root + "/lock_" , new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);getLock();}void waitForLock(String lower) throws InterruptedException, KeeperException {Stat stat = zk.exists(root + "/" + lower,true);if(stat != null){mutex.wait();}else{getLock();}}@Overridepublic void process(WatchedEvent event) {if(event.getType() == Event.EventType.NodeDeleted){System.out.println("得到通知");super.process(event);doAction();}}/*** 執(zhí)行其他任務(wù)*/private void doAction(){System.out.println("同步隊(duì)列已經(jīng)得到同步,可以開始執(zhí)行后面的任務(wù)了");}public static void main(String[] args) {String connectString = "localhost:2181";Locks lk = new Locks(connectString, "/locks");try {lk.check();} catch (InterruptedException e) {logger.error(e);} catch (KeeperException e) {logger.error(e);}}}?
2.5 集群管理(Cluster Management)
(1) 典型場景描述
集群機(jī)器監(jiān)控:
這通常用于那種對集群中機(jī)器狀態(tài),機(jī)器在線率有較高要求的場景,能夠快速對集群中機(jī)器變化作出響應(yīng)。這樣的場景中,往往有一個(gè)監(jiān)控系統(tǒng),實(shí)時(shí)檢測集 群機(jī)器是否存活。過去的做法通常是:監(jiān)控系統(tǒng)通過某種手段(比如ping)定時(shí)檢測每個(gè)機(jī)器,或者每個(gè)機(jī)器自己定時(shí)向監(jiān)控系統(tǒng)匯報(bào)"我還活著"。 這種做法可行,但是存在兩個(gè)比較明顯的問題:
①?集群中機(jī)器有變動的時(shí)候,牽連修改的東西比較多。
②?有一定的延時(shí)。
利用ZooKeeper中兩個(gè)特性,就可以實(shí)施另一種集群機(jī)器存活性監(jiān)控系統(tǒng):
①?客戶端在節(jié)點(diǎn) x 上注冊一個(gè)Watcher,那么如果 x 的子節(jié)點(diǎn)變化了,會通知該客戶端。
②?創(chuàng)建EPHEMERAL類型的節(jié)點(diǎn),一旦客戶端和服務(wù)器的會話結(jié)束或過期,那么該節(jié)點(diǎn)就會消失。
Master選舉:
Master選舉則是zookeeper中最為經(jīng)典的使用場景了,在分布式環(huán)境中,相同的業(yè)務(wù)應(yīng)用分布在不同的機(jī)器上,有些業(yè)務(wù)邏輯,例如一些耗時(shí)的計(jì)算,網(wǎng)絡(luò)I/O處,往往只需要讓整個(gè)集群中的某一臺機(jī)器進(jìn)行執(zhí)行,其余機(jī)器可以共享這個(gè)結(jié)果,這樣可以大大減少重復(fù)勞動,提高性能,于是這個(gè)master選舉便是這種場景下的碰到的主要問題。
利用ZooKeeper中兩個(gè)特性,就可以實(shí)施另一種集群中Master選舉:
①?利用ZooKeeper的強(qiáng)一致性,能夠保證在分布式高并發(fā)情況下節(jié)點(diǎn)創(chuàng)建的全局唯一性,即:同時(shí)有多個(gè)客戶端請求創(chuàng)建 /Master 節(jié)點(diǎn),最終一定只有一個(gè)客戶端請求能夠創(chuàng)建成功。利用這個(gè)特性,就能很輕易的在分布式環(huán)境中進(jìn)行集群選舉了。
②另外,這種場景演化一下,就是動態(tài)Master選舉。這就要用到?EPHEMERAL_SEQUENTIAL類型節(jié)點(diǎn)的特性了,這樣每個(gè)節(jié)點(diǎn)會自動被編號。允許所有請求都能夠創(chuàng)建成功,但是得有個(gè)創(chuàng)建順序,每次選取序列號最小的那個(gè)機(jī)器作為Master 。
(2) 應(yīng)用
在搜索系統(tǒng)中,如果集群中每個(gè)機(jī)器都生成一份全量索引,不僅耗時(shí),而且不能保證彼此間索引數(shù)據(jù)一致。因此讓集群中的Master來迚行全量索引的生 成,然后同步到集群中其它機(jī)器。另外,Master選丼的容災(zāi)措施是,可以隨時(shí)迚行手動挃定master,就是說應(yīng)用在zk在無法獲取master信息 時(shí),可以通過比如http方式,向一個(gè)地方獲取master。 ? 在Hbase中,也是使用ZooKeeper來實(shí)現(xiàn)動態(tài)HMaster的選舉。在Hbase實(shí)現(xiàn)中,會在ZK上存儲一些ROOT表的地址和HMaster 的地址,HRegionServer也會把自己以臨時(shí)節(jié)點(diǎn)(Ephemeral)的方式注冊到Zookeeper中,使得HMaster可以隨時(shí)感知到各 個(gè)HRegionServer的存活狀態(tài),同時(shí),一旦HMaster出現(xiàn)問題,會重新選丼出一個(gè)HMaster來運(yùn)行,從而避免了HMaster的單點(diǎn)問 題的存活狀態(tài),同時(shí),一旦HMaster出現(xiàn)問題,會重新選丼出一個(gè)HMaster來運(yùn)行,從而避免了HMaster的單點(diǎn)問題。
(3) 應(yīng)用舉例
集群監(jiān)控:
應(yīng)用集群中,我們常常需要讓每一個(gè)機(jī)器知道集群中或依賴的其他某一個(gè)集群中哪些機(jī)器是活著的,并且在集群機(jī)器因?yàn)殄礄C(jī),網(wǎng)絡(luò)斷鏈等原因能夠不在人工 介入的情況下迅速通知到每一個(gè)機(jī)器,Zookeeper 能夠很容易的實(shí)現(xiàn)集群管理的功能,如有多臺 Server 組成一個(gè)服務(wù)集群,那么必須要一個(gè)"總管"知道當(dāng)前集群中每臺機(jī)器的服務(wù)狀態(tài),一旦有機(jī)器不能提供服務(wù),集群中其它集群必須知道,從而做出調(diào)整重新分配服 務(wù)策略。同樣當(dāng)增加集群的服務(wù)能力時(shí),就會增加一臺或多臺 Server,同樣也必須讓"總管"知道,這就是ZooKeeper的集群監(jiān)控功能。
圖2.4 集群管理結(jié)構(gòu)圖
比如我在zookeeper服務(wù)器端有一個(gè)znode叫/Configuration,那么集群中每一個(gè)機(jī)器啟動的時(shí)候都去這個(gè)節(jié)點(diǎn)下創(chuàng)建一個(gè)EPHEMERAL類型的節(jié)點(diǎn),比如server1創(chuàng)建/Configuration?/Server1,server2創(chuàng)建/Configuration?/Server1,然后Server1和Server2都watch /Configuration 這個(gè)父節(jié)點(diǎn),那么也就是這個(gè)父節(jié)點(diǎn)下數(shù)據(jù)或者子節(jié)點(diǎn)變化都會通知對該節(jié)點(diǎn)進(jìn)行watch的客戶端。因?yàn)镋PHEMERAL類型節(jié)點(diǎn)有一個(gè)很重要的特性,就 是客戶端和服務(wù)器端連接斷掉或者session過期就會使節(jié)點(diǎn)消失,那么在某一個(gè)機(jī)器掛掉或者斷鏈的時(shí)候,其對應(yīng)的節(jié)點(diǎn)就會消 失,然后集群中所有對/Configuration進(jìn)行watch的客戶端都會收到通知,然后取得最新列表即可。
Master選舉:
Zookeeper 不僅能夠維護(hù)當(dāng)前的集群中機(jī)器的服務(wù)狀態(tài),而且能夠選出一個(gè)"總管",讓這個(gè)總管來管理集群,這就是 Zookeeper 的另一個(gè)功能 Leader Election。Zookeeper 如何實(shí)現(xiàn) Leader Election,也就是選出一個(gè) Master Server。和前面的一樣每臺 Server 創(chuàng)建一個(gè) EPHEMERAL 目錄節(jié)點(diǎn),不同的是它還是一個(gè) SEQUENTIAL 目錄節(jié)點(diǎn),所以它是個(gè) EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn)。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節(jié)點(diǎn),是因?yàn)槲覀兛梢越o每臺 Server 編號,我們可以選擇當(dāng)前是最小編號的 Server 為 Master,假如這個(gè)最小編號的 Server 死去,由于是 EPHEMERAL 節(jié)點(diǎn),死去的 Server 對應(yīng)的節(jié)點(diǎn)也被刪除,所以當(dāng)前的節(jié)點(diǎn)列表中又出現(xiàn)一個(gè)最小編號的節(jié)點(diǎn),我們就選擇這個(gè)節(jié)點(diǎn)為當(dāng)前 Master。這樣就實(shí)現(xiàn)了動態(tài)選擇 Master,避免了傳統(tǒng)意義上單 Master 容易出現(xiàn)單點(diǎn)故障的問題。
清單 3 Leader Election代碼
package org.zk.leader.election;import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.data.Stat;import java.net.InetAddress;import java.net.UnknownHostException;/*** LeaderElection* <p/>* Author By: sunddenly工作室* Created Date: 2014-11-13*/public class LeaderElection extends TestMainClient {public static final Logger logger = Logger.getLogger(LeaderElection.class);public LeaderElection(String connectString, String root) {super(connectString);this.root = root;if (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (KeeperException e) {logger.error(e);} catch (InterruptedException e) {logger.error(e);}}}void findLeader() throws InterruptedException, UnknownHostException, KeeperException {byte[] leader = null;try {leader = zk.getData(root + "/leader", true, null);} catch (KeeperException e) {if (e instanceof KeeperException.NoNodeException) {logger.error(e);} else {throw e;}}if (leader != null) {following();} else {String newLeader = null;byte[] localhost = InetAddress.getLocalHost().getAddress();try {newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (KeeperException e) {if (e instanceof KeeperException.NodeExistsException) {logger.error(e);} else {throw e;}}if (newLeader != null) {leading();} else {mutex.wait();}}}@Overridepublic void process(WatchedEvent event) {if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) {System.out.println("得到通知");super.process(event);following();}}void leading() {System.out.println("成為領(lǐng)導(dǎo)者");}void following() {System.out.println("成為組成員");}public static void main(String[] args) {String connectString = "localhost:2181";LeaderElection le = new LeaderElection(connectString, "/GroupMembers");try {le.findLeader();} catch (Exception e) {logger.error(e);}}}2.6 隊(duì)列管理
Zookeeper 可以處理兩種類型的隊(duì)列:
①?當(dāng)一個(gè)隊(duì)列的成員都聚齊時(shí),這個(gè)隊(duì)列才可用,否則一直等待所有成員到達(dá),這種是同步隊(duì)列。
②?隊(duì)列按照 FIFO 方式進(jìn)行入隊(duì)和出隊(duì)操作,例如實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型。
(1)?同步隊(duì)列用 Zookeeper 實(shí)現(xiàn)的實(shí)現(xiàn)思路如下:
創(chuàng)建一個(gè)父目錄 /synchronizing,每個(gè)成員都監(jiān)控標(biāo)志(Set Watch)位目錄 /synchronizing/start 是否存在,然后每個(gè)成員都加入這個(gè)隊(duì)列,加入隊(duì)列的方式就是創(chuàng)建 /synchronizing/member_i 的臨時(shí)目錄節(jié)點(diǎn),然后每個(gè)成員獲取 / synchronizing 目錄的所有目錄節(jié)點(diǎn),也就是 member_i。判斷 i 的值是否已經(jīng)是成員的個(gè)數(shù),如果小于成員個(gè)數(shù)等待 /synchronizing/start 的出現(xiàn),如果已經(jīng)相等就創(chuàng)建 /synchronizing/start。
用下面的流程圖更容易理解:
圖 2.5 同步隊(duì)列流程圖
?
清單 4 Synchronizing 代碼
package org.zk.queue;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.List;import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat;import org.zk.leader.election.TestMainClient;/*** Synchronizing* <p/>* Author By: sunddenly工作室* Created Date: 2014-11-13*/public class Synchronizing extends TestMainClient {int size;String name;public static final Logger logger = Logger.getLogger(Synchronizing.class);/*** 構(gòu)造函數(shù)** @param connectString 服務(wù)器連接* @param root 根目錄* @param size 隊(duì)列大小*/Synchronizing(String connectString, String root, int size) {super(connectString);this.root = root;this.size = size;if (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}} catch (KeeperException e) {logger.error(e);} catch (InterruptedException e) {logger.error(e);}}try {name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());} catch (UnknownHostException e) {logger.error(e);}}/*** 加入隊(duì)列** @return* @throws KeeperException* @throws InterruptedException*/void addQueue() throws KeeperException, InterruptedException{zk.exists(root + "/start",true);zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);synchronized (mutex) {List<String> list = zk.getChildren(root, false);if (list.size() < size) {mutex.wait();} else {zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}}@Overridepublic void process(WatchedEvent event) {if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){System.out.println("得到通知");super.process(event);doAction();}}/*** 執(zhí)行其他任務(wù)*/private void doAction(){System.out.println("同步隊(duì)列已經(jīng)得到同步,可以開始執(zhí)行后面的任務(wù)了");}public static void main(String args[]) {//啟動ServerString connectString = "localhost:2181";int size = 1;Synchronizing b = new Synchronizing(connectString, "/synchronizing", size);try{b.addQueue();} catch (KeeperException e){logger.error(e);} catch (InterruptedException e){logger.error(e);}}}(2)?FIFO 隊(duì)列用 Zookeeper 實(shí)現(xiàn)思路如下:
實(shí)現(xiàn)的思路也非常簡單,就是在特定的目錄下創(chuàng)建 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證所有成員加入隊(duì)列時(shí)都是有編號的,出隊(duì)列時(shí)通過 getChildren( ) 方法可以返回當(dāng)前所有的隊(duì)列中的元素,然后消費(fèi)其中最小的一個(gè),這樣就能保證 FIFO。
下面是生產(chǎn)者和消費(fèi)者這種隊(duì)列形式的示例代碼
清單 5 FIFOQueue 代碼
import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.data.Stat;import java.nio.ByteBuffer;import java.util.List;/*** FIFOQueue* <p/>* Author By: sunddenly工作室* Created Date: 2014-11-13*/public class FIFOQueue extends TestMainClient{public static final Logger logger = Logger.getLogger(FIFOQueue.class);/*** Constructor** @param connectString* @param root*/FIFOQueue(String connectString, String root) {super(connectString);this.root = root;if (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}} catch (KeeperException e) {logger.error(e);} catch (InterruptedException e) {logger.error(e);}}}/*** 生產(chǎn)者** @param i* @return*/boolean produce(int i) throws KeeperException, InterruptedException{ByteBuffer b = ByteBuffer.allocate(4);byte[] value;b.putInt(i);value = b.array();zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);return true;}/*** 消費(fèi)者** @return* @throws KeeperException* @throws InterruptedException*/int consume() throws KeeperException, InterruptedException{int retvalue = -1;Stat stat = null;while (true) {synchronized (mutex) {List<String> list = zk.getChildren(root, true);if (list.size() == 0) {mutex.wait();} else {Integer min = new Integer(list.get(0).substring(7));for(String s : list){Integer tempValue = new Integer(s.substring(7));if(tempValue < min) min = tempValue;}byte[] b = zk.getData(root + "/element" + min,false, stat);zk.delete(root + "/element" + min, 0);ByteBuffer buffer = ByteBuffer.wrap(b);retvalue = buffer.getInt();return retvalue;}}}}@Overridepublic void process(WatchedEvent event) {super.process(event);}public static void main(String args[]) {//啟動ServerTestMainServer.start();String connectString = "localhost:"+TestMainServer.CLIENT_PORT;FIFOQueue q = new FIFOQueue(connectString, "/app1");int i;Integer max = new Integer(5);System.out.println("Producer");for (i = 0; i < max; i++)try{q.produce(10 + i);} catch (KeeperException e){logger.error(e);} catch (InterruptedException e){logger.error(e);}for (i = 0; i < max; i++) {try{int r = q.consume();System.out.println("Item: " + r);} catch (KeeperException e){i--;logger.error(e);} catch (InterruptedException e){logger.error(e);}}}}三、ZooKeeper實(shí)際應(yīng)用
假設(shè)我們的集群有:
(1)?20個(gè)搜索引擎的服務(wù)器:每個(gè)負(fù)責(zé)總索引中的一部分的搜索任務(wù)。
①?搜索引擎的服務(wù)器中的15個(gè)服務(wù)器現(xiàn)在提供搜索服務(wù)。
②?5個(gè)服務(wù)器正在生成索引。
這20個(gè)搜索引擎的服務(wù)器,經(jīng)常要讓正在提供搜索服務(wù)的服務(wù)器停止提供服務(wù)開始生成索引,或生成索引的服務(wù)器已經(jīng)把索引生成完成可以搜索提供服務(wù)了。
(2)?一個(gè)總服務(wù)器:負(fù)責(zé)向這20個(gè)搜索引擎的服務(wù)器發(fā)出搜索請求并合并結(jié)果集。
(3)?一個(gè)備用的總服務(wù)器:負(fù)責(zé)當(dāng)總服務(wù)器宕機(jī)時(shí)替換總服務(wù)器。
(4)?一個(gè)web的cgi:向總服務(wù)器發(fā)出搜索請求。
使用Zookeeper可以保證:
(1)?總服務(wù)器:自動感知有多少提供搜索引擎的服務(wù)器,并向這些服務(wù)器發(fā)出搜索請求。
(2)?備用的總服務(wù)器:宕機(jī)時(shí)自動啟用備用的總服務(wù)器。
(3)?web的cgi:能夠自動地獲知總服務(wù)器的網(wǎng)絡(luò)地址變化。
(4) 實(shí)現(xiàn)如下:
①?提供搜索引擎的服務(wù)器都在Zookeeper中創(chuàng)建znode,zk.create("/search/nodes/node1",?"hostname".getBytes(),?Ids.OPEN_ACL_UNSAFE,?CreateFlags.EPHEMERAL);
② 總服務(wù)器可以從Zookeeper中獲取一個(gè)znode的子節(jié)點(diǎn)的列表,zk.getChildren("/search/nodes", true);
③?總服務(wù)器遍歷這些子節(jié)點(diǎn),并獲取子節(jié)點(diǎn)的數(shù)據(jù)生成提供搜索引擎的服務(wù)器列表;
④?當(dāng)總服務(wù)器接收到子節(jié)點(diǎn)改變的事件信息,重新返回第二步;
⑤?總服務(wù)器在Zookeeper中創(chuàng)建節(jié)點(diǎn),zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);
⑥ 備用的總服務(wù)器監(jiān)控Zookeeper中的"/search/master"節(jié)點(diǎn)。當(dāng)這個(gè)znode的節(jié)點(diǎn)數(shù)據(jù)改變時(shí),把自己啟動變成總服務(wù)器,并把自己的網(wǎng)絡(luò)地址數(shù)據(jù)放進(jìn)這個(gè)節(jié)點(diǎn)。
⑦?web的cgi從Zookeeper中"/search/master"節(jié)點(diǎn)獲取總服務(wù)器的網(wǎng)絡(luò)地址數(shù)據(jù),并向其發(fā)送搜索請求。
⑧?web的cgi監(jiān)控Zookeeper中的"/search/master"節(jié)點(diǎn),當(dāng)這個(gè)znode的節(jié)點(diǎn)數(shù)據(jù)改變時(shí),從這個(gè)節(jié)點(diǎn)獲取總服務(wù)器的網(wǎng)絡(luò)地址數(shù)據(jù),并改變當(dāng)前的總服務(wù)器的網(wǎng)絡(luò)地址。
轉(zhuǎn)載于:https://www.cnblogs.com/skying555/p/7873345.html
總結(jié)
以上是生活随笔為你收集整理的ZooKeeper管理分布式环境中的数据的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梦到洗葡萄预示着什么意思
- 下一篇: sqlite insert数据要用“?”