zookeeper -- 第八章 zk開源客戶端 Curator介紹 (下)

一、讀取數據

構建操做包裝類(Builder): GetDataBuilder getData() ---- CuratorFrameworknode

GetDataBuilderapache

  • storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服務器端獲取的狀態數據存儲到stat對象api

  • Byte[] forPath (String path) // 節點路勁緩存

public void readNode(String path) throws Exception {
		Stat stat = new Stat();
		byte[] data = client.getData().storingStatIn(stat).forPath(path);
		System.out.println("讀取節點" + path + "的數據:" + new String(data));
		System.out.println(stat.toString());
	}

二、更新數據

構建操做包裝類 (Builder) SetDataBuilder setData() ---- CuratorFramework服務器

SetDataBuilder異步

  • withVersion (int version) // 特定版本號函數

  • forPath (String path, byte[] data) // 節點路勁ui

  • forPath (String path) // 節點路勁this

public void updateNode(String path, byte[] data, int version)
			throws Exception {
		client.setData().withVersion(version).forPath(path, data);
	}

三、讀取子節點

構建操做包裝類(Builder):GetChildrenBuilder getChildren() --- CuratorFramework線程

GetChildrenBuilder

  • storingStatIn(org.apache.zookeeper.data.Stat stat) // 把服務器端獲取的狀態數據存儲到stat對象

  • Byte[] forPath(String path) // 節點路徑

  • usingWatcher(org.apache.zookeeper.Watcher watcher) // 設置watcher ,相似zk自己api ,也只能使用一次

  • usingWatcher(CuratorWatcher watcher) // 設置watcher ,相似於zk自己的api,也只能使用一次

public void getChildren(String path) throws Exception {
		List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/curator");
		for (String pth : children) {
			System.out.println("child=" + pth);
		}
	}

四、設置watcher

一、 NodeCache

  • 監聽數據節點的內容變動

  • 監聽節點的建立,即若是指定的節點不存在,則節點建立後,會觸發這個監聽

二、PathChildrenCache

  • 監聽指定節點的子節點變化狀況

  • 包括:新增子節點 子節點數據變動和子節點刪除

三、 NodeCache介紹

構造函數

  • NodeCache(CuratorFramework client, String path)

  • NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

client : 客戶端實例

path : 數據節點路徑

dataIsCompressed : 是否進行數據壓縮

  • 回調接口
public interface NodeCacheListener {
    // 沒有參數,怎麼獲取事件信息以及節點數據
    void nodeChanged() throws Exception;
}

四、PathChildrenCache介紹

一、 構造函數

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData) {
        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
    }

    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory) {
        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
    }

    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory) {
        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
    }

    public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ExecutorService executorService) {
        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
    }

二、 回調接口

public interface PathChildrenCacheListener {
    void childEvent(CuratorFramework var1, PathChildrenCacheEvent var2) throws Exception;
}

三、 構造函數參數

client : 客戶端實例

path : 數據節點路徑

dataIsCompressed : 是否進行數據壓縮

cacheData : 用於配置是否把節點內容緩存起來,若是配置true,那麼客戶端在接收到節點列表變動時,也

可以獲取到節點的數據內容,若是爲false則沒法取到數據內容

threadFactory : 經過這兩個參數構造專門的線程池來處理事件通知

ExecutorService

四、 監聽接口

public static enum Type {
        CHILD_ADDED,                                // 新增子節點(CHILD_ADDED)
        CHILD_UPDATED,                           // 子節點數據變動(CHILD_UPDATED)
        CHILD_REMOVED,                          // 子節點刪除(CHILD_REMOVED)
        CONNECTION_SUSPENDED,
        CONNECTION_RECONNECTED,
        CONNECTION_LOST,
        INITIALIZED;

        private Type() {
        }
    }

五、 PathChildrenCache.StartMode

public static enum StartMode {
         // 異步初始化cache
        NORMAL,  
                          
        // 同步初始化客戶端的cache,及建立cache後,就從服務器端拉入對應的數據
        BUILD_INITIAL_CACHE,     

        // 異步初始化,初始化完成觸發事件, PathChildrenCacheEvent.Type.INITIALIZED
        POST_INITIALIZED_EVENT;

        private StartMode() {
        }
    }

六、 代碼演示

public void addChildWatcher(String path) throws Exception {
		final PathChildrenCache cache = new PathChildrenCache(this.client,
				path, true);
		cache.start(StartMode.POST_INITIALIZED_EVENT);
		System.out.println(cache.getCurrentData().size());
		//byte childone[] = cache.getCurrentData().get(0).getData();
//		System.out.println("childone:"
//				+ cache.getCurrentData().get(0).getPath() + ";data="
//				+ new String(childone));
		cache.getListenable().addListener(new PathChildrenCacheListener() {
			public void childEvent(CuratorFramework client,
					PathChildrenCacheEvent event) throws Exception {
				if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
					System.out.println("客戶端子節點cache初始化數據完成");
					System.out.println("size="+cache.getCurrentData().size());
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
					System.out.println("添加子節點:"+event.getData().getPath());
					System.out.println("修改子節點數據:"+new String(event.getData().getData()));
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
					System.out.println("刪除子節點:"+event.getData().getPath());
				}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
					System.out.println("修改子節點數據:"+event.getData().getPath());
					System.out.println("修改子節點數據:"+new String(event.getData().getData()));
				}
			}
		});
	}
相關文章
相關標籤/搜索