【分佈式】Zookeeper使用--開源客戶端

1、前言java

  上一篇博客已經介紹瞭如何使用Zookeeper提供的原生態Java API進行操做,本篇博文主要講解如何經過開源客戶端來進行操做。node

2、ZkClientgit

  ZkClient是在Zookeeper原聲API接口之上進行了包裝,是一個更易用的Zookeeper客戶端,其內部還實現了諸如Session超時重連、Watcher反覆註冊等功能。github

  2.1 添加依賴apache

  在pom.xml文件中添加以下內容便可。  api

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>

  2.2 建立會話session

  使用ZkClient能夠輕鬆的建立會話,鏈接到服務端。  併發

package com.hust.grid.leesf.zkclient.examples;

import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;

public class Create_Session_Sample {
    public static void main(String[] args) throws IOException, InterruptedException {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        System.out.println("ZooKeeper session established.");
    }
}
View Code

  運行結果:  異步

ZooKeeper session established.

  結果代表已經成功建立會話。分佈式

  2.3 建立節點 

  ZkClient提供了遞歸建立節點的接口,即其幫助開發者完成父節點的建立,再建立子節點。

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;

public class Create_Node_Sample {
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        String path = "/zk-book/c1";
        zkClient.createPersistent(path, true);
        System.out.println("success create znode.");
    }
}
View Code

  運行結果: 

success create znode.

  結果代表已經成功建立了節點,值得注意的是,在原生態接口中是沒法建立成功的(父節點不存在),可是經過ZkClient能夠遞歸的先建立父節點,再建立子節點。

  

  能夠看到確實成功建立了/zk-book和/zk-book/c1兩個節點。

  2.4 刪除節點

  ZkClient提供了遞歸刪除節點的接口,即其幫助開發者先刪除全部子節點(存在),再刪除父節點。  

package com.hust.grid.leesf.zkclient.examples;

import org.I0Itec.zkclient.ZkClient;

public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.createPersistent(path, "");
        zkClient.createPersistent(path+"/c1", "");
        zkClient.deleteRecursive(path);
        System.out.println("success delete znode.");
    }
}
View Code

  運行結果:  

success delete znode.

  結果代表ZkClient可直接刪除帶子節點的父節點,由於其底層先刪除其全部子節點,而後再刪除父節點。

  2.5 獲取子節點  

package com.hust.grid.leesf.zkclient.examples;

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

public class Get_Children_Sample {

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
            }
        });

        zkClient.createPersistent(path);
        Thread.sleep(1000);
        zkClient.createPersistent(path + "/c1");
        Thread.sleep(1000);
        zkClient.delete(path + "/c1");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}
View Code

  運行結果:  

/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null

  結果代表:

  客戶端能夠對一個不存在的節點進行子節點變動的監聽。

  一旦客戶端對一個節點註冊了子節點列表變動監聽以後,那麼當該節點的子節點列表發生變動時,服務端都會通知客戶端,並將最新的子節點列表發送給客戶端

  該節點自己的建立或刪除也會通知到客戶端。

  2.6 獲取數據

package com.hust.grid.leesf.zkclient.examples;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

public class Get_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.createEphemeral(path, "123");

        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("Node " + dataPath + " deleted.");
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("Node " + dataPath + " changed, new data: " + data);
            }
        });

        System.out.println(zkClient.readData(path));
        zkClient.writeData(path, "456");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}
View Code

  運行結果: 

123
Node /zk-book changed, new data: 456
Node /zk-book deleted.

  結果代表能夠成功監聽節點數據變化或刪除事件。

  2.7 檢測節點是否存在  

package com.hust.grid.leesf.zkclient.examples;

import org.I0Itec.zkclient.ZkClient;

public class Exist_Node_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000);
        System.out.println("Node " + path + " exists " + zkClient.exists(path));
    }
}
View Code

  運行結果:

Node /zk-book exists false

  結果代表,能夠經過ZkClient輕易檢測節點是否存在,其相比於原生態的接口更易於理解。

3、Curator客戶端

  Curator解決了不少Zookeeper客戶端很是底層的細節開發工做,包括鏈接重連,反覆註冊Watcher和NodeExistsException異常等,現已成爲Apache的頂級項目。

  3.1 添加依賴

  在pom.xml文件中添加以下內容便可。  

        <!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.4.2</version>
        </dependency>

  3.2 建立會話

  Curator除了使用通常方法建立會話外,還可使用fluent風格進行建立。

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Create_Session_Sample {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
        client.start();
        System.out.println("Zookeeper session1 established. ");
        CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
        client1.start();
        System.out.println("Zookeeper session2 established. ");        
    }
}
View Code

  運行結果: 

Zookeeper session1 established. 
Zookeeper session2 established. 

  值得注意的是session2會話含有隔離命名空間,即客戶端對Zookeeper上數據節點的任何操做都是相對/base目錄進行的,這有利於實現不一樣的Zookeeper的業務之間的隔離。

  3.3 建立節點

  經過使用Fluent風格的接口,開發人員能夠進行自由組合來完成各類類型節點的建立。  

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class Create_Node_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        System.out.println("success create znode: " + path);
    }
}
View Code

  運行結果: 

success create znode: /zk-book/c1

  其中,也建立了/zk-book/c1的父節點/zk-book節點。

  3.4 刪除節點  

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
        client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
        System.out.println("success delete znode " + path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}
View Code

  運行結果: 

init
success delete znode /zk-book/c1

  結果代表成功刪除/zk-book/c1節點。

  3.5 獲取數據 

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Get_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
    }
}
View Code

  運行結果:  

init

  結果代表成功獲取了節點的數據。

  3.6 更新數據  

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Set_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println("Success set node for : " + path + ", new version: "
                + client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
        try {
            client.setData().withVersion(stat.getVersion()).forPath(path);
        } catch (Exception e) {
            System.out.println("Fail set node due to " + e.getMessage());
        }
    }
}
View Code

  運行結果:  

Success set node for : /zk-book, new version: 1
Fail set node due to KeeperErrorCode = BadVersion for /zk-book

  結果代表當攜帶數據版本不一致時,沒法完成更新操做。

  3.7 異步接口

  如同Zookeeper原生API提供了異步接口,Curator也提供了異步接口。在Zookeeper中,全部的異步通知事件處理都是由EventThread這個線程來處理的,EventThread線程用於串行處理全部的事件通知,其能夠保證對事件處理的順序性,可是一旦碰上覆雜的處理單元,會消耗過長的處理時間,從而影響其餘事件的處理,Curator容許用戶傳入Executor實例,這樣能夠將比較複雜的事件處理放到一個專門的線程池中去。 

package com.hust.grid.leesf.curator.examples;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class Create_Node_Background_Sample {
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    static CountDownLatch semaphore = new CountDownLatch(2);
    static ExecutorService tp = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {
        client.start();
        System.out.println("Main thread: " + Thread.currentThread().getName());

        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                System.out.println();
                semaphore.countDown();
            }
        }, tp).forPath(path, "init".getBytes());

        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                semaphore.countDown();
            }
        }).forPath(path, "init".getBytes());

        semaphore.await();
        tp.shutdown();
    }
}
View Code

  運行結果:

Main thread: main
event[code: -110, type: CREATE], Thread of processResult: main-EventThread
event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1

  其中,建立節點的事件由線程池本身處理,而非默認線程處理。

  Curator除了提供很便利的API,還提供了一些典型的應用場景,開發人員可使用參考更好的理解如何使用Zookeeper客戶端,全部的都在recipes包中,只須要在pom.xml中添加以下依賴便可

<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.4.2</version>
</dependency>

  3.8 節點監聽  

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class NodeCache_Sample {
    static String path = "/zk-book/nodecache";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        final NodeCache cache = new NodeCache(client, path, false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
            }
        });
        client.setData().forPath(path, "u".getBytes());
        Thread.sleep(1000);
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}
View Code

  運行結果:  

Node data update, new data: u

  當節點數據變動後收到了通知。NodeCache不只能夠監聽數據節點的內容變動,也能監聽指定節點是否存在。

  3.9 子節點監聽 

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class PathChildrenCache_Sample {
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(5000).build();

    public static void main(String[] args) throws Exception {
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED," + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED," + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED," + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        });
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}
View Code

  運行結果:

CHILD_ADDED,/zk-book/c1
CHILD_REMOVED,/zk-book/c1

  監聽節點的子節點,包括新增、數據變化、刪除三類事件。

  3.10 Master選舉

  藉助Zookeeper,開發者能夠很方便地實現Master選舉功能,其大致思路以下:選擇一個根節點,如/master_select,多臺機器同時向該節點建立一個子節點/master_select/lock,利用Zookeeper特性,最終只有一臺機器可以成功建立,成功的那臺機器就是Master。

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Recipes_MasterSelect {
    static String master_path = "/curator_recipes_master_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("成爲Master角色");
                Thread.sleep(3000);
                System.out.println("完成Master操做,釋放Master權利");
            }
        });
        selector.autoRequeue();
        selector.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}
View Code

  運行結果:

成爲Master角色
完成Master操做,釋放Master權利
成爲Master角色

  以上結果會反覆循環,而且當一個應用程序完成Master邏輯後,另一個應用程序的相應方法纔會被調用,即當一個應用實例成爲Master後,其餘應用實例會進入等待,直到當前Master掛了或者推出後纔會開始選舉Master。

  3.11 分佈式鎖

  爲了保證數據的一致性,常常在程序的某個運行點須要進行同步控制。以流水號生成場景爲例,普通的後臺應用一般採用時間戳方式來生成流水號,可是在用戶量很是大的狀況下,可能會出現併發問題。 

package com.hust.grid.leesf.curator.examples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class Recipes_NoLock {
    public static void main(String[] args) throws Exception {
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();
                    } catch (Exception e) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.err.println("生成的訂單號是 : " + orderNo);
                }
            }).start();
        }
        down.countDown();
    }
}
View Code

  運行結果: 

生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|592
生成的訂單號是 : 16:29:10|591

  結果表示訂單號出現了重複,即普通的方法沒法知足業務須要,由於其未進行正確的同步。可使用Curator來實現分佈式鎖功能。

package com.hust.grid.leesf.curator.examples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Recipes_Lock {
    static String lock_path = "/curator_recipes_lock_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();
                        lock.acquire();
                    } catch (Exception e) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的訂單號是 : " + orderNo);
                    try {
                        lock.release();
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        down.countDown();
    }
}
View Code

  運行結果:

生成的訂單號是 : 16:31:50|293
生成的訂單號是 : 16:31:50|319
生成的訂單號是 : 16:31:51|278
生成的訂單號是 : 16:31:51|326
生成的訂單號是 : 16:31:51|402
生成的訂單號是 : 16:31:51|420
生成的訂單號是 : 16:31:51|546
生成的訂單號是 : 16:31:51|602
生成的訂單號是 : 16:31:51|626
生成的訂單號是 : 16:31:51|656
生成的訂單號是 : 16:31:51|675
生成的訂單號是 : 16:31:51|701
生成的訂單號是 : 16:31:51|708
生成的訂單號是 : 16:31:51|732
生成的訂單號是 : 16:31:51|763
生成的訂單號是 : 16:31:51|785
生成的訂單號是 : 16:31:51|805
生成的訂單號是 : 16:31:51|823
生成的訂單號是 : 16:31:51|839
生成的訂單號是 : 16:31:51|853
生成的訂單號是 : 16:31:51|868
生成的訂單號是 : 16:31:51|884
生成的訂單號是 : 16:31:51|897
生成的訂單號是 : 16:31:51|910
生成的訂單號是 : 16:31:51|926
生成的訂單號是 : 16:31:51|939
生成的訂單號是 : 16:31:51|951
生成的訂單號是 : 16:31:51|965
生成的訂單號是 : 16:31:51|972
生成的訂單號是 : 16:31:51|983

  結果代表此時已經不存在重複的流水號。

  3.12 分佈式計數器

  分佈式計數器的典型應用是統計系統的在線人數,藉助Zookeeper也能夠很方便實現分佈式計數器功能:指定一個Zookeeper數據節點做爲計數器,多個應用實例在分佈式鎖的控制下,經過更新節點的內容來實現計數功能。 

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

public class Recipes_DistAtomicInt {
    static String distatomicint_path = "/curator_recipes_distatomicint_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distatomicint_path,
                new RetryNTimes(3, 1000));
        AtomicValue<Integer> rc = atomicInteger.add(8);
        System.out.println("Result: " + rc.succeeded());
    }
}
View Code

  運行結果:

Result: true

  結果代表已經將數據成功寫入數據節點中。

  3.13 分佈式Barrier

  如同JDK的CyclicBarrier,Curator提供了DistributedBarrier來實現分佈式Barrier。  

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Recipes_Barrier {
    static String barrier_path = "/curator_recipes_barrier_path";
    static DistributedBarrier barrier;

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        CuratorFramework client = CuratorFrameworkFactory.builder()
                                .connectString("127.0.0.1:2181")
                                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
                        client.start();
                        barrier = new DistributedBarrier(client, barrier_path);
                        System.out.println(Thread.currentThread().getName() + "號barrier設置");
                        barrier.setBarrier();
                        barrier.waitOnBarrier();
                        System.err.println("啓動...");
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        Thread.sleep(2000);
        barrier.removeBarrier();
    }
}
View Code

  運行結果:

Thread-1號barrier設置
Thread-2號barrier設置
Thread-4號barrier設置
Thread-3號barrier設置
Thread-0號barrier設置
啓動...
啓動...
啓動...
啓動...
啓動...

  結果代表經過DistributedBarrier能夠實現相似於CyclicBarrier的分佈式Barrier功能。

4、Curator工具類

  4.1 ZKPaths

  其提供了簡單的API來構建znode路徑、遞歸建立、刪除節點等。   

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZKPaths.PathAndNode;
import org.apache.zookeeper.ZooKeeper;

public class ZKPaths_Sample {
    static String path = "/curator_zkpath_sample";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();

        System.out.println(ZKPaths.fixForNamespace(path, "sub"));
        System.out.println(ZKPaths.makePath(path, "sub"));
        System.out.println(ZKPaths.getNodeFromPath("/curator_zkpath_sample/sub1"));

        PathAndNode pn = ZKPaths.getPathAndNode("/curator_zkpath_sample/sub1");
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());

        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zookeeper, dir1);
        ZKPaths.mkdirs(zookeeper, dir2);
        System.out.println(ZKPaths.getSortedChildren(zookeeper, path));

        ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
    }
}
View Code

  運行結果: 

/curator_zkpath_sample/sub
/curator_zkpath_sample/sub
sub1
/curator_zkpath_sample
sub1
[child1, child2]

  藉助ZKPaths可快速方便的完成節點的建立等操做。

  4.2 EnsurePath

  其提供了一種可以確保數據節點存在的機制,當上層業務但願對一個數據節點進行操做時,操做前須要確保該節點存在。 

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class EnsurePathDemo {
    static String path = "/zk-book/c1";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.usingNamespace("zk-book");

        EnsurePath ensurePath = new EnsurePath(path);
        ensurePath.ensure(client.getZookeeperClient());
        ensurePath.ensure(client.getZookeeperClient());

        EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1");
        ensurePath2.ensure(client.getZookeeperClient());
    }
}
View Code

  EnsurePath採起了以下節點建立方式,試圖建立指定節點,若是節點已經存在,那麼就不進行任何操做,也不對外拋出異常,不然正常建立數據節點。

5、總結

  本篇介紹了使用Zookeeper的開源客戶端如何操做Zookeeper的方法,相應的源碼也已經上傳至github,謝謝各位園友的觀看~

相關文章
相關標籤/搜索