ZkClient簡單的操做demo,經過修改節點或節點數據,監控其數據的變化。java
一、對數據節點的修改操做session
public class ZKClientChanger { private static ZkClient zkClient; /*** * 測試修改path數據 */ private static class ChangerThread extends Thread { @Override public void run() { if(!zkClient.exists("/root")) { //一、建立root節點 二、初始化節點數據 三、節點爲永久性建立 zkClient.create("/root", "root".getBytes(), CreateMode.PERSISTENT); } for(int i = 0; i < 50; i++) { zkClient.writeData("/root", "root" + String.valueOf(i)); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { //對zookeeper封裝的client,zookeeper發佈訂閱是一次的性的,監控一次後會斷開鏈接; //client超時過時時貨自動建立zookeeper鏈接 zkClient = new ZkClient("192.168.52.128:2181", 1000); new ChangerThread().start(); } }
二、監控數據節點的變化狀況ide
public class ZKclientWatcher { private static ZkClient zkClient; private static int count = 0; /** * 測試監控path下的數據變化狀況 */ private static class WatcherThread extends Thread { @Override public void run() { //監控path下的數據的變化 zkClient.subscribeDataChanges("/root", new IZkDataListener() { @Override public void handleDataChange(String path, Object data) throws Exception { System.out.println("handleDataChange===========>path:" + path + " data: " + data); count++; } @Override public void handleDataDeleted(String path) throws Exception { System.out.println("handleDataDeleted=========>path:" + path); } }); //監控path下的節點的變化 zkClient.subscribeChildChanges("/root", new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { // TODO 節點變化處理邏輯 } }); //監控鏈接狀態的變化 zkClient.subscribeStateChanges(new IZkStateListener() { @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { // TODO 監控zookeeper鏈接狀態變化邏輯 } @Override public void handleNewSession() throws Exception { // TODO 監控zookeeper Session過時並建立新Session時的邏輯 } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { // TODO 監控鏈接失敗,session沒法從新建立時的邏輯 } }); //取消全部的訂閱[unsubscribeChildChanges/unsubscribeDataChanges/unsubscribeStateChanges] // zkClient.unsubscribeAll(); } } public static void main(String[] args) throws InterruptedException { zkClient = new ZkClient("192.168.52.128:2181", 2000); new WatcherThread().start(); while (count < 10) { System.out.println("count : " + count); Thread.sleep(1000); } } }
主要依賴zkclient的包:
測試
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency>