Curator 操做是zookeeper的優秀api(相對於原生api),知足大部分需求.並且是Fluent流式api風格.java
參考文獻:https://www.jianshu.com/p/70151fc0ef5d 感謝分享,動手敲一遍留個印象node
curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操做,例如重試策略等
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等apache
環境:JDK1.8 、maven 、啓動三臺虛擬機作分部署環境 、 curator-recipes 4.0.1 、zookeeper3.4.8api
maven 依賴:不依賴zookeeper會報錯..session
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency>
測試增刪改查:
package com.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import java.util.List; /** * Created by Administrator on 2018/6/25. * @see org.apache.zookeeper.CreateMode * PERSISTENT:持久化 PERSISTENT_SEQUENTIAL:持久化而且帶序列號 EPHEMERAL:臨時 EPHEMERAL_SEQUENTIAL:臨時而且帶序列號 */ public class curatorRecipesDemo { final static String zookeeperAddress = "192.168.149.133:2181,192.168.149.135:2181,192.168.149.134:2181"; public static void main(String[] args) throws Exception { CuratorFramework curatorClint = CuratorFrameworkFactory.builder(). connectString(zookeeperAddress)//zkClint鏈接地址 .connectionTimeoutMs(2000)//鏈接超時時間 .sessionTimeoutMs(10000)//會話超時時間 .retryPolicy(new ExponentialBackoffRetry(1000, 3)) //重試策略 .namespace("myZookeeperTest") //命名空間,默認節點 .build(); curatorClint.start(); curatorClint.create().forPath("/path");//默認持久化節點,以斜槓開頭 System.out.println(curatorClint.getChildren().forPath("/")); curatorClint.create().withMode(CreateMode.EPHEMERAL) .forPath("/secondPath","hello,word".getBytes()); System.out.println("節點secondPath的數據"+new String(curatorClint.getData().forPath("/secondPath"))); curatorClint.setData().forPath("/secondPath","hello,myWorld!".getBytes()); System.out.println("節點secondPath的數據"+new String(curatorClint.getData().forPath("/secondPath"))); curatorClint.create() .creatingParentContainersIfNeeded() .forPath("/secondPath/second2/second3");//遞歸建立 List<String> list= curatorClint.getChildren().forPath("/secondPath");//查詢節點的全部字節點 System.out.println(list); curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath/second2");//遞歸刪除 System.out.println(curatorClint.checkExists().forPath("/secondPath/second2"));//判斷節點是否存在 System.out.println(curatorClint.checkExists().forPath("/secondPath/second2/second3"));//判斷節點是否存在 System.out.println(curatorClint.getChildren().forPath("/secondPath")); curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath"); //todo guaranteed()若是刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功爲止 //todo Stat stat 對象包含版本id,事物id等信息 } }
建立的節點能夠經過 zookeeper 安裝下的bin目錄 鏈接客戶端 sh zkCli.sh ls / 分開斜槓命令進行查看(或./zkCli.sh -timeout 5000 -server 127.0.0.1:2181)maven
監聽watcer api分佈式
package com.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.List; import java.util.Objects; /** * Created by Administrator on 2018/6/26. * * PathChildCache 監聽一個節點下子節點的建立、刪除、更新 * NodeCache 監聽一個節點的更新和建立事件 * TreeCache 綜合PatchChildCache和NodeCache的特性 */ public class WatcherDemo { final static String zookeeperAddress = "192.168.149.133:2181,192.168.149.135:2181,192.168.149.134:2181"; public static void main(String[] args) throws Exception { CuratorFramework curatorClint = CuratorFrameworkFactory.builder(). connectString(zookeeperAddress)//zkClint鏈接地址 .connectionTimeoutMs(2000)//鏈接超時時間 .sessionTimeoutMs(10000)//會話超時時間 .retryPolicy(new ExponentialBackoffRetry(1000, 3)) //重試策略 .namespace("myZookeeperTest") //命名空間,默認節點 .build(); curatorClint.start(); List<String> list= curatorClint.getChildren().forPath("/"); if(Objects.nonNull(list)){ if( !list.contains("myWatch")){ curatorClint.delete().deletingChildrenIfNeeded().forPath("/myWatch"); } }else { curatorClint.create().forPath("/myWatch"); } PathChildrenCache pathChildrenCache= pathChildrenCache = new PathChildrenCache(curatorClint,"/myWatch",false); PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("pathChildrenCacheListener::::->"+pathChildrenCacheEvent.getData()); } }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);//註冊監聽事件 pathChildrenCache.start(); NodeCache nodeCache=new NodeCache(curatorClint,"/myWatch",false); NodeCacheListener nodeCacheListener=new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("nodeCacheListener::::->:"+nodeCache.getCurrentData().getPath()); } }; nodeCache.getListenable().addListener(nodeCacheListener); nodeCache.start(); TreeCache treeCache=new TreeCache(curatorClint,"/myWatch"); TreeCacheListener treeCacheListener=new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("treeCacheListener::::->"+treeCacheEvent.getData()); } }; treeCache.getListenable().addListener(treeCacheListener); treeCache.start(); curatorClint.create().forPath("/myWatch/child22","生個好孩子".getBytes()); curatorClint.create().creatingParentContainersIfNeeded().forPath("/myWatch/child22/child22","生個好孩子".getBytes()); curatorClint.setData().forPath("/myWatch/child222","生個好孩子aaaa".getBytes()); System.in.read();//阻塞否則啓動後clint就關掉了 } }