ZkClient簡單操做

    ZkClient簡單的操做demo,經過修改節點或節點數據,監控其數據的變化。java

一、對數據節點的修改操做session

public class ZKClientChanger {

    private static ZkClient zkClient;

    /***
     * 測試修改path數據
     */
    private static class ChangerThread extends Thread {

        @Override
        public void run() {
            if(!zkClient.exists("/root")) {
                //一、建立root節點 二、初始化節點數據 三、節點爲永久性建立
                zkClient.create("/root", "root".getBytes(), CreateMode.PERSISTENT);
            }

            for(int i = 0; i < 50; i++) {
                zkClient.writeData("/root", "root" + String.valueOf(i));
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    public static void main(String[] args) {

        //對zookeeper封裝的client,zookeeper發佈訂閱是一次的性的,監控一次後會斷開鏈接;
        //client超時過時時貨自動建立zookeeper鏈接
        zkClient = new ZkClient("192.168.52.128:2181", 1000);

        new ChangerThread().start();
    }

}

二、監控數據節點的變化狀況ide

public class ZKclientWatcher {

    private static ZkClient zkClient;

    private static int count = 0;

    /**
     * 測試監控path下的數據變化狀況
     */
    private static class WatcherThread extends Thread {
        @Override
        public void run() {
            //監控path下的數據的變化
            zkClient.subscribeDataChanges("/root", new IZkDataListener() {
                @Override
                public void handleDataChange(String path, Object data) throws Exception {
                    System.out.println("handleDataChange===========>path:" + path + " data: " + data);
                    count++;
                }

                @Override
                public void handleDataDeleted(String path) throws Exception {
                    System.out.println("handleDataDeleted=========>path:" + path);
                }
            });

            //監控path下的節點的變化
            zkClient.subscribeChildChanges("/root", new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    // TODO 節點變化處理邏輯
                }
            });

            //監控鏈接狀態的變化
            zkClient.subscribeStateChanges(new IZkStateListener() {
                @Override
                public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                    // TODO 監控zookeeper鏈接狀態變化邏輯
                }

                @Override
                public void handleNewSession() throws Exception {
                    // TODO  監控zookeeper Session過時並建立新Session時的邏輯
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    // TODO 監控鏈接失敗,session沒法從新建立時的邏輯
                }
            });

            //取消全部的訂閱[unsubscribeChildChanges/unsubscribeDataChanges/unsubscribeStateChanges]
//            zkClient.unsubscribeAll();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        zkClient = new ZkClient("192.168.52.128:2181", 2000);
        new WatcherThread().start();

        while (count < 10) {
            System.out.println("count : " + count);
            Thread.sleep(1000);
        }
    }

}

    主要依賴zkclient的包:
測試

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>
相關文章
相關標籤/搜索