1、前言java
上一篇博客已經介紹瞭如何使用Zookeeper提供的原生態Java API進行操做,本篇博文主要講解如何經過開源客戶端來進行操做。node
2、ZkClientgit
ZkClient是在Zookeeper原聲API接口之上進行了包裝,是一個更易用的Zookeeper客戶端,其內部還實現了諸如Session超時重連、Watcher反覆註冊等功能。github
2.1 添加依賴apache
在pom.xml文件中添加以下內容便可。 api
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
2.2 建立會話session
使用ZkClient能夠輕鬆的建立會話,鏈接到服務端。 併發
package com.hust.grid.leesf.zkclient.examples; import java.io.IOException; import org.I0Itec.zkclient.ZkClient; public class Create_Session_Sample { public static void main(String[] args) throws IOException, InterruptedException { ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); System.out.println("ZooKeeper session established."); } }
運行結果: 異步
ZooKeeper session established.
結果代表已經成功建立會話。分佈式
2.3 建立節點
ZkClient提供了遞歸建立節點的接口,即其幫助開發者完成父節點的建立,再建立子節點。
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Create_Node_Sample { public static void main(String[] args) throws Exception { ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); String path = "/zk-book/c1"; zkClient.createPersistent(path, true); System.out.println("success create znode."); } }
運行結果:
success create znode.
結果代表已經成功建立了節點,值得注意的是,在原生態接口中是沒法建立成功的(父節點不存在),可是經過ZkClient能夠遞歸的先建立父節點,再建立子節點。
能夠看到確實成功建立了/zk-book和/zk-book/c1兩個節點。
2.4 刪除節點
ZkClient提供了遞歸刪除節點的接口,即其幫助開發者先刪除全部子節點(存在),再刪除父節點。
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Del_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); zkClient.createPersistent(path, ""); zkClient.createPersistent(path+"/c1", ""); zkClient.deleteRecursive(path); System.out.println("success delete znode."); } }
運行結果:
success delete znode.
結果代表ZkClient可直接刪除帶子節點的父節點,由於其底層先刪除其全部子節點,而後再刪除父節點。
2.5 獲取子節點
package com.hust.grid.leesf.zkclient.examples; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; public class Get_Children_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); zkClient.subscribeChildChanges(path, new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds); } }); zkClient.createPersistent(path); Thread.sleep(1000); zkClient.createPersistent(path + "/c1"); Thread.sleep(1000); zkClient.delete(path + "/c1"); Thread.sleep(1000); zkClient.delete(path); Thread.sleep(Integer.MAX_VALUE); } }
運行結果:
/zk-book 's child changed, currentChilds:[] /zk-book 's child changed, currentChilds:[c1] /zk-book 's child changed, currentChilds:[] /zk-book 's child changed, currentChilds:null
結果代表:
客戶端能夠對一個不存在的節點進行子節點變動的監聽。
一旦客戶端對一個節點註冊了子節點列表變動監聽以後,那麼當該節點的子節點列表發生變動時,服務端都會通知客戶端,並將最新的子節點列表發送給客戶端
該節點自己的建立或刪除也會通知到客戶端。
2.6 獲取數據
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; public class Get_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); zkClient.createEphemeral(path, "123"); zkClient.subscribeDataChanges(path, new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { System.out.println("Node " + dataPath + " deleted."); } public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("Node " + dataPath + " changed, new data: " + data); } }); System.out.println(zkClient.readData(path)); zkClient.writeData(path, "456"); Thread.sleep(1000); zkClient.delete(path); Thread.sleep(Integer.MAX_VALUE); } }
運行結果:
123 Node /zk-book changed, new data: 456 Node /zk-book deleted.
結果代表能夠成功監聽節點數據變化或刪除事件。
2.7 檢測節點是否存在
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Exist_Node_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000); System.out.println("Node " + path + " exists " + zkClient.exists(path)); } }
運行結果:
Node /zk-book exists false
結果代表,能夠經過ZkClient輕易檢測節點是否存在,其相比於原生態的接口更易於理解。
3、Curator客戶端
Curator解決了不少Zookeeper客戶端很是底層的細節開發工做,包括鏈接重連,反覆註冊Watcher和NodeExistsException異常等,現已成爲Apache的頂級項目。
3.1 添加依賴
在pom.xml文件中添加以下內容便可。
<!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.4.2</version> </dependency>
3.2 建立會話
Curator除了使用通常方法建立會話外,還可使用fluent風格進行建立。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; public class Create_Session_Sample { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy); client.start(); System.out.println("Zookeeper session1 established. "); CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build(); client1.start(); System.out.println("Zookeeper session2 established. "); } }
運行結果:
Zookeeper session1 established.
Zookeeper session2 established.
值得注意的是session2會話含有隔離命名空間,即客戶端對Zookeeper上數據節點的任何操做都是相對/base目錄進行的,這有利於實現不一樣的Zookeeper的業務之間的隔離。
3.3 建立節點
經過使用Fluent風格的接口,開發人員能夠進行自由組合來完成各類類型節點的建立。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Create_Node_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book/c1"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); System.out.println("success create znode: " + path); } }
運行結果:
success create znode: /zk-book/c1
其中,也建立了/zk-book/c1的父節點/zk-book節點。
3.4 刪除節點
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Del_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book/c1"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); Stat stat = new Stat(); System.out.println(new String(client.getData().storingStatIn(stat).forPath(path))); client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path); System.out.println("success delete znode " + path); Thread.sleep(Integer.MAX_VALUE); } }
運行結果:
init
success delete znode /zk-book/c1
結果代表成功刪除/zk-book/c1節點。
3.5 獲取數據
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Get_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); Stat stat = new Stat(); System.out.println(new String(client.getData().storingStatIn(stat).forPath(path))); } }
運行結果:
init
結果代表成功獲取了節點的數據。
3.6 更新數據
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Set_Data_Sample { public static void main(String[] args) throws Exception { String path = "/zk-book"; CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path); System.out.println("Success set node for : " + path + ", new version: " + client.setData().withVersion(stat.getVersion()).forPath(path).getVersion()); try { client.setData().withVersion(stat.getVersion()).forPath(path); } catch (Exception e) { System.out.println("Fail set node due to " + e.getMessage()); } } }
運行結果:
Success set node for : /zk-book, new version: 1 Fail set node due to KeeperErrorCode = BadVersion for /zk-book
結果代表當攜帶數據版本不一致時,沒法完成更新操做。
3.7 異步接口
如同Zookeeper原生API提供了異步接口,Curator也提供了異步接口。在Zookeeper中,全部的異步通知事件處理都是由EventThread這個線程來處理的,EventThread線程用於串行處理全部的事件通知,其能夠保證對事件處理的順序性,可是一旦碰上覆雜的處理單元,會消耗過長的處理時間,從而影響其餘事件的處理,Curator容許用戶傳入Executor實例,這樣能夠將比較複雜的事件處理放到一個專門的線程池中去。
package com.hust.grid.leesf.curator.examples; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Create_Node_Background_Sample { static String path = "/zk-book"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); static CountDownLatch semaphore = new CountDownLatch(2); static ExecutorService tp = Executors.newFixedThreadPool(2); public static void main(String[] args) throws Exception { client.start(); System.out.println("Main thread: " + Thread.currentThread().getName()); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName()); System.out.println(); semaphore.countDown(); } }, tp).forPath(path, "init".getBytes()); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() { public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName()); semaphore.countDown(); } }).forPath(path, "init".getBytes()); semaphore.await(); tp.shutdown(); } }
運行結果:
Main thread: main event[code: -110, type: CREATE], Thread of processResult: main-EventThread event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1
其中,建立節點的事件由線程池本身處理,而非默認線程處理。
Curator除了提供很便利的API,還提供了一些典型的應用場景,開發人員可使用參考更好的理解如何使用Zookeeper客戶端,全部的都在recipes包中,只須要在pom.xml中添加以下依賴便可
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.4.2</version>
</dependency>
3.8 節點監聽
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class NodeCache_Sample { static String path = "/zk-book/nodecache"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes()); final NodeCache cache = new NodeCache(client, path, false); cache.start(true); cache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData())); } }); client.setData().forPath(path, "u".getBytes()); Thread.sleep(1000); client.delete().deletingChildrenIfNeeded().forPath(path); Thread.sleep(Integer.MAX_VALUE); } }
運行結果:
Node data update, new data: u
當節點數據變動後收到了通知。NodeCache不只能夠監聽數據節點的內容變動,也能監聽指定節點是否存在。
3.9 子節點監聽
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class PathChildrenCache_Sample { static String path = "/zk-book"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(5000).build(); public static void main(String[] args) throws Exception { client.start(); PathChildrenCache cache = new PathChildrenCache(client, path, true); cache.start(StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED," + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED," + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED," + event.getData().getPath()); break; default: break; } } }); client.create().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1"); Thread.sleep(1000); client.delete().forPath(path + "/c1"); Thread.sleep(1000); client.delete().forPath(path); Thread.sleep(Integer.MAX_VALUE); } }
運行結果:
CHILD_ADDED,/zk-book/c1
CHILD_REMOVED,/zk-book/c1
監聽節點的子節點,包括新增、數據變化、刪除三類事件。
3.10 Master選舉
藉助Zookeeper,開發者能夠很方便地實現Master選舉功能,其大致思路以下:選擇一個根節點,如/master_select,多臺機器同時向該節點建立一個子節點/master_select/lock,利用Zookeeper特性,最終只有一臺機器可以成功建立,成功的那臺機器就是Master。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.retry.ExponentialBackoffRetry; public class Recipes_MasterSelect { static String master_path = "/curator_recipes_master_path"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() { public void takeLeadership(CuratorFramework client) throws Exception { System.out.println("成爲Master角色"); Thread.sleep(3000); System.out.println("完成Master操做,釋放Master權利"); } }); selector.autoRequeue(); selector.start(); Thread.sleep(Integer.MAX_VALUE); } }
運行結果:
成爲Master角色
完成Master操做,釋放Master權利
成爲Master角色
以上結果會反覆循環,而且當一個應用程序完成Master邏輯後,另一個應用程序的相應方法纔會被調用,即當一個應用實例成爲Master後,其餘應用實例會進入等待,直到當前Master掛了或者推出後纔會開始選舉Master。
3.11 分佈式鎖
爲了保證數據的一致性,常常在程序的某個運行點須要進行同步控制。以流水號生成場景爲例,普通的後臺應用一般採用時間戳方式來生成流水號,可是在用戶量很是大的狀況下,可能會出現併發問題。
package com.hust.grid.leesf.curator.examples; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; public class Recipes_NoLock { public static void main(String[] args) throws Exception { final CountDownLatch down = new CountDownLatch(1); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { try { down.await(); } catch (Exception e) { } SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.err.println("生成的訂單號是 : " + orderNo); } }).start(); } down.countDown(); } }
運行結果:
生成的訂單號是 : 16:29:10|590 生成的訂單號是 : 16:29:10|590 生成的訂單號是 : 16:29:10|591 生成的訂單號是 : 16:29:10|591 生成的訂單號是 : 16:29:10|590 生成的訂單號是 : 16:29:10|590 生成的訂單號是 : 16:29:10|591 生成的訂單號是 : 16:29:10|590 生成的訂單號是 : 16:29:10|592 生成的訂單號是 : 16:29:10|591
結果表示訂單號出現了重複,即普通的方法沒法知足業務須要,由於其未進行正確的同步。可使用Curator來實現分佈式鎖功能。
package com.hust.grid.leesf.curator.examples; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; public class Recipes_Lock { static String lock_path = "/curator_recipes_lock_path"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); final InterProcessMutex lock = new InterProcessMutex(client, lock_path); final CountDownLatch down = new CountDownLatch(1); for (int i = 0; i < 30; i++) { new Thread(new Runnable() { public void run() { try { down.await(); lock.acquire(); } catch (Exception e) { } SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(new Date()); System.out.println("生成的訂單號是 : " + orderNo); try { lock.release(); } catch (Exception e) { } } }).start(); } down.countDown(); } }
運行結果:
生成的訂單號是 : 16:31:50|293 生成的訂單號是 : 16:31:50|319 生成的訂單號是 : 16:31:51|278 生成的訂單號是 : 16:31:51|326 生成的訂單號是 : 16:31:51|402 生成的訂單號是 : 16:31:51|420 生成的訂單號是 : 16:31:51|546 生成的訂單號是 : 16:31:51|602 生成的訂單號是 : 16:31:51|626 生成的訂單號是 : 16:31:51|656 生成的訂單號是 : 16:31:51|675 生成的訂單號是 : 16:31:51|701 生成的訂單號是 : 16:31:51|708 生成的訂單號是 : 16:31:51|732 生成的訂單號是 : 16:31:51|763 生成的訂單號是 : 16:31:51|785 生成的訂單號是 : 16:31:51|805 生成的訂單號是 : 16:31:51|823 生成的訂單號是 : 16:31:51|839 生成的訂單號是 : 16:31:51|853 生成的訂單號是 : 16:31:51|868 生成的訂單號是 : 16:31:51|884 生成的訂單號是 : 16:31:51|897 生成的訂單號是 : 16:31:51|910 生成的訂單號是 : 16:31:51|926 生成的訂單號是 : 16:31:51|939 生成的訂單號是 : 16:31:51|951 生成的訂單號是 : 16:31:51|965 生成的訂單號是 : 16:31:51|972 生成的訂單號是 : 16:31:51|983
結果代表此時已經不存在重複的流水號。
3.12 分佈式計數器
分佈式計數器的典型應用是統計系統的在線人數,藉助Zookeeper也能夠很方便實現分佈式計數器功能:指定一個Zookeeper數據節點做爲計數器,多個應用實例在分佈式鎖的控制下,經過更新節點的內容來實現計數功能。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes; public class Recipes_DistAtomicInt { static String distatomicint_path = "/curator_recipes_distatomicint_path"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distatomicint_path, new RetryNTimes(3, 1000)); AtomicValue<Integer> rc = atomicInteger.add(8); System.out.println("Result: " + rc.succeeded()); } }
運行結果:
Result: true
結果代表已經將數據成功寫入數據節點中。
3.13 分佈式Barrier
如同JDK的CyclicBarrier,Curator提供了DistributedBarrier來實現分佈式Barrier。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.ExponentialBackoffRetry; public class Recipes_Barrier { static String barrier_path = "/curator_recipes_barrier_path"; static DistributedBarrier barrier; public static void main(String[] args) throws Exception { for (int i = 0; i < 5; i++) { new Thread(new Runnable() { public void run() { try { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); barrier = new DistributedBarrier(client, barrier_path); System.out.println(Thread.currentThread().getName() + "號barrier設置"); barrier.setBarrier(); barrier.waitOnBarrier(); System.err.println("啓動..."); } catch (Exception e) { } } }).start(); } Thread.sleep(2000); barrier.removeBarrier(); } }
運行結果:
Thread-1號barrier設置 Thread-2號barrier設置 Thread-4號barrier設置 Thread-3號barrier設置 Thread-0號barrier設置 啓動... 啓動... 啓動... 啓動... 啓動...
結果代表經過DistributedBarrier能夠實現相似於CyclicBarrier的分佈式Barrier功能。
4、Curator工具類
4.1 ZKPaths
其提供了簡單的API來構建znode路徑、遞歸建立、刪除節點等。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths.PathAndNode; import org.apache.zookeeper.ZooKeeper; public class ZKPaths_Sample { static String path = "/curator_zkpath_sample"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper(); System.out.println(ZKPaths.fixForNamespace(path, "sub")); System.out.println(ZKPaths.makePath(path, "sub")); System.out.println(ZKPaths.getNodeFromPath("/curator_zkpath_sample/sub1")); PathAndNode pn = ZKPaths.getPathAndNode("/curator_zkpath_sample/sub1"); System.out.println(pn.getPath()); System.out.println(pn.getNode()); String dir1 = path + "/child1"; String dir2 = path + "/child2"; ZKPaths.mkdirs(zookeeper, dir1); ZKPaths.mkdirs(zookeeper, dir2); System.out.println(ZKPaths.getSortedChildren(zookeeper, path)); ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true); } }
運行結果:
/curator_zkpath_sample/sub /curator_zkpath_sample/sub sub1 /curator_zkpath_sample sub1 [child1, child2]
藉助ZKPaths可快速方便的完成節點的建立等操做。
4.2 EnsurePath
其提供了一種可以確保數據節點存在的機制,當上層業務但願對一個數據節點進行操做時,操做前須要確保該節點存在。
package com.hust.grid.leesf.curator.examples; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.EnsurePath; public class EnsurePathDemo { static String path = "/zk-book/c1"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181") .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); client.usingNamespace("zk-book"); EnsurePath ensurePath = new EnsurePath(path); ensurePath.ensure(client.getZookeeperClient()); ensurePath.ensure(client.getZookeeperClient()); EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1"); ensurePath2.ensure(client.getZookeeperClient()); } }
EnsurePath採起了以下節點建立方式,試圖建立指定節點,若是節點已經存在,那麼就不進行任何操做,也不對外拋出異常,不然正常建立數據節點。
5、總結
本篇介紹了使用Zookeeper的開源客戶端如何操做Zookeeper的方法,相應的源碼也已經上傳至github,謝謝各位園友的觀看~