zookeeper中的節點能夠持久化/有序的兩個維度分爲四種類型:java
PERSIST:持久化無序(保存在磁盤中)node
PERSIST_SEQUENTIAL:持久化有序遞增apache
EPHEMERAL:非持久化的無序的,保存在內存中,當客戶端關閉後消失。api
EPHEMERAL_SEQUENTIAL:非持久有序遞增,保存在內存中,當客戶端關閉後消失函數
每一個節點均可以註冊Watch操做,用於監聽節點的變化,有四種事件類型以下:this
Created event: Enabled with a call to exists編碼
Deleted event: Enabled with a call to exists, getData, and getChildrenspa
Changed event: Enabled with a call to exists and getDatacode
Child event: Enabled with a call to getChildrenxml
Watch的基本特徵是客戶端先獲得通知,而後才能獲得數據,Watch被fire以後就當即取消了,不會再有Watch後續變化,想要監聽只能從新註冊;
使用原生Zookeeper建立節點和監聽節點變化代碼以下:
1. 引入依賴,pom.xml
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency>
2. 客戶端鏈接類
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //建立連接,並監聽鏈接狀態 ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("連接客戶端"); System.out.println(watchedEvent.getState()); } }); //建立節點,/parent:節點路徑, data.xx:數據,Ids:設置權限CreateNode.PERSISTENT:建立節點類型 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //監聽節點變化 zooKeeper.exists("/testRoot", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); } }); System.out.println(parent); Thread.sleep(10000000); } }
運行建立一個持久化的節點。
查看客戶端能夠看到:
parent節點建立成功。
刪除parent節點,觀察watche變化。
控制檯打印:
表示監聽了刪除節點事件,此時再在客戶端手動建立節點,觀察變化
控制檯並無打印任何建立信息,說明沒有監聽到,這就是咱們說的一旦watche被fire以後就會被關閉,此時改造一下代碼:
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //建立連接,並監聽鏈接狀態 final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("連接客戶端"); System.out.println(watchedEvent.getState()); } }); //建立節點 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //監聽節點變化 zooKeeper.exists("/parent", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); try { //從新註冊監聽事件 zooKeeper.exists("/parent", this); } catch (KeeperException e) { } catch (InterruptedException e) { e.printStackTrace(); } } }); // System.out.println(newNode); Thread.sleep(10000000); } }
刪除節點,再手動建立節點:
控制檯打印以下:
這樣建立節點的事件就又被從新註冊並監聽到了。
1. 搶注Leader節點——非公平模式
編碼流程:
1. 建立Leader父節點,如/chroot,並將其設置爲persist節點
2. 各客戶端經過在/chroot下建立Leader節點,如/chroot/leader,來競爭Leader。該節點應被設置爲ephemeral
3. 若某建立Leader節點成功,則該客戶端成功競選爲Leader
4. 若建立Leader節點失敗,則競選Leader失敗,在/chroot/leader節點上註冊exist的watch,一旦該節點被刪除則得到通知
5. Leader可經過刪除Leader節點來放棄Leader
6. 若是Leader宕機,因爲Leader節點被設置爲ephemeral,Leader節點會自行刪除。而其它節點因爲在Leader節點上註冊了watch,故可獲得通知,參與下一輪競選,從而保證總有客戶端以Leader角色工做。
實現代碼以下:
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //建立連接,並監聽鏈接狀態 final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("連接客戶端"); System.out.println(watchedEvent.getState()); } }); //建立節點 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //監聽節點變化 zooKeeper.exists("/parent", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); try { zooKeeper.exists("/parent", this); } catch (KeeperException e) { } catch (InterruptedException e) { e.printStackTrace(); } } }); String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); // System.out.println(newNode); Thread.sleep(10000000); } }
當存在節點以後,會拋出異常,這樣就會致使節點建立不成功,因此只有建立成功的node才能成爲leader。使用watcher監聽能夠在節點被刪除或宕機以後來搶佔leader.
2. 先到先得,後者監視前者——公平模式
1. 建立Leader父節點,如/chroot,並將其設置爲persist節點
2. 各客戶端經過在/chroot下建立Leader節點,如/chroot/leader,來競爭Leader。該節點應被設置爲ephemeral_sequential
3. 客戶端經過getChildren方法獲取/chroot/下全部子節點,若是其註冊的節點的id在全部子節點中最小,則當前客戶端競選Leader成功
4. 不然,在前面一個節點上註冊watch,一旦前者被刪除,則它獲得通知,返回step 3(並不能直接認爲本身成爲新Leader,由於可能前面的節點只是宕機了)
5. Leader節點可經過自行刪除本身建立的節點以放棄Leader
代碼實現以下:
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //建立連接,並監聽鏈接狀態 final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("連接客戶端"); System.out.println(watchedEvent.getState()); } }); //建立節點 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //監聽節點變化 zooKeeper.exists("/parent", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); try { zooKeeper.exists("/parent", this); } catch (KeeperException e) { } catch (InterruptedException e) { e.printStackTrace(); } } }); String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); // System.out.println(newNode); Thread.sleep(10000000); } }
能夠看到zk中的parent下多出了三個節點:
默認以node+十個十進制數命名節點名稱,數據遞增。
當id在全部子節點中最小,選舉成爲leader.
手下引入Curator依賴,pom.xml以下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>3.2.1</version> </dependency>
1. Curator LeaderLatch特色及api的做用:
1. 競選爲Leader後,不可自行放棄領導權
2. 只能經過close方法放棄領導權
3. 強烈建議增長ConnectionStateListener,當鏈接SUSPENDED或者LOST時視爲丟失領導權
4. 可經過await方法等待成功獲取領導權,並可加入timeout
5. 可經過hasLeadership方法判斷是否爲Leader
6. 可經過getLeader方法獲取當前Leader
7. 可經過getParticipants方法獲取當前競選Leader的參與方
簡單實現:
package com.wangx.kafka.zk; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLeaderLatch { public static void main(String[] args) throws Exception { //設置重試策略,這裏是沉睡一秒後開始重試,重試五次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5); //經過工廠類獲取curatorFramework CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy); //leader節點建立 LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,"/parent","node"); //監聽leader節點 leaderLatch.addListener(new LeaderLatchListener() { //當前節點是leader時回調 public void isLeader() { System.out.println("I am a listener"); } //再也不是leader時回調 public void notLeader() { System.out.println("I am not a listener"); } }); //啓動 curatorFramework.start(); leaderLatch.start(); Thread.sleep(100000000); leaderLatch.close(); curatorFramework.close(); } }
2. Curator LeaderSelector特色及api的做用:
1. 競選Leader成功後回調takeLeadership方法
2. 可在takeLeadership方法中實現業務邏輯
3. 一旦takeLeadership方法返回,即視爲放棄領導權
4. 可經過autoRequeue方法循環獲取領導權
5. 可經過hasLeadership方法判斷是否爲Leader
6. 可經過getLeader方法獲取當前Leader
7. 可經過getParticipants方法獲取當前競選Leader的參與方
簡單實現:
package com.wangx.kafka.zk; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.*; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLeaderSelector { public static void main(String[] args) throws Exception { //設置重試策略,這裏是沉睡一秒後開始重試,重試五次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5); //經過工廠類獲取curatorFramework CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy); //leader節點建立,監聽Leader狀態,並在takeLeadership回調函數中作本身的業務邏輯 LeaderSelector leaderSelector = new LeaderSelector(curatorFramework,"/node", new LeaderSelectorListenerAdapter() { public void takeLeadership(CuratorFramework curatorFramework) throws Exception { Thread.sleep(1000); System.out.println("啓動了 takeLeadership"); } }); leaderSelector.autoRequeue(); leaderSelector.start(); //啓動 curatorFramework.start(); Thread.sleep(100000000); leaderSelector.close(); curatorFramework.close(); } }
這裏的LeaderSelectorListenerAdapter實現了LeaderSelectorListener接口,源碼以下:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.curator.framework.recipes.leader; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener { public LeaderSelectorListenerAdapter() { } //當鏈接失敗時,會拋出異常,這樣就會中斷takeLeadership方法,防止業務邏輯錯誤操做 public void stateChanged(CuratorFramework client, ConnectionState newState) { if (client.getConnectionStateErrorPolicy().isErrorState(newState)) { throw new CancelLeadershipException(); } } }
1. Kafka「各自爲政」Leader Election
每一個Partition的多個Replica同時競爭Leader,這樣作的好處是實現起來比較簡單,可是一樣出現的問題的就是Herd Effect(可能會有不少的leader節點),Zookeeper負載太重,Latency較大(可能會產生不少其餘的問題)
2. Kafka基於Controller的Leader Election
原理是在整個集羣中選舉出一個Broker做爲Controller,Controller爲全部Topic的全部Partition指定Leader及Follower,Kafka經過在zookeeper上建立/controller臨時節點來實現leader選舉,並在該節點中寫入當前broker的信息 {「version」:1,」brokerid」:1,」timestamp」:」1512018424988」}
利用Zookeeper的強一致性特性,一個節點只能被一個客戶端建立成功,建立成功的broker即爲leader,即先到先得原則,leader也就是集羣中的controller,負責集羣中全部大小事務。 當leader和zookeeper失去鏈接時,臨時節點會刪除,而其餘broker會監聽該節點的變化,當節點刪除時,其餘broker會收到事件通知,從新發起leader選舉。
這樣作極大緩解Herd Effect問題,減輕Zookeeper負載,Controller與Leader及Follower間經過RPC通訊,高效且實時,可是因爲引入Controller增長了複雜度,同時須要考慮Controller的Failover(容錯)