爲了更好的實現Java操做zookeeper服務器,後來出現了Curator框架,很是的強大,目前已是Apache的頂級項目,裏面提供了更多豐富的操做,例如session超時重連、主從選舉、分佈式計數器、分佈式鎖等等適用於各類複雜的zookeeper場景的API封裝java |
Curator框架中使用鏈式編程風格,易讀性更強,使用工廠方法建立鏈接對象。apache 1.使用CuratorFrameworkFactory的兩個靜態工廠方法(參數不一樣)來實現編程 1.1 connectString:鏈接串緩存 1.2 retryPolicy:重試鏈接策略。有四種實現,分別是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed服務器 1.3sessionTimeoutMs:會話超時時間,默認爲60000mssession 1.4connectionTimeoutMs鏈接超時時間,默認爲15000ms併發
注意對於retryPolicy策略經過一個接口來讓用戶自定義實現框架 |
2.1建立鏈接dom
/** 重試策略: 初始時間爲1s, 重試10次 */分佈式 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
/** 經過工廠建立鏈接 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build();
/** 開啓鏈接 */ cf.start(); |
2.2 新增節點
/** * 新增節點:指定節點類型(不加withMode默認爲持久類型節點)、路徑、數據內容 * 1.creatingParentsIfNeeded() 遞歸建立父目錄 * 2.withMode() 節點類型(持久|臨時) * 3.forPath() 指定路徑 */ cf.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/super/c1", "c1內容".getBytes()); |
2.3 刪除節點
/** * 刪除節點 * 1.deletingChildrenIfNeeded() 遞歸刪除 * 2.guaranteed() 確保節點被刪除 * 3. withVersion(int version) //特定版本號 */ cf.delete().deletingChildrenIfNeeded().forPath("/super"); |
2.4 讀取和修改數據
/** * 讀取和修改數據 : getData()和setData() */ cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1內容".getBytes()); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2內容".getBytes());
/** 讀取節點內容 */ String c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("c2_data-->"+c2_data);
/** 修改節點內容 */ cf.setData().forPath("/super/c2", "修改c2的內容".getBytes()); String update_c2_data = new String(cf.getData().forPath("/super/c2")); System.out.println("update_c2_data-->"+update_c2_data); |
2.5 綁定回調函數
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void proce***esult(CuratorFramework cf, CuratorEvent event) throws Exception { System.out.println("code-->" + event.getResultCode()); System.out.println("type-->" + event.getType()); System.out.println("線程爲-->" + Thread.currentThread().getName()); } }, pool).forPath("/super/c3", "c2的內容".getBytes());
System.out.println("主線程-->" + Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE); |
2.6 讀取子節點和判斷節點是否存在
/** * 讀取子節點的方法: getChildren() * 判斷節點是否存在: checkExists() */ List<String> list = cf.getChildren().forPath("/super"); for (String p: list) { System.out.println(p); }
//若是爲null標識不存在 Stat stat = cf.checkExists().forPath("/super/c4"); System.out.println(stat); |
若是要使用相似Wather的監聽功能Curator必須依賴一個jar包,Maven依賴 <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.4.2</version> </dependency> 有了這個依賴包,使用NodeCache的方式去客戶端實例中註冊一個監聽緩存,而後實現對應的監聽方法便可,這裏主要有兩種監聽方式 NodeCacheListener:監聽節點的新增、修改操做 PathChildrenCacheListener:監聽子節點的新增、修改、刪除操做 |
4.1 分佈式鎖
在分佈式場景中,爲了保證數據的一致性,常常在程序運行的某一個點須要進行同步操做(java提供了synchronized或者Reentrantlock實現)好比看一個小示例,這個示例出現分佈式不一樣步的問題 好比:以前是在高併發下訪問一個程序,如今則是在高併發下訪問多個服務器節點(分佈式)
使用Curator基於zookeeper的特性提供的分佈式鎖來處理分佈式場景的數據一致性,zookeeper自己的分佈式是有寫問題的,以前實現的時候遇到過,這裏強烈推薦使用Curator分佈式鎖 public class Lock2 { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時時間 */ private static final int SESSION_TIMEOUT = 5000; //MS static int count = 10; public static CuratorFramework createCuratorFramework(){ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); return cf; } public static void main(String[] args) throws Exception { final CountDownLatch countDown = new CountDownLatch(1); for (int i =0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { CuratorFramework cf = createCuratorFramework(); cf.start(); //鎖對象 client 鎖節點 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); try { countDown.await(); lock.acquire(); //得到鎖 number(); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release();//釋放鎖 } catch (Exception e) { e.printStackTrace(); } } } },"t" + i).start();; } Thread.sleep(2000); countDown.countDown(); } public static void number() { count--; System.out.println(Thread.currentThread().getName() + "-->" + count); } } |
4.2 分佈式計數器功能
一說到分佈式計數器,可能腦海裏想到AtomicInteger(原子累加)這種經典方式,若是針對一個JVM的場景固然沒問題,可是如今是在分佈式場景下,就須要利用Curator框架的DistributedAtomicInteger了 public class CuratorAtomicInteger { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時時間 */ private static final int SESSION_TIMEOUT = 5000; //MS public static void main(String[] args) throws Exception { CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); //使用DistributedAtomicInteger DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000)); //atomicInteger.increment(); atomicInteger.add(1); AtomicValue<Integer> atomicValue = atomicInteger.get(); System.out.println("atomicValue.succeeded()-->" + atomicValue.succeeded()); System.out.println("atomicValue.postValue()-->" + atomicValue.postValue()); System.out.println("atomicValue.preValue()-->" + atomicValue.preValue()); } } |
4.3 Barrier
4.3.1 DistributedDoubleBarrier
分佈式Barrier 類DistributedDoubleBarrier: 它會阻塞全部節點上的等待進程,直到某一個被知足, 而後全部的節點同時開始,中間誰先運行完畢,誰後運行完畢不關心,可是最終必定是一塊退出運行的
public class CuratorBarrier { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時時間 */ private static final int SESSION_TIMEOUT = 5000; //MS public static void main(String[] args) throws Exception{ for (int i =0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { /** 實例化5個客戶端對象 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + " 已準備好!"); barrier.enter(); System.out.println("同時開始運行..."); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println("運行完畢..."); barrier.leave(); System.out.println("同時退出運行..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start();; } } } |
4.3.2 DistributedBarrier
分佈式Barrier 類DistributedBarrier: 它會阻塞全部節點上的等待進程(全部節點進入待執行狀態),直到「某一我的吹哨」說開始執行, 而後全部的節點同時開始 public class CuratorBarrier2 { /** zk地址 */ private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181"; /** session超時時間 */ private static final int SESSION_TIMEOUT = 5000; //MS static DistributedBarrier barrier = null; public static void main(String[] args) throws Exception{ for (int i =0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { try { /** 實例化5個客戶端對象 */ CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(ZK_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); cf.start(); barrier = new DistributedBarrier(cf, "/superBarrier"); System.out.println(Thread.currentThread().getName() + " 設置barrier"); barrier.setBarrier(); //設置 barrier.waitOnBarrier(); //等待 System.out.println("開始執行程序..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start();; } Thread.sleep(5000); barrier.removeBarrier(); //釋放 } } |
Curator內部實現的幾種重試策略: 1.ExponentialBackoffRetry:重試指定的次數, 且每一次重試之間停頓的時間逐漸增長. 2.RetryNTimes:指定最大重試次數的重試策略 3.RetryOneTime:僅重試一次 4.RetryUntilElapsed:一直重試直到達到規定的時間 |
5.1 ExponentialBackoffRetry
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
參數說明 1.baseSleepTimeMs 初始sleep時間 2.maxRetries 最大重試次數 3.maxSleepMs 最大重試時間 |
5.2 RetryNTimes
RetryNTimes(int n, int sleepMsBetweenRetries) 參數說明 1.n 最大重試次數 2.sleepMsBetweenRetries 每次重試的間隔時間 |
5.3 RetryOneTime
RetryOneTime(int sleepMsBetweenRetry)
參數說明 1.sleepMsBetweenRetry爲重試間隔的時間 |
5.4 RetryUntilElapsed
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
參數說明 1.maxElapsedTimeMs 最大重試時間 2.sleepMsBetweenRetries 每次重試的間隔時間 |