zookeeper(六):Zookeeper客戶端Curator的API使用詳解

簡介

Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了不少Zookeeper客戶端很是底層的細節開發工做,包括鏈接重連、反覆註冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句「Guava is to Java that Curator to Zookeeper」給Curator予高度評價。node

引子和趣聞

Zookeeper名字的由來是比較有趣的,下面的片斷摘抄自《從PAXOS到ZOOKEEPER分佈式一致性原理與實踐》一書:算法

Zookeeper最先起源於雅虎的研究院的一個研究小組。在當時,研究人員發現,在雅虎內部不少大型的系統須要依賴一個相似的系統進行分佈式協調,可是這些系統每每存在分佈式單點問題。因此雅虎的開發人員就試圖開發一個通用的無單點問題的分佈式協調框架。在立項初期,考慮到不少項目都是用動物的名字來命名的(例如著名的Pig項目),雅虎的工程師但願給這個項目也取一個動物的名字。時任研究院的首席科學家Raghu Ramakrishnan開玩笑說:再這樣下去,咱們這兒就變成動物園了。此話一出,你們紛紛表示就叫動物園管理員吧——由於各個以動物命名的分佈式組件放在一塊兒,雅虎的整個分佈式系統看上去就像一個大型的動物園了,而Zookeeper正好用來進行分佈式環境的協調——因而,Zookeeper的名字由此誕生了。數據庫

Curator無疑是Zookeeper客戶端中的瑞士軍刀,它譯做"館長"或者''管理者'',不知道是否是開發小組有意而爲之,筆者猜想有可能這樣命名的緣由是說明Curator就是Zookeeper的館長(腦洞有點大:Curator就是動物園的園長)。apache

Curator包含的包

curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操做,例如重試策略等
curator-recipes:封裝了一些高級特性,如:Cache事件監聽、選舉、分佈式鎖、分佈式計數器、分佈式Barrier等。api

Maven依賴

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

注意:使用curator的版本:2.12.0,對應Zookeeper的版本爲:3.4.x,若是跨版本會有兼容性問題,頗有可能致使節點操做失敗緩存

Curator的基本Api

建立會話

1.使用靜態工程方法建立客戶端

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient靜態工廠方法包含四個主要參數:服務器

參數名 說明
connectionString 服務器列表,格式host1:port1,host2:port2,...
retryPolicy 重試策略,內建有四種重試策略,也能夠自行實現RetryPolicy接口
sessionTimeoutMs 會話超時時間,單位毫秒,默認60000ms
connectionTimeoutMs 鏈接建立超時時間,單位毫秒,默認60000ms
1-1.Curator 四種重連策略
1.RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) 

以sleepMsBetweenRetries的間隔重連,直到超過maxElapsedTimeMs的時間設置網絡

2.RetryNTimes(int n, int sleepMsBetweenRetries) 

指定重連次數session

3.RetryOneTime(int sleepMsBetweenRetry)

重連一次,簡單粗暴併發

4.ExponentialBackoffRetry

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 
時間間隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))  

2.使用Fluent風格的Api建立會話

核心參數變爲流式設置,

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

3.建立包含隔離命名空間的會話

爲了實現不一樣的Zookeeper業務之間的隔離,須要爲每一個業務分配一個獨立的命名空間(NameSpace),即指定一個Zookeeper的根路徑(官方術語:爲Zookeeper添加「Chroot」特性)。例如(下面的例子)當客戶端指定了獨立命名空間爲「/base」,那麼該客戶端對Zookeeper上的數據節點的操做都是基於該目錄進行的。經過設置Chroot能夠將客戶端應用與Zookeeper服務端的一課子樹相對應,在多個應用共用一個Zookeeper集羣的場景下,這對於實現不一樣應用之間的相互隔離十分有意義。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

啓動客戶端

當建立會話成功,獲得client的實例而後能夠直接調用其start( )方法:

client.start();

數據節點操做

建立數據節點

Zookeeper的節點建立模式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化而且帶序列號
  • EPHEMERAL:臨時
  • EPHEMERAL_SEQUENTIAL:臨時而且帶序列號

建立一個節點,初始內容爲空

client.create().forPath("path");

注意:若是沒有設置節點屬性,節點建立模式默認爲持久化節點,內容默認爲空

建立一個節點,附帶初始化內容

client.create().forPath("path","init".getBytes());

建立一個節點,指定建立模式(臨時節點),內容爲空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

建立一個節點,指定建立模式(臨時節點),附帶初始化內容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

建立一個節點,指定建立模式(臨時節點),附帶初始化內容,而且自動遞歸建立父節點

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

這個creatingParentContainersIfNeeded()接口很是有用,由於通常狀況開發人員在建立一個子節點必須判斷它的父節點是否存在,若是不存在直接建立會拋出NoNodeException,使用creatingParentContainersIfNeeded()以後Curator可以自動遞歸建立全部所需的父節點。

刪除數據節點

刪除一個節點

client.delete().forPath("path");

注意,此方法只能刪除葉子節點,不然會拋出異常。

刪除一個節點,而且遞歸刪除其全部的子節點

client.delete().deletingChildrenIfNeeded().forPath("path");

刪除一個節點,強制指定版本進行刪除

client.delete().withVersion(10086).forPath("path");

刪除一個節點,強制保證刪除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一個保障措施,只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操做,直到刪除節點成功。

注意:上面的多個流式接口是能夠自由組合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

讀取數據節點數據

讀取一個節點的數據內容

client.getData().forPath("path");

注意,此方法返的返回值是byte[ ];

讀取一個節點的數據內容,同時獲取到該節點的stat

先看下stat的查詢結果集
[zk: 10.15.107.155:3352(CONNECTED) 111] stat /c/hfx/sw1
cZxid = 0x2000007df
ctime = Thu Mar 14 11:08:28 CST 2013
mZxid = 0x200000a87
mtime = Thu Mar 14 15:00:34 CST 2013
pZxid = 0x2000007df
cversion = 0
dataVersion = 608
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

更新數據節點數據

更新一個節點的數據內容

client.setData().forPath("path","data".getBytes());

注意:該接口會返回一個Stat實例

更新一個節點的數據內容,強制指定版本進行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

檢查節點是否存在

client.checkExists().forPath("path");

注意:該方法返回一個Stat實例,用於檢查ZNode是否存在的操做. 能夠調用額外的方法(監控或者後臺處理)並在最後調用forPath( )指定要操做的ZNode

獲取某個節點的全部子節點路徑

client.getChildren().forPath("path");

注意:該方法的返回值爲List<String>,得到ZNode的子節點Path列表。 能夠調用額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後調用forPath()指定要操做的父ZNode。

事務

CuratorFramework的實例包含inTransaction( )接口方法,調用此方法開啓一個ZooKeeper事務. 能夠複合create, setData, check, and/or delete 等操做而後調用commit()做爲一個原子操做提交。一個例子以下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

異步接口

上面提到的建立、刪除、更新、讀取等方法都是同步的,Curator提供異步接口,引入了BackgroundCallback接口用於處理異步接口調用以後服務端返回的結果信息。BackgroundCallback接口中一個重要的回調值爲CuratorEvent,裏面包含事件類型、響應嗎和節點的詳細信息。

CuratorEventType

事件類型 對應CuratorFramework實例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

響應碼(#getResultCode())

響應碼 意義
0 OK,即調用成功
-4 ConnectionLoss,即客戶端與服務端斷開鏈接
-110 NodeExists,即節點已經存在
-112 SessionExpired,即會話過時

在程序中,咱們能夠經過如下API來進行異步操做。

Backgroundable<T>

public T inBackground();
public T inBackground(Object context);
public T inBackground(BackgroundCallback callback);
public T inBackground(BackgroundCallback callback, Object context);
public T inBackground(BackgroundCallback callback, Executor executor);
public T inBackground(BackgroundCallback callback, Object context, Executor executor);

在這些構造方法中,咱們重點來關注下executor這個參數。在ZooKeeper中,全部異步通知事件處理都是由EventThread這個線程來處理的——EventThread線程用於串行處理全部的事件通知。EventThread的「串行處理機制」在絕大部分應用場景下可以保證對事件處理的順序性,帶這個特性也有其弊端,就是一旦碰上一個複雜的處理單元,就會消耗過長的處理時間,從而影響對其餘事件的處理。所以,在上面的inBackground接口中,容許用戶傳入一個Executor實例,這樣一來,就能夠把那些比較複雜的的事件處理放到一個專門的線程池中去,如Executors.newFixedThreadPool(2)。

一個異步建立節點的例子以下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      
      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode())); },executor) .forPath("path");

注意:若是#inBackground()方法不指定executor,那麼會默認使用Curator的EventThread去進行異步處理。

Curator食譜(高級特性)

提醒:首先你必須添加curator-recipes依賴,下文僅僅對recipes一些特性的使用進行解釋和舉例,不打算進行源碼級別的探討

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重要提醒:強烈推薦使用ConnectionStateListener監控鏈接的狀態,當鏈接狀態爲LOST,curator-recipes下的全部Api將會失效或者過時,儘管後面全部的例子都沒有使用到ConnectionStateListener。

緩存

Zookeeper原生支持經過註冊Watcher來進行事件監聽,可是開發者須要反覆註冊(Watcher只能單次註冊單次使用)。Cache是Curator中對事件監聽的包裝,能夠看做是對事件監聽的本地緩存視圖,可以自動爲開發者處理反覆註冊監聽。Curator提供了三種Watcher(Cache)來監聽結點的變化。

Path Cache

Path Cache用來監控一個ZNode的子節點. 當一個子節點增長, 更新,刪除時, Path Cache會改變它的狀態, 會包含最新的子節點, 子節點的數據和狀態,而狀態的更變將經過PathChildrenCacheListener通知。

實際使用時會涉及到四個類:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

經過下面的構造函數建立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必須調用它的start方法,使用完後調用close方法。 能夠設置StartMode來實現啓動的模式,

StartMode有下面幾種:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在調用start()以前會調用rebuild()
  3. POST_INITIALIZED_EVENT: 當Cache初始化數據後發送一個PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)能夠增長listener監聽緩存的變化。

getCurrentData()方法返回一個List<ChildData>對象,能夠遍歷全部的子節點。

設置/更新、移除實際上是使用client (CuratorFramework)來操做, 不經過PathChildrenCache操做:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件類型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("節點數據:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:若是new PathChildrenCache(client, PATH, true)中的參數cacheData值設置爲false,則示例中的event.getData().getData()、data.getData()將返回null,cache將不會緩存節點數據。

注意:示例中的Thread.sleep(10)能夠註釋掉,可是註釋後事件監聽的觸發次數會不全,這可能與PathCache的實現原理有關,不能太過頻繁的觸發事件!

Node Cache

Node Cache與Path Cache相似,Node Cache只是監聽某一個特定的節點。它涉及到下面的三個類:

  • NodeCache - Node Cache實現類
  • NodeCacheListener - 節點監聽器
  • ChildData - 節點數據

注意:使用cache,依然要調用它的start()方法,使用完後調用close()方法。

getCurrentData()將獲得節點當前的狀態,經過它的狀態能夠獲得當前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("節點數據:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("節點被刪除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示例中的Thread.sleep(10)能夠註釋,可是註釋後事件監聽的觸發次數會不全,這可能與NodeCache的實現原理有關,不能太過頻繁的觸發事件!

注意:NodeCache只能監聽一個節點的狀態變化。

Tree Cache

Tree Cache能夠監控整個樹上的全部節點,相似於PathCache和NodeCache的組合,主要涉及到下面四個類:

  • TreeCache - Tree Cache實現類
  • TreeCacheListener - 監聽器類
  • TreeCacheEvent - 觸發的事件類
  • ChildData - 節點數據
public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件類型:" + event.getType() +
                        " | 路徑:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中沒有使用Thread.sleep(10),可是事件觸發次數也是正常的。

注意:TreeCache在初始化(調用start()方法)的時候會回調TreeCacheListener實例一個事TreeCacheEvent,而回調的TreeCacheEvent對象的Type爲INITIALIZED,ChildData爲null,此時event.getData().getPath()頗有可能致使空指針異常,這裏應該主動處理並避免這種狀況。

Leader選舉

在分佈式計算中, leader elections是很重要的一個功能, 這個選舉過程是這樣子的: 指派一個進程做爲組織者,將任務分發給各節點。 在任務開始前, 哪一個節點都不知道誰是leader(領導者)或者coordinator(協調者). 當選舉算法開始執行後, 每一個節點最終會獲得一個惟一的節點做爲任務leader. 除此以外, 選舉還常常會發生在leader意外宕機的狀況下,新的leader要被選舉出來。

在zookeeper集羣中,leader負責寫操做,而後經過Zab協議實現follower的同步,leader或者follower均可以處理讀操做。

Curator 有兩種leader選舉的recipe,分別是LeaderSelectorLeaderLatch

前者是全部存活的客戶端不間斷的輪流作Leader,大同社會。後者是一旦選舉出Leader,除非有客戶端掛掉從新觸發選舉,不然不會交出領導權。某黨?

LeaderLatch

LeaderLatch有兩個構造函數:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的啓動:

leaderLatch.start( );

一旦啓動,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,而後其中一個最終會被選舉爲leader,能夠經過hasLeadership方法查看LeaderLatch實例是否leader:

leaderLatch.hasLeadership( ); //返回true說明當前實例是leader

相似JDK的CountDownLatch, LeaderLatch在請求成爲leadership會block(阻塞),一旦不使用LeaderLatch了,必須調用close方法。 若是它是leader,會釋放leadership, 其它的參與者將會選舉一個leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

異常處理: LeaderLatch實例能夠增長ConnectionStateListener來監聽網絡鏈接問題。 當 SUSPENDED 或 LOST 時, leader再也不認爲本身仍是leader。當LOST後鏈接重連後RECONNECTED,LeaderLatch會刪除先前的ZNode而後從新建立一個。LeaderLatch用戶必須考慮致使leadership丟失的鏈接問題。 強烈推薦你使用ConnectionStateListener。

一個LeaderLatch的使用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

能夠添加test module的依賴方便進行測試,不須要啓動真實的zookeeper服務端:

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

首先咱們建立了10個LeaderLatch,啓動後它們中的一個會被選舉爲leader。 由於選舉會花費一些時間,start後並不能立刻就獲得leader。
經過hasLeadership查看本身是不是leader, 若是是的話返回true。
能夠經過.getLeader().getId()能夠獲得當前的leader的ID。
只能經過close釋放當前的領導權。
await是一個阻塞方法, 嘗試獲取leader地位,可是未必能上位。

LeaderSelector

LeaderSelector使用的時候主要涉及下面幾個類:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心類是LeaderSelector,它的構造函數以下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

相似LeaderLatch,LeaderSelector必須start: leaderSelector.start(); 一旦啓動,當實例取得領導權時你的listener的takeLeadership()方法被調用。而takeLeadership()方法只有領導權被釋放時才返回。 當你再也不使用LeaderSelector實例時,應該調用它的close方法。

異常處理 LeaderSelectorListener類繼承ConnectionStateListener。LeaderSelector必須當心鏈接狀態的改變。若是實例成爲leader, 它應該響應SUSPENDED 或 LOST。 當 SUSPENDED 狀態出現時, 實例必須假定在從新鏈接成功以前它可能再也不是leader了。 若是LOST狀態出現, 實例再也不是leader, takeLeadership方法返回。

重要: 推薦處理方式是當收到SUSPENDED 或 LOST時拋出CancelLeadershipException異常.。這會致使LeaderSelector實例中斷並取消執行takeLeadership方法的異常.。這很是重要, 你必須考慮擴展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推薦的處理邏輯。

下面的一個例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}
你能夠在takeLeadership進行任務的分配等等,而且不要返回,若是你想要要此實例一直是leader的話能夠加一個死循環。調用 leaderSelector.autoRequeue();保證在此實例釋放領導權以後還可能得到領導權。 在這裏咱們使用AtomicInteger來記錄此client得到領導權的次數, 它是」fair」, 每一個client有平等的機會得到領導權。
public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}
對比可知,LeaderLatch必須調用 close()方法纔會釋放領導權,而對於LeaderSelector,經過 LeaderSelectorListener能夠對領導權進行控制, 在適當的時候釋放領導權,這樣每一個節點都有可能得到領導權。從而,LeaderSelector具備更好的靈活性和可控性,建議有LeaderElection應用場景下優先使用LeaderSelector。

分佈式鎖

提醒:

1.推薦使用ConnectionStateListener監控鏈接的狀態,由於當鏈接LOST時你再也不擁有鎖

2.分佈式的鎖全局同步, 這意味着任何一個時間點不會有兩個客戶端都擁有相同的鎖。

可重入共享鎖—Shared Reentrant Lock

Shared意味着鎖是全局可見的, 客戶端均可以請求鎖。 Reentrant和JDK的ReentrantLock相似,便可重入, 意味着同一個客戶端在擁有鎖的同時,能夠屢次獲取,不會被阻塞。 它是由類InterProcessMutex來實現。 它的構造函數爲:

public InterProcessMutex(CuratorFramework client, String path)

經過acquire()得到鎖,並提供超時機制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

經過release()方法釋放鎖。 InterProcessMutex 實例能夠重用。

Revoking ZooKeeper recipes wiki定義了可協商的撤銷機制。 爲了撤銷mutex, 調用下面的方法:

public void makeRevocable(RevocationListener<T> listener)
將鎖設爲可撤銷的. 當別的進程或線程想讓你釋放鎖時Listener會被調用。
Parameters:
listener - the listener

若是你請求撤銷當前的鎖, 調用attemptRevoke()方法,注意鎖釋放時RevocationListener將會回調。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

二次提醒:錯誤處理 仍是強烈推薦你使用ConnectionStateListener處理鏈接狀態的改變。 當鏈接LOST時你再也不擁有鎖。

首先讓咱們建立一個模擬的共享資源, 這個資源指望只能單線程的訪問,不然會有併發問題。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真實環境中咱們會在這裏訪問/維護一個共享的資源
        //這個例子在使用鎖的狀況下不會非法併發異常IllegalStateException
        //可是在無鎖的狀況因爲sleep了一段時間,很容易拋出異常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

而後建立一個InterProcessMutexDemo類, 它負責請求鎖, 使用資源,釋放鎖這樣一個完整的訪問過程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代碼也很簡單,生成10個client, 每一個client重複執行10次 請求鎖–訪問資源–釋放鎖的過程。每一個client都在獨立的線程中。 結果能夠看到,鎖是隨機的被每一個實例排他性的使用。

既然是可重用的,你能夠在一個線程中屢次調用acquire(),在線程擁有鎖時它老是返回true。

你不該該在多個線程中用同一個InterProcessMutex, 你能夠在每一個線程中都生成一個新的InterProcessMutex實例,它們的path都同樣,這樣它們能夠共享同一個鎖。

不可重入共享鎖—Shared Lock

這個鎖和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味着它不能在同一個線程中重入。這個類是InterProcessSemaphoreMutex,使用方法和InterProcessMutex相似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能獲得互斥鎖");
        }
        System.out.println(clientName + " 已獲取到互斥鎖");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能獲得互斥鎖");
        }
        System.out.println(clientName + " 再次獲取到互斥鎖");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 獲取鎖幾回 釋放鎖也要幾回
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

運行後發現,有且只有一個client成功獲取第一個鎖(第一個acquire()方法返回true),而後它本身阻塞在第二個acquire()方法,獲取第二個鎖超時;其餘全部的客戶端都阻塞在第一個acquire()方法超時而且拋出異常。

這樣也就驗證了InterProcessSemaphoreMutex實現的鎖是不可重入的。

可重入讀寫鎖—Shared Reentrant Read Write Lock

相似JDK的ReentrantReadWriteLock。一個讀寫鎖管理一對相關的鎖。一個負責讀操做,另一個負責寫操做。讀操做在寫鎖沒被使用時可同時由多個進程使用,而寫鎖在使用時不容許讀(阻塞)。

此鎖是可重入的。一個擁有寫鎖的線程可重入讀鎖,可是讀鎖卻不能進入寫鎖。這也意味着寫鎖能夠降級成讀鎖, 好比請求寫鎖 --->請求讀鎖--->釋放讀鎖 ---->釋放寫鎖。從讀鎖升級成寫鎖是不行的。

可重入讀寫鎖主要由兩個類實現:InterProcessReadWriteLockInterProcessMutex。使用時首先建立一個InterProcessReadWriteLock實例,而後再根據你的需求獲得讀鎖或者寫鎖,讀寫鎖的類型是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先獲得寫鎖再獲得讀鎖,不能反過來!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能獲得寫鎖");
        }
        System.out.println(clientName + " 已獲得寫鎖");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能獲得讀鎖");
        }
        System.out.println(clientName + " 已獲得讀鎖");
        try {
            resource.use(); // 使用資源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 釋放讀寫鎖");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

信號量—Shared Semaphore

一個計數的信號量相似JDK的Semaphore。 JDK中Semaphore維護的一組許可(permits),而Curator中稱之爲租約(Lease)。 有兩種方式能夠決定semaphore的最大租約數。第一種方式是用戶給定path而且指定最大LeaseSize。第二種方式用戶給定path而且使用SharedCountReader類。若是不使用SharedCountReader, 必須保證全部實例在多進程中使用相同的(最大)租約數量,不然有可能出現A進程中的實例持有最大租約數量爲10,可是在B進程中持有的最大租約數量爲20,此時租約的意義就失效了。

此次調用acquire()會返回一個租約對象。 客戶端必須在finally中close這些租約對象,不然這些租約會丟失掉。 可是, 可是,若是客戶端session因爲某種緣由好比crash丟掉, 那麼這些客戶端持有的租約會自動close, 這樣其它客戶端能夠繼續使用這些租約。 租約還能夠經過下面的方式返還:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

注意你能夠一次性請求多個租約,若是Semaphore當前的租約不夠,則請求線程會被阻塞。 同時還提供了超時的重載方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的主要類包括下面幾個:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader
public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

首先咱們先得到了5個租約, 最後咱們把它還給了semaphore。 接着請求了一個租約,由於semaphore還有5個租約,因此請求能夠知足,返回一個租約,還剩4個租約。 而後再請求一個租約,由於租約不夠,阻塞到超時,仍是沒能知足,返回結果爲null(租約不足會阻塞到超時,而後返回null,不會主動拋出異常;若是不設置超時時間,會一致阻塞)。

上面說講的鎖都是公平鎖(fair)。 總ZooKeeper的角度看, 每一個客戶端都按照請求的順序得到鎖,不存在非公平的搶佔的狀況。

多共享鎖對象 —Multi Shared Lock

Multi Shared Lock是一個鎖的容器。 當調用acquire(), 全部的鎖都會被acquire(),若是請求失敗,全部的鎖都會被release。 一樣調用release時全部的鎖都被release(失敗被忽略)。 基本上,它就是組鎖的表明,在它上面的請求釋放操做都會傳遞給它包含的全部的鎖。

主要涉及兩個類:

  • InterProcessMultiLock
  • InterProcessLock

它的構造函數須要包含的鎖的集合,或者一組ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一個InterProcessMultiLock, 包含一個重入鎖和一個非重入鎖。 調用acquire()後能夠看到線程同時擁有了這兩個鎖。 調用release()看到這兩個鎖都被釋放了。

最後再重申一次, 強烈推薦使用ConnectionStateListener監控鏈接的狀態,當鏈接狀態爲LOST,鎖將會丟失。

分佈式計數器

顧名思義,計數器是用來計數的, 利用ZooKeeper能夠實現一個集羣共享的計數器。 只要使用相同的path就能夠獲得最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數(SharedCount),一個用long來計數(DistributedAtomicLong)。

分佈式int計數器—SharedCount

這個類使用int類型來計數。 主要涉及三個類。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表明計數器, 能夠爲它增長一個SharedCountListener,當計數器改變時此Listener能夠監聽到改變的事件,而SharedCountReader能夠讀取到最新的值, 包括字面值和帶版本信息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在這個例子中,咱們使用baseCount來監聽計數值(addListener方法來添加SharedCountListener )。 任意的SharedCount, 只要使用相同的path,均可以獲得這個計數值。 而後咱們使用5個線程爲計數值增長一個10之內的隨機數。相同的path的SharedCount對計數值進行更改,將會回調給baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

這裏咱們使用trySetCount去設置計數器。 第一個參數提供當前的VersionedValue,若是期間其它client更新了此計數值, 你的更新可能不成功, 可是這時你的client更新了最新的值,因此失敗了你能夠嘗試再更新一次。 而setCount是強制更新計數器的值

注意計數器必須start,使用完以後必須調用close關閉它。

強烈推薦使用ConnectionStateListener。 在本例中SharedCountListener擴展ConnectionStateListener

分佈式long計數器—DistributedAtomicLong

再看一個Long類型的計數器。 除了計數的範圍比SharedCount大了以外, 它首先嚐試使用樂觀鎖的方式設置計數器, 若是不成功(好比期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。

能夠從它的內部實現DistributedAtomicValue.trySet()中看出:

AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此計數器有一系列的操做:

  • get(): 獲取當前值
  • increment(): 加一
  • decrement(): 減一
  • add(): 增長特定的值
  • subtract(): 減去特定的值
  • trySet(): 嘗試設置計數值
  • forceSet(): 強制設置計數值

必須檢查返回結果的succeeded(), 它表明此操做是否成功。 若是操做成功, preValue()表明操做前的值, postValue()表明操做後的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分佈式隊列

使用Curator也能夠簡化Ephemeral Node (臨時節點)的操做。Curator也提供ZK Recipe的分佈式隊列實現。 利用ZK的 PERSISTENTS_EQUENTIAL節點, 能夠保證放入到隊列中的項目是按照順序排隊的。 若是單一的消費者從隊列中取數據, 那麼它是先入先出的,這也是隊列的特色。 若是你嚴格要求順序,你就的使用單一的消費者,可使用Leader選舉只讓Leader做爲惟一的消費者。

可是, 根據Netflix的Curator做者所說, ZooKeeper真心不適合作Queue,或者說ZK沒有實現一個好的Queue, 緣由有五:

  1. ZK有1MB 的傳輸限制。 實踐中ZNode必須相對較小,而隊列包含成千上萬的消息,很是的大。
  2. 若是有不少節點,ZK啓動時至關的慢。 而使用queue會致使好多ZNode. 你須要顯著增大 initLimit 和 syncLimit.
  3. ZNode很大的時候很難清理。Netflix不得不建立了一個專門的程序作這事。
  4. 當很大量的包含成千上萬的子節點的ZNode時, ZK的性能變得很差
  5. ZK的數據庫徹底放在內存中。 大量的Queue意味着會佔用不少的內存空間。

儘管如此, Curator仍是建立了各類Queue的實現。 若是Queue的數據量不太多,數據量不太大的狀況下,酌情考慮,仍是可使用的。

分佈式隊列—DistributedQueue

DistributedQueue是最普通的一種隊列。 它設計如下四個類:

  • QueueBuilder - 建立隊列使用QueueBuilder,它也是其它隊列的建立類
  • QueueConsumer - 隊列中的消息消費者接口
  • QueueSerializer - 隊列消息序列化和反序列化接口,提供了對隊列中的對象的序列化和反序列化
  • DistributedQueue - 隊列實現類

QueueConsumer是消費者,它能夠接收隊列的數據。處理隊列中的數據的代碼邏輯能夠放在QueueConsumer.consumeMessage()中。

正常狀況下先將消息從隊列中移除,再交給消費者消費。但這是兩個步驟,不是原子的。能夠調用Builder的lockPath()消費者加鎖,當消費者消費數據時持有鎖,這樣其它消費者不能消費此消息。若是消費失敗或者進程死掉,消息能夠交給其它進程。這會帶來一點性能的損失。最好仍是單消費者模式使用隊列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消費完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 隊列消息序列化實現類
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定義隊列消費者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("鏈接狀態改變: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消費消息(" + name + "): " + message);
            }
        };
    }
}

例子中定義了兩個分佈式隊列和兩個消費者,由於PATH是相同的,會存在消費者搶佔消費消息的狀況。

帶Id的分佈式隊列—DistributedIdQueue

DistributedIdQueue和上面的隊列相似,可是能夠爲隊列中的每個元素設置一個ID。 能夠經過ID把隊列中任意的元素移除。 它涉及幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

經過下面方法建立:

builder.buildIdQueue()

放入元素時:

queue.put(aMessage, messageId);

移除元素時:

int numberRemoved = queue.remove(messageId);

在這個例子中, 有些元素尚未被消費者消費前就移除了,這樣消費者不會收到刪除的消息。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

優先級分佈式隊列—DistributedPriorityQueue

優先級隊列對隊列中的元素按照優先級進行排序。 Priority越小, 元素越靠前, 越先被消費掉。 它涉及下面幾個類:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

經過builder.buildPriorityQueue(minItemsBeforeRefresh)方法建立。 當優先級隊列獲得元素增刪消息時,它會暫停處理當前的元素隊列,而後刷新隊列。minItemsBeforeRefresh指定刷新前當前活動的隊列的最小數量。 主要設置你的程序能夠容忍的不排序的最小值。

放入隊列時須要指定優先級:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

有時候你可能會有錯覺,優先級設置並無起效。那是由於優先級是對於隊列積壓的元素而言,若是消費速度過快有可能出如今後一個元素入隊操做以前前一個元素已經被消費,這種狀況下DistributedPriorityQueue會退化爲DistributedQueue。

分佈式延遲隊列—DistributedDelayQueue

JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了相似的功能, 元素有個delay值, 消費者隔一段時間才能收到元素。 涉及到下面四個類。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

經過下面的語句建立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素時能夠指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是離如今的一個時間間隔, 好比20毫秒,而是將來的一個時間戳,如 System.currentTimeMillis() + 10秒。 若是delayUntilEpoch的時間已通過去,消息會馬上被消費者接收。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

前面雖然實現了各類隊列,可是你注意到沒有,這些隊列並無實現相似JDK同樣的接口。 SimpleDistributedQueue提供了和JDK基本一致的接口(可是沒有實現Queue接口)。 建立很簡單:

public SimpleDistributedQueue(CuratorFramework client,String path)

增長元素:

public boolean offer(byte[] data) throws Exception

刪除元素:

public byte[] take() throws Exception

另外還提供了其它方法:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

沒有add方法, 多了take方法。

take方法在成功返回以前會被阻塞。 而poll方法在隊列爲空時直接返回null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("發送一條消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("發送一條消息失敗:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消費一條消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

可是實際上發送了100條消息,消費完第一條以後,後面的消息沒法消費,目前沒找到緣由。查看一下官方文檔推薦的demo使用下面幾個Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

可是實際使用發現仍是存在消費阻塞問題。

分佈式屏障—Barrier

分佈式Barrier是這樣一個類: 它會阻塞全部節點上的等待進程,直到某一個被知足, 而後全部的節點繼續進行。

好比賽馬比賽中, 等賽馬陸續來到起跑線前。 一聲令下,全部的賽馬都飛奔而出。

DistributedBarrier

DistributedBarrier類實現了柵欄的功能。 它的構造函數以下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你須要設置柵欄,它將阻塞在它上面等待的線程:

setBarrier();

而後須要阻塞的線程調用方法等待放行條件:

public void waitOnBarrier()

當條件知足時,移除柵欄,全部等待的線程將繼續執行:

removeBarrier();

異常處理 DistributedBarrier 會監控鏈接狀態,當鏈接斷掉時waitOnBarrier()方法會拋出異常。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

這個例子建立了controlBarrier來設置柵欄和移除柵欄。 咱們建立了5個線程,在此Barrier上等待。 最後移除柵欄後全部的線程才繼續執行。

若是你開始不設置柵欄,全部的線程就不會阻塞住。

雙柵欄—DistributedDoubleBarrier

雙柵欄容許客戶端在計算的開始和結束時同步。當足夠的進程加入到雙柵欄時,進程開始計算, 當計算完成時,離開柵欄。 雙柵欄類是DistributedDoubleBarrier。 構造函數爲:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成員數量,當enter()方法被調用時,成員被阻塞,直到全部的成員都調用了enter()。 當leave()方法被調用時,它也阻塞調用線程,直到全部的成員都調用了leave()。 就像百米賽跑比賽, 發令槍響, 全部的運動員開始跑,等全部的運動員跑過終點線,比賽才結束。

DistributedDoubleBarrier會監控鏈接狀態,當鏈接斷掉時enter()leave()方法會拋出異常。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}
相關文章
相關標籤/搜索