聊一聊 Zookeeper 客戶端之 Curator

原文連接:ZooKeeper 客戶端之 Curatorhtml

ZooKeeper 是一個分佈式的、開放源碼的分佈式應用程序協調服務,是 Google 的 Chubby 一個開源的實現。它是集羣的管理者,監視着集羣中各個節點的狀態,根據節點提交的反饋進行下一步合理操做。最終,將簡單易用的接口和性能高效、功能穩定的系統提供給用戶。java

Curator 是 Netflix 公司開源的一套 Zookeeper 客戶端框架,解決了不少 Zookeeper 客戶端很是底層的細節開發工做,包括鏈接重連、反覆註冊 Watcher 和 NodeExistsException 異常等等。Curator 包含了幾個包:node

  • curator-framework:對 Zookeeper 的底層 api 的一些封裝
  • curator-client:提供一些客戶端的操做,例如重試策略等
  • curator-recipes:封裝了一些高級特性,如:Cache 事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier 等

Curator 和 zookeeper 的版本問題

目前 Curator 有 2.x.x 和 3.x.x 兩個系列的版本,支持不一樣版本的 Zookeeper。其中Curator 2.x.x 兼容 Zookeeper的 3.4.x 和 3.5.x。而 Curator 3.x.x 只兼容 Zookeeper 3.5.x,而且提供了一些諸如動態從新配置、watch刪除等新特性。linux

Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new
複製代碼

若是跨版本會有兼容性問題,頗有可能致使節點操做失敗,當時在使用的時候就踩了這個坑,拋了以下的異常:git

KeeperErrorCode = Unimplemented for /***
複製代碼

Curator API

這裏就不對比與原生 API 的區別了,Curator 的 API 直接經過 org.apache.curator.framework.CuratorFramework 接口來看,並結合相應的案例進行使用,以備後用。github

爲了能夠直觀的看到 Zookeeper 的節點信息,能夠考慮弄一個 zk 的管控界面,常見的有 zkui 和 zkweb。web

zkui:github.com/DeemOpen/zk…apache

zkweb:github.com/zhitom/zkwe…api

我用的 zkweb ,雖然界面上看起來沒有 zkui 精簡,可是在層次展現和一些細節上感受比 zkui 好一點緩存

環境準備

以前寫的一個在 Linux 上安裝部署 Zookeeper 的筆記,其餘操做系統請自行谷歌教程吧。

本文案例工程已經同步到了 github,傳送門

PS : 目前尚未看過Curator的具體源碼,因此不會涉及到任何源碼解析、實現原理的東西;本篇主要是實際使用時的一些記錄,以備後用。若是文中錯誤之處,但願各位指出。

Curator 客戶端的初始化和初始化時機

在實際的工程中,Zookeeper 客戶端的初始化會在程序啓動期間完成。

初始化時機

在 Spring 或者 SpringBoot 工程中最多見的就是綁定到容器啓動的生命週期或者應用啓動的生命週期中:

  • 監聽 ContextRefreshedEvent 事件,在容器刷新完成以後初始化 Zookeeper
  • 監聽 ApplicationReadyEvent/ApplicationStartedEvent 事件,初始化 Zookeeper 客戶端

除了上面的方式以外,還有一種常見的是綁定到 bean 的生命週期中

  • 實現 InitializingBean 接口 ,在 afterPropertiesSet 中完成 Zookeeper 客戶端初始化

關於 SpringBoot中的事件機制能夠參考以前寫過的一篇文章:SpringBoot-SpringBoot中的事件機制

Curator 初始化

這裏使用 InitializingBean 的這種方式,代碼以下:

public class ZookeeperCuratorClient implements InitializingBean {
    private CuratorFramework curatorClient;
    @Value("${glmapper.zookeeper.address:localhost:2181}")
    private String           connectString;
    @Value("${glmapper.zookeeper.baseSleepTimeMs:1000}")
    private int              baseSleepTimeMs;
    @Value("${glmapper.zookeeper.maxRetries:3}")
    private int              maxRetries;
    @Value("${glmapper.zookeeper.sessionTimeoutMs:6000}")
    private int              sessionTimeoutMs;
    @Value("${glmapper.zookeeper.connectionTimeoutMs:6000}")
    private int              connectionTimeoutMs;
  
    @Override
    public void afterPropertiesSet() throws Exception {
        // custom policy
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        // to build curatorClient
        curatorClient = CuratorFrameworkFactory.builder().connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy).build();
        curatorClient.start();
    }

    public CuratorFramework getCuratorClient() {
        return curatorClient;
    }
}
複製代碼

glmapper.zookeeper.xxx 是本例中須要在配置文件中配置的 zookeeper 的一些參數,參數解釋以下:

  • baseSleepTimeMs:重試之間等待的初始時間
  • maxRetries:最大重試次數
  • connectString:要鏈接的服務器列表
  • sessionTimeoutMs:session 超時時間
  • connectionTimeoutMs:鏈接超時時間

另外,Curator 客戶端初始化時還須要指定重試策略,RetryPolicy 接口是 Curator 中重試鏈接(當zookeeper失去鏈接時使用)策略的頂級接口,其類繼承體系以下圖所示:

  • RetryOneTime:只重連一次
  • RetryNTime:指定重連的次數N
  • RetryUtilElapsed:指定最大重連超時時間和重連時間間隔,間歇性重連直到超時或者連接成功
  • ExponentialBackoffRetry:基於 "backoff"方式重連,和 RetryUtilElapsed 的區別是重連的時間間隔是動態的。
  • BoundedExponentialBackoffRetry: 同 ExponentialBackoffRetry的區別是增長了最大重試次數的控制

除上述以外,在一些場景中,須要對不一樣的業務進行隔離,這種狀況下,能夠經過設置 namespace 來解決,namespace 實際上就是指定zookeeper的根路徑,設置以後,後面的全部操做都會基於該根目錄。

Curator 基礎 API 使用

檢查節點是否存在

checkExists 方法返回的是一個 ExistsBuilder 構造器,這個構建器將返回一個 Stat 對象,就像調用了 org.apache.zookeeper.ZooKeeper.exists()同樣。null 表示它不存在,而實際的 Stat 對象表示存在。

public void checkNodeExist(String path) throws Exception {
    Stat stat = curatorClient.checkExists().forPath(path);
    if (stat != null){
        throw new RuntimeException("path = "+path +" has bean exist.");
    }
}
複製代碼

建議在實際的應用中,操做節點時對所需操做的節點進行 checkExists。

新增節點

  • 非遞歸方式建立節點

    curatorClient.create().forPath("/glmapper");
    curatorClient.create().forPath("/glmapper/test");
    複製代碼

    先建立/glmapper,而後再在/glmapper 下面建立 /test ,若是直接使用 /glmapper/test 沒有先建立 /glmapper 時,會拋出異常:

    org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /glmapper/test
    複製代碼

    若是須要在建立節點時指定節點中數據,則能夠這樣:

    curatorClient.create().forPath("/glmapper","data".getBytes());
    複製代碼

    指定節點類型(EPHEMERAL 臨時節點)

    curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper","data".getBytes());
    複製代碼
  • 遞歸方式建立節點

    遞歸方式建立節點有兩個方法,creatingParentsIfNeeded 和 creatingParentContainersIfNeeded。在新版本的 zookeeper 這兩個遞歸建立方法會有區別; creatingParentContainersIfNeeded() 以容器模式遞歸建立節點,若是舊版本 zookeeper,此方法等於creatingParentsIfNeeded()。

    在非遞歸方式狀況下,若是直接建立 /glmapper/test 會報錯,那麼在遞歸的方式下則是能夠的

    curatorClient.create().creatingParentContainersIfNeeded().forPath("/glmapper/test");
    複製代碼

    在遞歸調用中,若是不指定 CreateMode,則默認PERSISTENT,若是指定爲臨時節點,則最終節點會是臨時節點,父節點仍舊是PERSISTENT

刪除節點

  • 非遞歸刪除節點

    curatorClient.delete().forPath("/glmapper/test");
    複製代碼

    指定具體版本

    curatorClient.delete().withVersion(-1).forPath("/glmapper/test");
    複製代碼

    使用 guaranteed 方式刪除,guaranteed 會保證在session有效的狀況下,後臺持續進行該節點的刪除操做,直到刪除掉

    curatorClient.delete().guaranteed().withVersion(-1).forPath("/glmapper/test");
    複製代碼
  • 遞歸刪除當前節點及其子節點

    curatorClient.delete().deletingChildrenIfNeeded().forPath("/glmapper/test");
    複製代碼

獲取節點數據

獲取節點數據

byte[] data = curatorClient.getData().forPath("/glmapper/test");
複製代碼

根據配置的壓縮提供程序對數據進行解壓縮處理

byte[] data = curatorClient.getData().decompressed().forPath("/glmapper/test");
複製代碼

讀取數據並得到Stat信息

Stat stat = new Stat();
byte[] data = curatorClient.getData().storingStatIn(stat).forPath("/glmapper/test");
複製代碼

更新節點數據

設置指定值

curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
複製代碼

設置數據並使用配置的壓縮提供程序壓縮數據

curatorClient.setData().compressed().forPath("/glmapper/test","newData".getBytes());
複製代碼

設置數據,並指定版本

curatorClient.setData().withVersion(-1).forPath("/glmapper/test","newData".getBytes());
複製代碼

獲取子列表

List<String> childrenList = curatorClient.getChildren().forPath("/glmapper");
複製代碼

事件

Curator 也對 Zookeeper 典型場景之事件監聽進行封裝,這部分能力實在 curator-recipes 包下的。

事件類型

在使用不一樣的方法時會有不一樣的事件發生

public enum CuratorEventType
{
    //Corresponds to {@link CuratorFramework#create()}
    CREATE,
    //Corresponds to {@link CuratorFramework#delete()}
    DELETE,
		//Corresponds to {@link CuratorFramework#checkExists()}
    EXISTS,
		//Corresponds to {@link CuratorFramework#getData()}
    GET_DATA,
		//Corresponds to {@link CuratorFramework#setData()}
    SET_DATA,
		//Corresponds to {@link CuratorFramework#getChildren()}
    CHILDREN,
		//Corresponds to {@link CuratorFramework#sync(String, Object)}
    SYNC,
		//Corresponds to {@link CuratorFramework#getACL()}
    GET_ACL,
		//Corresponds to {@link CuratorFramework#setACL()}
    SET_ACL,
		//Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}
    WATCHED,
		//Event sent when client is being closed
    CLOSING
}
複製代碼

事件監聽

一次性監聽方式:Watcher

利用 Watcher 來對節點進行監聽操做,能夠典型業務場景須要使用可考慮,但通常狀況不推薦使用。

byte[] data = curatorClient.getData().usingWatcher(new Watcher() {
     @Override
     public void process(WatchedEvent watchedEvent) {
       System.out.println("監聽器 watchedEvent:" + watchedEvent);
     }
 }).forPath("/glmapper/test");
System.out.println("監聽節點內容:" + new String(data));
// 第一次變動節點數據
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
// 第二次變動節點數據
curatorClient.setData().forPath("/glmapper/test","newChangedData".getBytes());
複製代碼

上面這段代碼對 /glmapper/test 節點註冊了一個 Watcher 監聽事件,而且返回當前節點的內容。後面進行兩次數據變動,實際上第二次變動時,監聽已經失效,沒法再次得到節點變更事件了。測試中控制檯輸出的信息以下:

監聽節點內容:data
watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/glmapper/test
複製代碼

CuratorListener 方式

CuratorListener 監聽,此監聽主要針對 background 通知和錯誤通知。使用此監聽器以後,調用inBackground 方法會異步得到監聽,對於節點的建立或修改則不會觸發監聽事件。

CuratorListener listener = new CuratorListener(){
    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
      System.out.println("event : " + event);
    }
 };
// 綁定監聽器
curatorClient.getCuratorListenable().addListener(listener);
// 異步獲取節點數據
curatorClient.getData().inBackground().forPath("/glmapper/test");
// 更新節點數據
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
複製代碼

測試中控制檯輸出的信息以下:

event : CuratorEventImpl{type=GET_DATA, resultCode=0, path='/glmapper/test', name='null', children=null, context=null, stat=5867,5867,1555140974671,1555140974671,0,0,0,0,4,0,5867
, data=[100, 97, 116, 97], watchedEvent=null, aclList=null}
複製代碼

這裏只觸發了一次監聽回調,就是 getData 。

Curator 引入的 Cache 事件監聽機制

Curator 引入了 Cache 來實現對 Zookeeper 服務端事件監聽,Cache 事件監聽能夠理解爲一個本地緩存視圖與遠程 Zookeeper 視圖的對比過程。Cache 提供了反覆註冊的功能。Cache 分爲兩類註冊類型:節點監聽和子節點監聽。

  • NodeCache

    監聽數據節點自己的變化。對節點的監聽須要配合回調函數來進行處理接收到監聽事件以後的業務處理。NodeCache 經過 NodeCacheListener 來完成後續處理。

    String path = "/glmapper/test";
    final NodeCache nodeCache = new NodeCache(curatorClient,path);
    //若是設置爲true則在首次啓動時就會緩存節點內容到Cache中。 nodeCache.start(true);
    nodeCache.start();
    nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
    System.out.println("觸發監聽回調,當前節點數據爲:" + new String(nodeCache.getCurrentData().getData()));
    }
    });
    curatorClient.setData().forPath(path,"1".getBytes());
    curatorClient.setData().forPath(path,"2".getBytes());
    curatorClient.setData().forPath(path,"3".getBytes());
    curatorClient.setData().forPath(path,"4".getBytes());
    curatorClient.setData().forPath(path,"5".getBytes());
    curatorClient.setData().forPath(path,"6".getBytes());
    複製代碼

    注意:在測試過程當中,nodeCache.start(),NodeCache 在前後屢次修改監聽節點的內容時,出現了丟失事件現象,在用例執行的5次中,僅一次監聽到了所有事件;若是 nodeCache.start(true),NodeCache 在前後屢次修改監聽節點的內容時,不會出現丟失現象。

    NodeCache不只能夠監聽節點內容變化,還能夠監聽指定節點是否存在。若是本來節點不存在,那麼Cache就會在節點被建立時觸發監聽事件,若是該節點被刪除,就沒法再觸發監聽事件。

  • PathChildrenCache

    PathChildrenCache 不會對二級子節點進行監聽,只會對子節點進行監聽。

    String path = "/glmapper";
    PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorClient,path,true);
    // 若是設置爲true則在首次啓動時就會緩存節點內容到Cache中。 nodeCache.start(true);
    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
      @Override
      public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
        System.out.println("-----------------------------");
        System.out.println("event:"  + event.getType());
        if (event.getData()!=null){
          System.out.println("path:" + event.getData().getPath());
        }
        System.out.println("-----------------------------");
      }
    });
    zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT);
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test","1".getBytes());
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test","2".getBytes());
    Thread.sleep(1000);
    zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT);
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test/second","1".getBytes());
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test/second","2".getBytes());
    Thread.sleep(1000);
    複製代碼

    注意:在測試過程當中發現,若是連續兩個操做之間不進行必定時間的間隔,會致使沒法監聽到下一次事件。所以只會監聽子節點,因此對二級子節點 /second 下面的操做是監聽不到的。測試中控制檯輸出的信息以下:

    -----------------------------
    event:CHILD_ADDED
    path:/glmapper/test
    -----------------------------
    -----------------------------
    event:INITIALIZED
    -----------------------------
    -----------------------------
    event:CHILD_UPDATED
    path:/glmapper/test
    -----------------------------
    -----------------------------
    event:CHILD_UPDATED
    path:/glmapper/test
    -----------------------------
    複製代碼
  • TreeCache

    TreeCache 使用一個內部類TreeNode來維護這個一個樹結構。並將這個樹結構與ZK節點進行了映射。因此TreeCache 能夠監聽當前節點下全部節點的事件。

    String path = "/glmapper";
    TreeCache treeCache = new TreeCache(curatorClient,path);
    treeCache.getListenable().addListener((client,event)-> {
        System.out.println("-----------------------------");
        System.out.println("event:"  + event.getType());
        if (event.getData()!=null){
          System.out.println("path:" + event.getData().getPath());
        }
        System.out.println("-----------------------------");
    });
    treeCache.start();
    zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT);
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test","1".getBytes());
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test","2".getBytes());
    Thread.sleep(1000);
    zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT);
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test/second","1".getBytes());
    Thread.sleep(1000);
    curatorClient.setData().forPath("/glmapper/test/second","2".getBytes());
    Thread.sleep(1000);
    複製代碼

    測試中控制檯輸出的信息以下:

    -----------------------------
    event:NODE_ADDED
    path:/glmapper
    -----------------------------
    -----------------------------
    event:NODE_ADDED
    path:/glmapper/test
    -----------------------------
    -----------------------------
    event:NODE_UPDATED
    path:/glmapper/test
    -----------------------------
    -----------------------------
    event:NODE_UPDATED
    path:/glmapper/test
    -----------------------------
    -----------------------------
    event:NODE_ADDED
    path:/glmapper/test/second
    -----------------------------
    -----------------------------
    event:NODE_UPDATED
    path:/glmapper/test/second
    -----------------------------
    -----------------------------
    event:NODE_UPDATED
    path:/glmapper/test/second
    -----------------------------
    複製代碼

事務操做

CuratorFramework 的實例包含 inTransaction( ) 接口方法,調用此方法開啓一個 ZooKeeper 事務。 能夠複合create、 setData、 check、and/or delete 等操做而後調用 commit() 做爲一個原子操做提交。

// 開啓事務 
CuratorTransaction curatorTransaction = curatorClient.inTransaction();
Collection<CuratorTransactionResult> commit = 
  // 操做1 
curatorTransaction.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper/transaction")
  .and()
  // 操做2 
  .delete().forPath("/glmapper/test")
  .and()
  // 操做3
  .setData().forPath("/glmapper/transaction", "data".getBytes())
  .and()
  // 提交事務
  .commit();
Iterator<CuratorTransactionResult> iterator = commit.iterator();
while (iterator.hasNext()){
  CuratorTransactionResult next = iterator.next();
  System.out.println(next.getForPath());
  System.out.println(next.getResultPath());
  System.out.println(next.getType());
}
複製代碼

這裏debug看了下Collection信息,面板以下:

異步操做

前面提到的增刪改查都是同步的,可是 Curator 也提供了異步接口,引入了 BackgroundCallback 接口用於處理異步接口調用以後服務端返回的結果信息。BackgroundCallback 接口中一個重要的回調值爲 CuratorEvent,裏面包含事件類型、響應嗎和節點的詳細信息。

在使用上也是很是簡單的,只須要帶上 inBackground() 就行,以下:

curatorClient.getData().inBackground().forPath("/glmapper/test");
複製代碼

經過查看 inBackground 方法定義能夠看到,inBackground 支持自定義線程池來處理返回結果以後的業務邏輯。

public T inBackground(BackgroundCallback callback, Executor executor);
複製代碼

這裏就不貼代碼了。

小結

本文主要圍繞 Curator 的基本 API 進行了學習記錄,對於原理及源碼部分沒有涉及。這部分若是有時間在慢慢研究吧。另外像分佈式鎖、分佈式自增序列等實現停留在理論階段,沒有實踐,不敢妄論,用到再碼吧。

參考

相關文章
相關標籤/搜索