上篇博客記錄了shell命令操做zookeeper集羣的方式,此次嘗試採用java代碼來操做。經過查閱API,發現並不困難。java
//zookeeper客戶端 private ZooKeeper zkCli; //鏈接地址 private static final String CONNECT_STRING = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; //session過時時間 private static final int SESSION_TIMEOUT = 2000; /** * 建立客戶端實例對象 * * @throws IOException */ @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, (event) -> { System.out.println("默認的回調函數"); }); }
@Test public void ls() throws KeeperException, InterruptedException { List<String> children = zkCli.getChildren("/", e -> { System.out.println("自定義回調函數"); }); System.out.println("---------------------"); for (String child : children) { System.out.println(child); } System.out.println("+++++++++++++++++++++"); Thread.sleep(Long.MAX_VALUE); }
@Test public void create() throws KeeperException, InterruptedException { //parameter1:建立的節點路徑 parameter2:節點的數據 parameter3:節點權限 parameter4:節點類型 String str = zkCli.create("/idea", "idea2019".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(str); Thread.sleep(Long.MAX_VALUE); }
@Test public void get() throws KeeperException, InterruptedException { byte[] data = zkCli.getData("/simon", true, new Stat()); String str = new String(data); System.out.println(str); }
@Test public void set() throws KeeperException, InterruptedException { Stat stat = zkCli.setData("/simon", "abcd".getBytes(), 1); System.out.println(stat.getDataLength()); }
@Test public void stat() throws KeeperException, InterruptedException { Stat exists = zkCli.exists("/ideaa", false); if (exists == null) { System.out.println("節點不存在"); } else { System.out.println(exists.getDataLength()); } }
@Test public void delete() throws KeeperException, InterruptedException { Stat exists = zkCli.exists("/idea", false); if (exists != null) { zkCli.delete("/idea", exists.getVersion()); } }
也就是當節點有任何變化時,就會調用回調函數,動態的顯示節點的變化信息。shell
@Test public void register() throws KeeperException, InterruptedException { byte[] data = zkCli.getData("/simon2", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { register();//發生事件,通知的時候在註冊一遍,實現了循環調用 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }, null); System.out.println(new String(data)); } @Test public void testRegister() { try { register(); //調用register()方法,將進程卡住,循環執行register Thread.sleep(Long.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } }
代碼很簡單易懂,只是有些內部原理仍是有些模糊,例如:Stat是什麼?zookeeper是怎麼監聽到節點發生變化的?又是如何將變化通知給客戶端的?在以後的博客中將會詳細記錄。~服務器