zookeeper生產最普遍使用java客戶端curator介紹及其它客戶端比較

  關於zookeeper的原理解析,能夠參見zookeeper核心原理詳解,本文所述大多數實踐基於對zookeeper原理的首先理解。html

  Curator是Netflix公司開源的一個Zookeeper客戶端,目前是apache頂級項目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量,至關於netty之於socket編程。提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架。官網爲http://curator.apache.org/java

  除此以外,Curator中還提供了Zookeeper各類應用場景(Recipe,如共享鎖服務、Master選舉機制和分佈式計算器等)的抽象封裝。因此說啊,不論是作底層庫仍是應用,用戶體驗真的很重要。node

關於zookeeper的java客戶端

  Zookeeper的官方客戶端提供了基本的操做,好比,建立會話、建立節點、讀取節點、更新數據、刪除節點和檢查節點是否存在等。但對於開發人員來講,Zookeeper提供的基本操縱仍是有一些不足之處。典型的缺點爲:git

(1)Zookeeper的Watcher是一次性的,每次觸發以後都須要從新進行註冊;
(2)Session超時以後沒有實現重連機制;
(3)異常處理繁瑣,Zookeeper提供了不少異常,對於開發人員來講可能根本不知道該如何處理這些異常信息;
(4)只提供了簡單的byte[]數組的接口,沒有提供針對對象級別的序列化;
(5)建立節點時若是節點存在拋出異常,須要自行檢查節點是否存在;
(6)刪除節點沒法實現級聯刪除;github

  所以,產生了兩款主流的三方zk客戶端,ZkClient和Curator。第一個主流的三方zk客戶端是ZkClient,由Datameer的工程師開發,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。像dubbo等框架對其也進行了集成使用。redis

  雖然ZkClient對原生API進行了封裝,但也有它自身的不足之處:apache

  • 幾乎沒有參考文檔;
  • 異常處理簡化(拋出RuntimeException);
  • 重試機制比較難用;
  • 沒有提供各類使用場景的實現;

  注:除此以外,不少依賴zookeeper的中間件或大數據組件都配備了與之相適應的zookeeper客戶端,例如hbase、hadoop、fabric8等。編程

  所以,除了早期集成外,目前新的框架和系統不多使用ZkClient,所以本文詳細解析curator。若是讀者對zkclient感興趣,能夠參考https://www.jianshu.com/p/d6de2d21d744去,其官網爲https://github.com/sgroschupf/zkclient,已經基本不活躍了、更新極少且star不過千。api

curator依賴添加

  Curator的Maven依賴以下:通常直接使用curator-recipes就好了,若是須要本身封裝一些底層些的功能的話,例如增長鏈接管理重試機制等,則能夠引入curator-framework包。client是低級api。數組

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
<!--All of the recipes listed on the ZooKeeper recipes doc (except two phase commit).--> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId>
<!-- High-level API that greatly simplifies using ZooKeeper. --> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId>
<!-- Low-level API --> <version>${apache-curator.version}</version> </dependency>

  最新版本能夠從https://mvnrepository.com/artifact/org.apache.curator/curator-client查閱,不過須要注意的是,curator和zookeeper自己的依賴(尤爲是zookeeper 3.4和3.5不兼容,致使的客戶端也是不同)對應關係。目前絕大多數使用2.x的版本。

典型的zk場景

Client操做

  利用Curator提供的客戶端API,能夠徹底實如今zkCli.sh原生客戶端的各類功能。值得注意的是,Curator採用流式風格API。準確的說是相似JPA化。因爲針對zk/redis等的操做都至關簡單,所以這種模式在這種場景下是比較合適的。以下:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;

/**
 * Curator framework's client test.
 * Output:
 *  $ create /zktest hello 
 *  $ ls / 
 *  [zktest, zookeeper]
 *  $ get /zktest 
 *  hello
 *  $ set /zktest world 
 *  $ get /zktest 
 *  world
 *  $ delete /zktest 
 *  $ ls / 
 *  [zookeeper]
 */
public class CuratorClientTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "10.20.30.17:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        // 2.Client API test
        // 2.1 Create node
        String data1 = "hello";
        print("create", ZK_PATH, data1);
        client.create().
                creatingParentsIfNeeded().
                forPath(ZK_PATH, data1.getBytes());

        // 2.2 Get node and data
        print("ls", "/");
        print(client.getChildren().forPath("/"));
        print("get", ZK_PATH);
        print(client.getData().forPath(ZK_PATH));

        // 2.3 Modify data
        String data2 = "world";
        print("set", ZK_PATH, data2);
        client.setData().forPath(ZK_PATH, data2.getBytes());
        print("get", ZK_PATH);
        print(client.getData().forPath(ZK_PATH));

        // 2.4 Remove node
        print("delete", ZK_PATH);
        client.delete().forPath(ZK_PATH);
        print("ls", "/");
        print(client.getChildren().forPath("/"));
    }

    private static void print(String... cmds) {
        StringBuilder text = new StringBuilder("$ ");
        for (String cmd : cmds) {
            text.append(cmd).append(" ");
        }
        System.out.println(text.toString());
    }

    private static void print(Object result) {
        System.out.println(
                result instanceof byte[]
                    ? new String((byte[]) result)
                        : result);
    }

}

  詳細的CuratorFramework功能及使用說明可參見https://curator.apache.org/curator-framework/index.html。

監聽器

  Curator提供了三種Watcher(Cache)來監聽結點的變化:

  • Path Cache:監視一個路徑下1)孩子結點的建立、2)刪除,3)以及結點數據的更新。產生的事件會傳遞給註冊的PathChildrenCacheListener。
  • Node Cache:監視一個結點的建立、更新、刪除,並將結點的數據緩存在本地。
  • Tree Cache:Path Cache和Node Cache的「合體」,監視路徑下的建立、更新、刪除事件,並緩存路徑下全部孩子結點的數據。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;

/**
 * Curator framework watch test.
 */
public class CuratorWatcherTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        // 2.Register watcher
        PathChildrenCache watcher = new PathChildrenCache(
                client,
                ZK_PATH,
                true    // if cache data
        );
        watcher.getListenable().addListener((client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(StartMode.BUILD_INITIAL_CACHE);
        System.out.println("Register zk watcher successfully!");

        Thread.sleep(Integer.MAX_VALUE);
    }

}

輸出以下:

Java: zk client start successfully!
Java: Register zk watcher successfully!

zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata
Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata
Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello
Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

 下列兩個系列稱爲Recipe(專題,關於這個recipe應該如何翻譯,LZ作了研究,直譯是菜譜,確定不對,也有叫作攻略的,貌似也不正確,因此叫專題可能確實更合適),完整的curator recipe實現可參見https://curator.apache.org/curator-recipes/index.html。

分佈式協調

  通常咱們稱分佈式鎖的時候,指的是短時的分佈式鎖,所以通常採用redis實現,而zk下的稱之爲分佈式協調更合理,由於它一般時間更長。好比分佈式編程時,好比最容易碰到的狀況就是應用程序在線上多機部署,因而當多個應用同時訪問某一資源時,就須要某種機制去協調它們。例如,如今一臺應用正在rebuild緩存內容,要臨時鎖住某個區域暫時不讓訪問;又好比調度程序每次只想一個任務被一臺應用執行等等。大多數的分佈式協調採用臨時節點+watch機制實現。除了直接採用原始的監聽器本身實現外,curator實現了分佈式的IPM(進程間鎖)。Curator的機制爲:使用咱們提供的lock路徑的結點做爲全局鎖,這個結點的數據相似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次得到鎖時會生成這種串,釋放鎖時清空數據。因爲內部採用zookeeper的臨時順序節點特性,一旦客戶端失去鏈接後,則就會自動清除該節點,redis則只能等待超時。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.TimeUnit;

/**
 * Curator framework's distributed lock test.
 */
public class CuratorDistrLockTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_LOCK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");

        t1.start();
        t2.start();
    }

    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

   固然實際中會更加複雜,好比只是某些接口須要全局單點,可是服務的粒度又沒有拆分到獨立的微服務。另外,客戶端宕機後鎖是否自動釋放也是要考慮的,不然其餘節點就沒法接管。InterProcessMutex的實現分析能夠參考:https://www.jianshu.com/p/5fa6a1464076

Leader選舉

  在分佈式系統中,很多系統也採用和zk自己同樣的leader/follower架構,所以存在leader選舉的問題,例如es/kafka(注:在通常分佈式系統中,並不會使用到該特性)。curator就包含了對應的解決方法。Curator提供了LeaderSelector監聽器實現Leader選舉功能。同一時刻,只有一個Listener會進入takeLeadership()方法,說明它是當前的Leader。注意:當Listener從takeLeadership()退出時就說明它放棄了「Leader身份」,這時Curator會利用Zookeeper再從剩餘的Listener中選出一個新的Leader。autoRequeue()方法使放棄Leadership的Listener有機會從新得到Leadership,若是不設置的話放棄了的Listener是不會再變成Leader的。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;

/**
 * Curator framework's leader election test.
 * Output:
 *  LeaderSelector-2 take leadership!
 *  LeaderSelector-2 relinquish leadership!
 *  LeaderSelector-1 take leadership!
 *  LeaderSelector-1 relinquish leadership!
 *  LeaderSelector-0 take leadership!
 *  LeaderSelector-0 relinquish leadership! 
 *      ...
 */
public class CuratorLeaderTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        LeaderSelectorListener listener = new LeaderSelectorListener() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println(Thread.currentThread().getName() + " take leadership!");

                // takeLeadership() method should only return when leadership is being relinquished.
                Thread.sleep(5000L);

                System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
            }
        };

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void registerListener(LeaderSelectorListener listener) {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();

        // 2.Ensure path
        try {
            new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 3.Register listener
        LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
        selector.autoRequeue();
        selector.start();
    }

}
相關文章
相關標籤/搜索