Java客戶端API

添加依賴

<dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.10</version>
    </dependency>

建立會話

  • 構造器方法
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
  • 參數詳解
參數名 描述
connectString 形如ip:port,ip:port/path1/path2,表示zk服務器列表,帶path路徑的表示基於此path操做
sessionTimeout 客戶端會話超時時間,毫秒值,在sessionTimeout時間內沒有進行有效的心跳檢測,則認爲會話超時
watcher 默認監聽器,能夠不設置,傳null便可
canBeReadOnly boolean值,true表示只讀,默認爲false
sessionId和sessionPasswd 會話id和會話的祕鑰,當一個會話建立後,會自動生成對應的id和祕鑰,主要用來恢復會話
  • 例子
package com.banary.base;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZookeeperFactory {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        createZk2();
    }

    public static void createZk1() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
        System.out.println("zk實例建立成功");
    }

    public static void createZk2() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();

        long sessionId = zooKeeper.getSessionId();
        byte[] sessionPasswd = zooKeeper.getSessionPasswd();

        //使用錯的sessionId和sessionPasswd
        zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher(), 1l, "test".getBytes());
        //使用對的sessionId和sessionPasswd
        zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher(), sessionId, sessionPasswd);
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DemoWatcher implements Watcher{
        public void process(WatchedEvent watchedEvent) {
            System.out.println("收到zk event:" + watchedEvent);
            countDownLatch.countDown();
        }
    }
}
  • 注意
    建立zk對象的方法是異步的,此處採用CountDownLatch實現同步

建立節點

  • 方法(不支持遞歸建立,若是該節點已存在,會拋出異常NodeExistsException)
#同步方法
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException
#異步方法
public void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
  • 參數詳解
參數名 描述
path 要建立的數據節點的路徑
data 字節數組,該節點的初始內容,須要本身序列化成字節數組
acl 節點的安全策略,詳見ZooDefs.Ids
createMode 枚舉類型,相見CreateMode
cb 異步建立時的回調函數,須要開發者本身實現對應的AsyncCallback子接口,如StringCallback,重寫void processResult(int var1, String var2, Object var3, String var4)方法,當zk服務器建立完節點,客戶端自動調用該方法
ctx 回調函數上下文,和回調函數一塊兒使用
  • 例子
package com.banary.base;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

public class CreateDemo {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        asyncCreate();
    }

    /**
     * 同步建立
     * @throws Exception
     */
    public static void syncCreate() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        //阻塞,直到zk連接成功
        countDownLatch.await();
        //建立臨時節點,沒有權限限制
        String path1 = zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("臨時節點1建立成功:"+ path1);
        //建立臨時順序節點,沒有權限限制
        String path2 = zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("臨時節點2建立成功:" + path2);
    }

    /**
     * 異步建立
     * @throws Exception
     */
    public static void asyncCreate() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        //阻塞,直到zk連接成功
        countDownLatch.await();
        //建立臨時節點,沒有權限限制
        zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new DemoCallback(), "path1");
        //建立臨時順序節點,沒有權限限制
        zooKeeper.create("/zk-test-ephemeral-", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                new DemoCallback(), "path2" );
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DemoWatcher implements Watcher{
        public void process(WatchedEvent watchedEvent) {
            if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                countDownLatch.countDown();
            }
        }
    }

    private static class DemoCallback implements AsyncCallback.StringCallback {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("建立節點:[" + rc + ", " + path + ", " + ctx.toString() +
            ", 真實path:" + name + "]");
        }
    }
}
  • 異步方法回調接口參數說明
參數名 描述
rc 服務端響應碼:0 建立成功;-4 客戶端和服務端的連接斷開;-110 節點已存在;-112 會話過時
path 對應create方法中節點路徑參數值
ctx 接口調用時傳如API的ctx,即對應異步create方法中的ctx參數
name 建立成功後,對應節點的真是路徑
  • 同步方法和異步方法比較
  1. 同步方法會阻塞線程,異步方法不會
  2. 同步會拋出異常,異步方法不會,異常信息是經過狀態碼的方式傳遞到回調函數中

刪除

  • 方法(刪除時不支持遞歸刪除,也就是說某個節點若是存在子節點,則不能刪除)
#同步刪除
public void delete(String path, int version) throws InterruptedException, KeeperException
#異步刪除
public void delete(String path, int version, VoidCallback cb, Object ctx)
  • 參數說明
參數名 描述
path 要刪除的節點對應的path
version 數據節點的版本號,樂觀鎖
cb 異步刪除時的回調函數
ctx 回調函數的參數
  • 例子
package com.banary.base;

import org.apache.zookeeper.*;

import java.util.concurrent.CountDownLatch;

public class DeleteDemo {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        asyncDelete();
    }

    public static void syncDelete() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        String path = zooKeeper.create("/deleteDemo", "deleteDemo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Thread.sleep(10000);
        zooKeeper.delete(path, 0);
    }

    public static void asyncDelete() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();

        String path = zooKeeper.create("/deleteDemo", "deleteDemo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Thread.sleep(5000);
        zooKeeper.delete(path, 0, new DemoCallback(), "asyncDelete");
        Thread.sleep(5000);
    }

    private static class DemoWatcher implements Watcher{
        @Override
        public void process(WatchedEvent watchedEvent) {
            System.out.println("建立zk會話:" + watchedEvent.getState());
            if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                System.out.println("zk會話建立成功");
                countDownLatch.countDown();
            }
        }
    }

    public static class DemoCallback implements AsyncCallback.VoidCallback{
        @Override
        public void processResult(int rc, String path, Object ctx) {
            System.out.println("刪除節點成功:[" + rc + ", " + path + ", " + ctx.toString() + "]");
        }
    }
}

查詢節點數據內容getData

  • 方法
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
  • 參數
參數名 描述
path 要獲取數據的節點的路徑
watcher 監聽器
watch true表示使用默認的監聽器,即建立zk對象時註冊的監聽器
stat 數據節點的狀態信息,傳入一箇舊的stat變量,服務器響應後會用的新的stat變量替換
cb 異步方法的回調函數
ctx 回調函數上下文參數
  • 例子
package com.banary.base;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class GetDataDemo {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        asyncGetData();
    }

    /**
     * 同步獲取數據
     * @throws Exception
     */
    public static void syncGetData() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        String path = zooKeeper.create("/syncGetData", "syncGetData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(new String(zooKeeper.getData(path,true, new Stat())));
    }

    /**
     * 異步獲取數據
     */
    public static void asyncGetData() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        String path = zooKeeper.create("/asyncGetData", "asyncGetData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(new String(zooKeeper.getData(path,true, new Stat())));
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DemoWatcher implements Watcher{
        @Override
        public void process(WatchedEvent watchedEvent) {
            if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                countDownLatch.countDown();
            }
        }
    }

    private static class DemoCallback implements AsyncCallback.DataCallback{
        @Override
        public void processResult(int rc, String path, Object ctx, byte[] bytes, Stat stat) {
            System.out.println("異步獲取的數據內容爲:" + new String(bytes));
        }
    }
}

查詢子節點getChildren

  • 方法
public List<String> getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
public void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
public List<String> getChildren(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException 
public List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getChildren(String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
  • 參數
參數名 描述
path 要查詢的節點
watcher 註冊一個監聽器
watch 使用默認的監聽器
cb 異步方法的回調函數
ctx 回調函數上下文參數
stat 闖入一箇舊的Stat對象,查詢後,會被服務端響應的新Stat對象替換

修改

  • 方法
#同步方法
public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException
#異步方法
public void setData(String path, byte[] data, int version, StatCallback cb, Object ctx)
  • 參數
參數名 描述
path 要修改的數據節點的路徑
data 修改後的數據
version 數據修改時基於的版本號,即樂觀鎖
cb 異步方法的回調函數
ctx 回調函數上下文參數
  • 例子
package com.banary.base;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class UpdateDemo {

    public static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        asyncUpdate();
    }

    /**
     * 同步更新
     */
    public static void syncUpdate() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        String path = zooKeeper.create("/syncUpdate", "syncUpdate".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Stat stat = new Stat();
        System.out.println(new String(zooKeeper.getData(path, true, stat)));
        System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
        //-1 表示不加樂觀鎖
        zooKeeper.setData(path, "32123".getBytes(), -1);
        System.out.println(new String(zooKeeper.getData(path, true, stat)));
        System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
    }

    /**
     * 異步更新
     * @throws Exception
     */
    public static void asyncUpdate() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();

        String path = zooKeeper.create("/asyncUpdate", "asyncUpdate".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Stat stat = new Stat();
        System.out.println(new String(zooKeeper.getData(path, true, stat)));
        System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());

        zooKeeper.setData(path, "dsadsa".getBytes(), -1, new DemoCallback(), "回調");
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DemoWatcher implements Watcher{
        @Override
        public void process(WatchedEvent watchedEvent) {
            if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                countDownLatch.countDown();
            }else if(watchedEvent.getType() == Event.EventType.NodeDataChanged){
                System.out.println("節點數據內容發生了變化");
            }
        }
    }

    private static class DemoCallback implements AsyncCallback.StatCallback{
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            System.out.println("修改節點:[" + rc + ", " + path + ", " + ctx.toString() +
                    ", stat:" + stat.toString() + "]");
        }
    }
}

判斷節點是否存在

  • 方法
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException
public void exists(String path, Watcher watcher, StatCallback cb, Object ctx)
public void exists(String path, boolean watch, StatCallback cb, Object ctx)
  • 參數
參數名 描述
path 要判斷的節點路徑
watcher 註冊一個監聽器,用來監聽節點被建立,刪除,更新
watch 是否使用默認的監聽器
cb 異步方法的回調函數
ctx 回調函數上下文參數
  • 例子
package com.banary.base;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class ExistsNodeDemo {

    public static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        asyncExists();
    }

    /**
     * 同步
     */
    public static void syncExists() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        //建立
        String path = zooKeeper.create("/syncExists", "syncExists".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Stat stat = zooKeeper.exists(path, true);
        System.out.println(stat.toString());
        //更新
        stat = zooKeeper.setData(path, "3213".getBytes(), -1);
        System.out.println(stat.toString());
        //刪除
        zooKeeper.delete(path, -1);
        Thread.sleep(Integer.MAX_VALUE);
    }

    /**
     * 異步
     */
    public static void asyncExists() throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        //建立
        String path = zooKeeper.create("/asyncExists", "asyncExists".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zooKeeper.exists(path, true, new DemoCallback(), "asyncExists");
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DemoWatcher implements Watcher {
        @Override
        public void process(WatchedEvent watchedEvent) {
            if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                countDownLatch.countDown();
            }else if(Event.EventType.NodeCreated == watchedEvent.getType()){
                System.out.println("建立節點");
            }else if(Event.EventType.NodeDataChanged == watchedEvent.getType()){
                System.out.println("修改節點");
            }else if(Event.EventType.NodeDeleted == watchedEvent.getType()){
                System.out.println("刪除節點");
            }
        }
    }

    public static class DemoCallback implements AsyncCallback.StatCallback{
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            System.out.println("[" + rc + ", " + path + ", " + ctx.toString() +
                    ", stat:" + stat.toString() + "]");
        }
    }
}

權限控制

  • 方法
#zk會話對象的方法
public void addAuthInfo(String scheme, byte[] auth)
  • 參數
參數名 描述
scheme 權限控制模式,枚舉值:world,auth,digest,ip和super
auth 權限信息
  • 例子
package com.banary.base;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class AuthDemo {

    private static CountDownLatch countDownLatch = null;

    public static void main(String[] args) throws Exception{
        authCreate();
    }

    public static void authCreate() throws Exception{
        countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        zooKeeper.addAuthInfo("digest", "auth".getBytes());

        String path = zooKeeper.create("/auth", "auth".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);

        countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper2 = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        zooKeeper2.addAuthInfo("digest", "auth".getBytes());
        System.out.println(new String(zooKeeper2.getData(path, true, new Stat())));

        countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper1 = new ZooKeeper("localhost:2181", 5000, new DemoWatcher());
        countDownLatch.await();
        System.out.println(new String(zooKeeper1.getData(path, true, new Stat())));

    }

    private static class DemoWatcher implements Watcher {
        public void process(WatchedEvent watchedEvent) {
            if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                countDownLatch.countDown();
            }
        }
    }
}

總結

  1. 除了建立會話是異步的,其餘操做都存在同步和異步方法,同步會拋出異常
  2. 建立會話、查詢(包括查詢該節點的數據和子節點和判斷節點存在)均可以註冊監聽器,也能夠使用會話的默認監聽器
相關文章
相關標籤/搜索