<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.4-beta</version> </dependency>
/** * 建立 zookeeper 會話 * <p> * <p> * zookeeper 客戶端 和 服務端建立會話的過程是異步的。也就是客戶度經過構造方法建立會話後當即返回,此時的鏈接並無徹底創建。 * 當真正的會話創建完成後,zk服務端會給客戶端通知一個事件,客戶端獲取通知以後在代表鏈接正在創建。 */ public class ZooKeeperClientSession implements Watcher { //用於等待zk服務端通知 private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2183", 5000, new ZooKeeperClientSession()); System.out.println(zooKeeper.getState()); latch.await(); long sessionId = zooKeeper.getSessionId(); byte[] sessionPasswd = zooKeeper.getSessionPasswd(); System.out.println(zooKeeper.getSessionId()); /** * 利用 sessionId 和 sessionPasswd 複用會話鏈接 */ ZooKeeper zooKeeper1 = new ZooKeeper("127.0.0.1:2183", 5000, new ZooKeeperClientSession(), sessionId, sessionPasswd); System.out.println(zooKeeper1.getSessionId()); } /** * 處理 zookeeper 服務端的 Watcher 通知 * @param watchedEvent */ public void process(WatchedEvent watchedEvent) { System.out.println("receive watch event : " + watchedEvent); if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { latch.countDown(); } } }
1.構造函數說明
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
參數 | 做用 |
---|---|
connectString | zk服務器列表,由英文逗號分開的字符串,例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183;也能夠是帶有目錄的字符:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/zk-book |
sessionTimeout | 會話超時時間,以毫秒爲單位。在一個會話週期內,zk客戶端和服務端經過心跳來檢查鏈接的有效性,一旦在sessionTimeout時間內沒有進行心跳檢測,則會話失效 |
watcher | zk容許客戶端在構造方法中傳入一個Watcher接口實現類做爲事件通知處理器 |
sessionId、sessionPasswd | 利用sessionId 和 sessionPasswd 確保複用會話鏈接 |
canBeReadOnly | 用於標識當前會話是否支付只讀模式。在zk集羣模式中,若是一臺集羣和集羣中過半以上的機器都都失去了網絡鏈接,那麼這個機器將再也不處理客戶端請求,包括讀寫請求。但在某些狀況下出現相似問題,咱們但願該臺機器可以處理讀請求,此時爲 read-only 模式 |
1.同步建立臨時節點
/** * 同步 api 建立臨時節點 */ public class CreatedSyncNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreatedSyncNode()); latch.await(); //臨時節點:建立接口返回該節點路徑L,例如返回值 /zk-test String path = zooKeeper.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("created path success : " + path); // 臨時順序節點:會自動的在節點路徑後加一個數字,例如返回值:/zk-test0000000001 String path1 = zooKeeper.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("created path success : " + path1); } public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { latch.countDown(); } } }
2.異步建立節點
/** * 異步 api 建立持久化節點 */ public class CreatedAsyncNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception { ZooKeeper zooKeeper = new ZooKeeper( "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreatedAsyncNode()); latch.await(); //持久化節點:建立接口返回該節點路徑,無返回值;異步建立的節點:/zk-test zooKeeper.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CallBack(), "PERSISTENT"); //持久化順序節點:會自動的在節點路徑後加一個數字,該方法無返回值,建立後節點:/zk-test-seq0000000002 zooKeeper.create("/zk-test-seq", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new CallBack(), "PERSISTENT_SEQUENTIAL"); Thread.sleep(1000); } public void process(WatchedEvent watchedEvent) { if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState()) { latch.countDown(); } } static class CallBack implements AsyncCallback.StringCallback { /** * 服務端回調方法 * * @param code 服務端響應碼:0 接口調用成功;-4 客戶端和服務端鏈接斷開;-110 節點已存在 ;-112 會話過時 * @param path 建立節點傳入的路徑參數 * @param ctx 異步建立api傳入的ctx參數 * @param name 服務端真正建立節點的名稱,業務邏輯應該以該值爲準 */ public void processResult(int code, String path, Object ctx, String name) { System.out.println("created success : " + code + "," + path + "," + ctx + "," + name); } } }
讀取數據包括獲取子節點列表和節點的數據內容。zookeeper 中分別提供了 getChildren 和 getData 方法。html
參數名 | 參數類型 | 做用 |
---|---|---|
path | String | 指定節點的路徑 |
watcher | Watcher | 註冊一個Watcher,一旦在本次子節點獲取到以後,子節點列表發送變動時,那麼會向客戶端發送通知 |
watch | boolean | 代表是否須要註冊一個 |
stat | org.apache.zookeeper.data.Stat | 節點的狀態信息,有時候咱們不只須要最新的子節點列表,還要獲取這個節點的最新狀態信息,咱們能夠將一箇舊的 stat 傳入到api方法中,在方法執行過程當中 stat 會被來自服務的新的 stat 替換掉 |
cb | AsyncCallback.ChildrenCallback | 異步回調函數 |
ctx | Object | 用於傳遞上下文信息 |
/** * 同步 api 建立節點,而後同步獲取節點列表 */ public class CreatedSyncNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new CreatedSyncNode()); latch.await(); String nodePath = "/zk-data"; zooKeeper.create(nodePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> list = zooKeeper.getChildren(nodePath, true); System.out.println("get children : " + list); //給節點 /zk-data 再添加子節點 /conf zooKeeper.create(nodePath + "/conf", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Thread.sleep(1000); } /** * 監聽子節點的變化 * * @param watchedEvent */ public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { //監聽到子節點變化後,主動的去獲取節點列表 try { List<String> list = zooKeeper.getChildren(watchedEvent.getPath(), true); System.out.println("get children node changed : " + list); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
重點是在獲取節點列表 api 中傳入回調對象,該對象實現 Children2Callback
接口java
/** * 異步查詢節點列表 */ public class AsyncQueryNode implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { //建立異步會話 zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncQueryNode()); latch.await(); String nodePath = "/zk-book"; zooKeeper.create(nodePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //表示異步獲取子節點列表,在 ChildrenCallback 中對節點列表進行處理 zooKeeper.getChildren(nodePath, true, new ChildrenCallback(), "query children"); //給節點 /zk-data 再添加子節點 /conf zooKeeper.create(nodePath + "/src", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Thread.sleep(1000); } /** * 監聽子節點的變化 * * @param watchedEvent */ public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { //監聽到子節點變化後,主動的去獲取節點列表 try { List<String> list = zooKeeper.getChildren(watchedEvent.getPath(), true); System.out.println("get children node changed : " + list); } catch (Exception e) { e.printStackTrace(); } } } } static class ChildrenCallback implements AsyncCallback.Children2Callback { /** * @param code 服務端響應碼 * @param path 建立節點傳入的路徑參數 * @param ctx 上下文信息 * @param list 子節點列表 * @param stat 節點的狀態信息 */ public void processResult(int code, String path, Object ctx, List<String> list, Stat stat) { System.out.println("code : " + code + ";path : " + path + ";ctx : " + ctx + ";children list : " + list + ";stat : " + stat); } } }
獲取節點數據內容方法,getData的用法和getChildren差很少,getData 方法的參數中也有一個 Watcher 參數,該 Watcher 的做用是客戶端拿到節點的數據以後,能夠進行 Watcher 註冊,一旦該節點的狀態發送變化,服務端會發送 NoteDataChanged
事件告訴客戶端。node
/** * 同步獲取節點數據 */ public class SyncGetData implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; private static Stat stat = new Stat(); public static void main(String[] args) throws Exception { String path = "/zk-demo"; zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new SyncGetData()); latch.await(); zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //直接獲取節點數據 byte[] datas = zooKeeper.getData(path, true, stat); System.out.println("結果:" + new String(datas)); System.out.println(stat); //修改節點數據 zooKeeper.setData(path, "123".getBytes(), -1); Thread.sleep(1000); } public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); //節點變化時進行通知,EventType.NodeDataChanged 事件 } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { try { byte[] datas = zooKeeper.getData(watchedEvent.getPath(), true, stat); System.out.println("回調通知:" + new String(datas)); System.out.println(stat); } catch (Exception e) { e.printStackTrace(); } } } } }
運行結果:apache
實現異步回調接口 DataCallback
,接收回調通知api
public class AsyncGetData implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static ZooKeeper zooKeeper; public static void main(String[] args) throws Exception { String path = "/zk-async"; zooKeeper = new ZooKeeper("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", 5000, new AsyncGetData()); latch.await(); zooKeeper.create(path, "456".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //設置異步獲取節點數據 zooKeeper.getData(path, true, new DataCallback(), null); //再次更新節點數據 zooKeeper.setData(path, "789".getBytes(), -1); Thread.sleep(1000); } public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) { latch.countDown(); //節點變化時進行通知,EventType.NodeDataChanged 事件 } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) { try { //再次異步獲取變化後的節點數據 zooKeeper.getData(watchedEvent.getPath(), true, new DataCallback(), null); } catch (Exception e) { e.printStackTrace(); } } } } //接收服務端回調 static class DataCallback implements AsyncCallback.DataCallback { public void processResult(int code, String path, Object ctx, byte[] data, Stat stat) { System.out.println("回調通知的節點數據:" + new String(data)); System.out.println("code : " + code + ";path : " + path + ";stat : " + stat); } } }
測試結果:服務器