前面講了zookeeper的API使用和zookeeper之ZkClient的使用,如今看看看看curator的使用。node
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency>
before,建立會話用,後面不在貼這部分代碼apache
CuratorFramework client; @Before public void before() { client = CuratorConnect.getCuratorClient2(); }
CuratorConnect,這邊給出2個方式,一個是直接new一個,一個是Fluent風格的,須要調用start方法來開啓會話。segmentfault
public class CuratorConnect { static final String CONNECT_STRING = "172.17.0.2:2181,172.17.0.3:2181,172.17.0.4:2181"; static ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 5); public static CuratorFramework getCuratorClient() { CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STRING, 5000, 5000, retryPolicy); client.start(); return client; } public static CuratorFramework getCuratorClient2() { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(CONNECT_STRING) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); return client; } }
跟以前不同的是,這邊須要設置重試策略,其餘參數雷同。
重試策略RetryPolicy以下:緩存
RetryPolicy的allowRetry有三個參數,分別是已重試的次數、從第一次重試開始到當前話費的時間、用於sleep的時間。網絡
測試代碼:session
@Test public void testCreate() throws Exception { // 默認持久性節點 client.create().forPath("/node1"); client.create().forPath("/node2","data2".getBytes()); // 須要指定類型,用withMode建立臨時節點 client.create().withMode(CreateMode.EPHEMERAL).forPath("/node3"); // 建立多層級用creatingParentsIfNeeded client.create().creatingParentsIfNeeded().forPath("/node4/node4_1"); //方便查看臨時節點 TimeUnit.SECONDS.sleep(100); }
客戶端查詢結果以下:異步
[zk: localhost:2181(CONNECTED) 75] ls / [node1, node2, node3, node4, zookeeper] [zk: localhost:2181(CONNECTED) 76] get /node2 data2 [zk: localhost:2181(CONNECTED) 77] ls /node4 [node4_1]
測試代碼:maven
@Test public void testDelete() throws Exception { // 刪除節點 client.delete().forPath("/node1"); // guaranteed保證能夠刪除節點,即使在網絡可能波動的狀況下 client.delete().guaranteed().forPath("/node5"); // 指定版本號刪除 client.delete().withVersion(-1).forPath("/node2"); // 刪除遞歸子節點 client.delete().deletingChildrenIfNeeded().forPath("/node4"); }
客戶端查詢結果以下:測試
[zk: localhost:2181(CONNECTED) 87] ls / [node1, node2, node4, node5, zookeeper] [zk: localhost:2181(CONNECTED) 88] ls / [zookeeper]
測試代碼:ui
@Test public void testGetChildren() throws Exception { List<String> children = client.getChildren().forPath("/node4"); System.out.println(children); }
運行結果以下:
測試代碼:
@Test public void testGetData() throws Exception { byte[] bytes = client.getData().forPath("/node2"); System.out.println(new String(bytes)); }
運行結果以下:
測試代碼:
@Test public void testWriteData() throws Exception { client.setData().forPath("/node2", "new_data".getBytes()); byte[] bytes = client.getData().forPath("/node2"); System.out.println(new String(bytes)); }
運行結果以下:
測試代碼:
@Test public void testExists() throws Exception { Stat stat = client.checkExists().forPath("/node"); System.out.println(stat); // 不存在父節點會建立,但不會建立當前節點 client.checkExists().creatingParentsIfNeeded().forPath("/node/node_1"); stat = client.checkExists().forPath("/node"); System.out.println(stat); }
運行結果以下:
在curator中,BackgroundCallback接口,用來異步接口調用後,處理服務端返回的接口。
接口中的processResult方法有兩個參數,一個是客戶端實例CuratorFramework,一個是服務端事件CuratorEvent。
在MyBackgroundCallback中,打印了事件類型和響應碼,響應碼和AsyncCallback的processResult方法的rc是同樣的,在原生API的建立方法有提過。
public class MyBackgroundCallback implements BackgroundCallback { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(curatorEvent.getType() + "-" + curatorEvent.getResultCode()); } }
測試代碼:
@Test public void testASyn() throws Exception { BackgroundCallback callback = new MyBackgroundCallback(); client.create().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); client.getData().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); client.setData().inBackground(callback).forPath("/node", "new_data".getBytes()); TimeUnit.SECONDS.sleep(3); client.getData().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); client.delete().inBackground(callback).forPath("/node"); TimeUnit.SECONDS.sleep(3); }
運行結果以下:
在inBackground方法中,除了傳遞BackgroundCallback,還能夠傳線程池對象,這樣業務邏輯就會該線程池處理,若是沒有傳,就使用默認的EventThread處理,這邊就不作演示了。
POM文件須要導入:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>
在curator中是經過NodeCache來監聽節點的變化的。
測試代碼:
@Test public void testNodeCache4Listener() throws Exception { // 第一個參數,是監聽的客戶端 // 第二個參數,是監聽的節點 final NodeCache nodeCache = new NodeCache(client, "/node1"); // 啓動的時候從zookeeper讀取對應的數據 nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { System.out.println(nodeCache.getPath() + ":" + new String(nodeCache.getCurrentData().getData())); } }); client.create().forPath("/node1","data".getBytes()); TimeUnit.MILLISECONDS.sleep(200); client.setData().forPath("/node1", "node1_data".getBytes()); TimeUnit.MILLISECONDS.sleep(200); client.delete().forPath("/node1"); TimeUnit.MILLISECONDS.sleep(200); }
測試結果以下:
節點的建立、刪除,會觸發監聽。
測試代碼
@Test public void testPathChildrenCache4Listener() throws Exception { // 第一個參數,是監聽的客戶端 // 第二個參數,是監聽的節點 // 第三個參數,是否緩存節點內容 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/node1", true); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println(pathChildrenCacheEvent); } }); System.out.println("--------------1--------------"); client.create().forPath("/node1"); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------2--------------"); client.create().forPath("/node1/node1_1"); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------3--------------"); client.setData().forPath("/node1/node1_1", "new_data".getBytes()); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------4--------------"); client.delete().forPath("/node1/node1_1"); TimeUnit.MILLISECONDS.sleep(200); System.out.println("--------------5--------------"); client.delete().forPath("/node1"); TimeUnit.MILLISECONDS.sleep(2000); }
運行結果以下:子節點的新增、修改、刪除,都會獲得監聽,當前節點的建立也會獲得監聽。