Zookeeper Client簡介

直接使用zk的api實現業務功能比較繁瑣。由於要處理session loss,session expire等異常,在發生這些異常後進行重連。又由於ZK的watcher是一次性的,若是要基於wather實現發佈/訂閱模式,還要本身包裝一下,將一次性訂閱包裝成持久訂閱。另外若是要使用抽象級別更高的功能,好比分佈式鎖,leader選舉等,還要本身額外作不少事情。這裏介紹下ZK的兩個第三方客戶端包裝小工具,能夠分別解決上述小問題。java

1、 zkClient
zkClient主要作了兩件事情。一件是在session loss和session expire時自動建立新的ZooKeeper實例進行重連。另外一件是將一次性watcher包裝爲持久watcher。後者的具體作法是簡單的在watcher回調中,從新讀取數據的同時再註冊相同的watcher實例。node

zkClient簡單的使用樣例以下:git

  public static void testzkClient(final String serverList) {
        ZkClient zkClient4subChild = new ZkClient(serverList);
        zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List currentChilds) throws Exception {
                System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
            }
        });

上面是訂閱children變化,下面是訂閱數據變化github

  ZkClient zkClient4subData = new ZkClient(serverList);
        zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println(prefix() + "Data of " + dataPath + " has changed");
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(prefix() + dataPath + " has deleted");
            }
        });

訂閱鏈接狀態的變化:api

 ZkClient zkClient4subStat = new ZkClient(serverList);
        zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleNewSession() throws Exception {
                System.out.println(prefix() + "handleNewSession()");
            }

            @Override
            public void handleStateChanged(KeeperState stat) throws Exception {
                System.out.println(prefix() + "handleStateChanged,stat:" + stat);
            }
        });

下面表格列出了寫操做與ZK內部產生的事件的對應關係:session

  **event For "/path"** **event For "/path/child"**
**create("/path")** EventType.NodeCreated NA
**delete("/path")** EventType.NodeDeleted NA
**setData("/path")** EventType.NodeDataChanged NA
**create("/path/child")** EventType.NodeChildrenChanged EventType.NodeCreated
**delete("/path/child")** EventType.NodeChildrenChanged EventType.NodeDeleted
**setData("/path/child")** NA EventType.NodeDataChanged

而ZK內部的寫事件與所觸發的watcher的對應關係以下:併發

**event For "/path"** **defaultWatcher** **exists ("/path")** **getData ("/path")** **getChildren ("/path")**
**EventType.None**
**EventType.NodeCreated**    
**EventType.NodeDeleted**   √(不正常)  
**EventType.NodeDataChanged**    
**EventType.NodeChildrenChanged**      

綜合上面兩個表,咱們能夠總結出各類寫操做能夠觸發哪些watcher,以下表所示:分佈式

  **"/path"** **"/path/child"**
  **exists** **getData** **getChildren** **exists** **getData** **getChildren**
**create("/path")** **√** **√** ** ** ** ** ** ** ** **
**delete("/path")** **√** **√** **√** ** ** ** ** ** **
**setData("/path")** **√** **√** ** ** ** ** ** ** ** **
**create("/path/child")** ** ** ** ** **√** **√** **√** ** **
**delete("/path/child")** ** ** ** ** **√** **√** **√** **√**
**setData("/path/child")** ** ** ** ** ** ** **√** **√** ** **

若是發生session close、authFail和invalid,那麼全部類型的wather都會被觸發 zkClient除了作了一些便捷包裝以外,對watcher使用作了一點加強。好比subscribeChildChanges其實是經過exists和getChildren關注了兩個事件。這樣當create("/path")時,對應path上經過getChildren註冊的listener也會被調用。另外subscribeDataChanges實際上只是經過exists註冊了事件。由於從上表能夠看到,對於一個更新,經過exists和getData註冊的watcher要麼都會觸發,要麼都不會觸發。 zkClient地址:[https://github.com/sgroschupf/zkclient](https://github.com/sgroschupf/zkclient) Maven工程中使用zkClient須要加的依賴:ide

    <dependency>
        <groupId>zkclient</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.1</version>
    </dependency>

2、 menagerie工具

menagerie基於Zookeeper實現了java.util.concurrent包的一個分佈式版本。這個封裝是更大粒度上對各類分佈式一致性使用場景的抽象。其中最基礎和經常使用的是一個分佈式鎖的實現:
org.menagerie.locks.ReentrantZkLock,經過ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL類型znode的支持,實現了分佈式鎖。具體作法是:不一樣的client上每一個試圖得到鎖的線程,都在相同的basepath下面建立一個EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要建立的是臨時znode,建立鏈接斷開時會自動刪除; SEQUENTIAL表示要自動在傳入的path後面綴上一個自增的全局惟一後綴,做爲最終的path。所以對不一樣的請求ZK會生成不一樣的後綴,並分別返回帶了各自後綴的path給各個請求。由於ZK全局有序的特性,無論client請求怎樣前後到達,在ZKServer端都會最終排好一個順序,所以自增後綴最小的那個子節點,就對應第一個到達ZK的有效請求。而後client讀取basepath下的全部子節點和ZK返回給本身的path進行比較,當發現本身建立的sequential node的後綴序號排在第一個時,就認爲本身得到了鎖;不然的話,就認爲本身沒有得到鎖。這時確定是有其餘併發的而且是沒有斷開的client/線程先建立了node。

基於分佈式鎖,還實現了其餘業務場景,好比leader選舉:

public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(「ZK-host-ip:2181」, 5000);
LeaderElector elector = new ZkLeaderElector(「/leaderElectionTest」, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(「Try to become the leader success!」);
}
}

java.util.concurrent包下面的其餘接口實現,也主要是基於ReentrantZkLock的,好比ZkHashMap實現了ConcurrentMap。具體請參見menagerie的API文檔

menagerie地址:https://github.com/wxuedong/menagerie
Maven工程中使用menagerie須要加的依賴:

    <dependency>
        <groupId>org.menagerie</groupId>
        <artifactId>menagerie</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>
相關文章
相關標籤/搜索