Zookeeper簡單入門

zookeeper 簡介

ZooKeeper(動物園管理員),顧名思義,是用來管理Hadoop(大象)、Hive(蜜蜂)、Pig(小豬)的管理員,同時Apache HBase、Apache Solr、LinkedIn Sensei等衆多項目中都採用了ZooKeeper。 ZooKeeper曾是Hadoop的正式子項目,後發展成爲Apache頂級項目,與Hadoop密切相關但卻沒有任何依賴。它是一個針對大型應用提供高可用的數據管理、應用程序協調服務的分佈式服務框架,基於對Paxos算法的實現,使該框架保證了分佈式環境中數據的強一致性,提供的功能包括:配置維護、統一命名服務、狀態同步服務、集羣管理等。 在分佈式應用中,因爲工程師不能很好地使用鎖機制,以及基於消息的協調機制不適合在某些應用中使用,所以須要有一種可靠的、可擴展的、分佈式的、可配置的協調機制來統一系統的狀態。Zookeeper的目的就在於此java

ZooKeeper能夠理解爲相似redis的緩存數據庫,只是相對於redis存儲數據量小,額外增長了存儲節點的機制, 經常使用於分佈式協調服務node

zookeeper客戶端 - Curator
  • Curator簡介
    Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery
  • Curator經常使用apiredis

    • 建立客戶端
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
      CuratorFramework client = CuratorFrameworkFactory.builder()
                              .connectString("127.0.0.1:2181")
                              .sessionTimeoutMs(5000)
                              .connectionTimeoutMs(5000)
                              .retryPolicy(retryPolicy)
                              .build();
      client.start();
      複製代碼
    • 建立節點數據
      //建立節點
         //PERSISTENT:持久化 默認模式
         //PERSISTENT_SEQUENTIAL:持久化而且帶序列號
         //EPHEMERAL:臨時
         //EPHEMERAL_SEQUENTIAL:臨時而且帶序列號
         //建立節點並遞歸建立父節點,並指定建立模式
         client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/data/path", "hello".getBytes());
      複製代碼
    • 刪除節點數據
      client.delete().deletingChildrenIfNeeded().forPath("/data/path");
      複製代碼
    • 更新節點數據
      client.setData().forPath("/data/path", "world".getBytes());
      複製代碼
    • 查詢節點數據
      byte[] data = client.getData().forPath("/data/path");//獲取指定節點數據
      List<String> childs = client.getChildren().forPath("/")//獲取子節點
      複製代碼
  • Curator事件(cache)算法

    ZooKeeper原生支持經過註冊Watcher來進行事件監聽,可是其使用並非特別方便,須要開發人員本身反覆註冊Watcher,比較繁瑣。Curator引入了Cache來實現對ZooKeeper服務端事件的監聽。Cache是Curator中對事件監聽的包裝,其對事件的監聽其實能夠近似看做是一個本地緩存視圖和遠程ZooKeeper視圖的對比過程。同時Curator可以自動爲開發人員處理反覆註冊監聽,從而大大簡化了原生API開發的繁瑣過程數據庫

    • 事件監聽示例代碼
      private static final String PATH_CACHE = "/example/pathCache";
      
      private static final String NODE_CACHE = "/example/nodeCache";
      
      private static final String TREE_CACHE = "/example/treeCache";
      
      public static void main(String[] args) throws Exception {
          CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
          client.start();
      
          //PathChildrenCache
          System.out.println("=========================PathChildrenCache==========================");
          PathChildrenCache pCache = new PathChildrenCache(client, PATH_CACHE, true);
          pCache.start();
          pCache.getListenable().addListener((c, e) -> {
              System.out.println("事件類型:" + e.getType());
              if (null != e.getData()) {
                  System.out.println("節點數據:" + e.getData().getPath() + " = " + new String(e.getData().getData()));
              }
          });
          client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
          Thread.sleep(100);
          client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
          Thread.sleep(100);
          client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
          Thread.sleep(100);
          for (ChildData data : pCache.getCurrentData()) {
              System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
          }
          client.delete().forPath("/example/pathCache/test01");
          Thread.sleep(100);
          client.delete().forPath("/example/pathCache/test02");
          Thread.sleep(2000);
          pCache.close();
      
          //Node Cache
          System.out.println("=========================NodeCache==========================");
          client.create().creatingParentsIfNeeded().forPath(NODE_CACHE);
          NodeCache nCache = new NodeCache(client, NODE_CACHE);
          nCache.getListenable().addListener(new NodeCacheListener() {
              @Override
              public void nodeChanged() throws Exception {
                  ChildData data = nCache.getCurrentData();
                  if (null != data) {
                      System.out.println("節點數據:" + new String(nCache.getCurrentData().getData()));
                  } else {
                      System.out.println("節點被刪除!");
                  }
              }
          });
          nCache.start();
          client.setData().forPath(NODE_CACHE, "01".getBytes());
          Thread.sleep(100);
          client.setData().forPath(NODE_CACHE, "02".getBytes());
          Thread.sleep(100);
          client.delete().deletingChildrenIfNeeded().forPath(NODE_CACHE);
          Thread.sleep(2000);
          nCache.close();
      
          //Tree cache
          System.out.println("=========================TreeCache==========================");
          client.create().creatingParentsIfNeeded().forPath(TREE_CACHE);
          TreeCache cache = new TreeCache(client, TREE_CACHE);
          cache.getListenable().addListener((c, e) ->
                  System.out.println("事件類型:" + e.getType() + " | 路徑:" + (null != e.getData() ? e.getData().getPath() : null)));
          cache.start();
          client.setData().forPath(TREE_CACHE, "01".getBytes());
          Thread.sleep(100);
          client.setData().forPath(TREE_CACHE, "02".getBytes());
          Thread.sleep(100);
          client.delete().deletingChildrenIfNeeded().forPath(TREE_CACHE);
          Thread.sleep(1000 * 2);
          cache.close();
      
          client.close();
      
      }
      複製代碼
    • 控制檯輸出展現
    =========================PathChildrenCache==========================
    事件類型:CONNECTION_RECONNECTED
    事件類型:CHILD_ADDED
    節點數據:/example/pathCache/test01 = 01
    事件類型:CHILD_ADDED
    節點數據:/example/pathCache/test02 = 02
    事件類型:CHILD_UPDATED
    節點數據:/example/pathCache/test01 = 01_V2
    getCurrentData:/example/pathCache/test01 = 01_V2
    getCurrentData:/example/pathCache/test02 = 02
    事件類型:CHILD_REMOVED
    節點數據:/example/pathCache/test01 = 01_V2
    事件類型:CHILD_REMOVED
    節點數據:/example/pathCache/test02 = 02
    =========================NodeCache==========================
    節點數據:01
    節點數據:02
    節點被刪除!
    =========================TreeCache==========================
    事件類型:NODE_ADDED | 路徑:/example/treeCache
    事件類型:INITIALIZED | 路徑:null
    事件類型:NODE_UPDATED | 路徑:/example/treeCache
    事件類型:NODE_UPDATED | 路徑:/example/treeCache
    事件類型:NODE_REMOVED | 路徑:/example/treeCache
    複製代碼
  • zookeeper選舉機制api

    • 流程示意圖緩存

    • 流程解析bash

      • zookeeper提供三種選舉機制:LeaderElection,AuthFastLeaderElection,FastLeaderElection。默認採用的機制是FastLeaderElection,本文主要分析該機制。舉例描述以前先明白幾個概念:
      1. 服務器ID:好比有三臺服務器,編號分別是1,2,3, 值編號越大在選擇算法中的權重越大
      2. 數據ID:服務器中存放的最大數據ID,值越大說明數據越新,在選舉算法中數據越新權重越大
      3. 邏輯時鐘:或者叫投票的次數,同一輪投票過程當中的邏輯時鐘值是相同的。每投完一次票這個數據就會增長,而後與接收到的其它服務器返回的投票信息中的數值相比,根據不一樣的值作出不一樣的判斷
      4. 選舉狀態:LOOKING,競選狀態;FOLLOWING,隨從狀態,同步leader狀態,參與投票;OBSERVING,觀察狀態,同步leader狀態,不參與投票;LEADING,領導者狀態

      選舉完成後會將以上信息發給集羣中的每一個節點,默認是採用投票數大於半數則勝出的邏輯,因此zookeeper集羣的節點數通常都是單數服務器

      • 假設zookeeper集羣有五個實例,FastLeaderElection選舉的流程以下:
      1. 服務器1啓動,給本身投票,而後發投票信息,因爲其它機器尚未啓動因此它收不到反饋信息,服務器1的狀態一直屬於Looking
      2. 服務器2啓動,給本身投票,同時與以前啓動的服務器1交換結果,因爲服務器2的編號大因此服務器2勝出,但此時投票數沒有大於半數,因此兩個服務器的狀態依然是LOOKING
      3. 服務器3啓動,給本身投票,同時與以前啓動的服務器1,2交換信息,因爲服務器3的編號最大因此服務器3勝出,此時投票數正好大於半數,因此服務器3成爲領導者,服務器1,2成爲小弟
      4. 服務器4啓動,給本身投票,同時與以前啓動的服務器1,2,3交換信息,儘管服務器4的編號大,但以前服務器3已經勝出,因此服務器4只能成爲小弟
      5. 服務器5啓動,後面的邏輯同服務器4成爲小弟
  • zookeeper 分佈式鎖應用session

    • zookeeper特性

      1. 有序節點:假如當前有一個父節點爲/lock,咱們能夠在這個父節點下面建立子節點;zookeeper提供了一個可選的有序特性,例如咱們能夠建立子節點「/lock/node-」而且指明有序,那麼zookeeper在生成子節點時會根據當前的子節點數量自動添加整數序號,也就是說若是是第一個建立的子節點,那麼生成的子節點爲/lock/node-0000000000,下一個節點則爲/lock/node-0000000001,依次類推。
      2. 臨時節點:客戶端能夠創建一個臨時節點,在會話結束或者會話超時後,zookeeper會自動刪除該節點。
      3. 事件監聽:在讀取數據時,咱們能夠同時對節點設置事件監聽,當節點數據或結構變化時,zookeeper會通知客戶端。當前zookeeper有以下四種事件:1)節點建立;2)節點刪除;3)節點數據修改;4)子節點變動
    • 分佈式鎖原理

      1. 客戶端鏈接zookeeper,並在/lock下建立臨時的且有序的子節點,第一個客戶端對應的子節點爲/lock/lock-0000000000,第二個爲/lock/lock-0000000001,以此類推。
      2. 客戶端獲取/lock下的子節點列表,判斷本身建立的子節點是否爲當前子節點列表中序號最小的子節點,若是是則認爲得到鎖,不然監聽恰好在本身以前一位的子節點刪除消息,得到子節點變動通知後重復此步驟直至得到鎖;
      3. 執行業務代碼;
      4. 完成業務流程後,刪除對應的子節點釋放鎖
    • 基於Curator分佈式鎖代碼展現

    //建立客戶端
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client =
            CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
    client.start();
    //建立分佈式鎖, 鎖空間的根節點路徑爲/curator/lock
    InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
    mutex.acquire();
    //得到了鎖, 進行業務流程
    //todo
    //完成業務流程, 釋放鎖
    mutex.release();
    
    //關閉客戶端
    client.close();
    
    複製代碼
相關文章
相關標籤/搜索