Curator提供了對zookeeper客戶端的封裝,並監控鏈接狀態和會話session,特別是會話session過時後,curator可以從新鏈接zookeeper,而且建立一個新的session。html
對於zk的使用者來講,session的概念相當重要,若是想了解更多session的說明,請訪問:java
http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.htmlapache
zk客戶端和zk服務器間主要可能存在下面幾種異常狀況:api
特別提示:watcher僅僅是一次性的,zookeeper通知了watcher事件後,就會將這個watcher從session中刪除,所以,若是想繼續監控,就要添加新的watcher。服務器
下面提供了對persistent和ephemeral兩種類型節點的監控方法,其中get方法說明了persistent節點如何監控,而register方法說明了ephemeral類型的節點如何監控。session
package demo; import java.net.InetAddress; import java.nio.charset.Charset; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.RetryNTimes; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; public class CuratorTest { private CuratorFramework zkTools; private ConcurrentSkipListSet<String> watchers = new ConcurrentSkipListSet<String>(); private static Charset charset = Charset.forName("utf-8"); public CuratorTest() { zkTools = CuratorFrameworkFactory.builder() .connectString("192.168.0.216:3306") .namespace("zk/test") .retryPolicy(new RetryNTimes(2000, 20000)) .build(); zkTools.start(); } public void addReconnectionWatcher(final String path, final ZookeeperWatcherType watcherType, final CuratorWatcher watcher) { synchronized (this) { if (!watchers.contains(watcher.toString()))// 不要添加劇復的監聽事件 { watchers.add(watcher.toString()); System.out.println("add new watcher " + watcher); zkTools.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println(newState); if (newState == ConnectionState.LOST) {// 處理session過時 try { if (watcherType == ZookeeperWatcherType.EXITS) { zkTools.checkExists().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_CHILDREN) { zkTools.getChildren().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.GET_DATA) { zkTools.getData().usingWatcher(watcher).forPath(path); } else if (watcherType == ZookeeperWatcherType.CREATE_ON_NO_EXITS) { // ephemeral類型的節點session過時了,須要從新建立節點,而且註冊監聽事件,以後監聽事件中, // 會處理create事件,將路徑值恢復到先前狀態 Stat stat = zkTools.checkExists().usingWatcher(watcher) .forPath(path); if (stat == null) { System.err.println("to create"); zkTools.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); } } } catch (Exception e) { e.printStackTrace(); } } } }); } } } public void create() throws Exception { zkTools.create()// 建立一個路徑 .creatingParentsIfNeeded()// 若是指定的節點的父節點不存在,遞歸建立父節點 .withMode(CreateMode.PERSISTENT)// 存儲類型(臨時的仍是持久的) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)// 訪問權限 .forPath("zk/test");// 建立的路徑 } public void put() throws Exception { // 對路徑節點賦值 zkTools.setData().forPath("zk/test", "hello world".getBytes(Charset.forName("utf-8"))); } public void get() throws Exception { String path = "zk/test"; ZKWatch watch = new ZKWatch(path); byte[] buffer = zkTools.getData().usingWatcher(watch).forPath(path); System.out.println(new String(buffer, charset)); // 添加session過時的監控 addReconnectionWatcher(path, ZookeeperWatcherType.GET_DATA, watch); } public void register() throws Exception { String ip = InetAddress.getLocalHost().getHostAddress(); String registeNode = "zk/register/" + ip;// 節點路徑 byte[] data = "disable".getBytes(charset);// 節點值 CuratorWatcher watcher = new ZKWatchRegister(registeNode, data); // 建立一個register watcher Stat stat = zkTools.checkExists().forPath(registeNode); if (stat != null) { zkTools.delete().forPath(registeNode); } zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(registeNode, data);// 建立的路徑和值 // 添加到session過時監控事件中 addReconnectionWatcher(registeNode, ZookeeperWatcherType.CREATE_ON_NO_EXITS, watcher); data = zkTools.getData().usingWatcher(watcher).forPath(registeNode); System.out.println("get path form zk : " + registeNode + ":" + new String(data, charset)); } public static void main(String[] args) throws Exception { CuratorTest test = new CuratorTest(); test.get(); test.register(); Thread.sleep(10000000000L); } public class ZKWatch implements CuratorWatcher { private final String path; public String getPath() { return path; } public ZKWatch(String path) { this.path = path; } @Override public void process(WatchedEvent event) throws Exception { System.out.println(event.getType()); if (event.getType() == EventType.NodeDataChanged) { byte[] data = zkTools.getData().usingWatcher(this).forPath(path); System.out.println(path + ":" + new String(data, Charset.forName("utf-8"))); } } } public class ZKWatchRegister implements CuratorWatcher { private final String path; private byte[] value; public String getPath() { return path; } public ZKWatchRegister(String path, byte[] value) { this.path = path; this.value = value; } @Override public void process(WatchedEvent event) throws Exception { System.out.println(event.getType()); if (event.getType() == EventType.NodeDataChanged) { // 節點數據改變了,須要記錄下來,以便session過時後,可以恢復到先前的數據狀態 byte[] data = zkTools.getData().usingWatcher(this).forPath(path); value = data; System.out.println(path + ":" + new String(data, charset)); } else if (event.getType() == EventType.NodeDeleted) { // 節點被刪除了,須要建立新的節點 System.out.println(path + ":" + path + " has been deleted."); Stat stat = zkTools.checkExists().usingWatcher(this).forPath(path); if (stat == null) { zkTools.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path); } } else if (event.getType() == EventType.NodeCreated) { // 節點被建立時,須要添加監聽事件(建立多是因爲session過時後,curator的狀態監聽部分觸發的) System.out.println(path + ":" + " has been created!" + "the current data is " + new String(value)); zkTools.setData().forPath(path, value); zkTools.getData().usingWatcher(this).forPath(path); } } } public enum ZookeeperWatcherType { GET_DATA, GET_CHILDREN, EXITS, CREATE_ON_NO_EXITS } }