第三章 zookeeper客戶端-curator詳解

1、簡介

  Curator是Netflix公司開源的一套zookeeper客戶端框架。java

  Curator包含了幾個包:
   curator-framework:對zookeeper的底層api的一些封裝
   curator-client:提供一些客戶端的操做,例如重試策略等
   curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等。
  咱們須要根據ZK的版原本選擇對應的curator版本,不然會出現兼容性問題

2、環境

  jdk 1.8
  zk 3.4.12
  curator-recipes 2.8.0

3、經常使用api介紹

  1)客戶端建立

  CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, 5000,3000,new RetryNTimes(10, 5000));
  一、ZK_ADDRESS 鏈接zk的地址 格式host1:port1,host2:port2,。。。
  二、sessionTimeoutMs 會話超時時間,單位是毫秒,可不填測此參數,默認值爲60000ms
  三、connectionTimeoutMs
  四、retryPolicy 重試策略,內建有四種重試策略,也能夠自行實現RetryPolicy接口。

  2)客戶端建立

  client.start()

  3)建立數據節點

   四種節點
   PERSISTENT:持久化
   PERSISTENT_SEQUENTIAL:持久化而且帶序列號
   EPHEMERAL:臨時
   EPHEMERAL_SEQUENTIAL:臨時而且帶序列號node

  一、默認持久化節點
  client.create().forPath("/data1");
  二、建立持久化節點並賦值
  client.create().forPath("/data2", "this is data2".getBytes());
  三、建立臨時空節點
  client.create().withMode(CreateMode.EPHEMERAL).forPath("/data3");
  四、建立臨時節點並賦值
  client.create().withMode(CreateMode.EPHEMERAL).forPath("/data4", "this is data4".getBytes());
  五、建立臨時有序節點並賦值並遞歸建立父節點
  client.create().creatingParentsIfNeeded().withMode(CreateMode.EPH_SEQ).forPath("/patent/data", "children".getBytes());git

   4)刪除數據節點

  流式風格,拼接順序能夠調整。
  一、只能刪除葉子節點,不然拋出異常
  client.delete().forPath("/data1");
  二、刪除一個節點並遞歸刪除其全部的子節點
  client.delete().deletingChildrenIfNeeded().forPath("/patent/data");
  三、保證強制刪除一個節點,只要客戶端會話有效就會持續進行刪除直到刪除成功。
  client.delete().guaranteed().forPath("/data2");github

   5)查詢數據節點

  一、讀節點的數據內容,返回byte數組
  byte[] data=client.getData().forPath("/data2");
  二、讀取一個節點的數據內容,同時獲取到該節點的stat
  Stat stat = new Stat();
  client.getData().storingStatIn(stat).forPath("data4");
  三、獲取一個路徑下全部的子節點
  List<String> childrens = client.getChildren().forPath("/");
  四、檢查節點是否存在,返回一個stat
  stat= client.checkExists().forPath("/data3");apache

  6)數據節點更新

  一、更新一個節點的數據內容,返回一個stat
  Stat stat = client.setData().forPath("/data1","update data1 data".getBytes());api

  7)事務

    CuratorFramework的實例包含inTransaction()接口方法,調用此方法開啓一個ZooKeeper事務. 能夠複合create, setData, check, and/or delete 等操做而後調用commit()做爲一個原子操做提交。    數組

    client.inTransaction()
    .and().create().withMode(CreateMode.PERSISTENT).forPath("/data5", "this is data5".getBytes())
    .and().setData().forPath("/data5", "update data5".getBytes())
    .and().commit();緩存

     8)監聽watch

   Zookeeper原生支持經過註冊Watcher來進行事件監聽,可是開發者須要反覆註冊(Watcher只能單次註冊單次使用)。Cache是Curator中對事件監聽的包裝,能夠看做是對事件監聽的本地緩存視圖,可以自動爲開發者處理反覆註冊監聽。Curator提供了三種Watcher(Cache)來監聽結點的變化。session

    一、Path Cache框架

     Path Cache用來監控一個ZNode的子節點. 當一個子節點增長, 更新,刪除時, Path Cache會改變它的狀態, 會包含最新的子節點, 子節點的數據和狀態,而狀態的更變將經過PathChildrenCacheListener通知。

   

package com.vi.test;

import com.vi.util.CuratorClientUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.zookeeper.CreateMode;

import java.util.List;

public class CuratorWatchTest {
    private static final String PATH = "/path/cache";

    public static void main(String[] args) {
        //建立客戶端
        CuratorFramework client = CuratorClientUtil.getClient();
        client.start();
        try {
            PathChildrenCache cache = pathCacheTest(client);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes());
            Thread.sleep(500);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes());
            Thread.sleep(500);
            //獲取全部子節點
            List<ChildData> datas =cache.getCurrentData();
            for(ChildData childData : datas){
                System.out.println("節點路徑:"+childData.getPath() + ",節點值:" + new String(childData.getData()));
            }
            client.setData().forPath(PATH + "/data1", "update data".getBytes());
            Thread.sleep(500);
            client.delete().deletingChildrenIfNeeded().forPath("/path");
            Thread.sleep(500);
            cache.close();
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static PathChildrenCache pathCacheTest(CuratorFramework client) throws Exception {
        //第三個參數不爲true時,不會緩存data數據
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        /*
            三種啓動方式
            NORMAL:正常初始化。
            BUILD_INITIAL_CACHE:在調用start()以前會調用rebuild()。
            POST_INITIALIZED_EVENT: 當Cache初始化數據後發送一個
            cache.start(PathChildrenCache.StartMode.NORMAL);
         */
        cache.start();
        //增長監聽事件
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("事件爲:" + event.getType() + ",數據爲:" + new String(event.getData().getData()));
            }
        });
        return cache;
    }

}

 

  2)Node Cache

    Node Cache與Path Cache相似,Node Cache只能監聽某一個特定的節點。

  

package com.vi.test;

import com.vi.util.CuratorClientUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.zookeeper.CreateMode;

public class CuratorNodeCacheTest {
    private static final String PATH = "/path/cache";

    public static void main(String[] args) {
        //建立客戶端
        CuratorFramework client = CuratorClientUtil.getClient();
        client.start();
        try {
            NodeCache cache = nodeCacheTest(client);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "first data".getBytes());
            Thread.sleep(500);
            client.setData().forPath(PATH, "update data".getBytes());
            Thread.sleep(500);
            client.delete().deletingChildrenIfNeeded().forPath("/path");
            Thread.sleep(500);
            cache.close();
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static NodeCache nodeCacheTest(CuratorFramework client) throws Exception {
        //只能監控一個節點
        NodeCache cache = new NodeCache(client, PATH);
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData childData = cache.getCurrentData();
                if (childData == null) {
                    System.out.println("節點刪除!");
                } else {
                    System.out.println("節點數據:" + new String(cache.getCurrentData().getData()));
                }
            }
        });
        cache.start();
        return cache;
    }
}

   3)Tree Cache

   Tree Cache能夠監控整個樹上的全部節點,相似於PathCache和NodeCache的組合。

package com.vi.test;

import com.vi.util.CuratorClientUtil;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.zookeeper.CreateMode;

public class CuratorTreeCacheTest {
    private static final String PATH = "/path/cache";

    public static void main(String[] args) {
        //建立客戶端
        CuratorFramework client = CuratorClientUtil.getClient();
        client.start();
        try {
            TreeCache cache = treeCacheTest(client);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes());
            Thread.sleep(500);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes());
            Thread.sleep(500);
            client.setData().forPath(PATH , "update path data".getBytes());
            Thread.sleep(500);
            client.setData().forPath(PATH + "/data1", "update data".getBytes());
            Thread.sleep(500);
            client.delete().deletingChildrenIfNeeded().forPath("/path");
            Thread.sleep(1000);
            cache.close();
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static TreeCache treeCacheTest(CuratorFramework client) throws Exception {
        //只能監控一個節點
        TreeCache cache = new TreeCache(client, PATH);
        cache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("事件爲:" + event.getType() + ",節點路徑爲:" + event.getData().getPath() + ",數據爲:" +  new String(event.getData().getData()));
            }
        });
        cache.start();
        return cache;
    }
}

 4、總結

 curator-recipes中有一些高級特性可使用,在後面的章節會具體介紹。

 源碼地址:  https://github.com/binary-vi/binary.github.io/tree/master/zk-curator

相關文章
相關標籤/搜索