ZooKeeper(五)-- Curator使用

前言

Curator是Netflix開源的一套ZooKeeper客戶端框架:java

  • 1.封裝ZooKeeper client與ZooKeeper server之間的鏈接處理;
  • 提供了一套Fluent風格的操做API;
  • 提供ZooKeeper各類應用場景(recipe, 好比共享鎖服務, 集羣領導選舉機制)的抽象封裝。

Curator幾個組成部分:node

  • Client:是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法
  • Framework: 用來簡化ZooKeeper高級功能的使用, 並增長了一些新的功能, 好比管理到ZooKeeper集羣的鏈接, 重試處理
  • Recipes:實現了通用ZooKeeper的recipe, 該組件創建在Framework的基礎之上
  • Utilities:各類ZooKeeper的工具類
  • Errors: 異常處理, 鏈接, 恢復等
  • Extensions: recipe擴展

Curator內部實現的幾種重試策略:apache

  • ExponentialBackoffRetry:重試指定的次數, 且每一次重試之間停頓的時間逐漸增長
  • RetryNTimes:指定最大重試次數的重試策略
  • RetryOneTime:僅重試一次
  • RetryUntilElapsed:一直重試直到達到規定的時間

正文

1.項目使用maven工程,在pom.xml中添加依賴api

&emsp;&emsp;<dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.10</version>
        <scope>test</scope>
   </dependency>
   <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.6</version>
   </dependency>
   <dependency>
       <groupId>org.apache.curator</groupId>
       <artifactId>curator-recipes</artifactId>
       <version>2.5.0</version>
   </dependency>
複製代碼

2.下面代碼從增刪改查、事務、事件訂閱/監聽器來實現的。框架

package om.xbq.demo;

import java.util.Collection;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;

public class CuratorDemo {

    // 此demo使用的集羣,因此有多個ip和端口
    private static String CONNECT_SERVER = "192.168.242.129:2181,192.168.242.129:2182,192.168.242.129:2183";
    private static int SESSION_TIMEOUT = 3000;
    private static int CONNECTION_TIMEOUT = 3000;
    
    public static void main(String[] args) {
        // 鏈接 ZooKeeper 
        CuratorFramework framework = CuratorFrameworkFactory.
                newClient(CONNECT_SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, new ExponentialBackoffRetry(1000,10));
        // 啓動
        framework.start();
        
        Stat stat = ifExists(framework);
        
        if(stat != null){
// update(framework);
// delete(framework);
// query(framework);
            
// 監聽事件,只監聽一次,不推薦
// listener1(framework);
        }else {
// add(framework);
        }
        
// 事務
// transaction(framework);
        
// 持久監聽,推薦使用
        listener2(framework);
    }
    
    /** * 判斷節點是否存在 * @param cf * @return */
    public static Stat ifExists(CuratorFramework cf){
        Stat stat = null;
        try {
            stat = cf.checkExists().forPath("/node_curator/test");;
            System.out.println(stat);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return stat;
    }
    
    /** * @Title: add * @Description: TODO(增長節點 , 能夠增長 多級節點) * @param @param cf 設定文件 * @return void 返回類型 * @throws */
    public static void add(CuratorFramework cf){
        try {
            String rs = cf.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT).forPath("/node_curator/test","xbq".getBytes());
            System.out.println(rs);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            cf.close();
        }
    }
    
    /** * @Title: update * @Description: TODO(修改指定節點的值) * @param @param cf 設定文件 * @return void 返回類型 * @throws */
    public static void update(CuratorFramework cf){
        try {
            Stat stat = cf.setData().forPath("/node_curator/test", "javaCoder".getBytes());
            System.out.println(stat);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            cf.close();
        }
    }
    
    /** * @Title: delete * @Description: TODO(刪除節點或者刪除包括子節點在內的父節點) * @param @param cf 設定文件 * @return void 返回類型 * @throws */
    public static void delete(CuratorFramework cf){
        try {
            // 遞歸刪除的話,則輸入父節點
            cf.delete().deletingChildrenIfNeeded().forPath("/node_curator");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            cf.close();
        }
    }
    
    /** * @Title: query * @Description: TODO(查詢節點的值) * @param @param cf 設定文件 * @return void 返回類型 * @throws */
    public static void query(CuratorFramework cf){
        try {
            byte[] value = cf.getData().forPath("/node_curator/test");
            System.out.println(new String(value));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            cf.close();
        }
    }
    
    /** * @Title: transaction * @Description: TODO(一組crud操做同生同滅) * @param @param cf 設定文件 * @return void 返回類型 * @throws */
    public static void transaction(CuratorFramework cf){
        try {
            // 事務處理, 事務會自動回滾
            Collection<CuratorTransactionResult> results = cf.inTransaction()
                    .create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq1").and()
                    .create().withMode(CreateMode.PERSISTENT).forPath("/node_xbq2").and().commit();
            // 遍歷
            for(CuratorTransactionResult result:results){
                System.out.println(result.getResultStat() + "->" + result.getForPath() + "->" + result.getType());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            cf.close();
        }
    }
    
    /** * @Title: listener1 * @Description: TODO(監聽 事件 -- 經過 usingWatcher 方法) * 注意:經過CuratorWatcher 去監聽指定節點的事件, 只監聽一次 * @param @param cf 設定文件 * @return void 返回類型 * @throws */
    public static void listener1(CuratorFramework cf){
        try {
            cf.getData().usingWatcher(new CuratorWatcher() {
                @Override
                public void process(WatchedEvent event) throws Exception {
                    System.out.println("觸發事件:" + event.getType());
                }
            }).forPath("/javaCoder");
            
            System.in.read(); // 掛起,在控制檯上輸入 才中止
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            cf.close();
        }
    }
    
    /** * @Title: listener2 * @Description: TODO(監聽 子節點的事件,不監聽 本身 -- 經過 PathChildrenCacheListener 方法,推薦使用) * @param @param cf 設定文件 * @return void 返回類型 * @throws */
    public static void listener2(CuratorFramework cf) {
        // 節點node_xbq不存在 會新增
        PathChildrenCache cache = new PathChildrenCache(cf, "/node_xbq", true);
        try {
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    System.out.println("觸發事件:" + event.getType());
                }
            });
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            cf.close();
        }
    }
}
複製代碼

歡迎關注個人公衆號,第一時間接收最新文章~ 搜索公衆號: 碼咖 或者 掃描下方二維碼:

img
相關文章
相關標籤/搜索