ZooKeeper是中典型的pub/sub模式的分佈式數據管理與協調框架,開發人員可使用它進行分佈式數據的發佈與訂閱。另外,其豐富的數據節點類型能夠交叉使用,配合Watcher事件通知機制,能夠應用於分佈式都會涉及的一些核心功能:數據發佈/訂閱、Master選舉、命名服務、分佈式協調/通知、集羣管理、分佈式鎖、分佈式隊列等。本博文主要介紹:發佈/訂閱、分佈式鎖、Master選舉三種最經常使用的場景java
本文中的代碼示例均是由Curator客戶端編寫的,已經對ZooKeeper原生API作好不少封裝。參考資料《從Paxos到Zookeeper 分佈式一致性原理與實踐》(有須要電子PDF的朋友,能夠評論私信我)node
(1)數據發佈/訂閱系統即所謂的配置中心,也就是發佈者將數據發佈到ZooKeeper的一個節點或者一系列節點上,提供訂閱者進行數據訂閱,從而實現動態更新數據的目的,實現配置信息的集中式管理和數據的動態更新。ZooKeeper採用的是推拉相結合的方式:客戶端向服務器註冊本身須要關注的節點,一旦該節點的數據發生改變,那麼服務端就會向相應的客戶端發送Wacher事件通知,客戶端接收到消息通知後,須要主動到服務端獲取最新的數據。數據庫
(2)實際系統開發過程當中:咱們能夠將初始化配置信息放到節點上集中管理,應用在啓動時都會主動到ZooKeeper服務端進行一次配置讀取,同時在指定節點註冊Watcher監聽,主要配置信息一旦變動,訂閱者就能夠獲取讀取最新的配置信息。一般系統中須要使用一些通用的配置信息,好比機器列表信息、運行時的開關配置、數據庫配置信息等全局配置信息,這些都會有如下3點特性:apache
1) 數據量一般比較小(一般是一些配置文件)服務器
2) 數據內容在運行時會常常發生動態變化(好比數據庫的臨時切換等)session
3) 集羣中各機器共享,配置一致(好比數據庫配置共享)。併發
(3)利用的ZooKeeper特性是:ZooKeeper對任何節點(包括子節點)的變動,只要註冊Wacther事件(使用Curator等客戶端工具已經被封裝好)均可以被其它客戶端監聽框架
package com.lijian.zookeeper.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import java.util.concurrent.CountDownLatch; public class ZooKeeper_Subsciption { private static final String ADDRESS = "xxx.xxx.xxx.xxx:2181"; private static final int SESSION_TIMEOUT = 5000; private static final String PATH = "/configs"; private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); private static String config = "jdbc_configuration"; private static CountDownLatch countDownLatch = new CountDownLatch(4); public static void main(String[] args) throws Exception { // 訂閱該配置信息的集羣節點(客戶端):sub1-sub3 for (int i = 0; i < 3; i++) { CuratorFramework consumerClient = getClient(); subscribe(consumerClient, "sub" + String.valueOf(i)); } // 更改配置信息的集羣節點(客戶端):pub CuratorFramework publisherClient = getClient(); publish(publisherClient, "pub"); } private static void init() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); client.start(); // 檢查節點是否存在,不存在則初始化建立 if (client.checkExists().forPath(PATH) == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(PATH, config.getBytes()); } } /** * 建立客戶端而且初始化創建一個存儲配置數據的節點 * * @return * @throws Exception */ private static CuratorFramework getClient() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); client.start(); if (client.checkExists().forPath(PATH) == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(PATH, config.getBytes()); } return client; } /** * 集羣中的某個節點機器更改了配置信息:即發佈了更新了數據 * * @param client * @throws Exception */ private static void publish(CuratorFramework client, String znode) throws Exception { System.out.println("節點[" + znode + "]更改了配置數據..."); client.setData().forPath(PATH, "configuration".getBytes()); countDownLatch.await(); } /** * 集羣中訂閱的節點客戶端(機器)得到最新的配置數據 * * @param client * @param znode * @throws Exception */ private static void subscribe(CuratorFramework client, String znode) throws Exception { // NodeCache監聽ZooKeeper數據節點自己的變化 final NodeCache cache = new NodeCache(client, PATH); // 設置爲true:NodeCache在第一次啓動的時候就馬上從ZooKeeper上讀取節點數據並保存到Cache中 cache.start(true); System.out.println("節點["+ znode +"]已訂閱當前配置數據:" + new String(cache.getCurrentData().getData())); // 節點監聽 countDownLatch.countDown(); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() { System.out.println("配置數據已發生改變, 節點[" + znode + "]讀取當前新配置數據: " + new String(cache.getCurrentData().getData())); } }); } }
(1)在一些讀寫分離的應用場景中,客戶端寫請求每每是由Master處理的,而另外一些場景中,Master則經常負責處理一些複雜的邏輯,並將處理結果同步給集羣中其它系統單元。好比一個廣告投放系統後臺與ZooKeeper交互,廣告ID一般都是通過一系列海量數據處理中計算獲得(很是消耗I/O和CPU資源的過程),那就能夠只讓集羣中一臺機器處理數據獲得計算結果,以後就能夠共享給整個集羣中的其它全部客戶端機器。分佈式
(2)利用ZooKeeper的特性:利用ZooKeeper的強一致性,即可以很好地保證分佈式高併發狀況下節點的建立必定可以保證全局惟一性,ZooKeeper將會保證客戶端沒法重複建立一個已經存在的數據節點,也就是說若是多個客戶端請求建立同一個節點,那麼最終必定只有一個客戶端請求可以建立成功,這個客戶端就是Master,而其它客戶端注在該節點上註冊子節點Wacther,用於監控當前Master是否存活,若是當前Master掛了,那麼其他客戶端立馬從新進行Master選舉。ide
(3)競爭成爲Master角色以後,建立的子節點都是臨時順序節點,好比:_c_862cf0ce-6712-4aef-a91d-fc4c1044d104-lock-0000000001,而且序號是遞增的。須要注意的是這裏有"lock"單詞,這說明ZooKeeper這一特性,也能夠運用於分佈式鎖。
package com.lijian.zookeeper.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class ZooKeeper_Master { private static final String ADDRESS="xxx.xxx.xxx.xxx:2181"; private static final int SESSION_TIMEOUT=5000; private static final String MASTER_PATH = "/master_path"; private static final int CLIENT_COUNT = 5; private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT); for (int i = 0; i < CLIENT_COUNT; i++) { final String index = String.valueOf(i); service.submit(() -> { masterSelect(index); }); } } private static void masterSelect(final String znode){ // client成爲master的次數統計 AtomicInteger leaderCount = new AtomicInteger(1); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); client.start(); // 一旦執行完takeLeadership,就會從新進行選舉 LeaderSelector selector = new LeaderSelector(client, MASTER_PATH, new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("節點["+ znode +"]成爲master"); System.out.println("節點["+ znode +"]已經成爲master次數:"+ leaderCount.getAndIncrement()); // 睡眠5s模擬成爲master後完成任務 Thread.sleep(5000); System.out.println("節點["+ znode +"]釋放master"); } }); // autoRequeue自動從新排隊:使得上一次選舉爲master的節點還有可能再次成爲master selector.autoRequeue(); selector.start(); } }
(1)對於排他鎖:ZooKeeper經過數據節點表示一個鎖,例如/exclusive_lock/lock節點就能夠定義一個鎖,全部客戶端都會調用create()接口,試圖在/exclusive_lock下建立lock子節點,可是ZooKeeper的強一致性會保證全部客戶端最終只有一個客戶建立成功。也就能夠認爲得到了鎖,其它線程Watcher監聽子節點變化(等待釋放鎖,競爭獲取資源)。
對於共享鎖:ZooKeeper一樣能夠經過數據節點表示一個鎖,相似於/shared_lock/[Hostname]-請求類型(讀/寫)-序號的臨時節點,好比/shared_lock/192.168.0.1-R-0000000000
Curator提供的有四種鎖,分別以下:
(1)InterProcessMutex:分佈式可重入排它鎖
(2)InterProcessSemaphoreMutex:分佈式排它鎖
(3)InterProcessReadWriteLock:分佈式讀寫鎖
(4)InterProcessMultiLock:將多個鎖做爲單個實體管理的容器
主要是以InterProcessMutex爲例,編寫示例:
package com.lijian.zookeeper.demo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ZooKeeper_Lock { private static final String ADDRESS = "xxx.xxx.xxx.xxx:2181"; private static final int SESSION_TIMEOUT = 5000; private static final String LOCK_PATH = "/lock_path"; private static final int CLIENT_COUNT = 10; private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); private static int resource = 0; public static void main(String[] args){ ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT); for (int i = 0; i < CLIENT_COUNT; i++) { final String index = String.valueOf(i); service.submit(() -> { distributedLock(index); }); } } private static void distributedLock(final String znode) { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); client.start(); final InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH); try { // lock.acquire(); System.out.println("客戶端節點[" + znode + "]獲取lock"); System.out.println("客戶端節點[" + znode + "]讀取的資源爲:" + String.valueOf(resource)); resource ++; // lock.release(); System.out.println("客戶端節點[" + znode + "]釋放lock"); } catch (Exception e) { e.printStackTrace(); } } }
運行結果:加鎖後能夠從左圖看到讀取的都是最新的資源值。若是去掉鎖的話讀取的資源值不能保證是最新值看右圖