基於Apache Curator框架的ZooKeeper使用詳解

一 簡介

Apache Curator是一個比較完善的ZooKeeper客戶端框架,經過封裝的一套高級API 簡化了ZooKeeper的操做。經過查看官方文檔,能夠發現Curator主要解決了三類問題:java

  • 封裝ZooKeeper client與ZooKeeper server之間的鏈接處理
  • 提供了一套Fluent風格的操做API
  • 提供ZooKeeper各類應用場景(recipe, 好比:分佈式鎖服務、集羣領導選舉、共享計數器、緩存機制、分佈式隊列等)的抽象封裝

    Curator主要從如下幾個方面下降了zk使用的複雜性:

    • 重試機制:提供可插拔的重試機制, 它將給捕獲全部可恢復的異常配置一個重試策略,而且內部也提供了幾種標準的重試策略(好比指數補償)
    • 鏈接狀態監控: Curator初始化以後會一直對zk鏈接進行監聽,一旦發現鏈接狀態發生變化將會做出相應的處理
    • zk客戶端實例管理:Curator會對zk客戶端到server集羣的鏈接進行管理,並在須要的時候重建zk實例,保證與zk集羣鏈接的可靠性
    • 各類使用場景支持:Curator實現了zk支持的大部分使用場景(甚至包括zk自身不支持的場景),這些實現都遵循了zk的最佳實踐,並考慮了各類極端狀況

二 基於Curator的ZooKeeper基本用法

 1 public class CuratorBase {
 2     //會話超時時間
 3     private final int SESSION_TIMEOUT = 30 * 1000;
 4     
 5     //鏈接超時時間
 6     private final int CONNECTION_TIMEOUT = 3 * 1000;
 7     
 8     //ZooKeeper服務地址
 9     private static final String CONNECT_ADDR = "192.168.1.1:2100,192.168.1.1:2101,192.168.1.:2102";
10     
11     //建立鏈接實例
12     private CuratorFramework client = null;
13 
14     public static void main(String[] args) throws Exception {  
15         //1 重試策略:初試時間爲1s 重試10次
16         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
17         //2 經過工廠建立鏈接
18         CuratorFramework client = CuratorFrameworkFactory.builder()
19                     .connectString(CONNECT_ADDR).connectionTimeoutMs(CONNECTION_TIMEOUT)
20                     .sessionTimeoutMs(SESSION_TIMEOUT)
21                     .retryPolicy(retryPolicy)
22 //命名空間           .namespace("super")
23                     .build();
24         //3 開啓鏈接
25         cf.start();
26         
27         System.out.println(States.CONNECTED);
28         System.out.println(cf.getState());
29 
30         //建立永久節點
31         client.create().forPath("/curator","/curator data".getBytes());
32         
33         //建立永久有序節點
34         client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes());
35         
36         //建立臨時節點
37         client.create().withMode(CreateMode.EPHEMERAL)
38             .forPath("/curator/ephemeral","/curator/ephemeral data".getBytes());
39  
40         //建立臨時有序節點
41         client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes());
42         
43         client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator/ephemeral_path2","/curator/ephemeral_path2 data".getBytes());
44         
45         //測試檢查某個節點是否存在
46         Stat stat1 = client.checkExists().forPath("/curator");
47         Stat stat2 = client.checkExists().forPath("/curator2");
48         
49         System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false));
50         System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false));
51 
52         //獲取某個節點的全部子節點
53         System.out.println(client.getChildren().forPath("/"));
54         
55         //獲取某個節點數據
56         System.out.println(new String(client.getData().forPath("/curator")));
57         
58         //設置某個節點數據
59         client.setData().forPath("/curator","/curator modified data".getBytes());
60 
61         //建立測試節點
62         client.create().orSetData().creatingParentContainersIfNeeded()
63             .forPath("/curator/del_key1","/curator/del_key1 data".getBytes());
64  
65         client.create().orSetData().creatingParentContainersIfNeeded()
66         .forPath("/curator/del_key2","/curator/del_key2 data".getBytes());
67         
68         client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes());
69               
70         //刪除該節點
71         client.delete().forPath("/curator/del_key1");
72         
73         //級聯刪除子節點
74         client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");
75 76     
77
    • orSetData()方法:若是節點存在則Curator將會使用給出的數據設置這個節點的值,至關於 setData() 方法
    • creatingParentContainersIfNeeded()方法:若是指定節點的父節點不存在,則Curator將會自動級聯建立父節點
    • guaranteed()方法:若是服務端可能刪除成功,可是client沒有接收到刪除成功的提示,Curator將會在後臺持續嘗試刪除該節點
    • deletingChildrenIfNeeded()方法:若是待刪除節點存在子節點,則Curator將會級聯刪除該節點的子節點

    事務管理:node

     * 事務管理:碰到異常,事務會回滾
     * @throws Exception
     */
    @Test
    public void testTransaction() throws Exception{
        //定義幾個基本操做
        CuratorOp createOp = client.transactionOp().create()
                .forPath("/curator/one_path","some data".getBytes());
        
        CuratorOp setDataOp = client.transactionOp().setData()
                .forPath("/curator","other data".getBytes());
        
        CuratorOp deleteOp = client.transactionOp().delete()
                .forPath("/curator");
        
        //事務執行結果
        List<CuratorTransactionResult> results = client.transaction()
                .forOperations(createOp,setDataOp,deleteOp);
        
        //遍歷輸出結果
        for(CuratorTransactionResult result : results){
            System.out.println("執行結果是: " + result.getForPath() + "--" + result.getType());
        }
    }
//由於節點「/curator」存在子節點,因此在刪除的時候將會報錯,事務回滾

三 監聽器

Curator提供了三種Watcher(Cache)來監聽結點的變化:apache

  • Path Cache:監視一個路徑下1)孩子結點的建立、2)刪除,3)以及結點數據的更新。產生的事件會傳遞給註冊的PathChildrenCacheListener。
  • Node Cache:監視一個結點的建立、更新、刪除,並將結點的數據緩存在本地。
  • Tree Cache:Path Cache和Node Cache的「合體」,監視路徑下的建立、更新、刪除事件,並緩存路徑下全部孩子結點的數據。
        /**
         * 在註冊監聽器的時候,若是傳入此參數,當事件觸發時,邏輯由線程池處理
         */
        ExecutorService pool = Executors.newFixedThreadPool(2);
        
        /**
         * 監聽數據節點的變化狀況
         */
        final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);
        nodeCache.start(true);
        nodeCache.getListenable().addListener(
            new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("Node data is changed, new data: " + 
                        new String(nodeCache.getCurrentData().getData()));
                }
            }, 
            pool
        );
        
        /**
         * 監聽子節點的變化狀況
         */
        final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);
        childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
        childrenCache.getListenable().addListener(
            new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
                        throws Exception {
                        switch (event.getType()) {
                        case CHILD_ADDED:
                            System.out.println("CHILD_ADDED: " + event.getData().getPath());
                            break;
                        case CHILD_REMOVED:
                            System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                            break;
                        case CHILD_UPDATED:
                            System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                            break;
                        default:
                            break;
                    }
                }
            },
            pool
        );
        
        client.setData().forPath("/zk-huey/cnode", "world".getBytes());
        
        Thread.sleep(10 * 1000);
        pool.shutdown();
        client.close();

四 分佈式鎖

分佈式編程時,好比最容易碰到的狀況就是應用程序在線上多機部署,因而當多個應用同時訪問某一資源時,就須要某種機制去協調它們。例如,如今一臺應用正在rebuild緩存內容,要臨時鎖住某個區域暫時不讓訪問;又好比調度程序每次只想一個任務被一臺應用執行等等。編程

下面的程序會啓動兩個線程t1和t2去爭奪鎖,拿到鎖的線程會佔用5秒。運行屢次能夠觀察到,有時是t1先拿到鎖而t2等待,有時又會反過來。Curator會用咱們提供的lock路徑的結點做爲全局鎖,這個結點的數據相似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次得到鎖時會生成這種串,釋放鎖時清空數據。緩存

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.TimeUnit;

/**
 * Curator framework's distributed lock test.
 */
public class CuratorDistrLockTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_LOCK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");

        t1.start();
        t2.start();
    }

    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

五 Leader選舉

當集羣裏的某個服務down機時,咱們可能要從slave結點裏選出一個做爲新的master,這時就須要一套能在分佈式環境中自動協調的Leader選舉方法。Curator提供了LeaderSelector監聽器實現Leader選舉功能。同一時刻,只有一個Listener會進入takeLeadership()方法,說明它是當前的Leader。注意:當Listener從takeLeadership()退出時就說明它放棄了「Leader身份」,這時Curator會利用Zookeeper再從剩餘的Listener中選出一個新的Leader。autoRequeue()方法使放棄Leadership的Listener有機會從新得到Leadership,若是不設置的話放棄了的Listener是不會再變成Leader的。session

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;

/**
 * Curator framework's leader election test.
 * Output:
 *  LeaderSelector-2 take leadership!
 *  LeaderSelector-2 relinquish leadership!
 *  LeaderSelector-1 take leadership!
 *  LeaderSelector-1 relinquish leadership!
 *  LeaderSelector-0 take leadership!
 *  LeaderSelector-0 relinquish leadership! 
 *      ...
 */
public class CuratorLeaderTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        LeaderSelectorListener listener = new LeaderSelectorListener() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println(Thread.currentThread().getName() + " take leadership!");

                // takeLeadership() method should only return when leadership is being relinquished.
                Thread.sleep(5000L);

                System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
            }
        };

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void registerListener(LeaderSelectorListener listener) {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();

        // 2.Ensure path
        try {
            new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 3.Register listener
        LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
        selector.autoRequeue();
        selector.start();
    }
}
相關文章
相關標籤/搜索