直接使用zk的api實現業務功能比較繁瑣。由於要處理session loss,session expire等異常,在發生這些異常後進行重連。又由於ZK的watcher是一次性的,若是要基於wather實現發佈/訂閱模式,還要本身包裝一下,將一次性訂閱包裝成持久訂閱。另外若是要使用抽象級別更高的功能,好比分佈式鎖,leader選舉等,還要本身額外作不少事情。這裏介紹下ZK的兩個第三方客戶端包裝小工具,能夠分別解決上述小問題。java
1、 zkClient
zkClient主要作了兩件事情。一件是在session loss和session expire時自動建立新的ZooKeeper實例進行重連。另外一件是將一次性watcher包裝爲持久watcher。後者的具體作法是簡單的在watcher回調中,從新讀取數據的同時再註冊相同的watcher實例。node
zkClient簡單的使用樣例以下:git
public static void testzkClient(final String serverList) { ZkClient zkClient4subChild = new ZkClient(serverList); zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List currentChilds) throws Exception { System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds); } });
上面是訂閱children變化,下面是訂閱數據變化github
ZkClient zkClient4subData = new ZkClient(serverList); zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println(prefix() + "Data of " + dataPath + " has changed"); } @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println(prefix() + dataPath + " has deleted"); } });
訂閱鏈接狀態的變化:api
ZkClient zkClient4subStat = new ZkClient(serverList); zkClient4subStat.subscribeStateChanges(new IZkStateListener() { @Override public void handleNewSession() throws Exception { System.out.println(prefix() + "handleNewSession()"); } @Override public void handleStateChanged(KeeperState stat) throws Exception { System.out.println(prefix() + "handleStateChanged,stat:" + stat); } });
下面表格列出了寫操做與ZK內部產生的事件的對應關係:session
**event For "/path"** | **event For "/path/child"** | |
---|---|---|
**create("/path")** | EventType.NodeCreated | NA |
**delete("/path")** | EventType.NodeDeleted | NA |
**setData("/path")** | EventType.NodeDataChanged | NA |
**create("/path/child")** | EventType.NodeChildrenChanged | EventType.NodeCreated |
**delete("/path/child")** | EventType.NodeChildrenChanged | EventType.NodeDeleted |
**setData("/path/child")** | NA | EventType.NodeDataChanged |
而ZK內部的寫事件與所觸發的watcher的對應關係以下:併發
**event For "/path"** | **defaultWatcher** | **exists ("/path")** | **getData ("/path")** | **getChildren ("/path")** |
---|---|---|---|---|
**EventType.None** | √ | √ | √ | √ |
**EventType.NodeCreated** | √ | √ | ||
**EventType.NodeDeleted** | √(不正常) | √ | ||
**EventType.NodeDataChanged** | √ | √ | ||
**EventType.NodeChildrenChanged** | √ |
綜合上面兩個表,咱們能夠總結出各類寫操做能夠觸發哪些watcher,以下表所示:分佈式
**"/path"** | **"/path/child"** | |||||
---|---|---|---|---|---|---|
**exists** | **getData** | **getChildren** | **exists** | **getData** | **getChildren** | |
**create("/path")** | **√** | **√** | ** ** | ** ** | ** ** | ** ** |
**delete("/path")** | **√** | **√** | **√** | ** ** | ** ** | ** ** |
**setData("/path")** | **√** | **√** | ** ** | ** ** | ** ** | ** ** |
**create("/path/child")** | ** ** | ** ** | **√** | **√** | **√** | ** ** |
**delete("/path/child")** | ** ** | ** ** | **√** | **√** | **√** | **√** |
**setData("/path/child")** | ** ** | ** ** | ** ** | **√** | **√** | ** ** |
若是發生session close、authFail和invalid,那麼全部類型的wather都會被觸發 zkClient除了作了一些便捷包裝以外,對watcher使用作了一點加強。好比subscribeChildChanges其實是經過exists和getChildren關注了兩個事件。這樣當create("/path")時,對應path上經過getChildren註冊的listener也會被調用。另外subscribeDataChanges實際上只是經過exists註冊了事件。由於從上表能夠看到,對於一個更新,經過exists和getData註冊的watcher要麼都會觸發,要麼都不會觸發。 zkClient地址:[https://github.com/sgroschupf/zkclient](https://github.com/sgroschupf/zkclient) Maven工程中使用zkClient須要加的依賴:ide
<dependency> <groupId>zkclient</groupId> <artifactId>zkclient</artifactId> <version>0.1</version> </dependency>
2、 menagerie工具
menagerie基於Zookeeper實現了java.util.concurrent包的一個分佈式版本。這個封裝是更大粒度上對各類分佈式一致性使用場景的抽象。其中最基礎和經常使用的是一個分佈式鎖的實現:
org.menagerie.locks.ReentrantZkLock,經過ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL類型znode的支持,實現了分佈式鎖。具體作法是:不一樣的client上每一個試圖得到鎖的線程,都在相同的basepath下面建立一個EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要建立的是臨時znode,建立鏈接斷開時會自動刪除; SEQUENTIAL表示要自動在傳入的path後面綴上一個自增的全局惟一後綴,做爲最終的path。所以對不一樣的請求ZK會生成不一樣的後綴,並分別返回帶了各自後綴的path給各個請求。由於ZK全局有序的特性,無論client請求怎樣前後到達,在ZKServer端都會最終排好一個順序,所以自增後綴最小的那個子節點,就對應第一個到達ZK的有效請求。而後client讀取basepath下的全部子節點和ZK返回給本身的path進行比較,當發現本身建立的sequential node的後綴序號排在第一個時,就認爲本身得到了鎖;不然的話,就認爲本身沒有得到鎖。這時確定是有其餘併發的而且是沒有斷開的client/線程先建立了node。
基於分佈式鎖,還實現了其餘業務場景,好比leader選舉:
public static void leaderElectionTest() { ZkSessionManager zksm = new DefaultZkSessionManager(「ZK-host-ip:2181」, 5000); LeaderElector elector = new ZkLeaderElector(「/leaderElectionTest」, zksm, Ids.OPEN_ACL_UNSAFE); if (elector.nominateSelfForLeader()) { System.out.println(「Try to become the leader success!」); } }
java.util.concurrent包下面的其餘接口實現,也主要是基於ReentrantZkLock的,好比ZkHashMap實現了ConcurrentMap。具體請參見menagerie的API文檔
menagerie地址:https://github.com/wxuedong/menagerie
Maven工程中使用menagerie須要加的依賴:
<dependency>
<groupId>org.menagerie</groupId>
<artifactId>menagerie</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>