Apache Curator是一個比較完善的ZooKeeper客戶端框架,經過封裝的一套高級API 簡化了ZooKeeper的操做。經過查看官方文檔,能夠發現Curator主要解決了三類問題:java
1 public class CuratorBase { 2 //會話超時時間 3 private final int SESSION_TIMEOUT = 30 * 1000; 4 5 //鏈接超時時間 6 private final int CONNECTION_TIMEOUT = 3 * 1000; 7 8 //ZooKeeper服務地址 9 private static final String CONNECT_ADDR = "192.168.1.1:2100,192.168.1.1:2101,192.168.1.:2102"; 10 11 //建立鏈接實例 12 private CuratorFramework client = null; 13 14 public static void main(String[] args) throws Exception { 15 //1 重試策略:初試時間爲1s 重試10次 16 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); 17 //2 經過工廠建立鏈接 18 CuratorFramework client = CuratorFrameworkFactory.builder() 19 .connectString(CONNECT_ADDR).connectionTimeoutMs(CONNECTION_TIMEOUT) 20 .sessionTimeoutMs(SESSION_TIMEOUT) 21 .retryPolicy(retryPolicy) 22 //命名空間 .namespace("super") 23 .build(); 24 //3 開啓鏈接 25 cf.start(); 26 27 System.out.println(States.CONNECTED); 28 System.out.println(cf.getState()); 29 30 //建立永久節點 31 client.create().forPath("/curator","/curator data".getBytes()); 32 33 //建立永久有序節點 34 client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes()); 35 36 //建立臨時節點 37 client.create().withMode(CreateMode.EPHEMERAL) 38 .forPath("/curator/ephemeral","/curator/ephemeral data".getBytes()); 39 40 //建立臨時有序節點 41 client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes()); 42 43 client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator/ephemeral_path2","/curator/ephemeral_path2 data".getBytes()); 44 45 //測試檢查某個節點是否存在 46 Stat stat1 = client.checkExists().forPath("/curator"); 47 Stat stat2 = client.checkExists().forPath("/curator2"); 48 49 System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false)); 50 System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false)); 51 52 //獲取某個節點的全部子節點 53 System.out.println(client.getChildren().forPath("/")); 54 55 //獲取某個節點數據 56 System.out.println(new String(client.getData().forPath("/curator"))); 57 58 //設置某個節點數據 59 client.setData().forPath("/curator","/curator modified data".getBytes()); 60 61 //建立測試節點 62 client.create().orSetData().creatingParentContainersIfNeeded() 63 .forPath("/curator/del_key1","/curator/del_key1 data".getBytes()); 64 65 client.create().orSetData().creatingParentContainersIfNeeded() 66 .forPath("/curator/del_key2","/curator/del_key2 data".getBytes()); 67 68 client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes()); 69 70 //刪除該節點 71 client.delete().forPath("/curator/del_key1"); 72 73 //級聯刪除子節點 74 client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2"); 75 } 76 77 }
事務管理:node
* 事務管理:碰到異常,事務會回滾 * @throws Exception */ @Test public void testTransaction() throws Exception{ //定義幾個基本操做 CuratorOp createOp = client.transactionOp().create() .forPath("/curator/one_path","some data".getBytes()); CuratorOp setDataOp = client.transactionOp().setData() .forPath("/curator","other data".getBytes()); CuratorOp deleteOp = client.transactionOp().delete() .forPath("/curator"); //事務執行結果 List<CuratorTransactionResult> results = client.transaction() .forOperations(createOp,setDataOp,deleteOp); //遍歷輸出結果 for(CuratorTransactionResult result : results){ System.out.println("執行結果是: " + result.getForPath() + "--" + result.getType()); } } //由於節點「/curator」存在子節點,因此在刪除的時候將會報錯,事務回滾
Curator提供了三種Watcher(Cache)來監聽結點的變化:apache
/** * 在註冊監聽器的時候,若是傳入此參數,當事件觸發時,邏輯由線程池處理 */ ExecutorService pool = Executors.newFixedThreadPool(2); /** * 監聽數據節點的變化狀況 */ final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false); nodeCache.start(true); nodeCache.getListenable().addListener( new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("Node data is changed, new data: " + new String(nodeCache.getCurrentData().getData())); } }, pool ); /** * 監聽子節點的變化狀況 */ final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true); childrenCache.start(StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } }, pool ); client.setData().forPath("/zk-huey/cnode", "world".getBytes()); Thread.sleep(10 * 1000); pool.shutdown(); client.close();
分佈式編程時,好比最容易碰到的狀況就是應用程序在線上多機部署,因而當多個應用同時訪問某一資源時,就須要某種機制去協調它們。例如,如今一臺應用正在rebuild緩存內容,要臨時鎖住某個區域暫時不讓訪問;又好比調度程序每次只想一個任務被一臺應用執行等等。編程
下面的程序會啓動兩個線程t1和t2去爭奪鎖,拿到鎖的線程會佔用5秒。運行屢次能夠觀察到,有時是t1先拿到鎖而t2等待,有時又會反過來。Curator會用咱們提供的lock路徑的結點做爲全局鎖,這個結點的數據相似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次得到鎖時會生成這種串,釋放鎖時清空數據。緩存
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; /** * Curator framework's distributed lock test. */ public class CuratorDistrLockTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
當集羣裏的某個服務down機時,咱們可能要從slave結點裏選出一個做爲新的master,這時就須要一套能在分佈式環境中自動協調的Leader選舉方法。Curator提供了LeaderSelector監聽器實現Leader選舉功能。同一時刻,只有一個Listener會進入takeLeadership()方法,說明它是當前的Leader。注意:當Listener從takeLeadership()退出時就說明它放棄了「Leader身份」,這時Curator會利用Zookeeper再從剩餘的Listener中選出一個新的Leader。autoRequeue()方法使放棄Leadership的Listener有機會從新得到Leadership,若是不設置的話放棄了的Listener是不會再變成Leader的。session
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; /** * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */ public class CuratorLeaderTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }