關於zookeeper的原理解析,能夠參見zookeeper核心原理詳解,本文所述大多數實踐基於對zookeeper原理的首先理解。html
Curator是Netflix公司開源的一個Zookeeper客戶端,目前是apache頂級項目。與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量,至關於netty之於socket編程。提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架。官網爲http://curator.apache.org/java
除此以外,Curator中還提供了Zookeeper各類應用場景(Recipe,如共享鎖服務、Master選舉機制和分佈式計算器等)的抽象封裝。因此說啊,不論是作底層庫仍是應用,用戶體驗真的很重要。node
Zookeeper的官方客戶端提供了基本的操做,好比,建立會話、建立節點、讀取節點、更新數據、刪除節點和檢查節點是否存在等。但對於開發人員來講,Zookeeper提供的基本操縱仍是有一些不足之處。典型的缺點爲:git
(1)Zookeeper的Watcher是一次性的,每次觸發以後都須要從新進行註冊;
(2)Session超時以後沒有實現重連機制;
(3)異常處理繁瑣,Zookeeper提供了不少異常,對於開發人員來講可能根本不知道該如何處理這些異常信息;
(4)只提供了簡單的byte[]數組的接口,沒有提供針對對象級別的序列化;
(5)建立節點時若是節點存在拋出異常,須要自行檢查節點是否存在;
(6)刪除節點沒法實現級聯刪除;github
所以,產生了兩款主流的三方zk客戶端,ZkClient和Curator。第一個主流的三方zk客戶端是ZkClient,由Datameer的工程師開發,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。像dubbo等框架對其也進行了集成使用。redis
雖然ZkClient對原生API進行了封裝,但也有它自身的不足之處:apache
注:除此以外,不少依賴zookeeper的中間件或大數據組件都配備了與之相適應的zookeeper客戶端,例如hbase、hadoop、fabric8等。編程
所以,除了早期集成外,目前新的框架和系統不多使用ZkClient,所以本文詳細解析curator。若是讀者對zkclient感興趣,能夠參考https://www.jianshu.com/p/d6de2d21d744去,其官網爲https://github.com/sgroschupf/zkclient,已經基本不活躍了、更新極少且star不過千。api
Curator的Maven依賴以下:通常直接使用curator-recipes就好了,若是須要本身封裝一些底層些的功能的話,例如增長鏈接管理重試機制等,則能夠引入curator-framework包。client是低級api。數組
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId>
<!--All of the recipes listed on the ZooKeeper recipes doc (except two phase commit).--> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId>
<!-- High-level API that greatly simplifies using ZooKeeper. --> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId>
<!-- Low-level API --> <version>${apache-curator.version}</version> </dependency>
最新版本能夠從https://mvnrepository.com/artifact/org.apache.curator/curator-client查閱,不過須要注意的是,curator和zookeeper自己的依賴(尤爲是zookeeper 3.4和3.5不兼容,致使的客戶端也是不同)對應關係。目前絕大多數使用2.x的版本。
利用Curator提供的客戶端API,能夠徹底實如今zkCli.sh原生客戶端的各類功能。值得注意的是,Curator採用流式風格API。準確的說是相似JPA化。因爲針對zk/redis等的操做都至關簡單,所以這種模式在這種場景下是比較合適的。以下:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; /** * Curator framework's client test. * Output: * $ create /zktest hello * $ ls / * [zktest, zookeeper] * $ get /zktest * hello * $ set /zktest world * $ get /zktest * world * $ delete /zktest * $ ls / * [zookeeper] */ public class CuratorClientTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "10.20.30.17:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Client API test // 2.1 Create node String data1 = "hello"; print("create", ZK_PATH, data1); client.create(). creatingParentsIfNeeded(). forPath(ZK_PATH, data1.getBytes()); // 2.2 Get node and data print("ls", "/"); print(client.getChildren().forPath("/")); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.3 Modify data String data2 = "world"; print("set", ZK_PATH, data2); client.setData().forPath(ZK_PATH, data2.getBytes()); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.4 Remove node print("delete", ZK_PATH); client.delete().forPath(ZK_PATH); print("ls", "/"); print(client.getChildren().forPath("/")); } private static void print(String... cmds) { StringBuilder text = new StringBuilder("$ "); for (String cmd : cmds) { text.append(cmd).append(" "); } System.out.println(text.toString()); } private static void print(Object result) { System.out.println( result instanceof byte[] ? new String((byte[]) result) : result); } }
詳細的CuratorFramework功能及使用說明可參見https://curator.apache.org/curator-framework/index.html。
Curator提供了三種Watcher(Cache)來監聽結點的變化:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.RetryNTimes; /** * Curator framework watch test. */ public class CuratorWatcherTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Register watcher PathChildrenCache watcher = new PathChildrenCache( client, ZK_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); } }); watcher.start(StartMode.BUILD_INITIAL_CACHE); System.out.println("Register zk watcher successfully!"); Thread.sleep(Integer.MAX_VALUE); } }
輸出以下:
Java: zk client start successfully! Java: Register zk watcher successfully! zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121] zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121] zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]
下列兩個系列稱爲Recipe(專題,關於這個recipe應該如何翻譯,LZ作了研究,直譯是菜譜,確定不對,也有叫作攻略的,貌似也不正確,因此叫專題可能確實更合適),完整的curator recipe實現可參見https://curator.apache.org/curator-recipes/index.html。
通常咱們稱分佈式鎖的時候,指的是短時的分佈式鎖,所以通常採用redis實現,而zk下的稱之爲分佈式協調更合理,由於它一般時間更長。好比分佈式編程時,好比最容易碰到的狀況就是應用程序在線上多機部署,因而當多個應用同時訪問某一資源時,就須要某種機制去協調它們。例如,如今一臺應用正在rebuild緩存內容,要臨時鎖住某個區域暫時不讓訪問;又好比調度程序每次只想一個任務被一臺應用執行等等。大多數的分佈式協調採用臨時節點+watch機制實現。除了直接採用原始的監聽器本身實現外,curator實現了分佈式的IPM(進程間鎖)。Curator的機制爲:使用咱們提供的lock路徑的結點做爲全局鎖,這個結點的數據相似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次得到鎖時會生成這種串,釋放鎖時清空數據。因爲內部採用zookeeper的臨時順序節點特性,一旦客戶端失去鏈接後,則就會自動清除該節點,redis則只能等待超時。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; /** * Curator framework's distributed lock test. */ public class CuratorDistrLockTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
固然實際中會更加複雜,好比只是某些接口須要全局單點,可是服務的粒度又沒有拆分到獨立的微服務。另外,客戶端宕機後鎖是否自動釋放也是要考慮的,不然其餘節點就沒法接管。InterProcessMutex的實現分析能夠參考:https://www.jianshu.com/p/5fa6a1464076
在分佈式系統中,很多系統也採用和zk自己同樣的leader/follower架構,所以存在leader選舉的問題,例如es/kafka(注:在通常分佈式系統中,並不會使用到該特性)。curator就包含了對應的解決方法。Curator提供了LeaderSelector監聽器實現Leader選舉功能。同一時刻,只有一個Listener會進入takeLeadership()方法,說明它是當前的Leader。注意:當Listener從takeLeadership()退出時就說明它放棄了「Leader身份」,這時Curator會利用Zookeeper再從剩餘的Listener中選出一個新的Leader。autoRequeue()方法使放棄Leadership的Listener有機會從新得到Leadership,若是不設置的話放棄了的Listener是不會再變成Leader的。
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; /** * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */ public class CuratorLeaderTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }