在Hadoop生態系統中,許多項目的Logo都採用了動物,好比 Hadoop 和 Hive 採用了大象的形象,HBase 採用了海豚的形象,而從字面上來看 ZooKeeper 表示動物園管理員,因此你們能夠理解爲 ZooKeeper就是對這些動物(項目組件)進行一些管理工做的。java
對於單機環境多線程的競態資源協調方法,咱們通常經過線程鎖來協調對共享數據的訪問以保證狀態的一致性。 可是分佈式環境如何進行協調呢?因而,Google創造了Chubby,而ZooKeeper則是對於Chubby的一個開源實現。
ZooKeeper是一種爲分佈式應用所設計的高可用、高性能且一致的開源協調服務,它提供了一項基本服務:分佈式鎖服務。因爲ZooKeeper的開源特性,後來咱們的開發者在分佈式鎖的基礎上,摸索了出了其餘的使用方法:配置維護、組服務、分佈式消息隊列、分佈式通知/協調等。它被設計爲易於編程,使用文件系統目錄樹做爲數據模型。node
Zookeeper服務自身組成一個集羣(2n+1個服務容許n>=1個失效)。Zookeeper集羣是一個基於主從複製的高可用集羣,每一個服務器承擔以下三種角色中的一種python
順序一致性:按照客戶端發送請求的順序更新數據。mysql
原子性:更新要麼成功,要麼失敗,不會出現部分更新。sql
單一性 :不管客戶端鏈接哪一個server,都會看到同一個視圖。apache
可靠性:一旦數據更新成功,將一直保持,直到新的更新。編程
及時性:客戶端會在一個肯定的時間內獲得最新的數據。安全
Zookeeper表現爲一個分層的文件系統目錄樹結構(不一樣於文件系統的是,節點能夠有本身的數據,而文件系統中的目錄節點只有子節點)。服務器
爲了保證寫操做的一致性與可用性,Zookeeper專門設計了一種名爲原子廣播(ZAB)的支持崩潰恢復的一致性協議。基於該協議,Zookeeper實現了一種主從模式的系統架構來保持集羣中各個副本之間的數據一致性。根據ZAB協議,全部的寫操做都必須經過Leader完成,leader收到寫請求,會將請求轉爲proposal並廣播給全部其它節點,其餘節點根據協議進行批准或經過。broadcast階段事實上就是一個兩階段提交的簡化版。其全部過程都跟兩階段提交一致,惟一不一致的是不能作事務的回滾。若是實現完整的兩階段提交,那就解決了一致性問題,不必發明新協議了,因此zab實際上拋棄了兩階段提交的事務回滾,因而一臺follower只能回覆ACK或者乾脆就不回覆了,leader只要收到過半的機器回覆即經過proposal。網絡
一旦Leader節點沒法工做,ZAB協議可以自動從Follower節點中從新選出一個合適的替代者,即新的Leader,該過程即爲領導選舉。該領導選舉過程,是ZAB協議中最爲重要和複雜的過程。
def test_create(): nodename = '/zk_test/service' index = 1 while index < 10: zk.create(nodename, b'192.168.187.215_{index}'. format(index=index), ephemeral=True, sequence=True) time.sleep(3) index += 1 @zk.ChildrenWatch('/zk_test') def my_func(children): print children children = zk.get_children('/zk_test',watch=my_func) while True: time.sleep(3)
my_id = uuid.uuid4() lock = zk.Lock("/lockpath", str(my_id)) print "I am {}".format(str(my_id)) def work(): time.sleep(3) print "{} is working! ".format(str(my_id)) while True: with lock: work() zk.stop()
@zk.DataWatch('/zk_test/service') def my_func(data, stat): print("Data is %s, Version is %s" %(data, stat.version)) while True: time.sleep(2) print '------'
my_id = uuid.uuid4() def leader_func(): print "I am the leader {}".format(str(my_id)) while True: print "{} is working! ".format(str(my_id)) time.sleep(3) election = zk.Election("/electionpath") # blocks until the election is won, then calls # leader_func() election.run(leader_func)
#coding=utf-8 from kazoo.client import KazooClient import time import logging logging.basicConfig() zk = KazooClient(hosts='bjdhj-187-215.58os.org:2181') zk.start() # Determine if a node exists while True: if zk.exists("/zk_test/service0000000054"): print "the worker is alive!" else: print "the worker is dead!" break time.sleep(3) zk.stop()
建立鎖節點
在ZooKeeper上會有一個/yarn-leader-election/appcluster-yarn
的鎖節點,全部的ResourceManager在啓動的時候,都會去競爭寫一個Lock子節點:/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb
,該節點是臨時節點。ZooKeepr可以爲咱們保證最終只有一個ResourceManager可以建立成功。建立成功的那個ResourceManager就切換爲Active狀態,沒有成功的那些ResourceManager則切換爲Standby狀態。
全部Standby狀態的ResourceManager都會向/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb
節點註冊一個節點變動的Watcher監聽,利用臨時節點的特性,可以快速感知到Active狀態的ResourceManager的運行狀況。
當Active狀態的ResourceManager出現諸如宕機或重啓的異常狀況時,其在ZooKeeper上鍊接的客戶端會話就會失效,所以/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb
節點就會被刪除。此時其他各個Standby狀態的ResourceManager就都會接收到來自ZooKeeper服務端的Watcher事件通知,而後會重複進行步驟1的操做。
以上就是利用ZooKeeper來實現ResourceManager的主備切換的過程,實現了ResourceManager的HA。
HDFS中NameNode的HA的實現原理跟YARN中ResourceManager的HA的實現原理相同。其鎖節點爲/hadoop-ha/mycluster/ActiveBreadCrumb
。
在 ResourceManager 中,RMStateStore 可以存儲一些 RM 的內部狀態信息,包括 Application 以及它們的 Attempts 信息、Delegation Token 及 Version Information 等。須要注意的是,RMStateStore 中的絕大多數狀態信息都是不須要持久化存儲的,由於很容易從上下文信息中將其重構出來,如資源的使用狀況。在存儲的設計方案中,提供了三種可能的實現,分別以下。
因爲這些狀態信息的數據量都不是很大,所以Hadoop官方建議基於ZooKeeper來實現狀態信息的存儲。在ZooKeepr上,ResourceManager 的狀態信息都被存儲在/rmstore
這個根節點下面。
RMAppRoot 節點下存儲的是與各個 Application 相關的信息,RMDTSecretManagerRoot 存儲的是與安全相關的 Token 等信息。每一個 Active 狀態的 ResourceManager 在初始化階段都會從 ZooKeeper 上讀取到這些狀態信息,並根據這些狀態信息繼續進行相應的處理。
小結:
ZooKeepr在Hadoop中的應用主要有:
HBase主要用ZooKeeper來實現HMaster選舉與主備切換、系統容錯、RootRegion管理、Region狀態管理和分佈式SplitWAL任務管理等。
HMaster選舉與主備切換的原理和HDFS中NameNode及YARN中ResourceManager的HA原理相同。
當HBase啓動時,每一個RegionServer都會到ZooKeeper的/hbase/rs
節點下建立一個信息節點(下文中,咱們稱該節點爲」rs狀態節點」),例如/hbase/rs/[Hostname]
,同時,HMaster會對這個節點註冊監聽。當某個 RegionServer 掛掉的時候,ZooKeeper會由於在一段時間內沒法接受其心跳(即 Session 失效),而刪除掉該 RegionServer 服務器對應的 rs 狀態節點。與此同時,HMaster 則會接收到 ZooKeeper 的 NodeDelete 通知,從而感知到某個節點斷開,並當即開始容錯工做。
HBase爲何不直接讓HMaster來負責RegionServer的監控呢?若是HMaster直接經過心跳機制等來管理RegionServer的狀態,隨着集羣愈來愈大,HMaster的管理負擔會愈來愈重,另外它自身也有掛掉的可能,所以數據還須要持久化。在這種狀況下,ZooKeeper就成了理想的選擇。
對應HBase集羣來講,數據存儲的位置信息是記錄在元數據region,也就是RootRegion上的。每次客戶端發起新的請求,須要知道數據的位置,就會去查詢RootRegion,而RootRegion自身位置則是記錄在ZooKeeper上的(默認狀況下,是記錄在ZooKeeper的/hbase/meta-region-server
節點中)。當RootRegion發生變化,好比Region的手工移動、從新負載均衡或RootRegion所在服務器發生了故障等是,就可以經過ZooKeeper來感知到這一變化並作出一系列相應的容災措施,從而保證客戶端老是可以拿到正確的RootRegion信息。
HBase裏的Region會常常發生變動,這些變動的緣由來自於系統故障、負載均衡、配置修改、Region分裂與合併等。一旦Region發生移動,它就會經歷下線(offline)和從新上線(online)的過程。
在下線期間數據是不能被訪問的,而且Region的這個狀態變化必須讓全局知曉,不然可能會出現事務性的異常。對於大的HBase集羣來講,Region的數量可能會多達十萬級別,甚至更多,這樣規模的Region狀態管理交給ZooKeeper來作也是一個很好的選擇。
當某臺RegionServer服務器掛掉時,因爲總有一部分新寫入的數據尚未持久化到HFile中,所以在遷移該RegionServer的服務時,一個重要的工做就是從WAL中恢復這部分還在內存中的數據,而這部分工做最關鍵的一步就是SplitWAL,即HMaster須要遍歷該RegionServer服務器的WAL,並按Region切分紅小塊移動到新的地址下,並進行日誌的回放(replay)。
因爲單個RegionServer的日誌量相對龐大(可能有上千個Region,上GB的日誌),而用戶又每每但願系統可以快速完成日誌的恢復工做。所以一個可行的方案是將這個處理WAL的任務分給多臺RegionServer服務器來共同處理,而這就又須要一個持久化組件來輔助HMaster完成任務的分配。當前的作法是,HMaster會在ZooKeeper上建立一個SplitWAL節點(默認狀況下,是/hbase/SplitWAL
節點),將「哪一個RegionServer處理哪一個Region」這樣的信息以列表的形式存放到該節點上,而後由各個RegionServer服務器自行到該節點上去領取任務並在任務執行成功或失敗後再更新該節點的信息,以通知HMaster繼續進行後面的步驟。ZooKeeper在這裏擔負起了分佈式集羣中相互通知和信息持久化的角色。
小結:
以上就是一些HBase中依賴ZooKeeper完成分佈式協調功能的典型場景。但事實上,HBase對ZooKeepr的依賴還不止這些,好比HMaster還依賴ZooKeeper來完成Table的enable/disable狀態記錄,以及HBase中幾乎全部的元數據存儲都是放在ZooKeeper上的。
因爲ZooKeeper出色的分佈式協調能力及良好的通知機制,HBase在各版本的演進過程當中愈來愈多地增長了ZooKeeper的應用場景,從趨勢上來看二者的交集愈來愈多。HBase中全部對ZooKeeper的操做都封裝在了org.apache.hadoop.hbase.zookeeper這個包中,感興趣的同窗能夠自行研究。
CAS
AtomicInteger atomicInteger = new AtomicInteger(); for(int b = 0; b < numThreads; b++) { new Thread(() -> { for(int a = 0; a < iteration; a++) { atomicInteger.incrementAndGet(); } }).start(); } public final int getAndIncrement() { for( ; ; ) { int current = get(); int next = current + 1; if ( compareAndSet(current, next) ) return current; } }
private final Map<String, Long> urlCounter = new ConcurrentHashMap<>(); //接口調用次數+1 public long increase(String url) { Long oldValue = urlCounter.get(url); Long newValue = (oldValue == null) ? 1L : oldValue + 1; urlCounter.put(url, newValue); return newValue; } //獲取調用次數 public Long getCount(String url){ return urlCounter.get(url); } //模擬併發狀況下的接口調用統計 for(int i=0;i<callTime;i++){ executor.execute(new Runnable() { @Override public void run() { counterDemo.increase(url); countDownLatch.countDown(); } }); } // CAS: while (true) { private final Map<String, Long> urlCounter = new ConcurrentHashMap<>(); oldValue = urlCounter.get(url); if (oldValue == null) { newValue = 1l; //初始化成功,退出循環 if (urlCounter.putIfAbsent(url, 1l) == null) break; //若是初始化失敗,說明其餘線程已經初始化過了 } else { newValue = oldValue + 1; //+1成功,退出循環,原子操做 if (urlCounter.replace(url, oldValue, newValue)) break; //若是+1失敗,說明其餘線程已經修改過了舊值 } } return newValue;
[1] ZooKeeper 學習筆記之掃盲
https://my.oschina.net/leejun2005/blog/67250
[2] ZooKeeper 原理及其在 Hadoop 和 HBase 中的應用
http://blog.jobbole.com/110388/
[3] Java 併發實踐 — ConcurrentHashMap 與 CAS