ZooKeeper學習第五期--ZooKeeper管理分佈式環境中的數據(轉)

轉載來源:https://www.cnblogs.com/sunddenly/p/4092654.html

引言

本節原本是要介紹ZooKeeper的實現原理,可是ZooKeeper的原理比較複雜,它涉及到了paxos算法、Zab協議、通訊協議等相關知識,理解起來比較抽象因此還須要藉助一些應用場景,來幫咱們理解。因爲內容比較多,一口氣吃不成胖子,得慢慢來一步一個腳印,所以我對後期ZooKeeper的學習規劃以下:html

第一階段: java

|---理解ZooKeeper的應用 node

    |---ZooKeeper是什麼web

    |---ZooKeeper能幹什麼算法

    |---ZooKeeper 怎麼使用數據庫

第二階段: apache

|---理解ZooKeeper原理準備 服務器

    |---瞭解paxos網絡

    |---理解 zab原理session

    |---理解選舉/同步流程

第三階段:

    |---深刻ZooKeeper原理

        |---分析源碼

        |---嘗試開發分佈式應用

因爲內容較多,並且理解較爲複雜,因此每一個階段分開來學習和介紹,那麼本文主要介紹的的是第一階段,該階段通常應該放在前面介紹,但感受像一些ZooKeeper應用案例,若是沒有必定的ZooKeeper基礎,理解起來也比較抽象, 因此放在這介紹。你們能夠對比一下前面的應用程序,來對比理解一下前面的那些應用到底用到ZooKeeper的那些功能,來進一步理解ZooKeeper的實現理念,因爲網上關於這方面的介紹比較多,若是一些可愛的博友對該內容已經比較瞭解,那麼您能夠不用往下看了,繼續下一步學習。

1、ZooKeeper產生背景

1.1 分佈式的發展

分佈式這個概念我想你們並不陌生,但真正實戰開始還要從google提及,很早之前在實驗室中分佈式被人提出,但是說是計算機內入行較爲複雜學習較爲困難的技術,而且市場也並不成熟,所以大規模的商業應用一直未成出現,但從Google 發佈了MapReduceDFS 以及Bigtable的論文以後,分佈式在計算機界的格局就發生了變化,從架構上實現了分佈式的難題,而且成熟的應用在了海量數據存儲計算上,其集羣的規模也是當前世界上最爲龐大的。

以DFS 爲基礎的分佈式計算框架keyvalue 數據高效的解決運算的瓶頸,並且開發人員不用再寫複雜的分佈式程序,只要底層框架完備開發人員只要用較少的代碼就能夠完成分佈式程序的開發,這使得開發人員只須要關注業務邏輯的便可。Google 在業界技術上的領軍地位,讓業界可望不可即的技術實力,IT 所以也是對Google 所退出的技術十分推崇。在最近幾年中分佈式則是成爲了海量數據存儲以及計算、高併發、高可靠性、高可用性的解決方案。

1.2 ZooKeeper的產生

衆所周知一般分佈式架構都是中心化的設計,就是一個主控機鏈接多個處理節點。問題能夠從這裏考慮,當主控機失效時,整個系統則就沒法訪問了,因此保證系統的高可用性是很是關鍵之處,也就是要保證主控機的高可用性。分佈式鎖就是一個解決該問題的較好方案,多主控機搶一把鎖。在這裏咱們就涉及到了咱們的重點Zookeeper。

ZooKeeper是什麼,chubby 我想你們都不會陌生的,chubby 是實現Google 的一個分佈式鎖的實現,運用到了paxos 算法解決的一個分佈式事務管理的系統。Zookeeper 就是雅虎模仿強大的Google chubby 實現的一套分佈式鎖管理系統。同時,Zookeeper 分佈式服務框架是Apache Hadoop的一個子項目,它是一個針對大型分佈式系統的可靠協調系統,它主要是用來解決分佈式應用中常常遇到的一些數據管理問題,能夠高可靠的維護元數據。提供的功能包括:配置維護、名字服務、分佈式同步、組服務等。ZooKeeper的設計目標就是封裝好複雜易出錯的關鍵服務,將簡單易用的接口和性能高效、功能穩定的系統提供給用戶。

1.3 ZooKeeper的使用

Zookeeper 做爲一個分佈式的服務框架,主要用來解決分佈式集羣中應用系統的一致性問題,它能提供基於相似於文件系統的目錄節點樹方式的數據存儲可是 Zookeeper 並非用來專門存儲數據的,它的做用主要是用來維護監控你存儲的數據的狀態變化。經過監控這些數據狀態的變化,從而能夠達到基於數據的集羣管理,後面將 會詳細介紹 Zookeeper 可以解決的一些典型問題。

注意一下這裏的"數據"是有限制的:

(1) 從數據大小來看:咱們知道ZooKeeper的數據存儲在一個叫ReplicatedDataBase 的數據庫中,該數據是一個內存數據庫,既然是在內存當中,我就應該知道該數據量就應該不會太大,這一點上就與hadoop的HDFS有了很大的區別,HDFS的數據主要存儲在磁盤上,所以數據存儲主要是HDFS的事,而ZooKeeper主要是協調功能,並非用來存儲數據的。

(2) 從數據類型來看:正如前面所說的,ZooKeeper的數據在內存中,因爲內存空間的限制,那麼咱們就不能在上面爲所欲爲的存儲數據,因此ZooKeeper存儲的數據都是咱們所關心的數據並且數據量還不能太大,並且還會根據咱們要以實現的功能來選擇相應的數據。簡單來講,幹什麼事存什麼數據,ZooKeeper所實現的一切功能,都是由ZK節點的性質該節點所關聯的數據實現的,至於關聯什麼數據那就要看你幹什麼事了。

例如:

  ① 集羣管理:利用臨時節點特性,節點關聯的是機器的主機名、IP地址等相關信息,集羣單點故障也屬於該範疇。

  ② 統一命名:主要利用節點的惟一性和目錄節點樹結構。

  ③ 配置管理:節點關聯的是配置信息。

  ④ 分佈式鎖:節點關聯的是要競爭的資源。

2、ZooKeeper應用場景

ZooKeeper是一個高可用的分佈式數據管理與系統協調框架。基於對Paxos算法的實現,使該框架保證了分佈式環境中數據的強一致性,也正是基於這樣的特性,使得zookeeper可以應用於不少場景。須要注意的是,ZK並非生來就爲這些場景設計,都是後來衆多開發者根據框架的特性,摸索出來的典型使用方法。所以,咱們也能夠根據本身的須要來設計相應的場景實現。正如前文所提到的,ZooKeeper 實現的任何功能都離不開ZooKeeper的數據結構,任何功能的實現都是利用"Znode結構特性+節點關聯的數據"來實現的,好吧那麼咱們就看一下ZooKeeper數據結構有哪些特性。ZooKeeper數據結構以下圖所示:

圖2.1 ZooKeeper數據結構

Zookeeper 這種數據結構有以下這些特色:

每一個子目錄項如 NameService 都被稱做爲 znode,這個 znode 是被它所在的路徑惟一標識,如 Server1 這個 znode 的標識爲 /NameService/Server1

znode 能夠有子節點目錄,而且每一個 znode 能夠存儲數據,注意 EPHEMERAL 類型的目錄節點不能有子節點目錄;

znode 是有版本的,每一個 znode 中存儲的數據能夠有多個版本,也就是一個訪問路徑中能夠存儲多份數據

znode 能夠是臨時節點,一旦建立這個 znode 的客戶端與服務器失去聯繫,這個 znode 也將自動刪除,Zookeeper 的客戶端和服務器通訊採用長鏈接方式,每一個客戶端和服務器經過心跳來保持鏈接,這個鏈接狀態稱爲 session,若是 znode 是臨時節點,這個 session 失效,znode 也就刪除了;

znode 的目錄名能夠自動編號,如 App1 已經存在,再建立的話,將會自動命名爲 App2;

znode 能夠被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化能夠通知設置監控的客戶端,這個是 Zookeeper 的核心特性,Zookeeper 的不少功能都是基於這個特性實現的。

2.1數據發佈與訂閱

(1) 典型場景描述

發佈與訂閱即所謂的配置管理,顧名思義就是將數據發佈到ZK節點上,供訂閱者動態獲取數據,實現配置信息的集中式管理和動態更新。例如全局的配置信息地址列表等就很是適合使用。集中式的配置管理在應用集羣中是很是常見的,通常商業公司內部都會實現一套集中的配置管理中心,應對不一樣的應用集羣對於共享各自配置的需求,而且在配置變動時可以通知到集羣中的每個機器。

(2) 應用

索引信息和集羣中機器節點狀態存放在ZK的一些指定節點,供各個客戶端訂閱使用。

系統日誌(通過處理後的)存儲,這些日誌一般2-3天后被清除。

應用中用到的一些配置信息集中管理,在應用啓動的時候主動來獲取一次,而且在節點上註冊一個Watcher,之後每次配置有更新,實時通知到應用,獲取最新配置信息。

業務邏輯中須要用到的一些全局變量,好比一些消息中間件的消息隊列一般有個offset,這個offset存放在zk上,這樣集羣中每一個發送者都能知道當前的發送進度

系統中有些信息須要動態獲取,而且還會存在人工手動去修改這個信息。之前一般是暴露出接口,例如JMX接口,有了ZK後,只要將這些信息存放到ZK節點上便可。

(3) 應用舉例

例如:同一個應用系統須要多臺 PC Server 運行,可是它們運行的應用系統的某些配置項是相同的,若是要修改這些相同的配置項,那麼就必須同時修改每臺運行這個應用系統的 PC Server,這樣很是麻煩並且容易出錯。將配置信息保存在 Zookeeper 的某個目錄節點中,而後將全部須要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每臺應用機器就會收到 Zookeeper 的通知,而後從 Zookeeper 獲取新的配置信息應用到系統中。ZooKeeper配置管理服務以下圖所示:

圖2.2 配置管理結構圖

Zookeeper很容易實現這種集中式的配置管理,好比將所須要的配置信息放到/Configuration 節點上,集羣中全部機器一啓動就會經過Client/Configuration這個節點進行監控【zk.exist("/Configuration″,true)】,而且實現Watcher回調方法process(),那麼在zookeeper上/Configuration節點下數據發生變化的時候,每一個機器都會收到通知,Watcher回調方法將會被執行,那麼應用再取下數據便可【zk.getData("/Configuration″,false,null)】。

2.2統一命名服務(Name Service)

(1) 場景描述

分佈式應用中,一般須要有一套完整的命名規則,既可以產生惟一的名稱又便於人識別和記住,一般狀況下用樹形的名稱結構是一個理想的選擇,樹形的名稱結構是一個有層次的目錄結構,既對人友好又不會重複。說到這裏你可能想到了 JNDI,沒錯 Zookeeper 的 Name Service 與 JNDI 可以完成的功能是差很少的,它們都是將有層次的目錄結構關聯到必定資源上,可是Zookeeper的Name Service 更加是普遍意義上的關聯,也許你並不須要將名稱關聯到特定資源上,你可能只須要一個不會重複名稱,就像數據庫中產生一個惟一的數字主鍵同樣。

(2) 應用

在分佈式系統中,經過使用命名服務,客戶端應用可以根據指定的名字來獲取資源服務的地址提供者等信息。被命名的實體一般能夠是集羣中的機器,提供的服務地址進程對象等等,這些咱們均可以統稱他們爲名字(Name)。其中較爲常見的就是一些分佈式服務框架中的服務地址列表。經過調用ZK提供的建立節點的API,可以很容易建立一個全局惟一的path,這個path就能夠做爲一個名稱。Name Service 已是Zookeeper 內置的功能,你只要調用 Zookeeper 的 API 就能實現。如調用 create 接口就能夠很容易建立一個目錄節點。

(3) 應用舉例

阿里開源的分佈式服務框架Dubbo中使用ZooKeeper來做爲其命名服務,維護全局的服務地址列表。在Dubbo實現中: 服務提供者在啓動的時候,向ZK上的指定節點/dubbo/${serviceName}/providers目錄下寫入本身的URL地址,這個操做就完成了服務的發佈。 服務消費者啓動的時候,訂閱/dubbo/${serviceName}/providers目錄下的提供者URL地址, 並向/dubbo/${serviceName} /consumers目錄下寫入本身的URL地址。 注意,全部向ZK上註冊的地址都是臨時節點,這樣就可以保證服務提供者和消費者可以自動感應資源的變化。 另外,Dubbo還有針對服務粒度的監控,方法是訂閱/dubbo/${serviceName}目錄下全部提供者和消費者的信息。

2.3分佈通知/協調(Distribution of notification/coordination)

(1) 典型場景描述

ZooKeeper中特有watcher註冊與異步通知機制,可以很好的實現分佈式環境下不一樣系統之間的通知與協調,實現對數據變動的實時處理。使用方法一般是不一樣系統都對ZK上同一個znode進行註冊,監聽znode的變化(包括znode自己內容及子節點的),其中一個系統update了znode,那麼另外一個系統可以收到通知,並做出相應處理。

(2) 應用

另外一種心跳檢測機制檢測系統被檢測系統之間並不直接關聯起來,而是經過ZK上某個節點關聯,大大減小系統耦合。

另外一種系統調度模式:某系統由控制檯推送系統兩部分組成,控制檯的職責是控制推送系統進行相應的推送工做。管理人員在控制檯做的一些操做,其實是修改了ZK上某些節點的狀態,而ZK就把這些變化通知給他們註冊Watcher的客戶端,即推送系統,因而,做出相應的推送任務。

另外一種工做彙報模式:一些相似於任務分發系統子任務啓動後,到ZK來註冊一個臨時節點,而且定時將本身的進度進行彙報(將進度寫回這個臨時節點),這樣任務管理者就可以實時知道任務進度。

總之,使用zookeeper來進行分佈式通知和協調可以大大下降系統之間的耦合。

2.4分佈式鎖(Distribute Lock)

(1) 場景描述

分佈式鎖,這個主要得益於ZooKeeper爲咱們保證了數據的強一致性,即用戶只要徹底相信每時每刻,zk集羣中任意節點(一個zk server)上的相同znode的數據是必定是相同的。鎖服務能夠分爲兩類,一個是保持獨佔,另外一個是控制時序。

保持獨佔,就是全部試圖來獲取這個鎖的客戶端,最終只有一個能夠成功得到這把鎖。一般的作法是把ZK上的一個znode看做是一把鎖,經過create znode的方式來實現。全部客戶端都去建立 /distribute_lock 節點,最終成功建立的那個客戶端也即擁有了這把鎖。

控制時序,就是全部試圖來獲取這個鎖的客戶端,最終都是會被安排執行,只是有個全局時序了。作法和上面基本相似,只是這裏 /distribute_lock 已經預先存在,客戶端在它下面建立臨時有序節點。Zk的父節點(/distribute_lock)維持一份sequence,保證子節點建立的時序性,從而也造成了每一個客戶端的全局時序。

(2) 應用

共享鎖在同一個進程中很容易實現,可是在跨進程或者在不一樣 Server 之間就很差實現了。Zookeeper 卻很容易實現這個功能,實現方式也是須要得到鎖的 Server 建立一個 EPHEMERAL_SEQUENTIAL 目錄節點,而後調用 getChildren方法獲取當前的目錄節點列表中最小的目錄節點是否是就是本身建立的目錄節點,若是正是本身建立的,那麼它就得到了這個鎖,若是不是那麼它就調用 exists(String path, boolean watch) 方法並監控 Zookeeper 上目錄節點列表的變化,一直到本身建立的節點是列表中最小編號的目錄節點,從而得到鎖,釋放鎖很簡單,只要刪除前面它本身所建立的目錄節點就好了。

圖 2.3 ZooKeeper實現Locks的流程圖

代碼清單1 TestMainClient 代碼

  1. package org.zk.leader.election;
  2.  
  3. import org.apache.log4j.xml.DOMConfigurator;
  4. import org.apache.zookeeper.WatchedEvent;
  5. import org.apache.zookeeper.Watcher;
  6. import org.apache.zookeeper.ZooKeeper;
  7.  
  8. import java.io.IOException;
  9.  
  10. /**
  11.  * TestMainClient
  12.  * <p/>
  13.  * Author By: sunddenly工做室
  14.  * Created Date: 2014-11-13
  15.  */
  16. public class TestMainClient implements Watcher {
  17.     protected static ZooKeeper zk = null;
  18.     protected static Integer mutex;
  19.     int sessionTimeout = 10000;
  20.     protected String root;
  21.     public TestMainClient(String connectString) {
  22.         if(zk == null){
  23.             try {
  24.  
  25.                 String configFile = this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml";
  26.                 DOMConfigurator.configure(configFile);
  27.                 System.out.println(" 建立一個新的鏈接: ");
  28.                 zk = new ZooKeeper(connectString, sessionTimeout, this);
  29.                 mutex = new Integer(-1);
  30.             } catch (IOException e) {
  31.                 zk = null;
  32.             }
  33.         }
  34.     }
  35.    synchronized public void process(WatchedEvent event) {
  36.         synchronized (mutex) {
  37.             mutex.notify();
  38.         }
  39.     }
  40. }

清單 2 Locks 代碼

  1. package org.zk.locks;
  2.  
  3. import org.apache.log4j.Logger;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.ZooDefs;
  8. import org.apache.zookeeper.data.Stat;
  9. import org.zk.leader.election.TestMainClient;
  10.  
  11. import java.util.Arrays;
  12. import java.util.List;
  13.  
  14. /**
  15.  * locks
  16.  * <p/>
  17.  * Author By: sunddenly工做室
  18.  * Created Date: 2014-11-13 16:49:40
  19.  */
  20. public class Locks extends TestMainClient {
  21.     public static final Logger logger = Logger.getLogger(Locks.class);
  22.     String myZnode;
  23.  
  24.     public Locks(String connectString, String root) {
  25.         super(connectString);
  26.         this.root = root;
  27.         if (zk != null) {
  28.             try {
  29.                 Stat s = zk.exists(root, false);
  30.                 if (s == null) {
  31.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  32.                 }
  33.             } catch (KeeperException e) {
  34.                 logger.error(e);
  35.             } catch (InterruptedException e) {
  36.                 logger.error(e);
  37.             }
  38.         }
  39.     }
  40.     void getLock() throws KeeperException, InterruptedException{
  41.         List<String> list = zk.getChildren(root, false);
  42.         String[] nodes = list.toArray(new String[list.size()]);
  43.         Arrays.sort(nodes);
  44.         if(myZnode.equals(root+"/"+nodes[0])){
  45.             doAction();
  46.         }
  47.         else{
  48.             waitForLock(nodes[0]);
  49.         }
  50.     }
  51.     void check() throws InterruptedException, KeeperException {
  52.         myZnode = zk.create(root + "/lock_" , new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  53.         getLock();
  54.     }
  55.     void waitForLock(String lower) throws InterruptedException, KeeperException {
  56.         Stat stat = zk.exists(root + "/" + lower,true);
  57.         if(stat != null){
  58.             mutex.wait();
  59.         }
  60.         else{
  61.             getLock();
  62.         }
  63.     }
  64.     @Override
  65.     public void process(WatchedEvent event) {
  66.         if(event.getType() == Event.EventType.NodeDeleted){
  67.             System.out.println(" 獲得通知 ");
  68.             super.process(event);
  69.             doAction();
  70.         }
  71.     }
  72.     /**
  73.      * 執行其餘任務
  74.      */
  75.     private void doAction(){
  76.         System.out.println(" 同步隊列已經獲得同步,能夠開始執行後面的任務了 ");
  77.     }
  78.  
  79.     public static void main(String[] args) {
  80.         String connectString = "localhost:2181";
  81.  
  82.         Locks lk = new Locks(connectString, "/locks");
  83.         try {
  84.             lk.check();
  85.         } catch (InterruptedException e) {
  86.             logger.error(e);
  87.         } catch (KeeperException e) {
  88.             logger.error(e);
  89.         }
  90.     }
  91. }

2.5 集羣管理(Cluster Management)

(1) 典型場景描述

集羣機器監控

這一般用於那種對集羣中機器狀態,機器在線率有較高要求的場景,可以快速對集羣中機器變化做出響應。這樣的場景中,每每有一個監控系統,實時檢測集羣機器是否存活。過去的作法一般是:監控系統經過某種手段(好比ping)定時檢測每一個機器,或者每一個機器本身定時向監控系統彙報"我還活着"。 這種作法可行,可是存在兩個比較明顯的問題:

集羣中機器有變更的時候,牽連修改的東西比較多。

有必定的延時。

利用ZooKeeper中兩個特性,就能夠實施另外一種集羣機器存活性監控系統:

客戶端在節點 x 上註冊一個Watcher,那麼若是 x 的子節點變化了,會通知該客戶端。

建立EPHEMERAL類型的節點,一旦客戶端和服務器的會話結束或過時,那麼該節點就會消失。

Master選舉:

Master選舉則是zookeeper中最爲經典的使用場景了,在分佈式環境中,相同的業務應用分佈在不一樣的機器上,有些業務邏輯,例如一些耗時的計算,網絡I/O處,每每只須要讓整個集羣中的某一臺機器進行執行,其他機器能夠共享這個結果,這樣能夠大大減小重複勞動,提升性能,因而這個master選舉即是這種場景下的碰到的主要問題。

利用ZooKeeper中兩個特性,就能夠實施另外一種集羣中Master選舉:

利用ZooKeeper的強一致性,可以保證在分佈式高併發狀況下節點建立的全局惟一性,即:同時有多個客戶端請求建立 /Master 節點,最終必定只有一個客戶端請求可以建立成功。利用這個特性,就能很輕易的在分佈式環境中進行集羣選舉了。

另外,這種場景演化一下,就是動態Master選舉。這就要用到 EPHEMERAL_SEQUENTIAL類型節點的特性了,這樣每一個節點會自動被編號。容許全部請求都可以建立成功,可是得有個建立順序,每次選取序列號最小的那個機器做爲Master 。

(2) 應用

在搜索系統中,若是集羣中每一個機器都生成一份全量索引,不只耗時,並且不能保證彼此間索引數據一致。所以讓集羣中的Master來迚行全量索引的生成,而後同步到集羣中其它機器。另外,Master選丼的容災措施是,能夠隨時迚行手動挃定master,就是說應用在zk在沒法獲取master信息時,能夠經過好比http方式,向一個地方獲取master。  在Hbase中,也是使用ZooKeeper來實現動態HMaster的選舉。在Hbase實現中,會在ZK上存儲一些ROOT表的地址和HMaster的地址,HRegionServer也會把本身以臨時節點(Ephemeral)的方式註冊到Zookeeper中,使得HMaster能夠隨時感知到各個HRegionServer的存活狀態,同時,一旦HMaster出現問題,會從新選丼出一個HMaster來運行,從而避免了HMaster的單點問題的存活狀態,同時,一旦HMaster出現問題,會從新選丼出一個HMaster來運行,從而避免了HMaster的單點問題。

(3) 應用舉例

集羣監控:

應用集羣中,咱們經常須要讓每個機器知道集羣中或依賴的其餘某一個集羣中哪些機器是活着的,而且在集羣機器由於宕機,網絡斷鏈等緣由可以不在人工介入的狀況下迅速通知到每個機器,Zookeeper 可以很容易的實現集羣管理的功能,若有多臺 Server 組成一個服務集羣,那麼必需要一個"總管"知道當前集羣中每臺機器的服務狀態,一旦有機器不能提供服務,集羣中其它集羣必須知道,從而作出調整從新分配服務策略。一樣當增長集羣的服務能力時,就會增長一臺或多臺 Server,一樣也必須讓"總管"知道,這就是ZooKeeper的集羣監控功能。

圖2.4 集羣管理結構圖

好比我在zookeeper服務器端有一個znode叫/Configuration,那麼集羣中每個機器啓動的時候都去這個節點下建立一個EPHEMERAL類型的節點,好比server1建立/Configuration /Server1,server2建立/Configuration /Server1,而後Server1和Server2都watch /Configuration 這個父節點,那麼也就是這個父節點下數據或者子節點變化都會通知對該節點進行watch的客戶端。由於EPHEMERAL類型節點有一個很重要的特性,就是客戶端和服務器端鏈接斷掉或者session過時就會使節點消失,那麼在某一個機器掛掉或者斷鏈的時候,其對應的節點就會消 失,而後集羣中全部對/Configuration進行watch的客戶端都會收到通知,而後取得最新列表便可。

Master選舉:

Zookeeper 不只可以維護當前的集羣中機器的服務狀態,並且可以選出一個"總管",讓這個總管來管理集羣,這就是 Zookeeper 的另外一個功能 Leader Election。Zookeeper 如何實現 Leader Election,也就是選出一個 Master Server。和前面的同樣每臺 Server 建立一個 EPHEMERAL 目錄節點,不一樣的是它仍是一個 SEQUENTIAL 目錄節點,因此它是個 EPHEMERAL_SEQUENTIAL 目錄節點。之因此它是 EPHEMERAL_SEQUENTIAL 目錄節點,是由於咱們能夠給每臺 Server 編號,咱們能夠選擇當前是最小編號的 Server 爲 Master,假如這個最小編號的 Server 死去,因爲是 EPHEMERAL 節點,死去的 Server 對應的節點也被刪除,因此當前的節點列表中又出現一個最小編號的節點,咱們就選擇這個節點爲當前 Master。這樣就實現了動態選擇 Master,避免了傳統意義上單 Master 容易出現單點故障的問題

清單 3 Leader Election代碼

  1. package org.zk.leader.election;
  2.  
  3. import org.apache.log4j.Logger;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.ZooDefs;
  8. import org.apache.zookeeper.data.Stat;
  9.  
  10. import java.net.InetAddress;
  11. import java.net.UnknownHostException;
  12.  
  13. /**
  14.  * LeaderElection
  15.  * <p/>
  16.  * Author By: sunddenly工做室
  17.  * Created Date: 2014-11-13
  18.  */
  19. public class LeaderElection extends TestMainClient {
  20.     public static final Logger logger = Logger.getLogger(LeaderElection.class);
  21.  
  22.     public LeaderElection(String connectString, String root) {
  23.         super(connectString);
  24.         this.root = root;
  25.         if (zk != null) {
  26.             try {
  27.                 Stat s = zk.exists(root, false);
  28.                 if (s == null) {
  29.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  30.                 }
  31.             } catch (KeeperException e) {
  32.                 logger.error(e);
  33.             } catch (InterruptedException e) {
  34.                 logger.error(e);
  35.             }
  36.         }
  37.     }
  38.  
  39.     void findLeader() throws InterruptedException, UnknownHostException, KeeperException {
  40.         byte[] leader = null;
  41.         try {
  42.             leader = zk.getData(root + "/leader", true, null);
  43.         } catch (KeeperException e) {
  44.             if (e instanceof KeeperException.NoNodeException) {
  45.                 logger.error(e);
  46.             } else {
  47.                 throw e;
  48.             }
  49.         }
  50.         if (leader != null) {
  51.             following();
  52.         } else {
  53.             String newLeader = null;
  54.             byte[] localhost = InetAddress.getLocalHost().getAddress();
  55.             try {
  56.                 newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  57.             } catch (KeeperException e) {
  58.                 if (e instanceof KeeperException.NodeExistsException) {
  59.                     logger.error(e);
  60.                 } else {
  61.                     throw e;
  62.                 }
  63.             }
  64.             if (newLeader != null) {
  65.                 leading();
  66.             } else {
  67.                 mutex.wait();
  68.             }
  69.         }
  70.     }
  71.  
  72.     @Override
  73.     public void process(WatchedEvent event) {
  74.         if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) {
  75.             System.out.println(" 獲得通知 ");
  76.             super.process(event);
  77.             following();
  78.         }
  79.     }
  80.  
  81.     void leading() {
  82.         System.out.println(" 成爲領導者 ");
  83.     }
  84.  
  85.     void following() {
  86.         System.out.println(" 成爲組成員 ");
  87.     }
  88.  
  89.     public static void main(String[] args) {
  90.         String connectString = "localhost:2181";
  91.  
  92.         LeaderElection le = new LeaderElection(connectString, "/GroupMembers");
  93.         try {
  94.             le.findLeader();
  95.         } catch (Exception e) {
  96.             logger.error(e);
  97.         }
  98.     }
  99. }

2.6 隊列管理

Zookeeper 能夠處理兩種類型的隊列:

當一個隊列的成員都聚齊時,這個隊列纔可用,不然一直等待全部成員到達,這種是同步隊列

隊列按照 FIFO 方式進行入隊和出隊操做,例如實現生產者消費者模型

(1) 同步隊列用 Zookeeper 實現的實現思路以下:

建立一個父目錄 /synchronizing,每一個成員都監控標誌(Set Watch)位目錄 /synchronizing/start 是否存在,而後每一個成員都加入這個隊列,加入隊列的方式就是建立 /synchronizing/member_i 的臨時目錄節點,而後每一個成員獲取 / synchronizing 目錄的全部目錄節點,也就是 member_i。判斷 i 的值是否已是成員的個數,若是小於成員個數等待 /synchronizing/start 的出現,若是已經相等就建立 /synchronizing/start。

用下面的流程圖更容易理解:

圖 2.5 同步隊列流程圖

 

清單 4 Synchronizing 代碼

  1. package org.zk.queue;
  2.  
  3. import java.net.InetAddress;
  4. import java.net.UnknownHostException;
  5. import java.util.List;
  6.  
  7. import org.apache.log4j.Logger;
  8. import org.apache.zookeeper.CreateMode;
  9. import org.apache.zookeeper.KeeperException;
  10. import org.apache.zookeeper.WatchedEvent;
  11. import org.apache.zookeeper.Watcher;
  12. import org.apache.zookeeper.ZooKeeper;
  13. import org.apache.zookeeper.ZooDefs.Ids;
  14. import org.apache.zookeeper.data.Stat;
  15. import org.zk.leader.election.TestMainClient;
  16.  
  17. /**
  18.  * Synchronizing
  19.  * <p/>
  20.  * Author By: sunddenly工做室
  21.  * Created Date: 2014-11-13
  22.  */
  23. public class Synchronizing extends TestMainClient {
  24.     int size;
  25.     String name;
  26.     public static final Logger logger = Logger.getLogger(Synchronizing.class);
  27.  
  28.     /**
  29.      * 構造函數
  30.      *
  31.      * @param connectString 服務器鏈接
  32.      * @param root 根目錄
  33.      * @param size 隊列大小
  34.      */
  35.     Synchronizing(String connectString, String root, int size) {
  36.         super(connectString);
  37.         this.root = root;
  38.         this.size = size;
  39.  
  40.         if (zk != null) {
  41.             try {
  42.                 Stat s = zk.exists(root, false);
  43.                 if (s == null) {
  44.                     zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  45.                 }
  46.             } catch (KeeperException e) {
  47.                 logger.error(e);
  48.             } catch (InterruptedException e) {
  49.                 logger.error(e);
  50.             }
  51.         }
  52.         try {
  53.             name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
  54.         } catch (UnknownHostException e) {
  55.             logger.error(e);
  56.         }
  57.  
  58.     }
  59.  
  60.     /**
  61.      * 加入隊列
  62.      *
  63.      * @return
  64.      * @throws KeeperException
  65.      * @throws InterruptedException
  66.      */
  67.  
  68.     void addQueue() throws KeeperException, InterruptedException{
  69.         zk.exists(root + "/start",true);
  70.         zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
  71.         synchronized (mutex) {
  72.             List<String> list = zk.getChildren(root, false);
  73.             if (list.size() < size) {
  74.                 mutex.wait();
  75.             } else {
  76.                 zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  77.             }
  78.         }
  79.     }
  80.  
  81.     @Override
  82.     public void process(WatchedEvent event) {
  83.         if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){
  84.             System.out.println(" 獲得通知 ");
  85.             super.process(event);
  86.             doAction();
  87.         }
  88.     }
  89.  
  90.     /**
  91.      * 執行其餘任務
  92.      */
  93.     private void doAction(){
  94.         System.out.println(" 同步隊列已經獲得同步,能夠開始執行後面的任務了 ");
  95.     }
  96.  
  97.     public static void main(String args[]) {
  98.         // 啓動 Server
  99.         String connectString = "localhost:2181";
  100.         int size = 1;
  101.         Synchronizing b = new Synchronizing(connectString, "/synchronizing", size);
  102.         try{
  103.             b.addQueue();
  104.         } catch (KeeperException e){
  105.             logger.error(e);
  106.         } catch (InterruptedException e){
  107.             logger.error(e);
  108.         }
  109.     }
  110. }

(2) FIFO 隊列用 Zookeeper 實現思路以下:

實現的思路也很是簡單,就是在特定的目錄下建立 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證全部成員加入隊列時都是有編號的,出隊列時經過 getChildren( ) 方法能夠返回當前全部的隊列中的元素,而後消費其中最小的一個,這樣就能保證 FIFO。

下面是生產者和消費者這種隊列形式的示例代碼

清單 5 FIFOQueue 代碼

  1. import org.apache.log4j.Logger;
  2. import org.apache.zookeeper.CreateMode;
  3. import org.apache.zookeeper.KeeperException;
  4. import org.apache.zookeeper.WatchedEvent;
  5. import org.apache.zookeeper.ZooDefs;
  6. import org.apache.zookeeper.data.Stat;
  7.  
  8. import java.nio.ByteBuffer;
  9. import java.util.List;
  10.  
  11. /**
  12.  * FIFOQueue
  13.  * <p/>
  14.  * Author By: sunddenly工做室
  15.  * Created Date: 2014-11-13
  16.  */
  17. public class FIFOQueue extends TestMainClient{
  18.     public static final Logger logger = Logger.getLogger(FIFOQueue.class);
  19.  
  20.     /**
  21.      * Constructor
  22.      *
  23.      * @param connectString
  24.      * @param root
  25.      */
  26.     FIFOQueue(String connectString, String root) {
  27.         super(connectString);
  28.         this.root = root;
  29.         if (zk != null) {
  30.             try {
  31.                 Stat s = zk.exists(root, false);
  32.                 if (s == null) {
  33.                     zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
  34.                 }
  35.             } catch (KeeperException e) {
  36.                 logger.error(e);
  37.             } catch (InterruptedException e) {
  38.                 logger.error(e);
  39.             }
  40.         }
  41.     }
  42.     /**
  43.      * 生產者
  44.      *
  45.      * @param i
  46.      * @return
  47.      */
  48.  
  49.     boolean produce(int i) throws KeeperException, InterruptedException{
  50.         ByteBuffer b = ByteBuffer.allocate(4);
  51.         byte[] value;
  52.         b.putInt(i);
  53.         value = b.array();
  54.         zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  55.                     CreateMode.PERSISTENT_SEQUENTIAL);
  56.         return true;
  57.     }
  58.  
  59.  
  60.     /**
  61.      * 消費者
  62.      *
  63.      * @return
  64.      * @throws KeeperException
  65.      * @throws InterruptedException
  66.      */
  67.     int consume() throws KeeperException, InterruptedException{
  68.         int retvalue = -1;
  69.         Stat stat = null;
  70.         while (true) {
  71.             synchronized (mutex) {
  72.                 List<String> list = zk.getChildren(root, true);
  73.                 if (list.size() == 0) {
  74.                     mutex.wait();
  75.                 } else {
  76.                     Integer min = new Integer(list.get(0).substring(7));
  77.                     for(String s : list){
  78.                         Integer tempValue = new Integer(s.substring(7));
  79.                         if(tempValue < min) min = tempValue;
  80.                     }
  81.                     byte[] b = zk.getData(root + "/element" + min,false, stat);
  82.                     zk.delete(root + "/element" + min, 0);
  83.                     ByteBuffer buffer = ByteBuffer.wrap(b);
  84.                     retvalue = buffer.getInt();
  85.                     return retvalue;
  86.                 }
  87.             }
  88.         }
  89.     }
  90.  
  91.     @Override
  92.     public void process(WatchedEvent event) {
  93.         super.process(event);
  94.     }
  95.  
  96.     public static void main(String args[]) {
  97.         // 啓動 Server
  98.         TestMainServer.start();
  99.         String connectString = "localhost:"+TestMainServer.CLIENT_PORT;
  100.  
  101.         FIFOQueue q = new FIFOQueue(connectString, "/app1");
  102.         int i;
  103.         Integer max = new Integer(5);
  104.  
  105.         System.out.println("Producer");
  106.         for (i = 0; i < max; i++)
  107.             try{
  108.                 q.produce(10 + i);
  109.             } catch (KeeperException e){
  110.                 logger.error(e);
  111.             } catch (InterruptedException e){
  112.                 logger.error(e);
  113.             }
  114.  
  115.         for (i = 0; i < max; i++) {
  116.             try{
  117.                 int r = q.consume();
  118.                 System.out.println("Item: " + r);
  119.             } catch (KeeperException e){
  120.                 i--;
  121.                 logger.error(e);
  122.             } catch (InterruptedException e){
  123.                 logger.error(e);
  124.             }
  125.         }
  126.  
  127.     }
  128. }

3、ZooKeeper實際應用

假設咱們的集羣有:

(1) 20個搜索引擎的服務器:每一個負責總索引中的一部分的搜索任務。

搜索引擎的服務器中的15個服務器如今提供搜索服務

5個服務器正在生成索引

這20個搜索引擎的服務器,常常要讓正在提供搜索服務的服務器中止提供服務開始生成索引,或生成索引的服務器已經把索引生成完成能夠搜索提供服務了。

(2) 一個總服務器:負責向這20個搜索引擎的服務器發出搜索請求併合並結果集。

(3) 一個備用的總服務器:負責當總服務器宕機時替換總服務器。

(4) 一個web的cgi:向總服務器發出搜索請求。

使用Zookeeper能夠保證:

(1) 總服務器:自動感知有多少提供搜索引擎的服務器,並向這些服務器發出搜索請求。

(2) 備用的總服務器:宕機時自動啓用備用的總服務器。

(3) web的cgi:可以自動地獲知總服務器的網絡地址變化

(4) 實現以下:

提供搜索引擎的服務器都在Zookeeper中建立znode,zk.create("/search/nodes/node1", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

② 總服務器能夠從Zookeeper中獲取一個znode的子節點的列表,zk.getChildren("/search/nodes", true);

總服務器遍歷這些子節點,並獲取子節點的數據生成提供搜索引擎的服務器列表

當總服務器接收到子節點改變的事件信息,從新返回第二步;

總服務器在Zookeeper中建立節點,zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

⑥ 備用的總服務器監控Zookeeper中的"/search/master"節點。當這個znode的節點數據改變時,把本身啓動變成總服務器,並把本身的網絡地址數據放進這個節點。

web的cgi從Zookeeper中"/search/master"節點獲取總服務器的網絡地址數據,並向其發送搜索請求。

web的cgi監控Zookeeper中的"/search/master"節點,當這個znode的節點數據改變時,從這個節點獲取總服務器的網絡地址數據,並改變當前的總服務器的網絡地址。

相關文章
相關標籤/搜索