構建操做包裝類(Builder): GetDataBuilder getData() ---- CuratorFrameworknode
GetDataBuilderapache
storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服務器端獲取的狀態數據存儲到stat對象api
Byte[] forPath (String path) // 節點路勁緩存
public void readNode(String path) throws Exception { Stat stat = new Stat(); byte[] data = client.getData().storingStatIn(stat).forPath(path); System.out.println("讀取節點" + path + "的數據:" + new String(data)); System.out.println(stat.toString()); }
構建操做包裝類 (Builder) SetDataBuilder setData() ---- CuratorFramework服務器
SetDataBuilder異步
withVersion (int version) // 特定版本號函數
forPath (String path, byte[] data) // 節點路勁ui
forPath (String path) // 節點路勁this
public void updateNode(String path, byte[] data, int version) throws Exception { client.setData().withVersion(version).forPath(path, data); }
構建操做包裝類(Builder):GetChildrenBuilder getChildren() --- CuratorFramework線程
GetChildrenBuilder
storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服務器端獲取的狀態數據存儲到stat對象
Byte[] forPath(String path) // 節點路徑
usingWatcher(org.apache.zookeeper.Watcher watcher) // 設置watcher ,相似zk自己api ,也只能使用一次
usingWatcher(CuratorWatcher watcher) // 設置watcher ,相似於zk自己的api,也只能使用一次
public void getChildren(String path) throws Exception { List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator"); for (String pth : children) { System.out.println("child=" + pth); } }
監聽數據節點的內容變動
監聽節點的建立,即若是指定的節點不存在,則節點建立後,會觸發這個監聽
監聽指定節點的子節點變化狀況
包括:新增子節點 子節點數據變動和子節點刪除
構造函數
NodeCache(CuratorFramework client, String path)
NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
client : 客戶端實例
path : 數據節點路徑
dataIsCompressed : 是否進行數據壓縮
public interface NodeCacheListener { // 沒有參數,怎麼獲取事件信息以及節點數據 void nodeChanged() throws Exception; }
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) { this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true)); } public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) { this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); } public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) { this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true)); } public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) { this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService)); }
public interface PathChildrenCacheListener { void childEvent(CuratorFramework var1, PathChildrenCacheEvent var2) throws Exception; }
client : 客戶端實例
path : 數據節點路徑
dataIsCompressed : 是否進行數據壓縮
cacheData : 用於配置是否把節點內容緩存起來,若是配置true,那麼客戶端在接收到節點列表變動時,也
可以獲取到節點的數據內容,若是爲false則沒法取到數據內容
threadFactory : 經過這兩個參數構造專門的線程池來處理事件通知
ExecutorService
public static enum Type { CHILD_ADDED, // 新增子節點(CHILD_ADDED) CHILD_UPDATED, // 子節點數據變動(CHILD_UPDATED) CHILD_REMOVED, // 子節點刪除(CHILD_REMOVED) CONNECTION_SUSPENDED, CONNECTION_RECONNECTED, CONNECTION_LOST, INITIALIZED; private Type() { } }
public static enum StartMode { // 異步初始化cache NORMAL, // 同步初始化客戶端的cache,及建立cache後,就從服務器端拉入對應的數據 BUILD_INITIAL_CACHE, // 異步初始化,初始化完成觸發事件, PathChildrenCacheEvent.Type.INITIALIZED POST_INITIALIZED_EVENT; private StartMode() { } }
public void addChildWatcher(String path) throws Exception { final PathChildrenCache cache = new PathChildrenCache(this.client, path, true); cache.start(StartMode.POST_INITIALIZED_EVENT); System.out.println(cache.getCurrentData().size()); //byte childone[] = cache.getCurrentData().get(0).getData(); // System.out.println("childone:" // + cache.getCurrentData().get(0).getPath() + ";data=" // + new String(childone)); cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){ System.out.println("客戶端子節點cache初始化數據完成"); System.out.println("size="+cache.getCurrentData().size()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ System.out.println("添加子節點:"+event.getData().getPath()); System.out.println("修改子節點數據:"+new String(event.getData().getData())); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ System.out.println("刪除子節點:"+event.getData().getPath()); }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("修改子節點數據:"+event.getData().getPath()); System.out.println("修改子節點數據:"+new String(event.getData().getData())); } } }); }