1、前言java
上一篇博客咱們經過命令行來操做Zookeper的客戶端和服務端並進行相應的操做,這篇主要介紹如何經過API(JAVA)來操做Zookeeper。node
2、開發環境配置git
首先打開Zookeeper服務端(上一篇博客有具體的方法),方便客戶端鏈接。github
配置開發環境環境能夠有兩種方式:① 直接下載相關的依賴Jar包,而後在IDE中添加依賴 ② 創建maven項目,使用maven進行依賴管理。apache
① 手動添加依賴至IDEapi
步驟一:點擊這裏下載對應版本的Jar包,包括(jar、javadoc.jar、sources.jar),筆者對應下載的Zookeeper3.4.6版本。服務器
步驟二:打開IDE(筆者使用eclispe),新建名爲zookeeper_examples_none_maven的java項目。因爲須要單獨添加依賴,爲了方便管理,筆者在項目下新建了jar文件夾,用於存放本項目的jar包(將步驟一下載的3個jar包存放至此文件夾下)。網絡
步驟三:在eclipse中添加依賴session
步驟四:新建包、Java類進行測試併發
Zookeeper_Constructor_Usage_Simple.java
package com.hust.grid.leesf.examples; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; public class Zookeeper_Constructor_Usage_Simple implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); @Override public void process(WatchedEvent event) { System.out.println("Receive watched event : " + event); if (KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } public static void main(String[] args) throws IOException { ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Constructor_Usage_Simple()); System.out.println(zookeeper.getState()); try { connectedSemaphore.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Zookeeper session established"); } }
運行結果以下
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory at org.apache.zookeeper.ZooKeeper.<clinit>(ZooKeeper.java:94) at com.hust.grid.leesf.examples.Zookeeper_Constructor_Usage_Simple.main(Zookeeper_Constructor_Usage_Simple.java:23) Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 2 more
結果代表缺失LoggerFactory類,經筆者查閱資料,只需將zookeeper的lib文件夾下log4j-1.2.16.jar、slf4j-api-1.6.1.jar放如zookeeper_examples_none_maven的jar文件夾下,而後再次將其添加至IDE便可。
再次運行結果以下
CONNECTING Receive watched event : WatchedEvent state:SyncConnected type:None path:null Zookeeper session established
表示客戶端已經成功鏈接至服務器了。
能夠看到方法一相對而言比較麻煩,須要手動管理不一樣的依賴jar包,能夠採用更成熟的依賴管理方法,即便用maven來管理Jar包。
② 使用maven管理依賴
步驟一:新建maven項目
步驟二:配置pom.xml文件以下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hust.grid.leesf</groupId> <artifactId>zookeeper_examples</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>zookeeper_examples</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <!-- https://mvnrepository.com/artifact/log4j/log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> </dependencies> </project>
步驟三:新建java類進行測試
Zookeeper_Constructor_Usage_Simple.java,代碼同上。
運行結果以下
CONNECTING Receive watched event : WatchedEvent state:SyncConnected type:None path:null Zookeeper session established
結果也表示客戶端已經成功鏈接至服務器。
3、操做示例
3.1 建立節點
建立節點有異步和同步兩種方式。不管是異步或者同步,Zookeeper都不支持遞歸調用,即沒法在父節點不存在的狀況下建立一個子節點,如在/zk-ephemeral節點不存在的狀況下建立/zk-ephemeral/ch1節點;而且若是一個節點已經存在,那麼建立同名節點時,會拋出NodeExistsException異常。
① 同步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class Zookeeper_Create_API_Sync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Create_API_Sync_Usage()); System.out.println(zookeeper.getState()); connectedSemaphore.await(); String path1 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create znode: " + path1); String path2 = zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Success create znode: " + path2); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } }
運行結果以下
CONNECTING Success create znode: /zk-test-ephemeral- Success create znode: /zk-test-ephemeral-0000000043
結果代表已經成功建立了臨時節點和臨時順序節點,在建立順序節點時,系統會在後面自動增長一串數字。
② 異步方式
使用異步方式於同步方式的區別在於節點的建立過程(包括網絡通訊和服務端的節點建立過程)是異步的,在同步接口調用過程當中,開發者須要關注接口拋出異常的可能,可是在異步接口中,接口自己不會拋出異常,全部異常都會在回調函數中經過Result Code來體現。
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; public class Zookeeper_Create_API_ASync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_Create_API_ASync_Usage()); System.out.println(zookeeper.getState()); connectedSemaphore.await(); zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), "I am context. "); zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(), "I am context. "); zookeeper.create("/zk-test-ephemeral-", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new IStringCallback(), "I am context. "); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } } class IStringCallback implements AsyncCallback.StringCallback { public void processResult(int rc, String path, Object ctx, String name) { System.out.println("Create path result: [" + rc + ", " + path + ", " + ctx + ", real path name: " + name); } }
運行結果以下
CONNECTING Create path result: [0, /zk-test-ephemeral-, I am context. , real path name: /zk-test-ephemeral- Create path result: [-110, /zk-test-ephemeral-, I am context. , real path name: null Create path result: [0, /zk-test-ephemeral-, I am context. , real path name: /zk-test-ephemeral-0000000045
結果代表已經成功使用異步方式建立了相應節點。
3.2 刪除節點
只容許刪除葉子節點,即一個節點若是有子節點,那麼該節點將沒法直接刪除,必須先刪掉其全部子節點。一樣也有同步和異步兩種方式。
① 同步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class Delete_API_Sync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new Delete_API_Sync_Usage()); connectedSemaphore.await(); zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path + "/c1"); try { zk.delete(path, -1); } catch (Exception e) { System.out.println("fail to delete znode: " + path); } zk.delete(path + "/c1", -1); System.out.println("success delete znode: " + path + "/c1"); zk.delete(path, -1); System.out.println("success delete znode: " + path); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } } } }
運行結果以下
success create znode: /zk-book success create znode: /zk-book/c1 fail to delete znode: /zk-book success delete znode: /zk-book/c1 success delete znode: /zk-book
結果代表若節點有子節點,則沒法將其刪除,必須先刪除其全部子節點。
② 異步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; public class Delete_API_ASync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new Delete_API_ASync_Usage()); connectedSemaphore.await(); zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path + "/c1"); zk.delete(path, -1, new IVoidCallback(), null); zk.delete(path + "/c1", -1, new IVoidCallback(), null); zk.delete(path, -1, new IVoidCallback(), null); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } } } } class IVoidCallback implements AsyncCallback.VoidCallback { public void processResult(int rc, String path, Object ctx) { System.out.println(rc + ", " + path + ", " + ctx); } }
運行結果以下
success create znode: /zk-book success create znode: /zk-book/c1 -111, /zk-book, null 0, /zk-book/c1, null 0, /zk-book, null
結果結果代表第一次刪除/zk-book的時異常,ResultCode爲-111。
3.3 子節點獲取
讀取節點的子節點列表,一樣可使用同步和異步的方式進行操做。
① 同步方式
package com.hust.grid.leesf.examples; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; public class Zookeeper_GetChildren_API_Sync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws Exception { String path = "/zk-book-1"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_GetChildren_API_Sync_Usage()); connectedSemaphore.await(); zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path + "/c1"); List<String> childrenList = zk.getChildren(path, true); System.out.println(childrenList); zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path + "/c2"); Thread.sleep(1000); zk.create(path + "/c3", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path + "/c3"); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true)); } catch (Exception e) { } } } } }
運行結果以下
success create znode: /zk-book-1 success create znode: /zk-book-1/c1 [c1] success create znode: /zk-book-1/c2 ReGet Child:[c1, c2] success create znode: /zk-book-1/c3 ReGet Child:[c3, c1, c2]
值得注意的是,Watcher通知是一次性的,即一旦觸發一次通知後,該Watcher就失效了,所以客戶端須要反覆註冊Watcher,即程序中在process裏面又註冊了Watcher,不然,將沒法獲取c3節點的建立而致使子節點變化的事件。
② 異步方式
package com.hust.grid.leesf.examples; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class Zookeeper_GetChildren_API_ASync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk = null; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new Zookeeper_GetChildren_API_ASync_Usage()); connectedSemaphore.await(); zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path + "/c1"); zk.getChildren(path, true, new IChildren2Callback(), null); zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path + "/c2"); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeChildrenChanged) { try { System.out.println("ReGet Child:" + zk.getChildren(event.getPath(), true)); } catch (Exception e) { } } } } } class IChildren2Callback implements AsyncCallback.Children2Callback { public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: " + ctx + ", children list: " + children + ", stat: " + stat); } }
運行結果以下
success create znode: /zk-book success create znode: /zk-book/c1 Get Children znode result: [response code: 0, param path: /zk-book, ctx: null, children list: [c1], stat: 2901,2901,1478226062843,1478226062843,0,1,0,0,0,1,2902 success create znode: /zk-book/c2 ReGet Child:[c1, c2]
結果表示經過異步的方式能夠獲取子節點信息。
3.4 數據節點獲取
對於節點的數據獲取,一樣存在同步和異步兩種方式。
① 同步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class GetData_API_Sync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk = null; private static Stat stat = new Stat(); public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new GetData_API_Sync_Usage()); connectedSemaphore.await(); zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path); System.out.println("the data of znode " + path + " is : " + new String(zk.getData(path, true, stat))); System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion()); zk.setData(path, "123".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeDataChanged) { try { System.out.println("the data of znode " + event.getPath() + " is : " + new String(zk.getData(event.getPath(), true, stat))); System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion()); } catch (Exception e) { } } } } }
運行結果以下
success create znode: /zk-book the data of /zk-book is : 123 czxID: 2924, mzxID: 2924, version: 0 the data of /zk-book is : 123 czxID: 2924, mzxID: 2925, version: 1
結果代表可使用getData函數獲取節點的數據。
② 異步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class GetData_API_ASync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new GetData_API_ASync_Usage()); connectedSemaphore.await(); zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path); zk.getData(path, true, new IDataCallback(), null); zk.setData(path, "123".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == EventType.NodeDataChanged) { try { zk.getData(event.getPath(), true, new IDataCallback(), null); } catch (Exception e) { } } } } } class IDataCallback implements AsyncCallback.DataCallback { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println("rc: " + rc + ", path: " + path + ", data: " + new String(data)); System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion()); } }
運行結果以下
success create znode: /zk-book rc: 0, path: /zk-book, data: 123 czxID: 2932, mzxID: 2932, version: 0 rc: 0, path: /zk-book, data: 123 czxID: 2932, mzxID: 2933, version: 1
結果代表採用異步方式一樣可方便獲取節點的數據。
3.5 更新數據
在更新數據時,setData方法存在一個version參數,其用於指定節點的數據版本,代表本次更新操做是針對指定的數據版本進行的,可是,在getData方法中,並無提供根據指定數據版原本獲取數據的接口,那麼,這裏爲什麼要指定數據更新版本呢,這裏方便理解,能夠等效於CAS(compare and swap),對於值V,每次更新以前都會比較其值是不是預期值A,只有符合預期,纔會將V原子化地更新到新值B。Zookeeper的setData接口中的version參數能夠對應預期值,代表是針對哪一個數據版本進行更新,假如一個客戶端試圖進行更新操做,它會攜帶上次獲取到的version值進行更新,而若是這段時間內,Zookeeper服務器上該節點的數據已經被其餘客戶端更新,那麼其數據版本也會相應更新,而客戶端攜帶的version將沒法匹配,沒法更新成功,所以能夠有效地避免分佈式更新的併發問題。
① 同步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class SetData_API_Sync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new SetData_API_Sync_Usage()); connectedSemaphore.await(); zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path); zk.getData(path, true, null); Stat stat = zk.setData(path, "456".getBytes(), -1); System.out.println("czxID: " + stat.getCzxid() + ", mzxID: " + stat.getMzxid() + ", version: " + stat.getVersion()); Stat stat2 = zk.setData(path, "456".getBytes(), stat.getVersion()); System.out.println("czxID: " + stat2.getCzxid() + ", mzxID: " + stat2.getMzxid() + ", version: " + stat2.getVersion()); try { zk.setData(path, "456".getBytes(), stat.getVersion()); } catch (KeeperException e) { System.out.println("Error: " + e.code() + "," + e.getMessage()); } Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } } } }
運行結果以下
success create znode: /zk-book czxID: 2936, mzxID: 2937, version: 1 czxID: 2936, mzxID: 2938, version: 2 Error: BADVERSION,KeeperErrorCode = BadVersion for /zk-book
結果代表因爲攜帶的數據版本不正確,而沒法成功更新節點。其中,setData中的version參數設置-1含義爲客戶端須要基於數據的最新版本進行更新操做。
② 異步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class SetData_API_ASync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new SetData_API_ASync_Usage()); connectedSemaphore.await(); zk.create(path, "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success create znode: " + path); zk.setData(path, "456".getBytes(), -1, new IStatCallback(), null); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } } } } class IStatCallback implements AsyncCallback.StatCallback { public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat); } }
運行結果以下
success create znode: /zk-book
rc: 0, path: /zk-book, stat: 2942,2943,1478228414526,1478228414545,1,0,0,96876700808708136,3,0,2942
rc(ResultCode)爲0,代表成功更新節點數據。
3.6 檢測節點是否存在
在調用接口時註冊Watcher的話,還能夠對節點是否存在進行監聽,一旦節點被建立、被刪除、數據更新,都會通知客戶端。
① 同步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class Exist_API_Sync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, // new Exist_API_Sync_Usage()); connectedSemaphore.await(); zk.exists(path, true); zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.setData(path, "123".getBytes(), -1); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path + "/c1"); zk.delete(path + "/c1", -1); zk.delete(path, -1); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { try { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (EventType.NodeCreated == event.getType()) { System.out.println("success create znode: " + event.getPath()); zk.exists(event.getPath(), true); } else if (EventType.NodeDeleted == event.getType()) { System.out.println("success delete znode: " + event.getPath()); zk.exists(event.getPath(), true); } else if (EventType.NodeDataChanged == event.getType()) { System.out.println("data changed of znode: " + event.getPath()); zk.exists(event.getPath(), true); } } } catch (Exception e) { } } }
運行結果以下
success create znode: /zk-book data changed of znode: /zk-book success create znode: /zk-book/c1 success delete znode: /zk-book
結果代表:
· 不管節點是否存在,均可以經過exists接口註冊Watcher。
· 註冊的Watcher,對節點建立、刪除、數據更新事件進行監聽。
· 對於指定節點的子節點的各類變化,不會通知客戶端。
② 異步方式
package com.hust.grid.leesf.examples; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class Exist_API_ASync_Usage implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private static ZooKeeper zk; public static void main(String[] args) throws Exception { String path = "/zk-book"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new Exist_API_ASync_Usage()); connectedSemaphore.await(); zk.exists(path, true, new IIStatCallback(), null); zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.setData(path, "123".getBytes(), -1); zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("success create znode: " + path + "/c1"); zk.delete(path + "/c1", -1); zk.delete(path, -1); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { try { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (EventType.NodeCreated == event.getType()) { System.out.println("success create znode: " + event.getPath()); zk.exists(event.getPath(), true, new IIStatCallback(), null); } else if (EventType.NodeDeleted == event.getType()) { System.out.println("success delete znode: " + event.getPath()); zk.exists(event.getPath(), true, new IIStatCallback(), null); } else if (EventType.NodeDataChanged == event.getType()) { System.out.println("data changed of znode: " + event.getPath()); zk.exists(event.getPath(), true, new IIStatCallback(), null); } } } catch (Exception e) { } } } class IIStatCallback implements AsyncCallback.StatCallback { public void processResult(int rc, String path, Object ctx, Stat stat) { System.out.println("rc: " + rc + ", path: " + path + ", stat: " + stat); } }
運行結果以下
rc: -101, path: /zk-book, stat: null success create znode: /zk-book rc: 0, path: /zk-book, stat: 2974,2974,1478229717889,1478229717889,0,0,0,0,0,0,2974 data changed of znode: /zk-book rc: 0, path: /zk-book, stat: 2974,2975,1478229717889,1478229717922,1,0,0,0,3,0,2974 success create znode: /zk-book/c1 success delete znode: /zk-book rc: -101, path: /zk-book, stat: null
結果代表當節點不存在時,其rc(ResultCode)爲-101。
3.7 權限控制
經過設置Zookeeper服務器上數據節點的ACL控制,就能夠對其客戶端對該數據節點的訪問權限:若是符合ACL控制,則能夠進行訪問,不然沒法訪問。
① 使用無權限信息的Zookeeper會話訪問含權限信息的數據節點
package com.hust.grid.leesf.examples; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class AuthSample_Get { final static String PATH = "/zk-book-auth_test"; public static void main(String[] args) throws Exception { ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper1.addAuthInfo("digest", "foo:true".getBytes()); zookeeper1.create(PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); System.out.println("success create znode: " + PATH); ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper2.getData(PATH, false, null); } }
運行結果以下
success create znode: /zk-book-auth_test Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-book-auth_test at org.apache.zookeeper.KeeperException.create(KeeperException.java:113) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184) at com.hust.grid.leesf.examples.AuthSample_Get.main(AuthSample_Get.java:17)
表示權限不夠,不能進行操做。
② 刪除帶權限控制的節點
package com.hust.grid.leesf.examples; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; public class AuthSample_Delete { final static String PATH = "/zk-book-auth_test"; final static String PATH2 = "/zk-book-auth_test/child"; public static void main(String[] args) throws Exception { ZooKeeper zookeeper1 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper1.addAuthInfo("digest", "foo:true".getBytes()); zookeeper1.create(PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); zookeeper1.create(PATH2, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); try { ZooKeeper zookeeper2 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper2.delete(PATH2, -1); } catch (Exception e) { System.out.println("fail to delete: " + e.getMessage()); } ZooKeeper zookeeper3 = new ZooKeeper("127.0.0.1:2181", 5000, null); zookeeper3.addAuthInfo("digest", "foo:true".getBytes()); zookeeper3.delete(PATH2, -1); System.out.println("success delete znode: " + PATH2); ZooKeeper zookeeper4 = new ZooKeeper("127.00.1:2181", 5000, null); zookeeper4.delete(PATH, -1); System.out.println("success delete znode: " + PATH); } }
運行結果以下
fail to delete: KeeperErrorCode = NoAuth for /zk-book-auth_test/child success delete znode: /zk-book-auth_test/child success delete znode: /zk-book-auth_test
結果代表若沒有權限,則沒法刪除節點。
4、總結
基於原生態的JAVA API的調用相對較簡單,筆者後續會對源碼進行分析。本博客的全部代碼也同步上傳至github,也謝謝各位園友的觀看~