Curator是Netflix公司開源的一套zookeeper客戶端框架。java
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, 5000,3000,new RetryNTimes(10, 5000));
一、ZK_ADDRESS 鏈接zk的地址 格式host1:port1,host2:port2,。。。
二、sessionTimeoutMs 會話超時時間,單位是毫秒,可不填測此參數,默認值爲60000ms
三、connectionTimeoutMs
四、retryPolicy 重試策略,內建有四種重試策略,也能夠自行實現RetryPolicy接口。
client.start()
四種節點
PERSISTENT:持久化
PERSISTENT_SEQUENTIAL:持久化而且帶序列號
EPHEMERAL:臨時
EPHEMERAL_SEQUENTIAL:臨時而且帶序列號node
一、默認持久化節點
client.create().forPath("/data1");
二、建立持久化節點並賦值
client.create().forPath("/data2", "this is data2".getBytes());
三、建立臨時空節點
client.create().withMode(CreateMode.EPHEMERAL).forPath("/data3");
四、建立臨時節點並賦值
client.create().withMode(CreateMode.EPHEMERAL).forPath("/data4", "this is data4".getBytes());
五、建立臨時有序節點並賦值並遞歸建立父節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPH_SEQ).forPath("/patent/data", "children".getBytes());git
流式風格,拼接順序能夠調整。
一、只能刪除葉子節點,不然拋出異常
client.delete().forPath("/data1");
二、刪除一個節點並遞歸刪除其全部的子節點
client.delete().deletingChildrenIfNeeded().forPath("/patent/data");
三、保證強制刪除一個節點,只要客戶端會話有效就會持續進行刪除直到刪除成功。
client.delete().guaranteed().forPath("/data2");github
一、讀節點的數據內容,返回byte數組
byte[] data=client.getData().forPath("/data2");
二、讀取一個節點的數據內容,同時獲取到該節點的stat
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("data4");
三、獲取一個路徑下全部的子節點
List<String> childrens = client.getChildren().forPath("/");
四、檢查節點是否存在,返回一個stat
stat= client.checkExists().forPath("/data3");apache
一、更新一個節點的數據內容,返回一個stat
Stat stat = client.setData().forPath("/data1","update data1 data".getBytes());api
CuratorFramework的實例包含inTransaction()接口方法,調用此方法開啓一個ZooKeeper事務. 能夠複合create, setData, check, and/or delete 等操做而後調用commit()做爲一個原子操做提交。 數組
client.inTransaction()
.and().create().withMode(CreateMode.PERSISTENT).forPath("/data5", "this is data5".getBytes())
.and().setData().forPath("/data5", "update data5".getBytes())
.and().commit();緩存
Zookeeper原生支持經過註冊Watcher來進行事件監聽,可是開發者須要反覆註冊(Watcher只能單次註冊單次使用)。Cache是Curator中對事件監聽的包裝,能夠看做是對事件監聽的本地緩存視圖,可以自動爲開發者處理反覆註冊監聽。Curator提供了三種Watcher(Cache)來監聽結點的變化。session
一、Path Cache框架
Path Cache用來監控一個ZNode的子節點. 當一個子節點增長, 更新,刪除時, Path Cache會改變它的狀態, 會包含最新的子節點, 子節點的數據和狀態,而狀態的更變將經過PathChildrenCacheListener通知。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.zookeeper.CreateMode; import java.util.List; public class CuratorWatchTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //建立客戶端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { PathChildrenCache cache = pathCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes()); Thread.sleep(500); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes()); Thread.sleep(500); //獲取全部子節點 List<ChildData> datas =cache.getCurrentData(); for(ChildData childData : datas){ System.out.println("節點路徑:"+childData.getPath() + ",節點值:" + new String(childData.getData())); } client.setData().forPath(PATH + "/data1", "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(500); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static PathChildrenCache pathCacheTest(CuratorFramework client) throws Exception { //第三個參數不爲true時,不會緩存data數據 PathChildrenCache cache = new PathChildrenCache(client, PATH, true); /* 三種啓動方式 NORMAL:正常初始化。 BUILD_INITIAL_CACHE:在調用start()以前會調用rebuild()。 POST_INITIALIZED_EVENT: 當Cache初始化數據後發送一個 cache.start(PathChildrenCache.StartMode.NORMAL); */ cache.start(); //增長監聽事件 cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("事件爲:" + event.getType() + ",數據爲:" + new String(event.getData().getData())); } }); return cache; } }
2)Node Cache
Node Cache與Path Cache相似,Node Cache只能監聽某一個特定的節點。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.zookeeper.CreateMode; public class CuratorNodeCacheTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //建立客戶端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { NodeCache cache = nodeCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "first data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH, "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(500); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static NodeCache nodeCacheTest(CuratorFramework client) throws Exception { //只能監控一個節點 NodeCache cache = new NodeCache(client, PATH); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData childData = cache.getCurrentData(); if (childData == null) { System.out.println("節點刪除!"); } else { System.out.println("節點數據:" + new String(cache.getCurrentData().getData())); } } }); cache.start(); return cache; } }
3)Tree Cache
Tree Cache能夠監控整個樹上的全部節點,相似於PathCache和NodeCache的組合。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.zookeeper.CreateMode; public class CuratorTreeCacheTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //建立客戶端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { TreeCache cache = treeCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes()); Thread.sleep(500); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH , "update path data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH + "/data1", "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(1000); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static TreeCache treeCacheTest(CuratorFramework client) throws Exception { //只能監控一個節點 TreeCache cache = new TreeCache(client, PATH); cache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("事件爲:" + event.getType() + ",節點路徑爲:" + event.getData().getPath() + ",數據爲:" + new String(event.getData().getData())); } }); cache.start(); return cache; } }
curator-recipes中有一些高級特性可使用,在後面的章節會具體介紹。
源碼地址: https://github.com/binary-vi/binary.github.io/tree/master/zk-curator