ZooKeeper簡介
ZooKeeper是分佈式服務框架,主要是用來解決分佈式應用中常常遇到的一些數據管理問題,如:
統一命名服務、狀態同步服務、集羣管理、分佈式應用配置項的管理等等。
ZooKeeper基本概念
角色
ZooKeeper中的角色主要有如下三類,以下表所示:
設計目的
1. 最終一致性:client不論鏈接到哪一個Server,展現給它都是同一個視圖,這是ZooKeeper最重要的性能。
2. 可靠性:具備簡單、健壯、良好的性能,若是消息m被到一臺服務器接受,那麼它將被全部的服務器接受。
3. 實時性:ZooKeeper保證客戶端將在一個時間間隔範圍內得到服務器的更新信息,或者服務器失效的信息。但因爲網絡延時等緣由,ZooKeeper不能保證兩個客戶端能同時獲得剛更新的數據,若是須要最新數據,應該在讀數據以前調用sync()接口。
4. 等待無關(wait-free):慢的或者失效的client不得干預快速的client的請求,使得每一個client都能有效的等待。
5. 原子性:更新只能成功或者失敗,沒有中間狀態。
6. 順序性:包括全局有序和偏序兩種:全局有序是指若是在一臺服務器上消息a在消息b前發佈,則在全部Server上消息a都將在消息b前被髮布;偏序是指若是一個消息b在消息a後被同一個發送者發佈,a必將排在b前面。
ZooKeeper工做原理
ZooKeeper的核心是廣播,這個機制保證了各個Server之間的同步。實現這個機制的協議叫作Zab協議。Zab協議有兩種模式,它們分別是恢復模式(選主)和廣播模式(同步)。當服務啓動或者在領導者崩潰後,Zab就進入了恢復模式,當領導者被選舉出來,且大多數Server完成了和leader的狀態同步之後,恢復模式就結束了。狀態同步保證了leader和Server具備相同的系統狀態。爲了保證事務的順序一致性,ZooKeeper採用了遞增的事務id號 (zxid)來標識事務。全部的提議(proposal)都在被提出的時候加上了zxid。實現中zxid是一個64位的數字,它高32位是epoch用 來標識leader關係是否改變,每次一個leader被選出來,它都會有一個新的epoch,標識當前屬於那個leader的統治時期。低32位用於遞增計數。
每一個Server在工做過程當中有三種狀態:
LOOKING:當前Server不知道leader是誰,正在搜尋。
LEADING:當前Server即爲選舉出來的leader。
FOLLOWING:leader已經選舉出來,當前Server與之同步。
選主流程
當 leader崩潰或者leader失去大多數的follower,這時候zk進入恢復模式,恢復模式須要從新選舉出一個新的leader,讓全部的 Server都恢復到一個正確的狀態。Zk的選舉算法有兩種:一種是基於basic paxos實現的,另一種是基於fast paxos算法實現的。系統默認的選舉算法爲fast paxos。
basic paxos流程:
1 .選舉線程由當前Server發起選舉的線程擔任,其主要功能是對投票結果進行統計,並選出推薦的Server;
2 .選舉線程首先向全部Server發起一次詢問(包括本身);
3 .選舉線程收到回覆後,驗證是不是本身發起的詢問(驗證zxid是否一致),而後獲取對方的id(myid),並存儲到當前詢問對象列表中,最後獲取對方提議的leader相關信息(id,zxid),並將這些信息存儲到當次選舉的投票記錄表中;
4. 收到全部Server回覆之後,就計算出zxid最大的那個Server,並將這個Server相關信息設置成下一次要投票的Server;
5. 線程將當前zxid最大的Server設置爲當前Server要推薦的Leader,若是此時獲勝的Server得到n/2 + 1的Server票數, 設置當前推薦的leader爲獲勝的Server,將根據獲勝的Server相關信息設置本身的狀態,不然,繼續這個過程,直到leader被選舉出來。
通 過流程分析咱們能夠得出:要使Leader得到多數Server的支持,則Server總數必須是奇數2n+1,且存活的Server的數目不得少於 n+1.每一個Server啓動後都會重複以上流程。在恢復模式下,若是是剛從崩潰狀態恢復的或者剛啓動的server還會從磁盤快照中恢復數據和會話信 息,zk會記錄事務日誌並按期進行快照,方便在恢復時進行狀態恢復。
選主的具體流程圖以下所示:
fast paxos流程是在選舉過程當中,某Server首先向全部Server提議本身要成爲leader,當其它Server收到提議之後,解決epoch和 zxid的衝突,並接受對方的提議,而後向對方發送接受提議完成的消息,重複這個流程,最後必定能選舉出Leader。其流程圖以下所示:
同步流程
選完leader之後,zk就進入狀態同步過程。
1. leader等待server鏈接;
2 .Follower鏈接leader,將最大的zxid發送給leader;
3 .Leader根據follower的zxid肯定同步點;
4 .完成同步後通知follower 已經成爲uptodate狀態;
5 .Follower收到uptodate消息後,又能夠從新接受client的請求進行服務了。
流程圖以下所示:
工做流程
Leader工做流程
Leader主要有三個功能:
1 .恢復數據;
2 .維持與Learner的心跳,接收Learner請求並判斷Learner的請求消息類型;
3 .Learner的消息類型主要有PING消息、REQUEST消息、ACK消息、REVALIDATE消息,根據不一樣的消息類型,進行不一樣的處理。
PING 消息是指Learner的心跳信息;REQUEST消息是Follower發送的提議信息,包括寫請求及同步請求;ACK消息是Follower的對提議 的回覆,超過半數的Follower經過,則commit該提議;REVALIDATE消息是用來延長SESSION有效時間。
Leader的工做流程簡圖以下所示:
Follower工做流程
Follower主要有四個功能:
1. 向Leader發送請求(PING消息、REQUEST消息、ACK消息、REVALIDATE消息);
2 .接收Leader消息並進行處理;
3 .接收Client的請求,若是爲寫請求,發送給Leader進行投票;
4 .返回Client結果。
Follower的消息循環處理以下幾種來自Leader的消息:
1 .PING消息: 心跳消息;
2 .PROPOSAL消息:Leader發起的提案,要求Follower投票;
3 .COMMIT消息:服務器端最新一次提案的信息;
4 .UPTODATE消息:代表同步完成;
5 .REVALIDATE消息:根據Leader的REVALIDATE結果,關閉待revalidate的session仍是容許其接受消息;
6 .SYNC消息:返回SYNC結果到客戶端,這個消息最初由客戶端發起,用來強制獲得最新的更新。
Follower的工做流程簡圖以下所示:
Zookeeper 會維護一個具備層次關係的數據結構,它很是相似於一個標準的文件系統。
Zookeeper 這種數據結構有以下這些特色:
1. 每一個子目錄項如 NameService 都被稱做爲 znode,這個 znode 是被它所在的路徑惟一標識,如 Server1 這個 znode 的標識爲 /NameService/Server1(圖上有一個錯誤,你們自行發現吧:-))。
2. znode 能夠有子節點目錄,而且每一個 znode 能夠存儲數據,注意 EPHEMERAL 類型的目錄節點不能有子節點目錄。
3. znode 是有版本的,每一個 znode 中存儲的數據能夠有多個版本,也就是一個訪問路徑中能夠存儲多份數據。
4. znode 能夠是臨時節點,一旦建立這個 znode 的客戶端與服務器失去聯繫,這個 znode 也將自動刪除,Zookeeper 的客戶端和服務器通訊採用長鏈接方式,每一個客戶端和服務器經過心跳來保持鏈接,這個鏈接狀態稱爲 session,若是 znode 是臨時節點,這個 session 失效,znode 也就刪除了。
5. znode 的目錄名能夠自動編號,如 App1 已經存在,再建立的話,將會自動命名爲 App2。
6. znode 能夠被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化能夠通知設置監控的客戶端,這個是 Zookeeper 的核心特性,Zookeeper 的不少功能都是基於這個特性實現的,後面在典型的應用場景中會有實例介紹。
ZooKeeper安裝配置
ZooKeeper的安裝模式分爲三種,分別爲:單機模式(stand-alone)、集羣模式和集羣僞分佈模式(略)。
單機模式
下載ZooKeeper的安裝包以後, 解壓到合適目錄. 進入ZooKeeper目錄下的conf子目錄, 建立zoo.cfg(或者直接更名zoo_sample.cfg):
tickTime=2000
dataDir=D:/devtools/zookeeper-3.2.2/build
clientPort=2181
|
- clientPort:這個端口就是客戶端鏈接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。
當這些配置項配置好後,你如今就能夠啓動 Zookeeper 了,啓動後要檢查 Zookeeper 是否已經在服務,能夠經過 netstat -ano 命令查看是否有你配置的 clientPort 端口號在監聽服務。
集羣模式
Zookeeper 不只能夠單機提供服務,同時也支持多機組成集羣來提供服務。實際上 Zookeeper 還支持另一種僞集羣的方式,也就是能夠在一臺物理機上運行多個 Zookeeper 實例,下面將介紹集羣模式的安裝和配置。
Zookeeper 的集羣模式的安裝和配置也不是很複雜,所要作的就是增長几個配置項。集羣模式除了上面的三個配置項還要增長下面幾個配置項:
initLimit=5
syncLimit=2
server.1=192.168.211.1:2888:3888
server.2=192.168.211.2:2888:3888
|
-
initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這裏所說的客戶端不是用戶鏈接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集羣中鏈接到 Leader 的 Follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。當已經超過 10 個心跳的時間(也就是 tickTime)長度後 Zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是 5*2000=10 秒
-
syncLimit:這個配置項標識 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是 2*2000=4 秒
-
server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;B 是這個服務器的 ip 地址;C 表示的是這個服務器與集羣中的 Leader 服務器交換信息的端口;D 表示的是萬一集羣中的 Leader 服務器掛了,須要一個端口來從新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通訊的端口。若是是僞集羣的配置方式,因爲 B 都是同樣,因此不一樣的 Zookeeper 實例通訊端口號不能同樣,因此要給它們分配不一樣的端口號。
除了修改 zoo.cfg 配置文件,集羣模式下還要配置一個文件 myid,這個文件在 dataDir 目錄下,這個文件裏面就有一個數據就是 A 的值,Zookeeper 啓動時會讀取這個文件,拿到裏面的數據與 zoo.cfg 裏面的配置信息比較從而判斷究竟是那個 server。
ZooKeeper經常使用命令
ZooKeeper服務命令:
1. 啓動ZK服務: ./zkServer.sh start
2. 查看ZK服務狀態: ./zkServer.sh status
3. 中止ZK服務: ./zkServer.sh stop
4. 重啓ZK服務: ./zkServer.sh restart
ZooKeeper客戶端命令:
ZooKeeper 命令行工具相似於Linux的shell環境,使用它能夠對ZooKeeper進行訪問,數據建立,數據修改等操做. 使用
zkCli.sh -server 127.0.0.1:2181 鏈接到 ZooKeeper 服務,鏈接成功後,系統會輸出 ZooKeeper 的相關環境以及配置信息。
命令行工具的一些簡單操做以下:
1. 顯示根目錄下、文件: ls / 使用 ls 命令來查看當前 ZooKeeper 中所包含的內容
2. 顯示根目錄下、文件: ls2 / 查看當前節點數據並能看到更新次數等數據
3. 建立文件,並設置初始內容: create /zk "test" 建立一個新的 znode節點「 zk 」以及與它關聯的字符串
4. 獲取文件內容: get /zk 確認 znode 是否包含咱們所建立的字符串
5. 修改文件內容: set /zk "zkbak" 對 zk 所關聯的字符串進行設置
6. 刪除文件: delete /zk 將剛纔建立的 znode 刪除
7. 退出客戶端: quit
8. 幫助命令: help
ZooKeeper 經常使用四字命令:
ZooKeeper 支持某些特定的四字命令字母與其的交互。它們大可能是查詢命令,用來獲取 ZooKeeper 服務的當前狀態及相關信息。用戶在客戶端能夠經過 telnet 或 nc 向 ZooKeeper 提交相應的命令
1. 能夠經過命令:echo stat|nc 127.0.0.1 2181 來查看哪一個節點被選擇做爲follower或者leader
2. 使用echo ruok|nc 127.0.0.1 2181 測試是否啓動了該Server,若回覆imok表示已經啓動。
3. echo dump| nc 127.0.0.1 2181 ,列出未經處理的會話和臨時節點。
4. echo kill | nc 127.0.0.1 2181 ,關掉server
5. echo conf | nc 127.0.0.1 2181 ,輸出相關服務配置的詳細信息。
6. echo cons | nc 127.0.0.1 2181 ,列出全部鏈接到服務器的客戶端的徹底的鏈接 / 會話的詳細信息。
7. echo envi |nc 127.0.0.1 2181 ,輸出關於服務環境的詳細信息(區別於 conf 命令)。
8. echo reqs | nc 127.0.0.1 2181 ,列出未經處理的請求。
9. echo wchs | nc 127.0.0.1 2181 ,列出服務器 watch 的詳細信息。
10. echo wchc | nc 127.0.0.1 2181 ,經過 session 列出服務器 watch 的詳細信息,它的輸出是一個與 watch 相關的會話的列表。
11. echo wchp | nc 127.0.0.1 2181 ,經過路徑列出服務器 watch 的詳細信息。它輸出一個與 session 相關的路徑。
ZooKeeper的Java客戶端API
客戶端要鏈接 Zookeeper 服務器能夠經過建立 org.apache.zookeeper. ZooKeeper 的一個實例對象,而後調用這個類提供的接口來和服務器交互。
前面說了 ZooKeeper 主要是用來維護和監控一個目錄節點樹中存儲的數據的狀態,因此咱們可以操做 ZooKeeper 的節點也和操做目錄節點樹大致同樣,如建立一個目錄節點,給某個目錄節點設置數據,獲取某個目錄節點的全部子目錄節點,給某個目錄節點設置權限和監控這個目錄節點的狀態變化。
類org.apache.zookeeper. ZooKeeper 的方法以下表所示:
|
String create(String path, byte[] data, List<ACL> acl,CreateMode createMode) |
建立一個給定的目錄節點 path, 並給它設置數據,CreateMode 標識有四種形式的目錄節點,分別是 PERSISTENT:持久化目錄節點,這個目錄節點存儲的數據不會丟失;PERSISTENT_SEQUENTIAL:順序自動編號的目錄節點,這種目錄節點會根據當前已近存在的節點數自動加 1,而後返回給客戶端已經成功建立的目錄節點名;EPHEMERAL:臨時目錄節點,一旦建立這個節點的客戶端與服務器端口也就是 session 超時,這種節點會被自動刪除;EPHEMERAL_SEQUENTIAL:臨時自動編號節點 |
Stat exists(String path, boolean watch) |
判斷某個 path 是否存在,並設置是否監控這個目錄節點,這裏的 watcher 是在建立 ZooKeeper 實例時指定的 watcher,exists方法還有一個重載方法,能夠指定特定的watcher |
Stat exists(String path,Watcher watcher) |
重載方法,這裏給某個目錄節點設置特定的 watcher,Watcher 在 ZooKeeper 是一個核心功能,Watcher 能夠監控目錄節點的數據變化以及子目錄的變化,一旦這些狀態發生變化,服務器就會通知全部設置在這個目錄節點上的 Watcher,從而每一個客戶端都很快知道它所關注的目錄節點的狀態發生變化,而作出相應的反應 |
void delete(String path, int version) |
刪除 path 對應的目錄節點,version 爲 -1 能夠匹配任何版本,也就刪除了這個目錄節點全部數據 |
List<String>getChildren(String path, boolean watch) |
獲取指定 path 下的全部子目錄節點,一樣 getChildren方法也有一個重載方法能夠設置特定的 watcher 監控子節點的狀態 |
Stat setData(String path, byte[] data, int version) |
給 path 設置數據,能夠指定這個數據的版本號,若是 version 爲 -1 怎能夠匹配任何版本 |
byte[] getData(String path, boolean watch, Stat stat) |
獲取這個 path 對應的目錄節點存儲的數據,數據的版本等信息能夠經過 stat 來指定,同時還能夠設置是否監控這個目錄節點數據的狀態 |
voidaddAuthInfo(String scheme, byte[] auth) |
客戶端將本身的受權信息提交給服務器,服務器將根據這個受權信息驗證客戶端的訪問權限。 |
Stat setACL(String path,List<ACL> acl, int version) |
給某個目錄節點從新設置訪問權限,須要注意的是 Zookeeper 中的目錄節點權限不具備傳遞性,父目錄節點的權限不能傳遞給子目錄節點。目錄節點 ACL 由兩部分組成:perms 和 id。 Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 幾種 而 id 標識了訪問目錄節點的身份列表,默認狀況下有如下兩種: ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分別表示任何人均可以訪問和建立者擁有訪問權限。 |
List<ACL>getACL(String path,Stat stat) |
獲取某個目錄節點的訪問權限列表 |
除了以上這些列出的方法以外還有一些重載方法,如都提供了一個回調類的重載方法以及能夠設置特定 Watcher 的重載方法,具體的方法能夠參考 org.apache.zookeeper. ZooKeeper 類的 API 說明。
下面給出基本的操做 ZooKeeper 的示例代碼,這樣你就能對 ZooKeeper 有直觀的認識了。下面的清單包括了建立與 ZooKeeper 服務器的鏈接以及最基本的數據操做:
// 建立一個與服務器的鏈接
ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT,
ClientBase.CONNECTION_TIMEOUT, new Watcher() {
// 監控全部被觸發的事件
public void process(WatchedEvent event) {
System.out.println("已經觸發了" + event.getType() + "事件!");
}
});
// 建立一個目錄節點
zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// 建立一個子目錄節點
zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath",false,null)));
// 取出子目錄節點列表
System.out.println(zk.getChildren("/testRootPath",true));
// 修改子目錄節點數據
zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);
System.out.println("目錄節點狀態:["+zk.exists("/testRootPath",true)+"]");
// 建立另一個子目錄節點
zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));
// 刪除子目錄節點
zk.delete("/testRootPath/testChildPathTwo",-1);
zk.delete("/testRootPath/testChildPathOne",-1);
// 刪除父目錄節點
zk.delete("/testRootPath",-1);
// 關閉鏈接
zk.close();
|
輸出的結果以下:
已經觸發了 None 事件!
testRootData
[testChildPathOne]
目錄節點狀態:[5,5,1281804532336,1281804532336,0,1,0,0,12,1,6]
已經觸發了 NodeChildrenChanged 事件!
testChildDataTwo
已經觸發了 NodeDeleted 事件!
已經觸發了 NodeDeleted 事件!
|
當對目錄節點監控狀態打開時,一旦目錄節點的狀態發生變化,Watcher 對象的 process 方法就會被調用。
直接使用zk的api實現業務功能比較繁瑣。由於要處理session loss,session expire等異常,在發生這些異常後進行重連。又由於ZK的watcher是一次性的,若是要基於wather實現發佈/訂閱模式,還要本身包裝一下,將一次性訂閱包裝成持久訂閱。另外若是要使用抽象級別更高的功能,好比分佈式鎖,leader選舉等,還要本身額外作不少事情。這裏介紹下ZK的兩個第三方客戶端包裝小工具,能夠分別解決上述小問題。
zkClient主要作了兩件事情。一件是在session loss和session expire時自動建立新的ZooKeeper實例進行重連。另外一件是將一次性watcher包裝爲持久watcher。後者的具體作法是簡單的在watcher回調中,從新讀取數據的同時再註冊相同的watcher實例。
ZkConnection 類: 對zookeeper API的簡單分裝,提供了連接zookeeper server和數據CRUD的操做;此類實現了IZkConnection接口,一般狀況下,若是I0Itec-zkclient不能知足須要的時候,我 們能夠重寫ZkConnection便可.ZkClient類: 核心類,也是開發者須要直接使用的類,它內部維護了zookeeper的連接管理和Event處理邏輯等,同時也暴露了zookeeper znode的CRUD方法列表.IZkChildListener接口: znode 子節點事件偵聽器,當ZkClient接收到某個path節點變動或者子節點變動事件時,會觸發lisntener.IZkDataListener接 口:IZkStateListener接口: 當zookeeper客戶端狀態變動時,觸發.
public static void testzkClient(final String serverList) {
ZkClient zkClient4subChild = new ZkClient(serverList);
zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List currentChilds) throws Exception {
System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
}
});
}
上面是訂閱children變化,下面是訂閱數據變化
ZkClient zkClient4subData = new ZkClient(serverList);
zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(prefix() + "Data of " + dataPath + " has changed");
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(prefix() + dataPath + " has deleted");
}
});
訂閱鏈接狀態的變化:
ZkClient zkClient4subStat = new ZkClient(serverList);
zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
@Override public void handleNewSession() throws Exception {
System.out.println(prefix() + "handleNewSession()");
}
@Override
public void handleStateChanged(KeeperState stat) throws Exception {
System.out.println(prefix() + "handleStateChanged,stat:" + stat);
}
});
下面表格列出了寫操做與ZK內部產生的事件的對應關係:node
event For 「/path」
event For 「/path/child」
|
create(「/path」) |
EventType.NodeCreated |
NA |
delete(「/path」) |
EventType.NodeDeleted |
NA |
setData(「/path」) |
EventType.NodeDataChanged |
NA |
create(「/path/child」) |
EventType.NodeChildrenChanged |
EventType.NodeCreated |
delete(「/path/child」) |
EventType.NodeChildrenChanged |
EventType.NodeDeleted |
setData(「/path/child」) |
NA |
EventType.NodeDataChanged |
而ZK內部的寫事件與所觸發的watcher的對應關係以下:git
event For 「/path」
defaultWatcher
exists
(「/path」)
getData
(「/path」)
getChildren
(「/path」)
|
EventType.None |
√ |
√ |
√ |
√ |
EventType.NodeCreated |
|
√ |
√ |
|
EventType.NodeDeleted |
|
√(不正常) |
√ |
|
EventType.NodeDataChanged |
|
√ |
√ |
|
EventType.NodeChildrenChanged |
|
|
|
√ |
綜合上面兩個表,咱們能夠總結出各類寫操做能夠觸發哪些watcher,以下表所示:github
「/path」
「/path/child」
exists
getData
getChildren
exists
getData
getChildren
|
|
create(「/path」) |
√ |
√ |
|
|
|
|
delete(「/path」) |
√ |
√ |
√ |
|
|
|
setData(「/path」) |
√ |
√ |
|
|
|
|
create(「/path/child」) |
|
|
√ |
√ |
√ |
|
delete(「/path/child」) |
|
|
√ |
√ |
√ |
√ |
setData(「/path/child」) |
|
|
|
√ |
√ |
|
若是發生session close、authFail和invalid,那麼全部類型的wather都會被觸發。算法
zkClient除了作了一些便捷包裝以外,對watcher使用作了一點加強。好比subscribeChildChanges其實是經過exists和getChildren關注了兩個事件。這樣當create(「/path」)時,對應path上經過getChildren註冊的listener也會被調用。另外subscribeDataChanges實際上只是經過exists註冊了事件。由於從上表能夠看到,對於一個更新,經過exists和getData註冊的watcher要麼都會觸發,要麼都不會觸發。shell
zkClient地址:https://github.com/sgroschupf/zkclient
Maven工程中使用zkClient須要加的依賴:數據庫
<dependency>
<groupId>zkclient</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
menagerie基於Zookeeper實現了java.util.concurrent包的一個分佈式版本。這個封裝是更大粒度上對各類分佈式一致性使用場景的抽象。其中最基礎和經常使用的是一個分佈式鎖的實現:
org.menagerie.locks.ReentrantZkLock,經過ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL類型znode的支持,實現了分佈式鎖。具體作法是:不一樣的client上每一個試圖得到鎖的線程,都在相同的basepath下面建立一個EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要建立的是臨時znode,建立鏈接斷開時會自動刪除; SEQUENTIAL表示要自動在傳入的path後面綴上一個自增的全局惟一後綴,做爲最終的path。所以對不一樣的請求ZK會生成不一樣的後綴,並分別返回帶了各自後綴的path給各個請求。由於ZK全局有序的特性,無論client請求怎樣前後到達,在ZKServer端都會最終排好一個順序,所以自增後綴最小的那個子節點,就對應第一個到達ZK的有效請求。而後client讀取basepath下的全部子節點和ZK返回給本身的path進行比較,當發現本身建立的sequential node的後綴序號排在第一個時,就認爲本身得到了鎖;不然的話,就認爲本身沒有得到鎖。這時確定是有其餘併發的而且是沒有斷開的client/線程先建立了node。
基於分佈式鎖,還實現了其餘業務場景,好比leader選舉:
public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(「ZK-host-ip:2181″, 5000);
LeaderElector elector = new ZkLeaderElector(「/leaderElectionTest」, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(「Try to become the leader success!」);
}
}apache
java.util.concurrent包下面的其餘接口實現,也主要是基於ReentrantZkLock的,好比ZkHashMap實現了ConcurrentMap。具體請參見menagerie的API文檔設計模式
menagerie地址:https://github.com/openUtility/menagerie
Maven工程中使用menagerie須要加的依賴:
<dependency>
<groupId>org.menagerie</groupId>
<artifactId>menagerie</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
Zookeeper 從設計模式角度來看,是一個基於觀察者模式設計的分佈式服務管理框架,它負責存儲和管理你們都關心的數據,而後接受觀察者的註冊,一旦這些數據的狀態發生變化,Zookeeper 就將負責通知已經在 Zookeeper 上註冊的那些觀察者作出相應的反應,從而實現集羣中相似 Master/Slave 管理模式,關於 Zookeeper 的詳細架構等內部細節能夠閱讀 Zookeeper 的源碼。
下面詳細介紹這些典型的應用場景,也就是 Zookeeper 到底能幫咱們解決那些問題?下面將給出答案。
分佈式應用中,一般須要有一套完整的命名規則,既可以產生惟一的名稱又便於人識別和記住,一般狀況下用樹形的名稱結構是一個理想的選擇,樹形的名稱結構是一個有層次的目錄結構,既對人友好又不會重複。說到這裏你可能想到了 JNDI,沒錯 Zookeeper 的 Name Service 與 JNDI 可以完成的功能是差很少的,它們都是將有層次的目錄結構關聯到必定資源上,可是 Zookeeper 的 Name Service 更加是普遍意義上的關聯,也許你並不須要將名稱關聯到特定資源上,你可能只須要一個不會重複名稱,就像數據庫中產生一個惟一的數字主鍵同樣。
Name Service 已是 Zookeeper 內置的功能,你只要調用 Zookeeper 的 API 就能實現。如調用 create 接口就能夠很容易建立一個目錄節點。
配置管理(Configuration Management)
配置的管理在分佈式應用環境中很常見,例如同一個應用系統須要多臺 PC Server 運行,可是它們運行的應用系統的某些配置項是相同的,若是要修改這些相同的配置項,那麼就必須同時修改每臺運行這個應用系統的 PC Server,這樣很是麻煩並且容易出錯。
像這樣的配置信息徹底能夠交給 Zookeeper 來管理,將配置信息保存在 Zookeeper 的某個目錄節點中,而後將全部須要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每臺應用機器就會收到 Zookeeper 的通知,而後從 Zookeeper 獲取新的配置信息應用到系統中。
Zookeeper 可以很容易的實現集羣管理的功能,若有多臺 Server 組成一個服務集羣,那麼必需要一個「總管」知道當前集羣中每臺機器的服務狀態,一旦有機器不能提供服務,集羣中其它集羣必須知道,從而作出調整從新分配服務策略。一樣當增長集羣的服務能力時,就會增長一臺或多臺 Server,一樣也必須讓「總管」知道。
Zookeeper 不只可以幫你維護當前的集羣中機器的服務狀態,並且可以幫你選出一個「總管」,讓這個總管來管理集羣,這就是 Zookeeper 的另外一個功能 Leader Election。
它們的實現方式都是在 Zookeeper 上建立一個 EPHEMERAL 類型的目錄節點,而後每一個 Server 在它們建立目錄節點的父目錄節點上調用
getChildren(
String path, boolean watch) 方法並設置 watch 爲 true,因爲是 EPHEMERAL 目錄節點,當建立它的 Server 死去,這個目錄節點也隨之被刪除,因此 Children 將會變化,這時
getChildren上的 Watch 將會被調用,因此其它 Server 就知道已經有某臺 Server 死去了。新增 Server 也是一樣的原理。
Zookeeper 如何實現 Leader Election,也就是選出一個 Master Server。和前面的同樣每臺 Server 建立一個 EPHEMERAL 目錄節點,不一樣的是它仍是一個 SEQUENTIAL 目錄節點,因此它是個 EPHEMERAL_SEQUENTIAL 目錄節點。之因此它是 EPHEMERAL_SEQUENTIAL 目錄節點,是由於咱們能夠給每臺 Server 編號,咱們能夠選擇當前是最小編號的 Server 爲 Master,假如這個最小編號的 Server 死去,因爲是 EPHEMERAL 節點,死去的 Server 對應的節點也被刪除,因此當前的節點列表中又出現一個最小編號的節點,咱們就選擇這個節點爲當前 Master。這樣就實現了動態選擇 Master,避免了傳統意義上單 Master 容易出現單點故障的問題。
void findLeader() throws InterruptedException {
byte[] leader = null;
try {
leader = zk.getData(root + "/leader", true, null);
} catch (Exception e) {
logger.error(e);
}
if (leader != null) {
following();
} else {
String newLeader = null;
try {
byte[] localhost = InetAddress.getLocalHost().getAddress();
newLeader = zk.create(root + "/leader", localhost,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
logger.error(e);
}
if (newLeader != null) {
leading();
} else {
mutex.wait();
}
}
}
|
共享鎖在同一個進程中很容易實現,可是在跨進程或者在不一樣 Server 之間就很差實現了。Zookeeper 卻很容易實現這個功能,實現方式也是須要得到鎖的 Server 建立一個 EPHEMERAL_SEQUENTIAL 目錄節點,而後調用
getChildren方法獲取當前的目錄節點列表中最小的目錄節點是否是就是本身建立的目錄節點,若是正是本身建立的,那麼它就得到了這個鎖,若是不是那麼它就調用
exists(
String path, boolean watch) 方法並監控 Zookeeper 上目錄節點列表的變化,一直到本身建立的節點是列表中最小編號的目錄節點,從而得到鎖,釋放鎖很簡單,只要刪除前面它本身所建立的目錄節點就好了。
void getLock() throws KeeperException, InterruptedException{
List<String> list = zk.getChildren(root, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if(myZnode.equals(root+"/"+nodes[0])){
doAction();
}
else{
waitForLock(nodes[0]);
}
}
void waitForLock(String lower) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
if(stat != null){
mutex.wait();
}
else{
getLock();
}
}
|
Zookeeper 能夠處理兩種類型的隊列:
(1)當一個隊列的成員都聚齊時,這個隊列纔可用,不然一直等待全部成員到達,這種是同步隊列。
(2)隊列按照 FIFO 方式進行入隊和出隊操做,例如實現生產者和消費者模型。
同步隊列用 Zookeeper 實現的實現思路以下:
建立一個父目錄 /synchronizing,每一個成員都監控標誌(Set Watch)位目錄 /synchronizing/start 是否存在,而後每一個成員都加入這個隊列,加入隊列的方式就是建立 /synchronizing/member_i 的臨時目錄節點,而後每一個成員獲取 / synchronizing 目錄的全部目錄節點,也就是 member_i。判斷 i 的值是否已是成員的個數,若是小於成員個數等待 /synchronizing/start 的出現,若是已經相等就建立 /synchronizing/start。
用下面的流程圖更容易理解:
同步隊列的關鍵代碼以下:
void addQueue() throws KeeperException, InterruptedException{
zk.exists(root + "/start",true);
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
synchronized (mutex) {
List<String> list = zk.getChildren(root, false);
if (list.size() < size) {
mutex.wait();
} else {
zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
}
|
當隊列沒盡是進入 wait(),而後會一直等待 Watch 的通知,Watch 的代碼以下:
public void process(WatchedEvent event) {
if(event.getPath().equals(root + "/start") &&
event.getType() == Event.EventType.NodeCreated){
System.out.println("獲得通知");
super.process(event);
doAction();
}
}
|
FIFO 隊列用 Zookeeper 實現思路以下:
實現的思路也很是簡單,就是在特定的目錄下建立 SEQUENTIAL 類型的子目錄 /queue_i,這樣就能保證全部成員加入隊列時都是有編號的,出隊列時經過 getChildren( ) 方法能夠返回當前全部的隊列中的元素,而後消費其中最小的一個,這樣就能保證 FIFO。
下面是生產者和消費者這種隊列形式的示例代碼:
boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
|
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
if(tempValue < min) min = tempValue;
}
byte[] b = zk.getData(root + "/element" + min,false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
|
參考資料