JDK 版本 : OpenJDK 11.0.1java
IDE : idea 2018.3node
Zookeeper Server 版本 : 3.5.4-betaapache
Zookeeper Client 版本 : 3.5.4-betasession
Curator 版本 : 4.2.0異步
Zookeeper Client 是 Zookeeper 的經典原生客戶端。使用以前須要在 Maven 中導入依賴:分佈式
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.4-beta</version> </dependency>
代碼:ide
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.TimeUnit; public class ClientTest { public static void main(String[] args) { /** * 建立一個 Zookeeper 的實例 * 此處爲一個集羣,Zookeeper 的 ip 之間用逗號隔開 * * 參數解釋: * param 1 - Zookeeper 的實例 ip ,此處是一個集羣,因此配置了多個 ip,用逗號隔開 * param 2 - session 過時時間,單位秒 (1000) * param 3 - 監視者,用於獲取監控事件 (MyWatch) */ ZooKeeper zooKeeper = null; try { Watcher createZkWatch = new MyWatch(); zooKeeper = new ZooKeeper("localhost:2101,localhost:2102,localhost:2103", 1000,createZkWatch); } catch (IOException e) { e.printStackTrace(); } /** * 值得注意的是,Zookeeper 對象去鏈接中間件實例是異步的 * 因此此處須要作一個死循環等待它鏈接完畢 * 更加優雅的作法是使用 CownDownLatch 去作,可是 while 比較簡單 */ while(zooKeeper.getState() == ZooKeeper.States.CONNECTING){ //返回 zookeeper 的狀態 System.out.println(zooKeeper.getState()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } //若是鏈接不出錯的話此處狀態應該爲 CONNECTED if(zooKeeper.getState() != ZooKeeper.States.CONNECTED) return; /** * 建立 ZooKeeper 節點 * 參數解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 節點數據 (my first data) * param 3 - 設置權限 (OPEN_ACL_UNSAFE) * param 4 - znode 類型 (PERSISTENT) * * * znode 類型有四種: * PERSISTENT - 持久化目錄節點,客戶端與zookeeper斷開鏈接後,該節點依舊存在 * PERSISTENT_SEQUENTIAL - 持久化,並帶有序列號 * EPHEMERAL - 臨時目錄節點,客戶端與zookeeper斷開鏈接後,該節點被刪除 * EPHEMERAL_SEQUENTIAL - 臨時,並帶有序列號 */ try { String s = zooKeeper.create("/zoo", "my first data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("建立節點:" + s); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 建立一個二級節點,參數同上 * 須要注意的是,必需要有一級節點纔能有二級節點,否則會報錯 */ try { String s = zooKeeper.create("/zoo/zoo_1", "my first data_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("建立二級節點:" + s); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 查詢 ZooKeeper 節點的數據 * 參數解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 監視者,用於獲取監控事件 (MyWatch) * param 3 - Zookeeper 實例信息和數據信息 (stat) * * 注意若是後續須要修改該節點的值,能夠在此處記錄節點版本 version (非必要操做) */ Integer zooVersion = null; try { MyWatch getDataWatch = new MyWatch(); Stat stat = new Stat(); byte[] data = zooKeeper.getData("/zoo",getDataWatch,stat); System.out.println("查詢節點數據:" + new String(data)); //從 stat 中能夠獲取不少 Zookeeper 實例的信息 System.out.println("查詢節點數據 czxid:" + stat.getCzxid()); //zxid zooVersion = stat.getVersion(); //此處獲取 /zoo 節點的版本號 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 修改 ZooKeeper 節點的數據 * 參數解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 節點新數據 (my first data change) * param 3 - 該節點的版本 * * 在成功修改了節點的數據以後,版本號會自動加一 * 若是此時不知道節點的版本,也能夠輸入 -1,會默認取最新的節點版本去修改 */ try { Stat stat = zooKeeper.setData("/zoo", "my first data change".getBytes(), zooVersion); // zooVersion = -1 System.out.println("修改節點數據 czxid:" + stat.getCzxid()); System.out.println("修改節點數據 version:" + stat.getVersion()); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 查看 ZooKeeper 節點是否存在 * 參數解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 監視者,用於獲取監控事件 (MyWatch) * * 若是不存在,返回的 stat 爲 null */ try { Stat stat = zooKeeper.exists("/zoo_not_exist", new MyWatch()); System.out.println("查看節點是否存在 stat:" + stat); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } /** * 刪除 ZooKeeper 節點 * 參數解釋: * param 1 - znode 名稱 (/zoo) * param 2 - 該節點的版本 * * 版本號若是不清楚的話能夠填入 -1,和上述同理 * 值得注意的是,若是一個節點下屬存在子節點,那麼它不能被刪除 */ try { zooKeeper.delete("/zoo", -1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } private static class MyWatch implements Watcher{ public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent); } } }
Curator 是 Netfix 開發的 Zookeeper Client,使用起來更方便,功能更增強大,目前應用更加普遍。使用以前須要在 Maven 中導入依賴:測試
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency>
代碼:ui
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.util.List; public class CuratorTest { public static void main(String[] args) { /** * 建立客戶端 * * RetryPolicy 接口是重試策略 */ /** * 指定客戶端的重連策略 * * RetryOneTime(int ms) * 休眠必定毫秒數以後從新鏈接一次 * * RetryForever(int ms) * 和第一種策略的差異是會不斷嘗試重連 * * RetryNTimes(int times,int ms) * 和第一種策略的差異是,第一個參數指定重連次數,第二個參數指定休眠間隔 * * RetryUntilElapsed(int max_sum_ms,int ms) * 第一個參數指定最大休眠時間,第二個參數指定休眠間隔,若是休眠時間超出了就不會繼續重連 * * ExponentialBackoffRetry(int ms,int,int max_ms) * 第一個參數表明最初的重連休眠時間,第二個參數表明最大重連次數,第三個參數表明最大重連休眠時間 * 該策略下重連的休眠時間會隨着重連次數的增長而增長,從最初休眠時間一直增長到最大休眠時間 * 最大重連次數必須小於等於 29,超過的狀況下會被自動修改爲 29 * * [其它策略不一一列舉] */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(100,3,1000); /** * 採用 buider 模式建立客戶端 */ CuratorFramework client = CuratorFrameworkFactory.builder() //Zookeeper 的地址 .connectString("localhost:2101,localhost:2102,localhost:2103") //session 的過時時間(毫秒) .sessionTimeoutMs(5000) //鏈接的超時時間(毫秒) .connectionTimeoutMs(5000) //拒絕策略 .retryPolicy(retryPolicy) //設置該客戶端可以操做的目錄權限,不設置的話默承認以操做所有 //好比此處設置爲 zoo,即爲該客戶端對象操做的節點前面默認會添加 /zoo .namespace("zoo") //完成建立 .build(); //啓動客戶端 client.start(); /** * 建立節點 */ try { String createReturn = client.create() //節點類型 //PERSISTENT - 持久化目錄節點,客戶端與zookeeper斷開鏈接後,該節點依舊存在 //PERSISTENT_SEQUENTIAL - 持久化,並帶有序列號 //EPHEMERAL - 臨時目錄節點,客戶端與zookeeper斷開鏈接後,該節點被刪除 //EPHEMERAL_SEQUENTIAL - 臨時,並帶有序列號 .withMode(CreateMode.PERSISTENT) //因爲 namespace 設置爲 zoo,因此此處至關於建立 /zoo/zoo_1 節點 .forPath("/zoo_1", "my first data zoo_1".getBytes()); System.out.println("建立節點:" + createReturn); } catch (Exception e) { e.printStackTrace(); } /** * 查詢節點 */ try { Stat stat = client.checkExists() //查詢 /zoo/zoo_1 節點 .forPath("/zoo_1"); //若是不存在,stat 爲 null System.out.println("查詢節點:" + stat); } catch (Exception e) { e.printStackTrace(); } /** * 刪除節點 */ try { client.delete() //若是該節點下有子節點,會拋出異常且刪除失敗 .forPath("/zoo_1"); } catch (Exception e) { e.printStackTrace(); } /** * 查詢節點的值 */ try { Stat stat = new Stat(); byte[] value = client.getData() //獲取節點的 stat .storingStatIn(stat) //查詢 /zoo/zoo_1 節點 .forPath("/zoo_1"); System.out.println("查詢節點的值:" + new String(value)); } catch (Exception e) { e.printStackTrace(); } /** * 更新節點的值 */ try { Stat stat = client.setData() //設置版本值,此選項非必填 .withVersion(10086) .forPath("/zoo_1", "zoo_1 new data".getBytes()); } catch (Exception e) { e.printStackTrace(); } /** * 獲取節點的子節點 */ try { //獲取全部子節點的節點名稱 List<String> nodes = client.getChildren() .forPath("/zoo_1"); } catch (Exception e) { e.printStackTrace(); } } }
Zookeeper 中的分佈式鎖實現原理很簡單,就是多個線程一塊兒去建立同一個節點,誰建立成功鎖就歸誰;使用完以後刪除該節點,其它節點再進行一次爭搶。Curator 中有一個寫好的重入鎖 InterProcessMutex,簡單封裝便可使用:this
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * Zookeeper 分佈式鎖實現 */ public class ZkLock implements Lock{ private InterProcessMutex lock; /** * 讓使用者方便運用的構造方法 */ public ZkLock(String zkAddrs){ this(zkAddrs, "/lock_node", "lock_base", 2000, new ExponentialBackoffRetry(1000, 10)); } /** * 核心構造方法,根據傳入的參數去構造 lock 對象 * @param zkAddrs Zookeeper 的服務地址 * @param lockNode 各個線程要去爭搶建立的 Znode,也就是客戶端有使用權限的 namespace * @param baseNode lockNode 的上級 Znode * @param sessionOutTimeMs 過時時間 * @param policy 重連策略 */ public ZkLock(String zkAddrs,String lockNode,String baseNode,int sessionOutTimeMs,RetryPolicy policy){ //有效性驗證 if(Objects.isNull(zkAddrs) || zkAddrs.trim().equals("") || Objects.isNull(lockNode) || lockNode.trim().equals("") || Objects.isNull(policy)) throw new RuntimeException(); //經過工廠建立鏈接 CuratorFrameworkFactory.Builder cfBuilder = CuratorFrameworkFactory.builder() .connectString(zkAddrs) .sessionTimeoutMs(sessionOutTimeMs) .retryPolicy(policy); if(baseNode != null && !baseNode.trim().equals("")) cfBuilder.namespace(baseNode); CuratorFramework cf = cfBuilder.build(); //開啓鏈接 cf.start(); //InterProcessMutex 是 Crator 裏自帶的一個已經實現好的重入鎖 //只要對其進行簡單封裝便可使用 lock = new InterProcessMutex(cf,lockNode); } /** * 上鎖方法,死循環調用 tryLock() 去上鎖 */ @Override public void lock() { while (!tryLock()) Thread.yield(); } /** * 嘗試獲取鎖,若是沒能獲取到會超時後報錯 */ @Override public boolean tryLock() { try { lock.acquire(); } catch (Exception e) { return Boolean.FALSE; } return Boolean.TRUE; } /** * 嘗試獲取鎖,若是指定時間內獲取不到就返回 false */ @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { try { return lock.acquire(time,unit); } catch (Exception e) { return Boolean.FALSE; } } /** * 釋放鎖,若是報錯就會遞歸去釋放 */ @Override public void unlock() { try { lock.release(); } catch (Exception e) { unlock(); } } //忽略 @Override public Condition newCondition() { throw new RuntimeException(); } //忽略 @Override public void lockInterruptibly() throws InterruptedException { lock(); } //測試 public static void main(String[] args) throws Exception { //建立一個要被操做的對象 AtomicInteger count = new AtomicInteger(30); //建立一個線程池 Executor executor = Executors.newFixedThreadPool(10); //建立所對象 Lock lock = new ZkLock("localhost:2101,localhost:2102,localhost:2103"); //for 循環,把任務丟進線程池裏 for(int i = 0; i < 30; i++){ executor.execute(()->{ try { //加鎖 lock.lock(); //此處開啓業務邏輯 //demo 中簡單模擬,將 count 對象減一 int a = count.decrementAndGet(); System.out.println(a); } catch (Exception e) { e.printStackTrace(); } finally { try { //釋放鎖 lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } }); } } }