ZooKeeper系列(四)—— Java 客戶端 Apache Curator

1、基本依賴

Curator 是 Netflix 公司開源的一個 Zookeeper 客戶端,目前由 Apache 進行維護。與 Zookeeper 原生客戶端相比,Curator 的抽象層次更高,功能也更加豐富,是目前 Zookeeper 使用範圍最廣的 Java 客戶端。本篇文章主要講解其基本使用,項目採用 Maven 構建,以單元測試的方法進行講解,相關依賴以下:java

<dependencies>
    <!--Curator 相關依賴-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.13</version>
    </dependency>
    <!--單元測試相關依賴-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

完整源碼見本倉庫: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curatornode

2、客戶端相關操做

2.1 建立客戶端實例

這裏使用 @Before 在單元測試執行前建立客戶端實例,並使用 @After 在單元測試後關閉客戶端鏈接。git

public class BasicOperation {

    private CuratorFramework client = null;
    private static final String zkServerPath = "192.168.0.226:2181";
    private static final String nodePath = "/hadoop/yarn";

    @Before
    public void prepare() {
        // 重試策略
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
        .connectString(zkServerPath)
        .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
        .namespace("workspace").build();  //指定命名空間後,client 的全部路徑操做都會以/workspace 開頭
        client.start();
    }

    @After
    public void destroy() {
        if (client != null) {
            client.close();
        }
    }
}

2.2 重試策略

在鏈接 Zookeeper 時,Curator 提供了多種重試策略以知足各類需求,全部重試策略均繼承自 RetryPolicy 接口,以下圖:github

這些重試策略類主要分爲如下兩類:apache

  • RetryForever :表明一直重試,直到鏈接成功;
  • SleepingRetry : 基於必定間隔時間的重試。這裏以其子類 ExponentialBackoffRetry 爲例說明,其構造器以下:
/**
 * @param baseSleepTimeMs 重試之間等待的初始時間
 * @param maxRetries 最大重試次數
 * @param maxSleepMs 每次重試間隔的最長睡眠時間(毫秒)
 */
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

2.3 判斷服務狀態

@Test
public void getStatus() {
    CuratorFrameworkState state = client.getState();
    System.out.println("服務是否已經啓動:" + (state == CuratorFrameworkState.STARTED));
}

3、節點增刪改查

3.1 建立節點

@Test
public void createNodes() throws Exception {
    byte[] data = "abc".getBytes();
    client.create().creatingParentsIfNeeded()
            .withMode(CreateMode.PERSISTENT)      //節點類型
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
            .forPath(nodePath, data);
}

建立時能夠指定節點類型,這裏的節點類型和 Zookeeper 原生的一致,所有類型定義在枚舉類 CreateMode 中:緩存

public enum CreateMode {
    // 永久節點
    PERSISTENT (0, false, false),
    //永久有序節點
    PERSISTENT_SEQUENTIAL (2, false, true),
    // 臨時節點
    EPHEMERAL (1, true, false),
    // 臨時有序節點
    EPHEMERAL_SEQUENTIAL (3, true, true);
    ....
}

2.2 獲取節點信息

@Test
public void getNode() throws Exception {
    Stat stat = new Stat();
    byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);
    System.out.println("節點數據:" + new String(data));
    System.out.println("節點信息:" + stat.toString());
}

如上所示,節點信息被封裝在 Stat 類中,其主要屬性以下:session

public class Stat implements Record {
    private long czxid;
    private long mzxid;
    private long ctime;
    private long mtime;
    private int version;
    private int cversion;
    private int aversion;
    private long ephemeralOwner;
    private int dataLength;
    private int numChildren;
    private long pzxid;
    ...
}

每一個屬性的含義以下:異步

狀態屬性 說明
czxid 數據節點建立時的事務 ID
ctime 數據節點建立時的時間
mzxid 數據節點最後一次更新時的事務 ID
mtime 數據節點最後一次更新時的時間
pzxid 數據節點的子節點最後一次被修改時的事務 ID
cversion 子節點的更改次數
version 節點數據的更改次數
aversion 節點的 ACL 的更改次數
ephemeralOwner 若是節點是臨時節點,則表示建立該節點的會話的 SessionID;若是節點是持久節點,則該屬性值爲 0
dataLength 數據內容的長度
numChildren 數據節點當前的子節點個數

2.3 獲取子節點列表

@Test
public void getChildrenNodes() throws Exception {
    List<String> childNodes = client.getChildren().forPath("/hadoop");
    for (String s : childNodes) {
        System.out.println(s);
    }
}

2.4 更新節點

更新時能夠傳入版本號也能夠不傳入,若是傳入則相似於樂觀鎖機制,只有在版本號正確的時候纔會被更新。oop

@Test
public void updateNode() throws Exception {
    byte[] newData = "defg".getBytes();
    client.setData().withVersion(0)     // 傳入版本號,若是版本號錯誤則拒絕更新操做,並拋出 BadVersion 異常
            .forPath(nodePath, newData);
}

2.5 刪除節點

@Test
public void deleteNodes() throws Exception {
    client.delete()
            .guaranteed()                // 若是刪除失敗,那麼在會繼續執行,直到成功
            .deletingChildrenIfNeeded()  // 若是有子節點,則遞歸刪除
            .withVersion(0)              // 傳入版本號,若是版本號錯誤則拒絕刪除操做,並拋出 BadVersion 異常
            .forPath(nodePath);
}

2.6 判斷節點是否存在

@Test
public void existNode() throws Exception {
    // 若是節點存在則返回其狀態信息若是不存在則爲 null
    Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");
    System.out.println("節點是否存在:" + !(stat == null));
}

3、監聽事件

3.1 建立一次性監聽

和 Zookeeper 原生監聽同樣,使用 usingWatcher 註冊的監聽是一次性的,即監聽只會觸發一次,觸發後就銷燬。示例以下:單元測試

@Test
public void DisposableWatch() throws Exception {
    client.getData().usingWatcher(new CuratorWatcher() {
        public void process(WatchedEvent event) {
            System.out.println("節點" + event.getPath() + "發生了事件:" + event.getType());
        }
    }).forPath(nodePath);
    Thread.sleep(1000 * 1000);  //休眠以觀察測試效果
}

3.2 建立永久監聽

Curator 還提供了建立永久監聽的 API,其使用方式以下:

@Test
public void permanentWatch() throws Exception {
    // 使用 NodeCache 包裝節點,對其註冊的監聽做用於節點,且是永久性的
    NodeCache nodeCache = new NodeCache(client, nodePath);
    // 一般設置爲 true, 表明建立 nodeCache 時,就去獲取對應節點的值並緩存
    nodeCache.start(true);
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        public void nodeChanged() {
            ChildData currentData = nodeCache.getCurrentData();
            if (currentData != null) {
                System.out.println("節點路徑:" + currentData.getPath() +
                        "數據:" + new String(currentData.getData()));
            }
        }
    });
    Thread.sleep(1000 * 1000);  //休眠以觀察測試效果
}

3.3 監聽子節點

這裏以監聽 /hadoop 下全部子節點爲例,實現方式以下:

@Test
public void permanentChildrenNodesWatch() throws Exception {

    // 第三個參數表明除了節點狀態外,是否還緩存節點內容
    PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);
    /*
         * StartMode 表明初始化方式:
         *    NORMAL: 異步初始化
         *    BUILD_INITIAL_CACHE: 同步初始化
         *    POST_INITIALIZED_EVENT: 異步並通知,初始化以後會觸發 INITIALIZED 事件
         */
    childrenCache.start(StartMode.POST_INITIALIZED_EVENT);

    List<ChildData> childDataList = childrenCache.getCurrentData();
    System.out.println("當前數據節點的子節點列表:");
    childDataList.forEach(x -> System.out.println(x.getPath()));

    childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
            switch (event.getType()) {
                case INITIALIZED:
                System.out.println("childrenCache 初始化完成");
                break;
                case CHILD_ADDED:
                // 須要注意的是: 即便是以前已經存在的子節點,也會觸發該監聽,由於會把該子節點加入 childrenCache 緩存中
                System.out.println("增長子節點:" + event.getData().getPath());
                break;
                case CHILD_REMOVED:
                System.out.println("刪除子節點:" + event.getData().getPath());
                break;
                case CHILD_UPDATED:
                System.out.println("被修改的子節點的路徑:" + event.getData().getPath());
                System.out.println("修改後的數據:" + new String(event.getData().getData()));
                break;
            }
        }
    });
    Thread.sleep(1000 * 1000); //休眠以觀察測試效果
}

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索