【zookeeper】第4篇:使用客戶端API來操做ZK

Java 客戶端

pom.xml 文件中引入相關api

<dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.5.4-beta</version>
</dependency>

建立鏈接

/**
 * 建立 zookeeper 會話
 * <p>
 * <p>
 * zookeeper 客戶端 和 服務端建立會話的過程是異步的。也就是客戶度經過構造方法建立會話後當即返回,此時的鏈接並無徹底創建。
 * 當真正的會話創建完成後,zk服務端會給客戶端通知一個事件,客戶端獲取通知以後在代表鏈接正在創建。
 */
public class ZooKeeperClientSession implements Watcher {
    //用於等待zk服務端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2183", 5000, new ZooKeeperClientSession());
        System.out.println(zooKeeper.getState());
        latch.await();

        long sessionId = zooKeeper.getSessionId();
        byte[] sessionPasswd = zooKeeper.getSessionPasswd();
        System.out.println(zooKeeper.getSessionId());

        /**
         *  利用 sessionId 和 sessionPasswd 複用會話鏈接
         */
        ZooKeeper zooKeeper1 = new ZooKeeper("127.0.0.1:2183",
                5000,
                new ZooKeeperClientSession(),
                sessionId,
                sessionPasswd);
        System.out.println(zooKeeper1.getSessionId());
    }
    /**
     * 處理 zookeeper 服務端的 Watcher 通知
     * @param watchedEvent
     */
    public void process(WatchedEvent watchedEvent) {
        System.out.println("receive watch event : " + watchedEvent);
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            latch.countDown();
        }
    }
}
1.構造函數說明
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

ZooKeeper(String connectString, 
          int sessionTimeout, 
          Watcher watcher,
          boolean canBeReadOnly)

ZooKeeper(String connectString,
          int sessionTimeout, 
          Watcher watcher, 
          long sessionId, 
          byte[] sessionPasswd)

ZooKeeper(String connectString, 
          int sessionTimeout, 
          Watcher watcher,
          long sessionId, 
          byte[] sessionPasswd, 
          boolean canBeReadOnly)
參數 做用
connectString zk服務器列表,由英文逗號分開的字符串,例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183;也能夠是帶有目錄的字符:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/zk-book
sessionTimeout 會話超時時間,以毫秒爲單位。在一個會話週期內,zk客戶端和服務端經過心跳來檢查鏈接的有效性,一旦在sessionTimeout時間內沒有進行心跳檢測,則會話失效
watcher zk容許客戶端在構造方法中傳入一個Watcher接口實現類做爲事件通知處理器
sessionId、sessionPasswd 利用sessionId 和 sessionPasswd 確保複用會話鏈接
canBeReadOnly 用於標識當前會話是否支付只讀模式。在zk集羣模式中,若是一臺集羣和集羣中過半以上的機器都都失去了網絡鏈接,那麼這個機器將再也不處理客戶端請求,包括讀寫請求。但在某些狀況下出現相似問題,咱們但願該臺機器可以處理讀請求,此時爲 read-only 模式
  • 建立節點
1.同步建立臨時節點
/**
 * 同步 api 建立臨時節點
 */
public class CreatedSyncNode implements Watcher {
    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
                5000, new CreatedSyncNode());
        latch.await();

        //臨時節點:建立接口返回該節點路徑L,例如返回值 /zk-test
        String path = zooKeeper.create("/zk-test", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("created path success : " + path);

        // 臨時順序節點:會自動的在節點路徑後加一個數字,例如返回值:/zk-test0000000001
        String path1 = zooKeeper.create("/zk-test", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println("created path success : " + path1);
    }

    public void process(WatchedEvent watchedEvent) {
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            latch.countDown();
        }
    }
}
2.異步建立節點
/**
 * 異步 api 建立持久化節點
 */
public class CreatedAsyncNode implements Watcher {
    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {

        ZooKeeper zooKeeper = new ZooKeeper(
                "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
                5000,
                new CreatedAsyncNode());
        latch.await();

        //持久化節點:建立接口返回該節點路徑,無返回值;異步建立的節點:/zk-test
        zooKeeper.create("/zk-test", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CallBack(), "PERSISTENT");

        //持久化順序節點:會自動的在節點路徑後加一個數字,該方法無返回值,建立後節點:/zk-test-seq0000000002
        zooKeeper.create("/zk-test-seq",
                "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT_SEQUENTIAL,
                new CallBack(),
                "PERSISTENT_SEQUENTIAL");

        Thread.sleep(1000);
    }

    public void process(WatchedEvent watchedEvent) {
        if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            latch.countDown();
        }
    }

    static class CallBack implements AsyncCallback.StringCallback {
        /**
         * 服務端回調方法
         *
         * @param code 服務端響應碼:0 接口調用成功;-4 客戶端和服務端鏈接斷開;-110 節點已存在 ;-112 會話過時
         * @param path 建立節點傳入的路徑參數
         * @param ctx  異步建立api傳入的ctx參數
         * @param name 服務端真正建立節點的名稱,業務邏輯應該以該值爲準
         */
        public void processResult(int code, String path, Object ctx, String name) {
            System.out.println("created success : " + code + "," + path + "," + ctx + "," + name);
        }
    }
}

讀取數據

讀取數據包括獲取子節點列表和節點的數據內容。zookeeper 中分別提供了 getChildren 和 getData 方法。html

getChildren

  • 方法參數說明
參數名 參數類型 做用
path String 指定節點的路徑
watcher Watcher 註冊一個Watcher,一旦在本次子節點獲取到以後,子節點列表發送變動時,那麼會向客戶端發送通知
watch boolean 代表是否須要註冊一個
stat org.apache.zookeeper.data.Stat 節點的狀態信息,有時候咱們不只須要最新的子節點列表,還要獲取這個節點的最新狀態信息,咱們能夠將一箇舊的 stat 傳入到api方法中,在方法執行過程當中 stat 會被來自服務的新的 stat 替換掉
cb AsyncCallback.ChildrenCallback 異步回調函數
ctx Object 用於傳遞上下文信息
  • 同步獲取節點列表
/**
 * 同步 api 建立節點,而後同步獲取節點列表
 */
public class CreatedSyncNode implements Watcher {
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
                5000, new CreatedSyncNode());
        latch.await();

        String nodePath = "/zk-data";
        zooKeeper.create(nodePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        List<String> list = zooKeeper.getChildren(nodePath, true);
        System.out.println("get children : " + list);

        //給節點 /zk-data 再添加子節點 /conf
        zooKeeper.create(nodePath + "/conf", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Thread.sleep(1000);
    }

    /**
     * 監聽子節點的變化
     *
     * @param watchedEvent
     */
    public void process(WatchedEvent watchedEvent) {
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
                latch.countDown();
            } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
                //監聽到子節點變化後,主動的去獲取節點列表
                try {
                    List<String> list = zooKeeper.getChildren(watchedEvent.getPath(), true);
                    System.out.println("get children node changed : " + list);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 異步獲取節點列表

重點是在獲取節點列表 api 中傳入回調對象,該對象實現 Children2Callback 接口java

/**
 * 異步查詢節點列表
 */
public class AsyncQueryNode implements Watcher {
    private static CountDownLatch latch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        //建立異步會話
        zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
                5000, new AsyncQueryNode());
        latch.await();

        String nodePath = "/zk-book";
        zooKeeper.create(nodePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //表示異步獲取子節點列表,在 ChildrenCallback 中對節點列表進行處理
        zooKeeper.getChildren(nodePath, true, new ChildrenCallback(), "query children");

        //給節點 /zk-data 再添加子節點 /conf
        zooKeeper.create(nodePath + "/src", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Thread.sleep(1000);
    }

    /**
     * 監聽子節點的變化
     *
     * @param watchedEvent
     */
    public void process(WatchedEvent watchedEvent) {
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
                latch.countDown();
            } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
                //監聽到子節點變化後,主動的去獲取節點列表
                try {
                    List<String> list = zooKeeper.getChildren(watchedEvent.getPath(), true);
                    System.out.println("get children node changed : " + list);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class ChildrenCallback implements AsyncCallback.Children2Callback {
        /**
         * @param code 服務端響應碼
         * @param path 建立節點傳入的路徑參數
         * @param ctx  上下文信息
         * @param list 子節點列表
         * @param stat 節點的狀態信息
         */
        public void processResult(int code, String path, Object ctx, List<String> list, Stat stat) {
            System.out.println("code : " + code
                    + ";path : " + path
                    + ";ctx : " + ctx
                    + ";children list : " + list
                    + ";stat : " + stat);
        }
    }
}

getData

獲取節點數據內容方法,getData的用法和getChildren差很少,getData 方法的參數中也有一個 Watcher 參數,該 Watcher 的做用是客戶端拿到節點的數據以後,能夠進行 Watcher 註冊,一旦該節點的狀態發送變化,服務端會發送 NoteDataChanged 事件告訴客戶端。node

  • 同步獲取
/**
 * 同步獲取節點數據
 */
public class SyncGetData implements Watcher {

    private static CountDownLatch latch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;
    private static Stat stat = new Stat();

    public static void main(String[] args) throws Exception {
        String path = "/zk-demo";
        zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
                5000,
                new SyncGetData());
        latch.await();

        zooKeeper.create(path, "123".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //直接獲取節點數據
        byte[] datas = zooKeeper.getData(path, true, stat);
        System.out.println("結果:" + new String(datas));
        System.out.println(stat);

        //修改節點數據
        zooKeeper.setData(path, "123".getBytes(), -1);
        Thread.sleep(1000);
    }

    public void process(WatchedEvent watchedEvent) {
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
                latch.countDown();
                //節點變化時進行通知,EventType.NodeDataChanged 事件
            } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                try {
                    byte[] datas = zooKeeper.getData(watchedEvent.getPath(), true, stat);
                    System.out.println("回調通知:" + new String(datas));
                    System.out.println(stat);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

運行結果:apache

clipboard.png

  • 異步獲取

實現異步回調接口 DataCallback,接收回調通知api

public class AsyncGetData implements Watcher {
    private static CountDownLatch latch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        String path = "/zk-async";
        zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",
                5000,
                new AsyncGetData());
        latch.await();
        zooKeeper.create(path, "456".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //設置異步獲取節點數據
        zooKeeper.getData(path, true, new DataCallback(), null);
        //再次更新節點數據
        zooKeeper.setData(path, "789".getBytes(), -1);

        Thread.sleep(1000);
    }

    public void process(WatchedEvent watchedEvent) {
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
                latch.countDown();
                //節點變化時進行通知,EventType.NodeDataChanged 事件
            } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                try {
                    //再次異步獲取變化後的節點數據
                    zooKeeper.getData(watchedEvent.getPath(), true, new DataCallback(), null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //接收服務端回調
    static class DataCallback implements AsyncCallback.DataCallback {
        public void processResult(int code, String path, Object ctx, byte[] data, Stat stat) {

            System.out.println("回調通知的節點數據:" + new String(data));
            System.out.println("code : " + code + ";path : " + path + ";stat : " + stat);
        }
    }
}

測試結果:服務器

clipboard.png

相關文章
相關標籤/搜索