zookeeper curator CRUD

Curator客戶端的基本操做

瘋狂創客圈 Java 分佈式聊天室【 億級流量】實戰系列之 -24【 博客園 總入口node


寫在前面

​ 你們好,我是做者尼恩。目前和幾個小夥伴一塊兒,組織了一個高併發的實戰社羣【瘋狂創客圈】。正在開始高併發、億級流程的 IM 聊天程序 學習和實戰面試

​ 前面,已經完成一個高性能的 Java 聊天程序的四件大事:apache

接下來,須要進入到分佈式開發的環節了。 分佈式的中間件,瘋狂創客圈的小夥伴們,一致的選擇了zookeeper,不單單是因爲其在大數據領域,太有名了。更重要的是,不少的著名框架,都使用了zk。api

本篇介紹 Curator客戶端的基本操做服務器

1.1.1. Curator客戶端的依賴包

打開Curator的官網,咱們能夠看到,Curator包含了如下幾個包:session

curator-framework:對zookeeper的底層api的一些封裝;併發

curator-client:提供一些客戶端的操做,例如重試策略等;框架

curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等。異步

Maven依賴(使用curator的版本:4.0.0,對應Zookeeper的版本爲:3.4.x,若是版本不匹配,就會有兼容性問題,頗有可能致使節點操做失敗。具體的版本對應關係,能夠去curator的官網查看。

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

1.1.2. Curator 建立會話

使用 curator-framework 包中的工廠類CuratorFrameworkFactory中的靜態方法newClient,來建立客戶端會話。

代碼以下:

/**
 * create by 尼恩 @ 瘋狂創客圈
 **/
public class ClientFactory {

    /**
     * @param connectionString zk的鏈接地址
     * @return CuratorFramework 實例
     */
    public static CuratorFramework createSimple(String connectionString) {
        // 重試策略:第一次重試等待1s,第二次重試等待2s,第三次重試等待4s
        // 第一個參數:等待時間的基礎單位,單位爲毫秒
        // 第二個參數:最大重試次數
        ExponentialBackoffRetry retryPolicy =
                new ExponentialBackoffRetry(1000, 3);

        // 獲取 CuratorFramework 實例的最簡單的方式
        // 第一個參數:zk的鏈接地址
        // 第二個參數:重試策略
        return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
    }

    /**
     * @param connectionString    zk的鏈接地址
     * @param retryPolicy         重試策略
     * @param connectionTimeoutMs 鏈接
     * @param sessionTimeoutMs
     * @return CuratorFramework 實例
     */
    public static CuratorFramework createWithOptions(
            String connectionString, RetryPolicy retryPolicy,
            int connectionTimeoutMs, int sessionTimeoutMs) {

        // builder 模式建立 CuratorFramework 實例
        return CuratorFrameworkFactory.builder()
                .connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // 其餘的建立選項
                .build();
    }
}

這裏用到兩個版本,前一個是簡化版本,只須要設置ZK集羣的鏈接地址和重試策略。

後一個是相對複雜的重載版本,能夠設置鏈接超時connectionTimeoutMs、會話超時sessionTimeoutMs 等其餘的會話建立選項。

具體請看瘋狂創客圈的Demo源碼。

1.1.3. CRUD 之 Create 建立節點

使用create()方法,最後使用forPath帶上須要建立的節點路徑。

/**
     * 建立節點
     */
    @Test
    public void createNode() {
        //建立客戶端
        CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
        try {
            //啓動客戶端實例,鏈接服務器
            client.start();

            // 建立一個 ZNode 節點
            // 節點的數據爲 payload

            String data = "hello";
            byte[] payload = data.getBytes("UTF-8");
            String zkPath = "/test/CRUD/node-1";
            client.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(zkPath, payload);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

使用withMode()方法,設置節點的類型。zookeeper節點有四種類型:

(1)PERSISTENT 持久節點

(2)PERSISTENT_SEQUENTIAL 持久順序節點

(3)PHEMERAL 臨時節

(4)EPHEMERAL_SEQUENTIAL 臨時順序節點

下面詳細介紹一下四種節點的區別和聯繫。

(1)持久節點(PERSISTENT)

所謂持久節點,是指在節點建立後,就一直存在,直到有刪除操做來主動清除這個節點。持久節點的生命週期是永久有效,不會由於建立該節點的客戶端會話失效而消失。

(2)持久順序節點(PERSISTENT_SEQUENTIAL)

這類節點的生命週期和持久節點是一致的。額外的特性是,在ZK中,每一個父節點會爲他的第一級子節點維護一份次序,會記錄每一個子節點建立的前後順序。若是在建立子節點的時候,能夠設置這個屬性,那麼在建立節點過程當中,ZK會自動爲給定節點名加上一個表示次序的數字後綴,做爲新的節點名。這個次序後綴的範圍是整型的最大值。

好比,在建立節點的時候只須要傳入節點 「/test_」,這樣以後,zookeeper自動會給」test_」後面補充數字次序。

(3)臨時節點(EPHEMERAL)

和持久節點不一樣的是,臨時節點的生命週期和客戶端會話綁定。也就是說,若是客戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裏提到的是會話失效,而非鏈接斷開。這裏還要注意一件事,就是當你客戶端會話失效後,所產生的節點也不是一會兒就消失了,也要過一段時間,大概是10秒之內,能夠試一下,本機操做生成節點,在服務器端用命令來查看當前的節點數目,你會發現客戶端已經stop,可是產生的節點還在。

另外,在臨時節點下面不能建立子節點。

(4)臨時順序節點(EPHEMERAL_SEQUENTIAL)

此節點是屬於臨時節點,不過帶有順序,客戶端會話結束節點就消失。

1.1.4. CRUD 之Read獲取節點

與節點讀取的有關的方法,主要有三個:

(1)首先是判斷節點是否存在,使用checkExists方法。

(2)其次是獲取節點的數據,使用getData方法。

(3)最後是獲取子節點列表,使用getChildren方法。

演示代碼以下:

/**
     * 讀取節點
     */
    @Test
    public void readNode() {
        //建立客戶端
        CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
        try {
            //啓動客戶端實例,鏈接服務器
            client.start();

            String zkPath = "/test/CRUD/node-1";


            Stat stat = client.checkExists().forPath(zkPath);
            if (null != stat) {
                //讀取節點的數據
                byte[] payload = client.getData().forPath(zkPath);
                String data = new String(payload, "UTF-8");
                log.info("read data:", data);

                String parentPath = "/test";
                List<String> children = client.getChildren().forPath(parentPath);

                for (String child : children) {
                    log.info("child:", child);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

1.1.5. CRUD 之update更新節點

節點的更新,分爲同步更新與異步更新。

/**
     * 更新節點
     */
    @Test
    public void updateNode() {
        //建立客戶端
        CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
        try {
            //啓動客戶端實例,鏈接服務器
            client.start();


            String data = "hello world";
            byte[] payload = data.getBytes("UTF-8");
            String zkPath = "/test/node-1";
            client.setData()
                    .forPath(zkPath, payload);


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

異步更新,須要用到inBackground()方法,其做用是,讓更新操做異步執行。若是須要監聽到異步操做的結果,須要爲inBackground加上AsyncCallback回調實例。

異步更新的代碼以下:

/**
     * 更新節點 - 異步模式
     */
    @Test
    public void updateNodeAsync() {
        //建立客戶端
        CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
        try {

            //更新完成監聽器
            AsyncCallback.StringCallback callback = new AsyncCallback.StringCallback() {

                @Override
                public void processResult(int i, String s, Object o, String s1) {
                    System.out.println(
                            "i = " + i + " | " +
                                    "s = " + s + " | " +
                                    "o = " + o + " | " +
                                    "s1 = " + s1
                    );
                }
            };
            //啓動客戶端實例,鏈接服務器
            client.start();

            String data = "hello ,every body! ";
            byte[] payload = data.getBytes("UTF-8");
            String zkPath = "/test/CRUD/node-1";
            client.setData()
                    .inBackground(callback)
                    .forPath(zkPath, payload);

            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

1.1.6. CRUD 之delete刪除節點

刪除節點,使用delete 方法,代碼以下。

/**
     * 刪除節點
     */
    @Test
    public void deleteNode() {
        //建立客戶端
        CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
        try {
            //啓動客戶端實例,鏈接服務器
            client.start();

            //刪除節點
            String zkPath = "/test/CRUD/node-1";
            client.delete().forPath(zkPath);


            //刪除後查看結果
            String parentPath = "/test";
            List<String> children = client.getChildren().forPath(parentPath);

            for (String child : children) {
                log.info("child:", child);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CloseableUtils.closeQuietly(client);
        }
    }

和更新同樣,也能夠進行異步刪除,一樣須要用到inBackground()方法。若是須要監聽異步操做的結果,須要爲inBackground方法加上一個參數:AsyncCallback回調實例。

寫在最後

​ 下一篇:開啓zk的客戶端開發。


瘋狂創客圈 億級流量 高併發IM 實戰 系列

  • Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰

  • Netty 源碼、原理、JAVA NIO 原理
  • Java 面試題 一網打盡
  • 瘋狂創客圈 【 博客園 總入口 】

相關文章
相關標籤/搜索