zookeeper之curator

前面講了zookeeper的API使用zookeeper之ZkClient的使用,如今看看看看curator的使用。node

maven依賴

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>

建立會話

before,建立會話用,後面不在貼這部分代碼apache

CuratorFramework client;

@Before
public void before() {
    client = CuratorConnect.getCuratorClient2();
}

CuratorConnect,這邊給出2個方式,一個是直接new一個,一個是Fluent風格的,須要調用start方法來開啓會話。segmentfault

public class CuratorConnect {
    static final String CONNECT_STRING = "172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181";
    static ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 5);

    public static CuratorFramework getCuratorClient() {
        CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STRING, 5000, 5000, retryPolicy);
        client.start();
        return client;
    }

    public static CuratorFramework getCuratorClient2() {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(CONNECT_STRING)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        return client;
    }
}

跟以前不同的是,這邊須要設置重試策略,其餘參數雷同。
重試策略RetryPolicy以下:
image.png緩存

  • BoundedExponentialBackoffRetry:繼承ExponentialBackoffRetry,可是有設置最大sleep時間
  • ExponentialBackoffRetry:每次重試後,休眠時間會愈來愈長,直到重試次數減爲0
  • RetryForever:一直重試
  • RetryNTimes:重試N次
  • RetryOneTime:僅重試一次
  • RetryUntilElapsed:一直重試,直到規定的時間

RetryPolicy的allowRetry有三個參數,分別是已重試的次數、從第一次重試開始到當前話費的時間、用於sleep的時間。網絡

增刪改查

建立節點

測試代碼:session

@Test
public void testCreate() throws Exception {
    // 默認持久性節點
    client.create().forPath("/node1");
    client.create().forPath("/node2","data2".getBytes());

    // 須要指定類型,用withMode建立臨時節點
    client.create().withMode(CreateMode.EPHEMERAL).forPath("/node3");
    // 建立多層級用creatingParentsIfNeeded
    client.create().creatingParentsIfNeeded().forPath("/node4/node4_1");
    //方便查看臨時節點
    TimeUnit.SECONDS.sleep(100);
}

客戶端查詢結果以下:異步

[zk: localhost:2181(CONNECTED) 75] ls /
[node1, node2, node3, node4, zookeeper]
[zk: localhost:2181(CONNECTED) 76] get /node2
data2
[zk: localhost:2181(CONNECTED) 77] ls /node4
[node4_1]

刪除節點

測試代碼:maven

@Test
public void testDelete() throws Exception {
    // 刪除節點
    client.delete().forPath("/node1");
    // guaranteed保證能夠刪除節點,即使在網絡可能波動的狀況下
    client.delete().guaranteed().forPath("/node5");
    // 指定版本號刪除
    client.delete().withVersion(-1).forPath("/node2");
    // 刪除遞歸子節點
    client.delete().deletingChildrenIfNeeded().forPath("/node4");
}

客戶端查詢結果以下:測試

[zk: localhost:2181(CONNECTED) 87] ls /
[node1, node2, node4, node5, zookeeper]
[zk: localhost:2181(CONNECTED) 88] ls /
[zookeeper]

節點列表

測試代碼:ui

@Test
public void testGetChildren() throws Exception {
    List<String> children = client.getChildren().forPath("/node4");
    System.out.println(children);
}

運行結果以下:
image.png

獲取數據

測試代碼:

@Test
public void testGetData() throws Exception {
    byte[] bytes = client.getData().forPath("/node2");
    System.out.println(new String(bytes));
}

運行結果以下:
image.png

更新數據

測試代碼:

@Test
public void testWriteData() throws Exception {
    client.setData().forPath("/node2", "new_data".getBytes());
    byte[] bytes = client.getData().forPath("/node2");
    System.out.println(new String(bytes));
}

運行結果以下:
image.png

節點是否存在

測試代碼:

@Test
public void testExists() throws Exception {
    Stat stat = client.checkExists().forPath("/node");
    System.out.println(stat);
    // 不存在父節點會建立,但不會建立當前節點
    client.checkExists().creatingParentsIfNeeded().forPath("/node/node_1");
    stat = client.checkExists().forPath("/node");
    System.out.println(stat);
}

運行結果以下:
image.png

異步接口

在curator中,BackgroundCallback接口,用來異步接口調用後,處理服務端返回的接口。
接口中的processResult方法有兩個參數,一個是客戶端實例CuratorFramework,一個是服務端事件CuratorEvent。
在MyBackgroundCallback中,打印了事件類型和響應碼,響應碼和AsyncCallback的processResult方法的rc是同樣的,在原生API的建立方法有提過

public class MyBackgroundCallback implements BackgroundCallback {
    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
        System.out.println(curatorEvent.getType() + "-" + curatorEvent.getResultCode());
    }
}

測試代碼:

@Test
public void testASyn() throws Exception {
    BackgroundCallback callback = new MyBackgroundCallback();
    client.create().inBackground(callback).forPath("/node");
    TimeUnit.SECONDS.sleep(3);
    client.getData().inBackground(callback).forPath("/node");
    TimeUnit.SECONDS.sleep(3);
    client.setData().inBackground(callback).forPath("/node", "new_data".getBytes());
    TimeUnit.SECONDS.sleep(3);
    client.getData().inBackground(callback).forPath("/node");
    TimeUnit.SECONDS.sleep(3);
    client.delete().inBackground(callback).forPath("/node");
    TimeUnit.SECONDS.sleep(3);
}

運行結果以下:
image.png
在inBackground方法中,除了傳遞BackgroundCallback,還能夠傳線程池對象,這樣業務邏輯就會該線程池處理,若是沒有傳,就使用默認的EventThread處理,這邊就不作演示了。

Listener

POM文件須要導入:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

在curator中是經過NodeCache來監聽節點的變化的。

NodeCache

測試代碼:

@Test
public void testNodeCache4Listener() throws Exception {
    // 第一個參數,是監聽的客戶端
    // 第二個參數,是監聽的節點
    final NodeCache nodeCache = new NodeCache(client, "/node1");
    // 啓動的時候從zookeeper讀取對應的數據
    nodeCache.start(true);
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        public void nodeChanged() throws Exception {
            System.out.println(nodeCache.getPath() + ":" + new String(nodeCache.getCurrentData().getData()));
        }
    });
    client.create().forPath("/node1","data".getBytes());
    TimeUnit.MILLISECONDS.sleep(200);
    client.setData().forPath("/node1", "node1_data".getBytes());
    TimeUnit.MILLISECONDS.sleep(200);
    client.delete().forPath("/node1");
    TimeUnit.MILLISECONDS.sleep(200);
}

測試結果以下:
image.png
節點的建立、刪除,會觸發監聽。

PathChildrenCache

測試代碼

@Test
public void testPathChildrenCache4Listener() throws Exception {
    // 第一個參數,是監聽的客戶端
    // 第二個參數,是監聽的節點
    // 第三個參數,是否緩存節點內容
    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node1", true);
    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            System.out.println(pathChildrenCacheEvent);
        }
    });
    System.out.println("--------------1--------------");
    client.create().forPath("/node1");
    TimeUnit.MILLISECONDS.sleep(200);

    System.out.println("--------------2--------------");
    client.create().forPath("/node1/node1_1");
    TimeUnit.MILLISECONDS.sleep(200);

    System.out.println("--------------3--------------");
    client.setData().forPath("/node1/node1_1", "new_data".getBytes());
    TimeUnit.MILLISECONDS.sleep(200);

    System.out.println("--------------4--------------");
    client.delete().forPath("/node1/node1_1");
    TimeUnit.MILLISECONDS.sleep(200);

    System.out.println("--------------5--------------");
    client.delete().forPath("/node1");
    TimeUnit.MILLISECONDS.sleep(2000);
}

運行結果以下:
image.png子節點的新增、修改、刪除,都會獲得監聽,當前節點的建立也會獲得監聽。

相關文章
相關標籤/搜索