Curator的使用


Curator



爲了更好的實現Java操做zookeeper服務器,後來出現了Curator框架,很是的強大,目前已是Apache的頂級項目,裏面提供了更多豐富的操做,例如session超時重連、主從選舉、分佈式計數器、分佈式鎖等等適用於各類複雜的zookeeper場景的API封裝java




1 Curator框架使用(一)

Curator框架中使用鏈式編程風格,易讀性更強,使用工廠方法建立鏈接對象。apache

1.使用CuratorFrameworkFactory的兩個靜態工廠方法(參數不一樣)來實現編程

1.1 connectString:鏈接串緩存

1.2 retryPolicy:重試鏈接策略。有四種實現,分別是:ExponentialBackoffRetry、RetryNTimes、RetryOneTimes、RetryUntilElapsed服務器

1.3sessionTimeoutMs:會話超時時間,默認爲60000mssession

1.4connectionTimeoutMs鏈接超時時間,默認爲15000ms併發

 

注意對於retryPolicy策略經過一個接口來讓用戶自定義實現框架




2 Curator框架使用(二)

2.1建立鏈接dom

/** 重試策略: 初始時間爲1s, 重試10次 */分佈式

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);

 

/** 經過工廠建立鏈接 */

CuratorFramework cf = CuratorFrameworkFactory.builder()

.connectString(ZK_ADDR)

.sessionTimeoutMs(SESSION_TIMEOUT)

.retryPolicy(retryPolicy)

.build();

 

/** 開啓鏈接 */

cf.start();



2.2 新增節點

/**

 * 新增節點:指定節點類型(不加withMode默認爲持久類型節點)、路徑、數據內容

 * 1.creatingParentsIfNeeded() 遞歸建立父目錄

 * 2.withMode() 節點類型(持久|臨時)

 * 3.forPath() 指定路徑

 */

cf.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.PERSISTENT)

.forPath("/super/c1", "c1內容".getBytes());



2.3 刪除節點

/**

 * 刪除節點

 * 1.deletingChildrenIfNeeded() 遞歸刪除

 * 2.guaranteed() 確保節點被刪除

 * 3. withVersion(int version) //特定版本號  

 */

cf.delete().deletingChildrenIfNeeded().forPath("/super");



2.4 讀取和修改數據

/**

 * 讀取和修改數據 : getData()和setData()

 */

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1內容".getBytes());

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2內容".getBytes());

 

/** 讀取節點內容 */

String c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("c2_data-->"+c2_data);

 

/** 修改節點內容 */

cf.setData().forPath("/super/c2", "修改c2的內容".getBytes());

String update_c2_data = new String(cf.getData().forPath("/super/c2"));

System.out.println("update_c2_data-->"+update_c2_data);



2.5 綁定回調函數

ExecutorService pool = Executors.newCachedThreadPool();

 

cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)

.inBackground(new BackgroundCallback() {


@Override

public void proce***esult(CuratorFramework cf, CuratorEvent event)

throws Exception {

System.out.println("code-->" + event.getResultCode());

System.out.println("type-->" + event.getType());

System.out.println("線程爲-->" + Thread.currentThread().getName());

}

}, pool).forPath("/super/c3", "c2的內容".getBytes());

 

System.out.println("主線程-->" + Thread.currentThread().getName());

 

Thread.sleep(Integer.MAX_VALUE);



2.6 讀取子節點和判斷節點是否存在

/**

 * 讀取子節點的方法: getChildren()

 * 判斷節點是否存在: checkExists()

 */

List<String> list = cf.getChildren().forPath("/super");

for (String p: list) {

System.out.println(p);

}

 

//若是爲null標識不存在

Stat stat = cf.checkExists().forPath("/super/c4");

System.out.println(stat);



3 Curator框架使用(三)

若是要使用相似Wather的監聽功能Curator必須依賴一個jar包,Maven依賴

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>2.4.2</version>

</dependency>

有了這個依賴包,使用NodeCache的方式去客戶端實例中註冊一個監聽緩存,而後實現對應的監聽方法便可,這裏主要有兩種監聽方式

NodeCacheListener:監聽節點的新增、修改操做

PathChildrenCacheListener:監聽子節點的新增、修改、刪除操做



4 Curator使用場景

4.1 分佈式鎖

在分佈式場景中,爲了保證數據的一致性,常常在程序運行的某一個點須要進行同步操做(java提供了synchronized或者Reentrantlock實現)好比看一個小示例,這個示例出現分佈式不一樣步的問題

好比:以前是在高併發下訪問一個程序,如今則是在高併發下訪問多個服務器節點(分佈式)

 

使用Curator基於zookeeper的特性提供的分佈式鎖來處理分佈式場景的數據一致性,zookeeper自己的分佈式是有寫問題的,以前實現的時候遇到過,這裏強烈推薦使用Curator分佈式鎖

public class Lock2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時時間 */
private static final int SESSION_TIMEOUT = 5000; //MS
static int count = 10;
public static CuratorFramework createCuratorFramework(){
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
return cf;
}
public static void main(String[] args) throws Exception {
final CountDownLatch countDown = new CountDownLatch(1);
for (int i =0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
CuratorFramework cf = createCuratorFramework();
cf.start();
//鎖對象 client 鎖節點
final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
try {
countDown.await();
lock.acquire(); //得到鎖
number();
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();//釋放鎖
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"t" + i).start();;
}
Thread.sleep(2000);
countDown.countDown();
}
 
public static void number() {
count--;
System.out.println(Thread.currentThread().getName() + "-->" + count);
}
}




4.2 分佈式計數器功能

一說到分佈式計數器,可能腦海裏想到AtomicInteger(原子累加)這種經典方式,若是針對一個JVM的場景固然沒問題,可是如今是在分佈式場景下,就須要利用Curator框架的DistributedAtomicInteger了

public class CuratorAtomicInteger {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時時間 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception {
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
//使用DistributedAtomicInteger
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, "/superM", new RetryNTimes(3, 1000));
//atomicInteger.increment();
atomicInteger.add(1);
AtomicValue<Integer> atomicValue = atomicInteger.get();
System.out.println("atomicValue.succeeded()-->" + atomicValue.succeeded());
System.out.println("atomicValue.postValue()-->" + atomicValue.postValue());
System.out.println("atomicValue.preValue()-->" + atomicValue.preValue());
}
}





4.3 Barrier

4.3.1 DistributedDoubleBarrier

分佈式Barrier 類DistributedDoubleBarrier: 它會阻塞全部節點上的等待進程,直到某一個被知足, 而後全部的節點同時開始,中間誰先運行完畢,誰後運行完畢不關心,可是最終必定是一塊退出運行的

 

public class CuratorBarrier {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時時間 */
private static final int SESSION_TIMEOUT = 5000; //MS
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 實例化5個客戶端對象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/superBarrier", 5);
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println(Thread.currentThread().getName() + " 已準備好!");
barrier.enter();
System.out.println("同時開始運行...");
Thread.sleep(1000 * (new Random()).nextInt(3));
System.out.println("運行完畢...");
barrier.leave();
System.out.println("同時退出運行...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
}
}





4.3.2 DistributedBarrier

分佈式Barrier 類DistributedBarrier: 它會阻塞全部節點上的等待進程(全部節點進入待執行狀態),直到「某一我的吹哨」說開始執行, 而後全部的節點同時開始

public class CuratorBarrier2 {
/** zk地址 */
private static final String ZK_ADDR = "192.168.1.220:2181,192.168.1.127:2181,192.168.1.128:2181";
/** session超時時間 */
private static final int SESSION_TIMEOUT = 5000; //MS
static DistributedBarrier barrier = null;
public static void main(String[] args) throws Exception{
for (int i =0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
/** 實例化5個客戶端對象 */
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZK_ADDR)
.sessionTimeoutMs(SESSION_TIMEOUT)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
cf.start();
barrier = new DistributedBarrier(cf, "/superBarrier");
System.out.println(Thread.currentThread().getName() + " 設置barrier");
barrier.setBarrier(); //設置
barrier.waitOnBarrier(); //等待
System.out.println("開始執行程序...");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();;
}
Thread.sleep(5000);
barrier.removeBarrier(); //釋放
}
}





5 Curator重試策略

Curator內部實現的幾種重試策略:

1.ExponentialBackoffRetry:重試指定的次數, 且每一次重試之間停頓的時間逐漸增長.

2.RetryNTimes:指定最大重試次數的重試策略   

3.RetryOneTime:僅重試一次

4.RetryUntilElapsed:一直重試直到達到規定的時間



5.1 ExponentialBackoffRetry

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

 

參數說明

   1.baseSleepTimeMs 初始sleep時間

   2.maxRetries 最大重試次數

   3.maxSleepMs 最大重試時間 




5.2 RetryNTimes

RetryNTimes(int n, int sleepMsBetweenRetries)


參數說明

   1.n 最大重試次數

   2.sleepMsBetweenRetries 每次重試的間隔時間

 



5.3 RetryOneTime

RetryOneTime(int sleepMsBetweenRetry)

 

參數說明

   1.sleepMsBetweenRetry爲重試間隔的時間

 

5.4 RetryUntilElapsed

RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

 

參數說明

   1.maxElapsedTimeMs 最大重試時間

   2.sleepMsBetweenRetries 每次重試的間隔時間

相關文章
相關標籤/搜索