ZooKeeper Java Example(官方例子)
為了向您介紹ZooKeeper Java API,我們在這里開發了一個非常簡單的觀看客戶端。該ZooKeeper客戶端通過啟動或停止程序來觀察ZooKeeper節點的更改并進行響應。
要求
?有四個要求:
? ? 1.它作為參數:
????????ZooKeeper服務的地址
????????那么znode的名字就是被觀看的
????????具有參數的可執行文件
? ? 2.它獲取與znode相關聯的數據,并啟動可執行文件。
? ? 3.如果znode更改,客戶端將重新啟動內容并重新啟動可執行文件。
? ? 4.如果znode消失,客戶端將殺死可執行文件。
程序設計
通常,ZooKeeper應用程序分為兩個單元,一個維護連接,另一個用于監視數據。在此應用程序中,名為Executor的類維護ZooKeeper連接,并且名為DataMonitor的類監視ZooKeeper樹中的數據。
此外,Executor包含主線程并包含執行邏輯。它負責什么樣的用戶交互,以及與您作為參數傳遞的可執行程序的交互以及根據znode的狀態關閉和重新啟動示例。
1.?Executor.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | package?com.hellojd.cloud; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.Watcher; import?org.apache.zookeeper.ZooKeeper; import?java.io.FileOutputStream; import?java.io.IOException; import?java.io.InputStream; import?java.io.OutputStream; /** ?*?A?simple?example?program?to?use?DataMonitor?to?start?and ?*?stop?executables?based?on?a?znode.?The?program?watches?the ?*?specified?znode?and?saves?the?data?that?corresponds?to?the ?*?znode?in?the?filesystem.?It?also?starts?the?specified?program ?*?with?the?specified?arguments?when?the?znode?exists?and?kills ?*?the?program?if?the?znode?goes?away. ?*/ public?class?Executor ????????implements?Watcher,?Runnable,?DataMonitor.DataMonitorListener { ????DataMonitor?dm; ????ZooKeeper?zk; ????String?filename; ????String?exec[]; ????Process?child; ????public?Executor(String?hostPort,?String?znode,?String?filename, ????????????????????String?exec[])?throws?KeeperException,?IOException?{ ????????this.filename?=?filename; ????????this.exec?=?exec; ????????zk?=?new?ZooKeeper(hostPort,?3000,?this); ????????dm?=?new?DataMonitor(zk,?znode,?null,?this); ????} ????/** ?????*?@param?args ?????*/ ????public?static?void?main(String[]?args)?{ ????????if?(args.length?<?4)?{ ????????????System.err ????????????????????.println("USAGE:?Executor?hostPort?znode?filename?program?[args?...]"); ????????????System.exit(2); ????????} ????????String?hostPort?=?args[0]; ????????String?znode?=?args[1]; ????????String?filename?=?args[2]; ????????String?exec[]?=?new?String[args.length?-?3]; ????????System.arraycopy(args,?3,?exec,?0,?exec.length); ????????try?{ ????????????new?Executor(hostPort,?znode,?filename,?exec).run(); ????????}?catch?(Exception?e)?{ ????????????e.printStackTrace(); ????????} ????} ????/*************************************************************************** ?????*?We?do?process?any?events?ourselves,?we?just?need?to?forward?them?on. ?????* ?????*?@see?org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) ?????*/ ????public?void?process(WatchedEvent?event)?{ ????????System.out.println("Watcher?process"); ????????dm.process(event); ????} ????public?void?run()?{ ????????try?{ ????????????synchronized?(this)?{ ????????????????while?(!dm.dead)?{ ????????????????????wait(); ????????????????} ????????????} ????????}?catch?(InterruptedException?e)?{ ????????} ????} ????//以響應ZooKeeper連接永久消失。 ????public?void?closing(int?rc)?{ ????????synchronized?(this)?{ ????????????notifyAll(); ????????} ????} ????static?class?StreamWriter?extends?Thread?{ ????????OutputStream?os; ????????InputStream?is; ????????StreamWriter(InputStream?is,?OutputStream?os)?{ ????????????this.is?=?is; ????????????this.os?=?os; ????????????start(); ????????} ????????public?void?run()?{ ????????????byte?b[]?=?new?byte[80]; ????????????int?rc; ????????????try?{ ????????????????while?((rc?=?is.read(b))?>?0)?{ ????????????????????os.write(b,?0,?rc); ????????????????} ????????????}?catch?(IOException?e)?{ ????????????} ????????} ????} ????public?void?exists(byte[]?data)?{ ????????if?(data?==?null)?{ ????????????if?(child?!=?null)?{ ????????????????System.out.println("Killing?process"); ????????????????child.destroy(); ????????????????try?{ ????????????????????child.waitFor(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????} ????????????} ????????????child?=?null; ????????}?else?{ ????????????if?(child?!=?null)?{ ????????????????System.out.println("Stopping?child"); ????????????????child.destroy(); ????????????????try?{ ????????????????????child.waitFor(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????????//保存znode數據至文件 ????????????try?{ ????????????????FileOutputStream?fos?=?new?FileOutputStream(filename); ????????????????fos.write(data); ????????????????fos.close(); ????????????}?catch?(IOException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????????try?{ ????????????????System.out.println("Starting?child"); ????????????????child?=?Runtime.getRuntime().exec(exec); ????????????????new?StreamWriter(child.getInputStream(),?System.out); ????????????????new?StreamWriter(child.getErrorStream(),?System.err); ????????????}?catch?(IOException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????} ????} } |
2.?DataMonitor.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | /** ?*?A?simple?class?that?monitors?the?data?and?existence?of?a?ZooKeeper ?*?node.?It?uses?asynchronous?ZooKeeper?APIs. ?*/ package?com.hellojd.cloud; import?java.util.Arrays; import?org.apache.zookeeper.KeeperException; import?org.apache.zookeeper.WatchedEvent; import?org.apache.zookeeper.Watcher; import?org.apache.zookeeper.ZooKeeper; import?org.apache.zookeeper.AsyncCallback.StatCallback; import?org.apache.zookeeper.KeeperException.Code; import?org.apache.zookeeper.data.Stat; /** ?*?另一方面,DataMonitorListener接口不是ZooKeeper?API的一部分。?它是一個完全定制的界面,專為此示例應用程序而設計。 ?*?DataMonitor對象使用它來回傳給它的容器,它也是Executor對象。 ?*/ public?class?DataMonitor?implements??StatCallback?{ ????//Executor或一些類似Executor的對象“擁有”ZooKeeper連接,但可以將事件委托給其他事件到其他對象。 ????ZooKeeper?zk; ????String?znode; ????Watcher?chainedWatcher; ????boolean?dead; ????//簡單地將這些事件轉發到DataMonitor來決定如何處理它們 ????DataMonitorListener?listener; ????byte?prevData[]; ????//?它主要是異步和事件驅動 ????public?DataMonitor(ZooKeeper?zk,?String?znode,?Watcher?chainedWatcher, ???????????????????????DataMonitorListener?listener)?{ ????????this.zk?=?zk; ????????this.znode?=?znode; ????????this.chainedWatcher?=?chainedWatcher; ????????this.listener?=?listener; ????????//?Get?things?started?by?checking?if?the?node?exists.?We?are?going ????????//?to?be?completely?event?driven ????????zk.exists(znode,?true,?this,?null); ????} ????/** ?????*?該接口在DataMonitor類中定義,并在Executor類中實現。?當調用Executor.exists()時,執行器根據要求決定是啟動還是關閉。 ?????*?當znode不再存在時,需要說的是殺死可執行文件。 ?????*/ ????public?interface?DataMonitorListener?{ ????????/** ?????????*?The?existence?status?of?the?node?has?changed. ?????????*/ ????????void?exists(byte?data[]); ????????/** ?????????*?The?ZooKeeper?session?is?no?longer?valid. ?????????* ?????????*?@param?rc ?????????*????????????????the?ZooKeeper?reason?code ?????????*/ ????????void?closing(int?rc); ????} ????//響應ZooKeeper狀態的更改 ????public?void?process(WatchedEvent?event)?{ ????????String?path?=?event.getPath(); ????????if?(event.getType()?==?Watcher.Event.EventType.None)?{ ????????????//?We?are?are?being?told?that?the?state?of?the ????????????//?connection?has?changed ????????????switch?(event.getState())?{ ????????????????case?SyncConnected: ????????????????????//?In?this?particular?example?we?don't?need?to?do?anything ????????????????????//?here?-?watches?are?automatically?re-registered?with ????????????????????//?server?and?any?watches?triggered?while?the?client?was ????????????????????//?disconnected?will?be?delivered?(in?order?of?course) ????????????????????break; ????????????????case?Expired: ????????????????????//?It's?all?over ????????????????????dead?=?true; ????????????????????listener.closing(KeeperException.Code.SessionExpired); ????????????????????break; ????????????} ????????}?else?{ ????????????if?(path?!=?null?&&?path.equals(znode))?{ ????????????????//?Something?has?changed?on?the?node,?let's?find?out ????????????????zk.exists(znode,?true,?this,?null); ????????????} ????????} ????????if?(chainedWatcher?!=?null)?{ ????????????chainedWatcher.process(event); ????????} ????} ????public?void?processResult(int?rc,?String?path,?Object?ctx,?Stat?stat)?{ /** ?*?首先檢查znode存在,致命錯誤和可恢復錯誤的錯誤代碼。 ?*?如果文件(或znode)存在,它將從znode獲取數據,然后調用Executor的exists()回調, ?*?如果狀態已更改。?注意,它不必對getData調用執行異常處理,因為它具有掛起的任何可能導致錯誤的監視器: ?*?如果節點在調用ZooKeeper.getData()之前被刪除,則由ZooKeeper設置的監視事件?.exists()觸發回調; ?*如果發生通信錯誤,連接回顯將觸發連接監視事件。 ?*/ ????????boolean?exists; ????????switch?(rc)?{ ????????????case?Code.Ok: ????????????????exists?=?true; ????????????????break; ????????????case?Code.NoNode: ????????????????exists?=?false; ????????????????break; ????????????case?Code.SessionExpired: ????????????case?Code.NoAuth: ????????????????dead?=?true; ????????????????listener.closing(rc); ????????????????return; ????????????default: ????????????????//?Retry?errors ????????????????zk.exists(znode,?true,?this,?null); ????????????????return; ????????} ????????//文件(或znode)存在 ????????byte?b[]?=?null; ????????if?(exists)?{ ????????????try?{ ????????????????b?=?zk.getData(znode,?false,?null); ????????????}?catch?(KeeperException?e)?{ ????????????????//?We?don't?need?to?worry?about?recovering?now.?The?watch ????????????????//?callbacks?will?kick?off?any?exception?handling ????????????????e.printStackTrace(); ????????????}?catch?(InterruptedException?e)?{ ????????????????return; ????????????} ????????} ????????if?((b?==?null?&&?b?!=?prevData) ????????????????||?(b?!=?null?&&?!Arrays.equals(prevData,?b)))?{ ????????????listener.exists(b); ????????????prevData?=?b; ????????} ????} } |
調試:
參數列表:192.168.0.10:2181 /hellojd_node filename calc
192.168.0.10:2181:ZK地址
/hellojd_node :監視node?
filename :備份數據文件
calc:命令
本文轉自 randy_shandong 51CTO博客,原文鏈接:http://blog.51cto.com/dba10g/1975090,如需轉載請自行聯系原作者
總結
以上是生活随笔為你收集整理的ZooKeeper Java Example(官方例子)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hdfs web_ui深入讲解、服务启动
- 下一篇: Java iText PDF:用 iTe