使用curator框架簡單操做zookeeper 學習筆記

Curator 操做是zookeeper的優秀api(相對於原生api),知足大部分需求.並且是Fluent流式api風格.java

參考文獻:https://www.jianshu.com/p/70151fc0ef5d 感謝分享,動手敲一遍留個印象node

curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操做,例如重試策略等
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等apache

環境:JDK1.8 、maven 、啓動三臺虛擬機作分部署環境 、 curator-recipes 4.0.1 、zookeeper3.4.8api

maven 依賴:不依賴zookeeper會報錯..session

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>4.0.1</version>
</dependency>
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.8</version>
</dependency>
測試增刪改查:
package com.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

import java.util.List;

/**
 * Created by Administrator on 2018/6/25.
 * @see org.apache.zookeeper.CreateMode
 * PERSISTENT:持久化
  PERSISTENT_SEQUENTIAL:持久化而且帶序列號
 EPHEMERAL:臨時
 EPHEMERAL_SEQUENTIAL:臨時而且帶序列號
 */
public class curatorRecipesDemo {


    final static String zookeeperAddress = "192.168.149.133:2181,192.168.149.135:2181,192.168.149.134:2181";

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorClint =
                CuratorFrameworkFactory.builder().
                        connectString(zookeeperAddress)//zkClint鏈接地址
                        .connectionTimeoutMs(2000)//鏈接超時時間
                        .sessionTimeoutMs(10000)//會話超時時間
                        .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                        //重試策略
                        .namespace("myZookeeperTest")
                        //命名空間,默認節點
                        .build();

        curatorClint.start();

       curatorClint.create().forPath("/path");//默認持久化節點,以斜槓開頭
        System.out.println(curatorClint.getChildren().forPath("/"));
        curatorClint.create().withMode(CreateMode.EPHEMERAL)
                .forPath("/secondPath","hello,word".getBytes());
        System.out.println("節點secondPath的數據"+new String(curatorClint.getData().forPath("/secondPath")));
        curatorClint.setData().forPath("/secondPath","hello,myWorld!".getBytes());
        System.out.println("節點secondPath的數據"+new String(curatorClint.getData().forPath("/secondPath")));

        curatorClint.create()
                .creatingParentContainersIfNeeded()
                .forPath("/secondPath/second2/second3");//遞歸建立
      List<String> list= curatorClint.getChildren().forPath("/secondPath");//查詢節點的全部字節點
        System.out.println(list);
        curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath/second2");//遞歸刪除
        System.out.println(curatorClint.checkExists().forPath("/secondPath/second2"));//判斷節點是否存在
        System.out.println(curatorClint.checkExists().forPath("/secondPath/second2/second3"));//判斷節點是否存在
        System.out.println(curatorClint.getChildren().forPath("/secondPath"));
        curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath");


        //todo guaranteed()若是刪除失敗,會記錄下來,只要會話有效,就會不斷的重試,直到刪除成功爲止
       //todo   Stat stat 對象包含版本id,事物id等信息

    }

}

  建立的節點能夠經過 zookeeper 安裝下的bin目錄 鏈接客戶端 sh zkCli.sh   ls  /      分開斜槓命令進行查看(或./zkCli.sh -timeout 5000 -server 127.0.0.1:2181)maven

監聽watcer   api分佈式

package com.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.List;
import java.util.Objects;

/**
 * Created by Administrator on 2018/6/26.
 *
 * PathChildCache 監聽一個節點下子節點的建立、刪除、更新
 * NodeCache  監聽一個節點的更新和建立事件
 * TreeCache  綜合PatchChildCache和NodeCache的特性
 */
public class WatcherDemo {
    final static String zookeeperAddress = "192.168.149.133:2181,192.168.149.135:2181,192.168.149.134:2181";

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorClint =
                CuratorFrameworkFactory.builder().
                        connectString(zookeeperAddress)//zkClint鏈接地址
                        .connectionTimeoutMs(2000)//鏈接超時時間
                        .sessionTimeoutMs(10000)//會話超時時間
                        .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                        //重試策略
                        .namespace("myZookeeperTest")
                        //命名空間,默認節點
                        .build();

        curatorClint.start();
       List<String> list= curatorClint.getChildren().forPath("/");
       if(Objects.nonNull(list)){
           if( !list.contains("myWatch")){
               curatorClint.delete().deletingChildrenIfNeeded().forPath("/myWatch");
           }
       }else {
           curatorClint.create().forPath("/myWatch");
       }


        PathChildrenCache pathChildrenCache= pathChildrenCache = new PathChildrenCache(curatorClint,"/myWatch",false);
        PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("pathChildrenCacheListener::::->"+pathChildrenCacheEvent.getData());
            }
        };
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);//註冊監聽事件
        pathChildrenCache.start();

        NodeCache nodeCache=new NodeCache(curatorClint,"/myWatch",false);
        NodeCacheListener nodeCacheListener=new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("nodeCacheListener::::->:"+nodeCache.getCurrentData().getPath());
            }
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();

        TreeCache treeCache=new TreeCache(curatorClint,"/myWatch");
        TreeCacheListener treeCacheListener=new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("treeCacheListener::::->"+treeCacheEvent.getData());
            }
        };
        treeCache.getListenable().addListener(treeCacheListener);
        treeCache.start();

        curatorClint.create().forPath("/myWatch/child22","生個好孩子".getBytes());
        curatorClint.create().creatingParentContainersIfNeeded().forPath("/myWatch/child22/child22","生個好孩子".getBytes());
        curatorClint.setData().forPath("/myWatch/child222","生個好孩子aaaa".getBytes());
        System.in.read();//阻塞否則啓動後clint就關掉了
    }

}
相關文章
相關標籤/搜索