zookeeper java api介紹

《zookeeper介紹及環境搭建》《zookeeper客戶端的使用》兩篇文章中,我介紹了zookeeper實驗環境的搭建、zookeeper的數據結構和zookeeper的一些操做命令。本篇文章我將對zookeeper java api進行詳細的介紹。相關代碼java

開發環境搭建

zookeeper.jar中包含了zookeeper提供的java api,爲了在項目中引入zookeeper.jar,在工程的pom.xml文件中加入下面的依賴:node

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

鏈接zk服務器

構造ZooKeeper類對象的過程就是與ZK服務器創建鏈接的過程。git

ZooKeeper zooKeeper = new ZooKeeper("192.168.1.108:2181", 5000, watcher);

ZooKeeper類的構造函數一共有三個參數:第一個參數是服務器的地址,第二個參數是session超時時間,第三個參數是org.apache.zookeeper.Watcher類型的對象。zookeeper api與服務器創建鏈接是異步的,上面的調用會立刻從ZooKeeper構造函數返回,當與服務器創建好鏈接以後會調用Watcher中的process方法進行處理。process方法會接受一個WatchedEvent類型的參數,用於代表發生了什麼事件。github

public void process(WatchedEvent watchedEvent) {
    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //判斷是否已鏈接
        if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
            // 最初與zk服務器創建好鏈接
        } else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            // 子節點變化事件
        }
        // ...還能夠繼續監聽其它事件類型
    }
    System.out.println(watchedEvent.getState());
}

WatchedEvent包含兩方面重要信息:apache

  1. 與zk服務器鏈接的狀態信息
    能夠調用watchedEvent.getState()方法獲取與zk服務器鏈接的狀態信息,狀態信息取值主要包括SyncConnectedDisconnectedConnectedReadOnlyAuthFailed等等。
  2. 發生的具體事件類型信息
    watchedEvent.getState()方法只是獲取與zk服務器鏈接的狀態信息,但在同一個鏈接狀態下,還會發生不少事件的類型。例如在zk中,咱們能夠watch一個節點的數據內容,當這個節點的數據被改變時,咱們能夠獲取到這個事件。相似的還有子節點列表變化事件等等。
    這就須要咱們在SyncConnected同一種鏈接狀態下區分多個事件類型。能夠經過watchedEvent.getType()方法獲取具體的事件類型。事件類型的取值包括NoneNodeCreatedNodeDeletedNodeDataChangedNodeChildrenChanged

建立節點

下面要介紹的每種api操做均可以分爲兩種類型——同步和異步。同步操做通常會有返回值,而且會拋出相應的異常。異步操做沒有返回值,也不會拋出異常。此外異步方法參數在同步方法參數的基礎上,會增長Callback和context兩個參數。
如用同步方式建立一個節點的的代碼以下:segmentfault

private void createNodeSync() throws KeeperException, InterruptedException {
    String path = "/poype_node";
    String nodePath = zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    System.out.println(nodePath);
}

這裏的zooKeeper就是經過ZooKeeper構造函數構造的對象,能夠調用它的create()方法建立一個節點。同步版的create()方法一共有4個參數,第一個參數是要建立的節點路徑。第二個參數是建立節點的數據值,參數類型是字節數組。第三個參數是這個節點的訪問權限,咱們這裏指定該節點能夠被任何人訪問(關於節點的訪問權限,我將在下一篇文章進行詳細介紹)。第四個參數是建立節點的類型,在上一篇文章中,我提到過create命令能夠有-s和-e兩個參數,其中-s是順序節點,-e是臨時節點。這裏的CreateMode就是這兩個參數的組合,它有下面四種取值:PERSISTENTPERSISTENT_SEQUENTIALEPHEMERALEPHEMERAL_SEQUENTIAL
異步模式建立一個節點的代碼以下,注意異步模式方法沒有返回值,而且不會拋出任何異常:api

private void createNodeAsync() {
    String path = "/poype_node2";
    zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
        public void processResult(int resultCode, String path, Object ctx, String name) {
            System.out.println(resultCode);
            System.out.println(path);
            System.out.println(ctx);
            System.out.println(name);
        }
    }, "建立");
}

除了同步create方法中的四個參數之外,異步模式的create方法還增長了callback和context兩個參數。StringCallback接口中的processResult方法會在節點建立好以後被調用,它有四個參數。第一個是int類型的resultCode,做爲建立節點的結果碼,當成功建立節點時,resultCode的值爲0。第二個參數是建立節點的路徑。第三個參數是context,當一個StringCallback類型對象做爲多個create方法的參數時,這個參數就頗有用了。第四個參數是建立節點的名字,其實與path參數相同。數組

獲取節點的數據值

同步獲取一個節點數據值的代碼以下:服務器

private void getDataSync() throws KeeperException, InterruptedException {
    Stat stat = new Stat();
    // getData的返回值是該節點的數據值,節點的狀態信息會賦值給stat對象
    byte[] data = zooKeeper.getData("/node_1",true, stat);
    System.out.println(new String(data));
    System.out.println(stat);
}

zooKeeper.getData方法的返回值就是節點中存儲的數據值,它有三個參數,第一個參數是節點的路徑,用於表示要獲取哪一個節點中的數據。第三個參數stat用於存儲節點的狀態信息,在調用getData方法前,會先構造一個空的Stat類型對象做爲參數傳給getData方法,當getData方法調用返回後,節點的狀態信息會被填充到stat對象中。
第二個參數是一個bool類型的watch,這個參數比較重要。當watch爲true時,表示咱們想要監控這個節點的數據變化。當節點的數據發生變化時,咱們就能夠拿到zk服務器推送給咱們的通知。在process方法中會有相似下面的代碼:session

public void process(WatchedEvent watchedEvent) {
    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //與zk服務器處於鏈接狀態
        if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
            createNodeAsync();
        } else if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            // 節點的子節點列表發生變化
        } else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
            // 節點的數據內容發生變化
        }
    }
}

當節點的數據內容發生變化時,咱們就會接收到NodeDataChanged這個事件。值得注意的是,zooKeeper設置的監聽只生效一次,若是在接收到NodeDataChanged事件後還想繼續對該節點的數據內容改變進行監聽,須要在事件處理邏輯中從新調用getData方法並將watch參數設置爲true。
異步獲取一個節點數據值的代碼以下:

private void getDataAsync() {
        zooKeeper.getData("/node_1", true, new AsyncCallback.DataCallback() {
            public void processResult(int resultCode, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println(resultCode);
                System.out.println(path);
                System.out.println(ctx);
                System.out.println(new String(data));
                System.out.println(stat);
            }
        }, "異步獲取節點的數據值");
    }

異步getData方法中的最後一個參數是context,我這裏將其設置爲"異步獲取節點的數據值"。

獲取節點的子節點列表

同步獲取一個節點的子節點列表:

private void getChildrenSync() throws KeeperException, InterruptedException {
    List<String> childrenNode = zooKeeper.getChildren("/",true);
    for(String child : childrenNode) {
        System.out.println(child);
    }
}

方法getChildren的第二個參數一樣爲watch,當節點的子節點列表發生變化時,zk服務器會向咱們推送類型爲NodeChildrenChanged的事件。
異步獲取一個節點的子節點列表:

private void getChildrenAsync() {
    zooKeeper.getChildren("/", true, new AsyncCallback.Children2Callback() {
        public void processResult(int resultCode, String path, Object ctx, List<String> children, Stat stat) {
            System.out.println(resultCode);  
            System.out.println(path);        
            System.out.println(ctx);
            for(String child : children) {
                System.out.println(child);
            }
            System.out.println(stat);
        }
    }, "獲取/下面的子節點");
}

查看一個節點是否存在

同步查看一個節點是否存在:

private void existSync() throws KeeperException, InterruptedException {
    Stat stat = zooKeeper.exists("/poype_node2", true);
    System.out.println(stat);
}

exists方法的watch參數比較特別,若是將其指定爲true,那麼表明你對該節點的建立事件、節點刪除事件和該節點的數據內容改變事件都感興趣,因此會同時響應三種事件類型。請看process方法中對事件的處理:

public void process(WatchedEvent watchedEvent) {
    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
        if(watchedEvent.getType() == Event.EventType.None && null == watchedEvent.getPath()) {
            existAsync();
        } else if(watchedEvent.getType() == Event.EventType.NodeCreated) { 
            System.out.println("監控到了該節點被建立");
            existAsync();
        } else if(watchedEvent.getType() == Event.EventType.NodeDataChanged) {
            System.out.println("監控到了該節點的數據內容發生變化");
            existAsync();
        } else if(watchedEvent.getType() == Event.EventType.NodeDeleted) {
            System.out.println("監控到了該節點被刪除");
            existAsync();
        }
    }
}

異步查看一個節點是否存在:

private void existAsync() {
    zooKeeper.exists("/poype_node2", true, new AsyncCallback.StatCallback() {
        public void processResult(int resultCode, String path, Object ctx, Stat stat) {
            System.out.println(resultCode);
            System.out.println(path);
            System.out.println(ctx);
            System.out.println(stat);
        }
    }, "異步查看一個節點是否存在");
}

修改節點的數據內容

同步修改節點的數據內容:

private void setDataSync() throws KeeperException, InterruptedException {
    Stat stat = zooKeeper.setData("/poype_node2", "poype5211314".getBytes(), 1);
    System.out.println(stat);
}

setData方法有三個參數,前兩個參數分別是節點的路徑和要修改的數據值,最後一個參數是version字段。在上一篇文章我提到過,在節點的狀態信息中包含dataVersion字段,是該節點的數據內容版本號。在調用setData方法修改節點數據內容時,只有當version參數的值與節點狀態信息中的dataVersion值相等時,數據修改才能成功,不然會拋出BadVersion異常。這是爲了防止丟失數據的更新,在ZooKeeper提供的API中,全部的寫操做(例如後面要提到的delete)都有version參數。
異步修改節點的數據內容:

private void setDataAsync() {
    zooKeeper.setData("/poype_node2", "poype5211314".getBytes(), 3, new AsyncCallback.StatCallback() {
        public void processResult(int resultCode, String path, Object ctx, Stat stat) {
            System.out.println(resultCode);
            System.out.println(path);
            System.out.println(ctx);
            System.out.println(stat.getVersion()); // 獲取數據節點版本號
        }
    }, "異步設置一個節點的數據");
}

刪除一個節點

同步方式刪除一個節點:

private void deleteSync() throws KeeperException, InterruptedException {
    zooKeeper.delete("/node_1", 12);
}

delete方法的第二個參數也是version,含義與setData方法中的version參數相似。
異步方式刪除一個節點:

private void deleteAsync() {
    zooKeeper.delete("/poype_node", 3, new AsyncCallback.VoidCallback() {
        public void processResult(int resultCode, String path, Object ctx) {
            System.out.println(resultCode);
            System.out.println(path);
            System.out.println(ctx);
        }
    }, "異步刪除一個節點");
}

小結

本文介紹了ZooKeeper Java API的基本使用方式,在下一篇文章中,我將詳細介紹ZooKeeper中節點權限的概念和使用方法。

相關文章
相關標籤/搜索