Curator 是 Netflix 公司開源的一個 Zookeeper 客戶端,目前由 Apache 進行維護。與 Zookeeper 原生客戶端相比,Curator 的抽象層次更高,功能也更加豐富,是目前 Zookeeper 使用範圍最廣的 Java 客戶端。本篇文章主要講解其基本使用,項目採用 Maven 構建,以單元測試的方法進行講解,相關依賴以下:java
<dependencies> <!--Curator 相關依賴--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency> <!--單元測試相關依賴--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
完整源碼見本倉庫: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curatornode
這裏使用 @Before
在單元測試執行前建立客戶端實例,並使用 @After
在單元測試後關閉客戶端鏈接。git
public class BasicOperation { private CuratorFramework client = null; private static final String zkServerPath = "192.168.0.226:2181"; private static final String nodePath = "/hadoop/yarn"; @Before public void prepare() { // 重試策略 RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder() .connectString(zkServerPath) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace").build(); //指定命名空間後,client 的全部路徑操做都會以/workspace 開頭 client.start(); } @After public void destroy() { if (client != null) { client.close(); } } }
在鏈接 Zookeeper 時,Curator 提供了多種重試策略以知足各類需求,全部重試策略均繼承自 RetryPolicy
接口,以下圖:github
這些重試策略類主要分爲如下兩類:apache
ExponentialBackoffRetry
爲例說明,其構造器以下:/** * @param baseSleepTimeMs 重試之間等待的初始時間 * @param maxRetries 最大重試次數 * @param maxSleepMs 每次重試間隔的最長睡眠時間(毫秒) */ ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
@Test public void getStatus() { CuratorFrameworkState state = client.getState(); System.out.println("服務是否已經啓動:" + (state == CuratorFrameworkState.STARTED)); }
@Test public void createNodes() throws Exception { byte[] data = "abc".getBytes(); client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) //節點類型 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(nodePath, data); }
建立時能夠指定節點類型,這裏的節點類型和 Zookeeper 原生的一致,所有類型定義在枚舉類 CreateMode
中:緩存
public enum CreateMode { // 永久節點 PERSISTENT (0, false, false), //永久有序節點 PERSISTENT_SEQUENTIAL (2, false, true), // 臨時節點 EPHEMERAL (1, true, false), // 臨時有序節點 EPHEMERAL_SEQUENTIAL (3, true, true); .... }
@Test public void getNode() throws Exception { Stat stat = new Stat(); byte[] data = client.getData().storingStatIn(stat).forPath(nodePath); System.out.println("節點數據:" + new String(data)); System.out.println("節點信息:" + stat.toString()); }
如上所示,節點信息被封裝在 Stat
類中,其主要屬性以下:session
public class Stat implements Record { private long czxid; private long mzxid; private long ctime; private long mtime; private int version; private int cversion; private int aversion; private long ephemeralOwner; private int dataLength; private int numChildren; private long pzxid; ... }
每一個屬性的含義以下:異步
狀態屬性 | 說明 |
---|---|
czxid | 數據節點建立時的事務 ID |
ctime | 數據節點建立時的時間 |
mzxid | 數據節點最後一次更新時的事務 ID |
mtime | 數據節點最後一次更新時的時間 |
pzxid | 數據節點的子節點最後一次被修改時的事務 ID |
cversion | 子節點的更改次數 |
version | 節點數據的更改次數 |
aversion | 節點的 ACL 的更改次數 |
ephemeralOwner | 若是節點是臨時節點,則表示建立該節點的會話的 SessionID;若是節點是持久節點,則該屬性值爲 0 |
dataLength | 數據內容的長度 |
numChildren | 數據節點當前的子節點個數 |
@Test public void getChildrenNodes() throws Exception { List<String> childNodes = client.getChildren().forPath("/hadoop"); for (String s : childNodes) { System.out.println(s); } }
更新時能夠傳入版本號也能夠不傳入,若是傳入則相似於樂觀鎖機制,只有在版本號正確的時候纔會被更新。oop
@Test public void updateNode() throws Exception { byte[] newData = "defg".getBytes(); client.setData().withVersion(0) // 傳入版本號,若是版本號錯誤則拒絕更新操做,並拋出 BadVersion 異常 .forPath(nodePath, newData); }
@Test public void deleteNodes() throws Exception { client.delete() .guaranteed() // 若是刪除失敗,那麼在會繼續執行,直到成功 .deletingChildrenIfNeeded() // 若是有子節點,則遞歸刪除 .withVersion(0) // 傳入版本號,若是版本號錯誤則拒絕刪除操做,並拋出 BadVersion 異常 .forPath(nodePath); }
@Test public void existNode() throws Exception { // 若是節點存在則返回其狀態信息若是不存在則爲 null Stat stat = client.checkExists().forPath(nodePath + "aa/bb/cc"); System.out.println("節點是否存在:" + !(stat == null)); }
和 Zookeeper 原生監聽同樣,使用 usingWatcher
註冊的監聽是一次性的,即監聽只會觸發一次,觸發後就銷燬。示例以下:單元測試
@Test public void DisposableWatch() throws Exception { client.getData().usingWatcher(new CuratorWatcher() { public void process(WatchedEvent event) { System.out.println("節點" + event.getPath() + "發生了事件:" + event.getType()); } }).forPath(nodePath); Thread.sleep(1000 * 1000); //休眠以觀察測試效果 }
Curator 還提供了建立永久監聽的 API,其使用方式以下:
@Test public void permanentWatch() throws Exception { // 使用 NodeCache 包裝節點,對其註冊的監聽做用於節點,且是永久性的 NodeCache nodeCache = new NodeCache(client, nodePath); // 一般設置爲 true, 表明建立 nodeCache 時,就去獲取對應節點的值並緩存 nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() { ChildData currentData = nodeCache.getCurrentData(); if (currentData != null) { System.out.println("節點路徑:" + currentData.getPath() + "數據:" + new String(currentData.getData())); } } }); Thread.sleep(1000 * 1000); //休眠以觀察測試效果 }
這裏以監聽 /hadoop
下全部子節點爲例,實現方式以下:
@Test public void permanentChildrenNodesWatch() throws Exception { // 第三個參數表明除了節點狀態外,是否還緩存節點內容 PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true); /* * StartMode 表明初始化方式: * NORMAL: 異步初始化 * BUILD_INITIAL_CACHE: 同步初始化 * POST_INITIALIZED_EVENT: 異步並通知,初始化以後會觸發 INITIALIZED 事件 */ childrenCache.start(StartMode.POST_INITIALIZED_EVENT); List<ChildData> childDataList = childrenCache.getCurrentData(); System.out.println("當前數據節點的子節點列表:"); childDataList.forEach(x -> System.out.println(x.getPath())); childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { switch (event.getType()) { case INITIALIZED: System.out.println("childrenCache 初始化完成"); break; case CHILD_ADDED: // 須要注意的是: 即便是以前已經存在的子節點,也會觸發該監聽,由於會把該子節點加入 childrenCache 緩存中 System.out.println("增長子節點:" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("刪除子節點:" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("被修改的子節點的路徑:" + event.getData().getPath()); System.out.println("修改後的數據:" + new String(event.getData().getData())); break; } } }); Thread.sleep(1000 * 1000); //休眠以觀察測試效果 }
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南