Zookeeper 原理與實踐

一、Zookeeper 的由來

在Hadoop生態系統中,許多項目的Logo都採用了動物,好比 Hadoop 和 Hive 採用了大象的形象,HBase 採用了海豚的形象,而從字面上來看 ZooKeeper 表示動物園管理員,因此你們能夠理解爲 ZooKeeper就是對這些動物(項目組件)進行一些管理工做的。java

對於單機環境多線程的競態資源協調方法,咱們通常經過線程鎖來協調對共享數據的訪問以保證狀態的一致性。 可是分佈式環境如何進行協調呢?因而,Google創造了Chubby,而ZooKeeper則是對於Chubby的一個開源實現。 
ZooKeeper是一種爲分佈式應用所設計的高可用、高性能且一致的開源協調服務,它提供了一項基本服務:分佈式鎖服務。因爲ZooKeeper的開源特性,後來咱們的開發者在分佈式鎖的基礎上,摸索了出了其餘的使用方法:配置維護、組服務、分佈式消息隊列、分佈式通知/協調等。它被設計爲易於編程,使用文件系統目錄樹做爲數據模型。node

二、ZooKeeper集羣模式典型架構

2.1 角色

Zookeeper服務自身組成一個集羣(2n+1個服務容許n>=1個失效)。Zookeeper集羣是一個基於主從複製的高可用集羣,每一個服務器承擔以下三種角色中的一種python

  • Leader 一個Zookeeper集羣同一時間只會有一個實際工做的Leader,它會發起並維護與各Follwer及Observer間的心跳。全部的寫操做必需要經過Leader完成再由Leader將寫操做廣播給其它服務器。
  • Follower 一個Zookeeper集羣可能同時存在多個Follower,它會響應Leader的心跳。Follower可直接處理並返回客戶端的讀請求,同時會將寫請求轉發給Leader處理,而且負責在Leader處理寫請求時對請求進行投票。
  • Observer 角色與Follower相似,可是無投票權。

Zookeeper Architecture

2.2 數據特性

  • 順序一致性:按照客戶端發送請求的順序更新數據。mysql

  • 原子性:更新要麼成功,要麼失敗,不會出現部分更新。sql

  • 單一性 :不管客戶端鏈接哪一個server,都會看到同一個視圖。apache

  • 可靠性:一旦數據更新成功,將一直保持,直到新的更新。編程

  • 及時性:客戶端會在一個肯定的時間內獲得最新的數據。安全

2.3 數據模型 Znode 

Zookeeper表現爲一個分層的文件系統目錄樹結構(不一樣於文件系統的是,節點能夠有本身的數據,而文件系統中的目錄節點只有子節點)。服務器

  • znode是被它所在的路徑惟一標識
  • znode能夠有子節點目錄,而且每一個znode能夠存儲數據
  • 每一個znode中存儲的數據能夠有多個版本
  • znode能夠被監控
  • 臨時+編號
  • 每次對Zookeeper的狀態的改變都會產生一個zxid,全局有序

三、核心原理

3.1 原子廣播(ZAB協議)

爲了保證寫操做的一致性與可用性,Zookeeper專門設計了一種名爲原子廣播(ZAB)的支持崩潰恢復的一致性協議。基於該協議,Zookeeper實現了一種主從模式的系統架構來保持集羣中各個副本之間的數據一致性。根據ZAB協議,全部的寫操做都必須經過Leader完成,leader收到寫請求,會將請求轉爲proposal並廣播給全部其它節點,其餘節點根據協議進行批准或經過。broadcast階段事實上就是一個兩階段提交的簡化版。其全部過程都跟兩階段提交一致,惟一不一致的是不能作事務的回滾。若是實現完整的兩階段提交,那就解決了一致性問題,不必發明新協議了,因此zab實際上拋棄了兩階段提交的事務回滾,因而一臺follower只能回覆ACK或者乾脆就不回覆了,leader只要收到過半的機器回覆即經過proposal。網絡

一旦Leader節點沒法工做,ZAB協議可以自動從Follower節點中從新選出一個合適的替代者,即新的Leader,該過程即爲領導選舉。該領導選舉過程,是ZAB協議中最爲重要和複雜的過程。

3.2 Watch機制

  • (一次性觸發)One-time trigger 
  • (發送至客戶端)Sent to the client 
  • (被設置 watch 的數據)The data for which the watch was set

3.3 出現腦裂怎麼辦(split-brain)

  • GC、網絡假死
  • Fencing

3.4 併發性能問題

  • 讀寫分離+ZAB

四、應用場景

4.1 統一命名服務

  • 全局惟一的ID

  • 集羣管理:動態的服務註冊和發現

def test_create():
    nodename = '/zk_test/service'
    index = 1
    while index < 10:
        zk.create(nodename,
                  b'192.168.187.215_{index}'.
                  format(index=index),
                        ephemeral=True,
                        sequence=True)
        time.sleep(3)
        index += 1



@zk.ChildrenWatch('/zk_test')
def my_func(children):
    print children
children = zk.get_children('/zk_test',watch=my_func)

while True:
    time.sleep(3)

4.2 分佈式鎖

my_id = uuid.uuid4()
lock = zk.Lock("/lockpath", str(my_id))
print "I am {}".format(str(my_id))
def work():
    time.sleep(3)
    print "{} is working! ".format(str(my_id))
while True:
    with lock:
        work()
zk.stop()

4.3 配置管理(數據發佈與訂閱)

  • 推拉結合
  • 分佈式協調/通知
    • 只通知一次

@zk.DataWatch('/zk_test/service')
def my_func(data, stat):
    print("Data is %s, Version is %s" %(data, stat.version))

while True:
    time.sleep(2)
    print '------'

4.4 集羣管理之 Master/Leader Election

  • 傳統的方案
    • 缺點
  • 分佈式方案
    • 爭搶

my_id = uuid.uuid4()
def leader_func():
    print "I am the leader {}".format(str(my_id))
    while True:
        print "{} is working! ".format(str(my_id))
        time.sleep(3)

election = zk.Election("/electionpath")

# blocks until the election is won, then calls
# leader_func()
election.run(leader_func)

4.5 隊列管理

  • 同步分佈式隊列
    • 當一個隊列的成員都聚齊時,這個隊列纔可用,不然一直等待全部成員到達。
  • 分佈式FIFO隊列
    • 入隊列有編號,出隊列時經過 getChildren( ) 返回全部,消費最小

4.6 心跳檢測/故障檢測(Failure Detection)

#coding=utf-8
from kazoo.client import KazooClient
import time
import logging

logging.basicConfig()
zk = KazooClient(hosts='bjdhj-187-215.58os.org:2181')
zk.start()
 
# Determine if a node exists
while True:
    if zk.exists("/zk_test/service0000000054"):
        print "the worker is alive!"
    else:
        print "the worker is dead!"
        break
    time.sleep(3)
 
zk.stop()

4.7 其它場景

  • 數據存儲(進度彙報)
    • offset → Zookeeper 
    • Zookeeper Write 影響 kafka 集羣吞吐量
    • __consumer_offsets Topic( 0.8.2.2  → 0.10.1.1)
      • Group,Topic,Partition 組合 Key
      • acking 級別設置爲了 -1
      • 內存三元組
  • 負載均衡
    • 服務端
    • 客戶端

五、ZooKeeper在大型分佈式系統中的應用

5.1 ZooKeeper在Hadoop中的應用

  • 主備切換

建立鎖節點
在ZooKeeper上會有一個/yarn-leader-election/appcluster-yarn的鎖節點,全部的ResourceManager在啓動的時候,都會去競爭寫一個Lock子節點:/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb,該節點是臨時節點。ZooKeepr可以爲咱們保證最終只有一個ResourceManager可以建立成功建立成功的那個ResourceManager就切換爲Active狀態沒有成功的那些ResourceManager則切換爲Standby狀態

  • 註冊Watcher監聽

全部Standby狀態的ResourceManager都會向/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb節點註冊一個節點變動的Watcher監聽,利用臨時節點的特性,可以快速感知到Active狀態的ResourceManager的運行狀況。

  • 主備切換

當Active狀態的ResourceManager出現諸如宕機或重啓的異常狀況時,其在ZooKeeper上鍊接的客戶端會話就會失效,所以/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb節點就會被刪除。此時其他各個Standby狀態的ResourceManager就都會接收到來自ZooKeeper服務端的Watcher事件通知,而後會重複進行步驟1的操做

以上就是利用ZooKeeper來實現ResourceManager的主備切換的過程,實現了ResourceManager的HA。

HDFS中NameNode的HA的實現原理跟YARN中ResourceManager的HA的實現原理相同。其鎖節點爲/hadoop-ha/mycluster/ActiveBreadCrumb

  • RM狀態存儲

在 ResourceManager 中,RMStateStore 可以存儲一些 RM 的內部狀態信息,包括 Application 以及它們的 Attempts 信息、Delegation Token 及 Version Information 等。須要注意的是,RMStateStore 中的絕大多數狀態信息都是不須要持久化存儲的,由於很容易從上下文信息中將其重構出來,如資源的使用狀況。在存儲的設計方案中,提供了三種可能的實現,分別以下。

  • 基於內存實現,通常是用於平常開發測試。
  • 基於文件系統的實現,如HDFS。
  • 基於ZooKeeper實現。

因爲這些狀態信息的數據量都不是很大,所以Hadoop官方建議基於ZooKeeper來實現狀態信息的存儲。在ZooKeepr上,ResourceManager 的狀態信息都被存儲在/rmstore這個根節點下面。

RMAppRoot 節點下存儲的是與各個 Application 相關的信息,RMDTSecretManagerRoot 存儲的是與安全相關的 Token 等信息。每一個 Active 狀態的 ResourceManager 在初始化階段都會從 ZooKeeper 上讀取到這些狀態信息,並根據這些狀態信息繼續進行相應的處理。

小結:

ZooKeepr在Hadoop中的應用主要有:

  1. HDFS中NameNode的HA和YARN中ResourceManager的HA。
  2. 存儲RMStateStore狀態信息

5.2 ZooKeeper在HBase中的應用

HBase主要用ZooKeeper來實現HMaster選舉與主備切換、系統容錯、RootRegion管理、Region狀態管理和分佈式SplitWAL任務管理等。

  • HMaster選舉與主備切換

HMaster選舉與主備切換的原理和HDFS中NameNode及YARN中ResourceManager的HA原理相同。

  • 系統容錯

當HBase啓動時,每一個RegionServer都會到ZooKeeper的/hbase/rs節點下建立一個信息節點(下文中,咱們稱該節點爲」rs狀態節點」),例如/hbase/rs/[Hostname],同時,HMaster會對這個節點註冊監聽。當某個 RegionServer 掛掉的時候,ZooKeeper會由於在一段時間內沒法接受其心跳(即 Session 失效),而刪除掉該 RegionServer 服務器對應的 rs 狀態節點。與此同時,HMaster 則會接收到 ZooKeeper 的 NodeDelete 通知,從而感知到某個節點斷開,並當即開始容錯工做。

HBase爲何不直接讓HMaster來負責RegionServer的監控呢?若是HMaster直接經過心跳機制等來管理RegionServer的狀態,隨着集羣愈來愈大,HMaster的管理負擔會愈來愈重,另外它自身也有掛掉的可能,所以數據還須要持久化。在這種狀況下,ZooKeeper就成了理想的選擇。

  • RootRegion管理

對應HBase集羣來講,數據存儲的位置信息是記錄在元數據region,也就是RootRegion上的。每次客戶端發起新的請求,須要知道數據的位置,就會去查詢RootRegion,而RootRegion自身位置則是記錄在ZooKeeper上的(默認狀況下,是記錄在ZooKeeper的/hbase/meta-region-server節點中)。當RootRegion發生變化,好比Region的手工移動、從新負載均衡或RootRegion所在服務器發生了故障等是,就可以經過ZooKeeper來感知到這一變化並作出一系列相應的容災措施,從而保證客戶端老是可以拿到正確的RootRegion信息。

  • Region管理

HBase裏的Region會常常發生變動,這些變動的緣由來自於系統故障、負載均衡、配置修改、Region分裂與合併等。一旦Region發生移動,它就會經歷下線(offline)和從新上線(online)的過程。

下線期間數據是不能被訪問的,而且Region的這個狀態變化必須讓全局知曉,不然可能會出現事務性的異常。對於大的HBase集羣來講,Region的數量可能會多達十萬級別,甚至更多,這樣規模的Region狀態管理交給ZooKeeper來作也是一個很好的選擇。

  • 分佈式SplitWAL任務管理

當某臺RegionServer服務器掛掉時,因爲總有一部分新寫入的數據尚未持久化到HFile中,所以在遷移該RegionServer的服務時,一個重要的工做就是從WAL中恢復這部分還在內存中的數據,而這部分工做最關鍵的一步就是SplitWAL,即HMaster須要遍歷該RegionServer服務器的WAL,並按Region切分紅小塊移動到新的地址下,並進行日誌的回放(replay)

因爲單個RegionServer的日誌量相對龐大(可能有上千個Region,上GB的日誌),而用戶又每每但願系統可以快速完成日誌的恢復工做。所以一個可行的方案是將這個處理WAL的任務分給多臺RegionServer服務器來共同處理,而這就又須要一個持久化組件來輔助HMaster完成任務的分配。當前的作法是,HMaster會在ZooKeeper上建立一個SplitWAL節點(默認狀況下,是/hbase/SplitWAL節點),將「哪一個RegionServer處理哪一個Region」這樣的信息以列表的形式存放到該節點上,而後由各個RegionServer服務器自行到該節點上去領取任務並在任務執行成功或失敗後再更新該節點的信息,以通知HMaster繼續進行後面的步驟。ZooKeeper在這裏擔負起了分佈式集羣中相互通知和信息持久化的角色。

小結:

以上就是一些HBase中依賴ZooKeeper完成分佈式協調功能的典型場景。但事實上,HBase對ZooKeepr的依賴還不止這些,好比HMaster還依賴ZooKeeper來完成Table的enable/disable狀態記錄,以及HBase中幾乎全部的元數據存儲都是放在ZooKeeper上的。

因爲ZooKeeper出色的分佈式協調能力及良好的通知機制,HBase在各版本的演進過程當中愈來愈多地增長了ZooKeeper的應用場景,從趨勢上來看二者的交集愈來愈多。HBase中全部對ZooKeeper的操做都封裝在了org.apache.hadoop.hbase.zookeeper這個包中,感興趣的同窗能夠自行研究。

5.3 ZooKeeper在 canal 中的應用

  • Canal Client
    • 保證get/ack/rollback有序
  • canal server
    • 減小mysql dump的請求
  • 原理
    • zk 分佈式鎖

六、無鎖實現

CAS

  • 原理
  • 缺陷
    • ABA
    • 衝突

AtomicInteger atomicInteger = new AtomicInteger();
for(int b = 0; b < numThreads; b++) {
  new Thread(() -> {
    for(int a = 0; a < iteration; a++) {
      atomicInteger.incrementAndGet();
    }
  }).start();
}

 public final int getAndIncrement() {
       for( ; ; ) {
           int current = get();
           int next = current + 1;
           if ( compareAndSet(current, next) )
                return current;
       }
}
private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
//接口調用次數+1
public long increase(String url) {
    Long oldValue = urlCounter.get(url);
    Long newValue = (oldValue == null) ? 1L : oldValue + 1;
    urlCounter.put(url, newValue);
    return newValue;
}
//獲取調用次數
public Long getCount(String url){
    return urlCounter.get(url);
}
//模擬併發狀況下的接口調用統計
for(int i=0;i<callTime;i++){
    executor.execute(new Runnable() {
        @Override
        public void run() {
            counterDemo.increase(url);
            countDownLatch.countDown();
        }
    });
}



// CAS:
while (true) {
    private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
    oldValue = urlCounter.get(url);
    if (oldValue == null) {
        newValue = 1l;
        //初始化成功,退出循環
        if (urlCounter.putIfAbsent(url, 1l) == null) break;
        //若是初始化失敗,說明其餘線程已經初始化過了
    } else {
        newValue = oldValue + 1;
        //+1成功,退出循環,原子操做
        if (urlCounter.replace(url, oldValue, newValue)) break;
        //若是+1失敗,說明其餘線程已經修改過了舊值
    }
}
return newValue;

Refer:

[1] ZooKeeper 學習筆記之掃盲

https://my.oschina.net/leejun2005/blog/67250

[2] ZooKeeper 原理及其在 Hadoop 和 HBase 中的應用

http://blog.jobbole.com/110388/

[3] Java 併發實踐 — ConcurrentHashMap 與 CAS

https://my.oschina.net/leejun2005/blog/81835

相關文章
相關標籤/搜索