1.zookeeper介紹java
ZooKeeper 是一個開源的分佈式協調服務,由雅虎建立,是 Google Chubby 的開源實現。分佈式應用程序能夠基於 ZooKeeper 實現諸如數據發佈/訂閱、負載均衡、命名服務、分佈式協調/通知、集羣管理、Master 選舉、配置維護,名字服務、分佈式同步、分佈式鎖和分佈式隊列等功能。node
數據模型:ZooKeeper 容許分佈式進程經過共享的層次結構命名空間進行相互協調,這與標準文件系統相似。名稱空間由 ZooKeeper 中的數據寄存器組成,稱爲 Znode,這些相似於文件和目錄。與典型文件系統不一樣,ZooKeeper 數據保存在內存中,這意味着 ZooKeeper 能夠實現高吞吐量和低延遲。算法
順序訪問:對於來自客戶端的每一個更新請求,ZooKeeper 都會分配一個全局惟一的遞增編號。這個編號反應了全部事務操做的前後順序,應用程序可使用 ZooKeeper 這個特性來實現更高層次的同步原語。這個編號也叫作時間戳—zxid(ZooKeeper Transaction Id)。apache
可構建集羣:爲了保證高可用,最好是以集羣形態來部署 ZooKeeper,這樣只要集羣中大部分機器是可用的(可以容忍必定的機器故障),那麼 ZooKeeper 自己仍然是可用的。客戶端在使用 ZooKeeper 時,須要知道集羣機器列表,經過與集羣中的某一臺機器創建 TCP 鏈接來使用服務。客戶端使用這個 TCP 連接來發送請求、獲取結果、獲取監聽事件以及發送心跳包。若是這個鏈接異常斷開了,客戶端能夠鏈接到另外的機器上。bash
上圖中每個 Server 表明一個安裝 ZooKeeper 服務的服務器。組成 ZooKeeper 服務的服務器都會在內存中維護當前的服務器狀態,而且每臺服務器之間都互相保持着通訊。集羣間經過 Zab 協議(Zookeeper Atomic Broadcast)來保持數據的一致性。Zookeeper服務器有三種角色:Leader、Follower、Observer,集羣中的全部機器經過一個 Leader 選舉過程來選定一臺稱爲 「Leader」 的機器。Leader 既能夠爲客戶端提供寫服務又能提供讀服務。除了 Leader 外,Follower 和 Observer 都只能提供讀服務。Follower 和 Observer 惟一的區別在於 Observer 機器不參與 Leader 的選舉過程,也不參與寫操做的「過半寫成功」策略,所以 Observer 機器能夠在不影響寫性能的狀況下提高集羣的讀性能。在 ZooKeeper 中,主要依賴 ZAB 協議來實現分佈式數據一致性,基於該協議,ZooKeeper 實現了一種主備模式的系統架構來保持集羣中各個副本之間的數據一致性。服務器
工做原理:Zookeeper的核心是原子廣播,這個機制保證了各個server之間的同步。實現這個機制的協議叫作Zab協議。Zab協議有兩種模式,它們分別是恢復模式和廣播模式。當服務啓動或者在領導者崩潰後,Zab就進入了恢復模式,當領導者被選舉出來,且大多數server的完成了和leader的狀態同步之後,恢復模式就結束了。狀態同步保證了leader和server具備相同的系統狀態。一旦leader已經和多數的follower進行了狀態同步後,他就能夠開始廣播消息了,即進入廣播狀態。這時候當一個server加入zookeeper服務中,它會在恢復模式下啓動,發現leader,並和leader進行狀態同步。待到同步結束,它也參與消息廣播。Zookeeper服務一直維持在Broadcast狀態,直到leader崩潰了或者leader失去了大部分的followers支持。網絡
Leader選舉:廣播模式須要保證proposal(提議)被按順序處理(leader來執行寫操做),所以zk採用了遞增的事務id號(zxid)來保證。全部的提議都在被提出的時候加上了zxid。實現中zxid是一個64爲的數字,它高32位是epoch用來標識leader關係是否改變,每次一個leader被選出來,它都會有一個新的epoch。低32位是個遞增計數。當leader崩潰或者leader失去大多數的follower,這時候zk進入恢復模式,恢復模式須要從新選舉出一個新的leader,讓全部的server都恢復到一個正確的狀態。每一個Server啓動之後都詢問其它的Server它要投票給誰。對於其餘server的詢問,server每次根據本身的狀態都回複本身推薦的leader的id和上一次處理事務的zxid(系統啓動時每一個server都會推薦本身),收到全部Server回覆之後,就計算出zxid最大的哪一個Server,並將這個Server相關信息設置成下一次要投票的Server。計算這過程當中得到票數最多的的sever爲獲勝者,若是獲勝者的票數超過半數,則改server被選爲leader。不然,繼續這個過程,直到leader被選舉出來。session
數據一致性算法:paxos,請參考Zookeeper全解析——Paxos做爲靈魂 數據結構
2.使用Zookeeper架構
1 //客戶端鏈接zookeeper服務器 2 ZooKeeper zkClient = new ZooKeeper(CONNECT_STR, 50000, new Watcher() { 3 @Override 4 public void process(WatchedEvent watchedEvent) { 5 //監控服務節點變化 6 System.out.println("sssss"); 7 } 8 }); 9 10 //獲取根節點下的全部節點 11 List<String> nodeList= zkClient.getChildren("/",null); 12 13 System.out.println(nodeList.toString()); 14 15 //Stat isExists= zkClient.exists(LOCK_ROOT_PATH,null); 16 //在test父節點下建立子節點 17 String lockPath = zkClient.create("/test/why","why".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
代碼中須要注意的是若是父節點不存在,會報異常,同時父節點不能是臨時節點。
Znode:在 ZooKeeper 中,「節點"分爲兩類,第一類一樣是指構成集羣的機器,咱們稱之爲機器節點,第二類則是指數據模型中的數據單元,咱們稱之爲數據節點一ZNode。ZooKeeper 將全部數據存儲在內存中,數據模型是一棵樹(Znode Tree),由斜槓(/)的進行分割的路徑,就是一個 Znode,例如/foo/path1。每一個上都會保存本身的數據內容,同時還會保存一系列屬性信息。zookeeper有四類節點:PERSISTENT(持久的)、EPHEMERAL(暫時的)、PERSISTENT_SEQUENTIAL(持久化順序編號目錄節點)、EPHEMERAL_SEQUENTIAL(暫時化順序編號目錄節點)
Session:Session 指的是 ZooKeeper 服務器與客戶端會話。在 ZooKeeper 中,一個客戶端鏈接是指客戶端和服務器之間的一個 TCP 長鏈接。客戶端啓動的時候,首先會與服務器創建一個 TCP 鏈接,從第一次鏈接創建開始,客戶端會話的生命週期也開始了。經過這個鏈接,客戶端可以經過心跳檢測與服務器保持有效的會話,也可以向 Zookeeper 服務器發送請求並接受響應,同時還可以經過該鏈接接收來自服務器的 Watch 事件通知。Session 的 sessionTimeout 值用來設置一個客戶端會話的超時時間。當因爲服務器壓力太大、網絡故障或是客戶端主動斷開鏈接等各類緣由致使客戶端鏈接斷開時,只要在 sessionTimeout 規定的時間內可以從新鏈接上集羣中任意一臺服務器,那麼以前建立的會話仍然有效。在爲客戶端建立會話以前,服務端首先會爲每一個客戶端都分配一個 sessionID。因爲 sessionID 是 Zookeeper 會話的一個重要標識,許多與會話相關的運行機制都是基於這個 sessionID 的。所以,不管是哪臺服務器爲客戶端分配的 sessionID,都務必保證全局惟一。
Watcher:是 ZooKeeper 中的一個很重要的特性。ZooKeeper 容許用戶在指定節點上註冊一些 Watcher,而且在一些特定事件觸發的時候,ZooKeeper 服務端會將事件通知到感興趣的客戶端上去,該機制是 ZooKeeper 實現分佈式協調服務的重要特性。
Version: Zookeeper 的每一個 ZNode 上都會存儲數據,對應於每一個 ZNode,Zookeeper 都會爲其維護一個叫做 Stat 的數據結構。Stat 中記錄了這個 ZNode 的三個數據版本,分別是:version(當前節點版本)、cversion(當前節點的子節點版本)、aversion(當前節點的ACL版本)
ACL:ZooKeeper 採用 ACL(AccessControlLists)策略來進行權限控制,相似於 UNIX 文件系統的權限控制。ZooKeeper 定義了 5 種權限:CREATE/READ/WRITE/DELETE/ADMIN
3.經過zookeeper實現分佈式鎖
1 package com.why; 2 3 import org.apache.zookeeper.*; 4 import org.apache.zookeeper.data.Stat; 5 6 import java.io.IOException; 7 import java.util.Collections; 8 import java.util.List; 9 10 /* 11 * 分佈式鎖 12 * */ 13 public class DistributeLock { 14 15 private static final String LOCK_ROOT_PATH = "/test"; 16 //private static final String LOCK_NODE_NAME = "Lock"; 17 18 private static ZooKeeper _zkClient; 19 20 static { 21 try { 22 _zkClient = new ZooKeeper("192.168.6.132:2181", 500000, null); 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 28 29 public static String getLock() { 30 try { 31 32 //System.out.println(_zkClient.getChildren("/",false)); 33 34 String lockPath = _zkClient.create( "/test/why", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 35 //System.out.println(lockPath); 36 //System.out.println(_zkClient.getChildren(LOCK_ROOT_PATH,false)); 37 if (tryLock(lockPath)) 38 return lockPath; 39 else 40 return null; 41 } catch (Exception ex) { 42 ex.printStackTrace(); 43 return null; 44 } 45 } 46 47 private static boolean tryLock(String lockPath) throws KeeperException, InterruptedException { 48 List<String> lockPaths = _zkClient.getChildren(LOCK_ROOT_PATH, false); 49 Collections.sort(lockPaths); 50 int index=lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length()+1)); 51 if(index==0){ 52 //得到鎖 53 return true; 54 } 55 else{ 56 String preLockPath="/"+lockPaths.get(index-1); 57 58 Watcher watcher=new Watcher() { 59 @Override 60 public void process(WatchedEvent watchedEvent) { 61 synchronized (this){ 62 //喚醒線程 63 notifyAll(); 64 } 65 } 66 }; 67 68 Stat stat=_zkClient.exists(preLockPath,watcher); 69 70 if(stat==null){ 71 return tryLock(lockPath); 72 }else{ 73 synchronized (watcher){ 74 watcher.wait(); 75 } 76 return tryLock(lockPath); 77 } 78 79 } 80 81 } 82 83 public static void closeZkClient() throws InterruptedException { 84 _zkClient.close(); 85 } 86 87 public static void releaseLock(String lockPath) throws KeeperException, InterruptedException { 88 _zkClient.delete(lockPath,-1); 89 } 90 }
測試:
package com.why; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiThreadDemo { private static int counter = 0; public static void plus() throws InterruptedException { Thread.sleep(500); counter++; //System.out.println(counter); } public static int Count(){ return counter; } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ExecutorService executor= Executors.newCachedThreadPool(); final int num=10; for(int i=0;i<num;i++){ executor.submit(new Runnable() { @Override public void run() { try { String path = DistributeLock.getLock(); System.out.println(path); plus(); DistributeLock.releaseLock(path); System.out.println(Count()); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } }); } executor.shutdown(); } }
附ubantu下zookeeper安裝腳本
1.下載
wget http://apache.osuosl.org/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
2.解壓到指定目錄
sudo tar -xzvf zookeeper-3.4.13.tar.gz -C /usr/local
3.複製配置文件:
cd conf
sudo zoo_sample.cfg zoo.cfg
4.單機版不須要配置zoo.cfg,集羣須要配置
5.啓動
cd bin
sh zkServer.sh start
6.若是啓用遇到錯誤提示Syntax error: "(" unexpected (expecting "fi")按以下步驟解決:root@127.0.0.1:~# cd /bin/root@127.0.0.1:/bin# ls -l /bin/shlrwxrwxrwx 1 root root 4 Dec 23 22:30 /bin/sh -> dash(默認)root@127.0.0.1:/bin# ln -sf bash /bin/shroot@127.0.0.1:/bin# ls -l /bin/shlrwxrwxrwx 1 root root 4 Dec 23 22:37 /bin/sh -> bash