在不少時候,咱們均可以在各類框架應用中看到ZooKeeper的身影,好比Kafka中間件,Dubbo框架,Hadoop等等。爲何處處都看到ZooKeeper?html
ZooKeeper是一個分佈式服務協調框架,提供了分佈式數據一致性的解決方案,基於ZooKeeper的數據結構,Watcher,選舉機制等特色,能夠實現數據的發佈/訂閱,軟負載均衡,命名服務,統一配置管理,分佈式鎖,集羣管理等等。java
ZooKeeper能保證:node
ZooKeeper的數據結構和Unix文件系統很相似,整體上能夠看作是一棵樹,每個節點稱之爲一個ZNode,每個ZNode默認能存儲1M的數據。每個ZNode可經過惟一的路徑標識。以下圖所示:spring
建立ZNode時,能夠指定如下四種類型,包括:apache
Watcher是基於觀察者模式實現的一種機制。若是咱們須要實現當某個ZNode節點發生變化時收到通知,就可使用Watcher監聽器。服務器
客戶端經過設置監視點(watcher)向 ZooKeeper 註冊須要接收通知的 znode,在 znode 發生變化時 ZooKeeper 就會向客戶端發送消息。session
這種通知機制是一次性的。一旦watcher被觸發,ZooKeeper就會從相應的存儲中刪除。若是須要不斷監聽ZNode的變化,能夠在收到通知後再設置新的watcher註冊到ZooKeeper。數據結構
監視點的類型有不少,如監控ZNode數據變化、監控ZNode子節點變化、監控ZNode 建立或刪除。負載均衡
ZooKeeper是一個高可用的應用框架,由於ZooKeeper是支持集羣的。ZooKeeper在集羣狀態下,配置文件是不會指定Master和Slave,而是在ZooKeeper服務器初始化時就在內部進行選舉,產生一臺作爲Leader,多臺作爲Follower,而且遵照半數可用原則。框架
因爲遵照半數可用原則,因此5臺服務器和6臺服務器,實際上最大容許宕機數量都是3臺,因此爲了節約成本,集羣的服務器數量通常設置爲奇數。
若是在運行時,若是長時間沒法和Leader保持鏈接的話,則會再次進行選舉,產生新的Leader,以保證服務的可用。
首先在官網下載ZooKeeper,我這裏用的是3.3.6版本。
而後解壓,複製一下/conf目錄下的zoo_sample.cfg文件,重命名爲zoo.cfg。
修改zoo.cfg中dataDir的值,並建立對應的目錄:
最後到/bin目錄下啓動,我用的是window系統,因此啓動zkServer.cmd,雙擊便可:
啓動成功的話就能夠看到這個對話框:
可視化界面的話,我推薦使用ZooInspector
,操做比較簡便:
首先引入Maven依賴:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
複製代碼
接着咱們寫一個Main方法,進行操做:
//鏈接地址及端口號
private static final String SERVER_HOST = "127.0.0.1:2181";
//會話超時時間
private static final int SESSION_TIME_OUT = 2000;
public static void main(String[] args) throws Exception {
//參數一:服務端地址及端口號
//參數二:超時時間
//參數三:監聽器
ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//獲取事件的狀態
Event.KeeperState state = watchedEvent.getState();
//判斷是不是鏈接事件
if (Event.KeeperState.SyncConnected == state) {
Event.EventType type = watchedEvent.getType();
if (Event.EventType.None == type) {
System.out.println("zk客戶端已鏈接...");
}
}
}
});
zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("新增ZNode成功");
zooKeeper.close();
}
複製代碼
建立一個持久性ZNode,路徑是/java,值爲"Hello World":
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) 複製代碼
參數解釋:
ACL權限控制,有三個是ZooKeeper定義的經常使用權限,在ZooDefs.Ids類中:
/** * This is a completely open ACL. * 徹底開放的ACL,任何鏈接的客戶端均可以操做該屬性znode */
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));
/** * This ACL gives the creators authentication id's all permissions. * 只有建立者纔有ACL權限 */
public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));
/** * This ACL gives the world the ability to read. * 只能讀取ACL */
public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
複製代碼
createMode就是前面講過的四種ZNode類型:
public enum CreateMode {
/** * 持久性ZNode */
PERSISTENT (0, false, false),
/** * 持久性自動增長順序號ZNode */
PERSISTENT_SEQUENTIAL (2, false, true),
/** * 臨時性ZNode */
EPHEMERAL (1, true, false),
/** * 臨時性自動增長順序號ZNode */
EPHEMERAL_SEQUENTIAL (3, true, true);
}
複製代碼
//同步獲取節點數據
public byte[] getData(String path, boolean watch, Stat stat){
...
}
//異步獲取節點數據
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx){
...
}
複製代碼
同步getData()方法中的stat參數是用於接收返回的節點描述信息:
public byte[] getData(final String path, Watcher watcher, Stat stat){
//省略...
GetDataResponse response = new GetDataResponse();
//發送請求到ZooKeeper服務器,獲取到response
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (stat != null) {
//把response的Stat賦值到傳入的stat中
DataTree.copyStat(response.getStat(), stat);
}
}
複製代碼
使用同步getData()獲取數據:
//數據的描述信息,包括版本號,ACL權限,子節點信息等等
Stat stat = new Stat();
//返回結果是byte[]數據,getData()方法底層會把描述信息複製到stat對象中
byte[] bytes = zooKeeper.getData("/java", false, stat);
//打印結果
System.out.println("ZNode的數據data:" + new String(bytes));//Hello World
System.out.println("獲取到dataVersion版本號:" + stat.getVersion());//默認數據版本號是0
複製代碼
public Stat setData(final String path, byte data[], int version){
...
}
複製代碼
值得注意的是第三個參數version,使用CAS機制,這是爲了防止多個客戶端同時更新節點數據,因此須要在更新時傳入版本號,每次更新都會使版本號+1,若是服務端接收到版本號,對比發現不一致的話,則會拋出異常。
因此,在更新前須要先查詢獲取到版本號,不然你不知道當前版本號是多少,就無法更新:
//獲取節點描述信息
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
System.out.println("更新ZNode數據...");
//更新操做,傳入路徑,更新值,版本號三個參數,返回結果是新的描述信息
Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
System.out.println("更新後的版本號爲:" + setData.getVersion());//更新後的版本號爲:1
複製代碼
更新後,版本號增長了:
若是傳入的版本參數是"-1",就是告訴zookeeper服務器,客戶端須要基於數據的最新版本進行更新操做。可是-1並非一個合法的版本號,而是一個標識符。
public void delete(final String path, int version){
...
}
複製代碼
這裏也須要傳入版本號,調用getData()方法便可獲取到版本號,很簡單:
Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
//刪除ZNode
zooKeeper.delete("/java", stat.getVersion());
複製代碼
在上面第三點提到,ZooKeeper是可使用通知監聽機制,當ZNode發生變化會收到通知消息,進行處理。基於watcher機制,ZooKeeper能玩出不少花樣。怎麼使用?
ZooKeeper的通知監聽機制,總的來講能夠分爲三個過程:
①客戶端註冊 Watcher ②服務器處理 Watcher ③客戶端回調 Watcher客戶端。
註冊 watcher 有 4 種方法,new ZooKeeper()、getData()、exists()、getChildren()。下面演示一下使用exists()方法註冊watcher:
首先須要實現Watcher接口,新建一個監聽器:
public class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
//獲取事件類型
Event.EventType eventType = event.getType();
//通知狀態
Event.KeeperState eventState = event.getState();
//節點路徑
String eventPath = event.getPath();
System.out.println("監聽到的事件類型:" + eventType.name());
System.out.println("監聽到的通知狀態:" + eventState.name());
System.out.println("監聽到的ZNode路徑:" + eventPath);
}
}
複製代碼
而後調用exists()方法,註冊監聽器:
zooKeeper.exists("/java", new MyWatcher());
//對ZNode進行更新數據的操做,觸發監聽器
zooKeeper.setData("/java", "fly".getBytes(), -1);
複製代碼
而後在控制檯就能夠看到打印的信息:
這裏咱們能夠看到Event.EventType對象就是事件類型,咱們能夠對事件類型進行判斷,再配合Event.KeeperState通知狀態,作相關的業務處理,事件類型有哪些?
打開EventType、KeeperState的源碼查看:
//事件類型是一個枚舉
public enum EventType {
None (-1),//無
NodeCreated (1),//Watcher監聽的數據節點被建立
NodeDeleted (2),//Watcher監聽的數據節點被刪除
NodeDataChanged (3),//Watcher監聽的數據節點內容發生變動
NodeChildrenChanged (4);//Watcher監聽的數據節點的子節點列表發生變動
}
//通知狀態也是一個枚舉
public enum KeeperState {
Unknown (-1),//屬性過時
Disconnected (0),//客戶端與服務端斷開鏈接
NoSyncConnected (1),//屬性過時
SyncConnected (3),//客戶端與服務端正常鏈接
AuthFailed (4),//身份認證失敗
ConnectedReadOnly (5),//返回這個狀態給客戶端,客戶端只能處理讀請求
SaslAuthenticated(6),//服務器採用SASL作校驗時
Expired (-112);//會話session失效
}
複製代碼
zooKeeper.exists("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是exists()方法的監聽器");
}
});
//對ZNode進行更新數據的操做,觸發監聽器
zooKeeper.setData("/java", "fly".getBytes(), -1);
//企圖第二次觸發監聽器
zooKeeper.setData("/java", "spring".getBytes(), -1);
複製代碼
zooKeeper.exists("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是exists()方法的監聽器");
}
});
Stat stat = new Stat();
zooKeeper.getData("/java", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是getData()方法的監聽器");
}
}, stat);
//對ZNode進行更新數據的操做,觸發監聽器
zooKeeper.setData("/java", "fly".getBytes(), stat.getVersion());
複製代碼
打印結果,說明先調用exists()方法的監聽器,再調用getData()方法的監聽器。由於exists()方法先註冊了。
我記得B站的UP主李永樂說過,只有你讓更多的人生活變得更美好時,本身的生活才能變得更美好。
這句話也是我今年開始寫技術分享的一個動力源泉,但願這篇文章對你有用~
著名的飛行家馬老師說過:回城是收費的,而點贊是免費的~