Curator的核心目標就是管理ZooKeeper的相關操做,將鏈接管理的複雜操做部分隱藏起來。java
例如,Curator實現瞭如下原語的菜譜:node
- 鎖(lock)
- 屏障(barrier)
- 緩存(cache)
- 還實現了流暢(fluent)式的開發風格的接口:可以將ZooKeeper 中的create、delete、getData 等操做以流水線式的編程方式鏈式執行。
- 提供了命名空間(namespace)
- 自動重連和其餘組件
- 首先、建立一個客戶端實例,實例爲 CuratorFramework 類的實力,經過調用 Curator 提供的工廠方法來得到該實例:
CuratorFramework zkc = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
- connectString:鏈接的ZooKeeper服務器的列表
- retryPolicy:指定對於失去鏈接事件重試操做的處理策略
- 注意:在工廠類中還提供了其餘方法來建立實例,其中有一個CuratorZooKeeperClient類,該類在ZooKeeper客戶端實例上提供了某些附加功能,如保證請求操做在不可預見的鏈接斷開狀況下也可以安全執行,與CuratorFramework類不一樣,CuratorZooKeeperClient類中的操做執行與ZooKeeper客戶端句柄直接相對應。
【標準 ZooKeeper API】編程
zk.create("/mypath", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
【Curator 流程 API】緩存
zkc.create().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
create 調用返回一個 CreateBuilder 類的實力,隨後調用的返回均爲 CreateBuilder 類所繼承的對象,如:安全
- CreateBuilder 繼承了 CreateModable<ACLBackgroundPathAndBytesable<String>> 類
- withMode 方法聲明瞭泛型接口 CreateModable<T>
【異步執行方法,只需增長 inBackground】服務器
zkc.create().inBackground().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
inBackground調用能夠傳入一個上下文對象,經過該參數能夠傳入一個具體的回調方法的實現,或是一個執行回調的執行器(java.util.concurrent.Executor)。在Java中,執行器(executor)對象能夠執行可運行的對象,咱們能夠經過執行器將回調方法的執行與ZooKeeper客戶端線程的運行解耦,採用執行器經常比爲每一個任務新建一個線程更好。
【設置監視點,只需調用鏈中增長 watched 方法】框架
zkc.getData().inBackground().watched().forPath("/mypath")
- 上面設置的監視點將會經過監聽器觸發通知,這些通知將會以WATCHED事件傳遞給指定的監聽器
- 還可使用usingWathcer方法替換watched方法,usingWathcer方法接受一個普通的ZooKeeper的Wathcer對象,並在接收到通知後調用該監視點方法
- 第三種選擇就是傳入一個CuratorWatcher對象,CuratorWatcher的process方法與ZooKeeper的Watcher不一樣的是,它可能會拋出異常。
一、【實現一個 CuratorListener 接口】異步
CuratorListener masterListener = new CuratorListener() { public void eventRecived(CuratorFramework client, CuratorEvent event) { try { switch(event.getType()) { case CHILDREN: ... break; case CREATE: ... break; case DELETE: ... break; case WATCHED: ... break; } } catch(Exception e) { LOG.error("Exception while processing event.", e); try { close(); } catch(IOException ioe) { LOG.error(:IOException while closing.", ioe); } } } };
二、【註冊這個監聽器】ide
client = CuratorFrameworkFactory.newClient(hostPost, retryPolicy); client.getCuratorListenable().addListener(masterListener);
三、【特殊的監聽器,負責處理後臺工做線程捕獲的異常時的錯誤報告,提供了底層細節的處理】ui
UnhandledErrorListener errorsListener = new UnhandledErrorListener() { public void UnhandleError(String message, Throwable e) { LOG.error("Unrecoverable error:", e); try { close(); } catch (IOException ioe) { LOG.warn( "Exception when closing.", ioe ); } } }
四、【將該監聽器註冊到客戶端實例中】
client.getUnhandledErrorListeable().addListener(errorsListener);
在Curator中暴露了與ZooKeeper不一樣的一組狀態,好比SUSPENDED狀態,還有Curator使用LOST來表示會話過時的狀態。
圖8-1中展現了鏈接狀態的狀態機模型,當處理狀態的轉換時,建議將全部主節點操做請求暫停,由於並不知道ZooKeeper客戶端可否在會話過時前從新鏈接,即便ZooKeeper客戶端從新鏈接成功,也可能再也不是主要主節點的角色,所以謹慎處理鏈接丟失的狀況,對應用程序更加安全。
【圖8-1:Curator鏈接狀態機模型】
【READ_ONLY狀態】:當ZooKeeper集羣啓用了只讀模式,客戶端所鏈接的服務器就會進入只讀模式中,此時的鏈接狀態也將進入只讀模式。服務器轉換到只讀模式後,該服務器就會因隔離問題而沒法與其餘服務器共同造成仲裁的最低法定數量,當鏈接狀態爲制度模式,客戶端也將漏掉此時發生的任何更新操做,由於若是集羣中存在一個子集的服務器數量,能夠知足仲裁最低法定數量,並能夠接收到客戶端的對ZooKeeper的更新操做,仍是會發生ZooKeeper的更新,也許這個子集的服務器會持續運行好久(ZooKeeper沒法控制這種狀況),那麼漏掉的更新操做可能會無限多。漏掉更新操做的結果可能會致使應用程序的不正確的操做行爲,因此,強烈建議啓用該模式前仔細考慮其後果。
有兩種有趣的錯誤場景,在Curator中均可以處理得很好,第一種是在有序節點的建立過程當中發生的錯誤狀況的處理,第二種爲刪除一個節點時的錯誤處理。
- 【有序節點的狀況】
若是客戶端所鏈接的服務器崩潰了,但還沒來得及返回客戶端所建立的有序節點的節點名稱(即節點序列號),或者客戶端只是鏈接丟失,客戶端沒接收到所請求操做的響應信息,結果,客戶端並不知道所建立的znode節點路徑名稱。回憶對於有序節點的應用場景,例如,創建一個有序的全部客戶端列表。爲了解決這個問題,CreateBuilder提供了一個 withProtection 方法來通知Curator客戶端,在建立的有序節點前添加一個惟一標識符,若是create操做失敗了,客戶端就會開始重試操做,而重試操做的一個步驟就是驗證是否存在一個節點包含這個惟一標識符。- 【刪除節點的保障】
在進行delete操做時也可能發生相似狀況,若是客戶端在執行delete操做時,與服務器之間的鏈接丟失,客戶端並不知道delete操做是否成功執行。若是一個znode節點刪除與否表示某些特殊狀況,例如,表示一個資源處於鎖定狀態,所以確保該節點刪除才能確保資源的鎖定被釋放,以即可以再次使用。Curator客戶端中提供了一個方法,對應用程序的delete操做的執行提供了保障,Curator客戶端會從新執行操做,直到成功爲止,或Curator客戶端實例不可用時。使用該功能,只須要使用DeleteBuilder接口中定義的 guaranteed 方法。
【建立一個LeaderLatch的實例】
leaderLatch = new LeaderLatch(client, "/master", myId);
- 傳入一個Curator框架客戶端的實例
- 一個用於表示集羣管理節點的羣組的ZooKeeper路徑
- 以及一個表示當前主節點的標識符
【註冊一個LeaderLatchListener接口的實現,該接口中有兩個方法:isLeader和notLeader。如下爲isLeader實現的代碼:】
@Override public void isLeader() { ... /* * Start workersCache ① */ workersCache.getListeable().addListener(workersCacheListener); workersCache.start(); (new RecoveredAssignments( client.getZooKeeperClient().getZooKeeper())).recover( new RecoveryCallback() { public void recoveryComplete(int rc, List<String> tasks) { try { if(rc == RecoveryCallback.FAILED) { LOG.warn("Recovery of assigned tasks failed."); } else { LOG.info( "Assigning recovered tasks" ); recoveryLatch = new CountDownLatch(tasks.size()); assignTasks(tasks); ② } new Thread( new Runnable() {③ public void run() { try { /* * Wait until recovery is complete */ recoveryLatch.await(); /* * Start tasks cache */ tasksCache.getListenable(). addListener(tasksCacheListener); ④ tasksCache.start(); } catch (Exception e) { LOG.warn("Exception while assigning and getting tasks.", e ); } } }).start(); } catch(Exception e) { LOG.error("Exception while executing the recovery callback", e); } } }); } /* ① 首先初始化一個從節點緩存列表的實例,以確保有能夠分配任務的從節點。 * ② 一旦發現存在以前的主節點沒有分配完的任務須要分配,將繼續進行任務分配。 * ③ 實現了一個任務分配的屏障,這樣就能夠在開始分配新任務前,等待已恢復的任務的分配完成,若是不這樣作,新的主節點會再次分配全部已恢復的任務。啓動了一個單獨的線程進行處理,以便不會鎖住ZooKeeper客戶端回調線程的運行。 * ④ 當主節點完成恢復任務的分配操做,開始進行新任務的分配操做。 */
【須要在具體流程開始前註冊監聽器。在runForMaster方法中進行這兩步操做,同時,還將註冊另外兩個監聽器,來處理事件的監聽和錯誤:】
pubic void runForMaster() { client.getCuratorListenable().adddListener(masterLisener); client.getUnhandledErrorListenable().addListener(errorsListener); leaderLatch.start(); }
**【對於notLeader方法,會在主節點失去管理權時進行調用,在本例中,只是簡單地關閉了全部對象實例,對這個例子來講,這些操
做已經足夠了。在實際的應用程序中,也許還須要進行某些狀態的清理操做並等待再次成爲主節點。若是LeaderLatch對象沒有關閉,Curator客戶端有可能再次得到管理權】**
選舉主節點時還可使用的另外一個菜譜爲LeaderSelector。
LeaderSelector和LeaderLatch之間主要區別在於使用的監聽器接口不一樣:
- LeaderSelector使用了LeaderSelectorListener接口,該接口中定義了takeLeadership方法,並繼承了stateChanged方法,能夠在應用程序中使用羣首閂原語來進行一個主節點的選舉操做
【首先須要建立一個LeaderSelector實例】
leaderSelector = new LeaderSelector(client, "/master", this);
【takeLeadership方法用於獲取管理權,該代碼實現與isLeader相似】
CountDownLatch leaderLatch = new CountDownLatch(1); CountDownLatch closeLatch = new CountDownLatch(1); @Override public void takeLeadership(CuratorFramework client) throws Exception { ... /* * Start workersCache */ workersCache.getListenable().addListener(workersCacheListener); workersCache.start(); (new RecoveredAssignments( client.getZooKeeperClient().getZooKeeper())).recover( new RecoveryCallback() { public void recoveryComplete (int rc, List<String> tasks) { try { if(rc == RecoveryCallback.FAILED) { LOG.warn("Recovery of assigned tasks failed."); } else { LOG.info( "Assigning recovered tasks" ); recoveryLatch = new CountDownLatch(tasks.size()); assignTasks(tasks); } new Thread( new Runnable() { public void run() { try { /* * Wait until recovery is complete */ recoveryLatch.await(); /* * Start tasks cache */ tasksCache.getListenable(). addListener(tasksCacheListener); tasksCache.start(); } catch (Exception e) { LOG.warn("Exception while assigning and getting tasks.", e); } } }).start(); /* * Decrement latch */ leaderLatch.countDown(); ① } catch (Exception e) { LOG.error("Exception while executing the recovery callback", e); } } }); /* * This latch is to prevent this call from exiting. If we exit, then * we release mastership. */ closeLatch.await(); ② } /* ① 經過一個單獨的CountDownLatch原語來等待該Curator客戶端獲取管理權。 * ② 若是主節點退出了takeLeadership方法,也就放棄了管理權,經過CountDownLatch來阻止退出該方法,直到主節點關閉爲止。 */
實現的這個方法爲CuratorMaster類的一部分,而CuratorMaster類實現了LeaderSelectorListener接口。對於主節點來講,若是想要釋放管理權只能退出takeLeadership方法,因此須要經過某些鎖等機制來阻止該方法的退出,在實現中,在退出主節點時經過遞減閂(latch)值來實現。
【依然在runForMaster方法中啓動咱們的主節點選擇器】
public void runForMaster() { client.getCuratorListenable().addListener(masterListener); client.getUnhandledErrorListenable().addListener(errorsListener); leaderSelector.setId(myId); leaderSelector.start(); }
另外還須要給這個主節點一個任意的標識符,雖然在本例中並未實現,但能夠設置羣首選擇器在失去管理權後自動從新排隊(LeaderSelector.autoRequeue)。從新排隊意味着該客戶端會一直嘗試獲取管理權,並在得到管理權後執行takeLeadership方法。
【做爲LeaderSelectorListener接口實現的一部分,還實現了一個處理鏈接狀態變化的方法:】
@Override public void stateChanged(CuratorFramework client, ConnectionState newState) { switch(newState) { case CONNECTED: // Nothing to do in this case. break; case RECONNECTED: // Reconnected, so I should ① // still be the leader. break; case SUSPENDED: LOG.warn("Session suspended"); break; case LOST: try { close(); ② } catch (IOException e) { LOG.warn( "Exception while closing", e ); } break; case READ_ONLY: // We ignore this case. break; } } /* ① 全部操做均須要經過ZooKeeper集羣實現,所以,若是鏈接丟失,主節點也就沒法先進行任何操做請求,所以在這裏最好什麼都不作。 * ② 若是會話丟失,只是關閉這個主節點程序。 */
最後一個菜譜是子節點緩存器(PathChildrenCached類)
爲了處理每個緩存器實例的變化狀況,須要一個 PathChildrenCacheListener 接口的實現類,該接口中只有一個方法
childEvent 。對於從節點信息的列表,只關心從節點離開的狀況,由於須要從新分配已經分給這些節點的任務,而列表中添加信息對於分配新任務更加劇要:
PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) { /* * Obtain just the worker's name */ try { getAbsentWorkerTasks(event.getData().getPath().replaceFirst("/workers/", "")); } catch (Exception e) { LOG.error("Exception while trying to re-assign tasks.", e); } } } };
對於任務列表,經過列表增長的狀況來觸發任務分配的過程:
PathChildrenCacheListener tasksCacheListener = new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) { try { assignTask(event.getData().getPath().replaceFirst("/tasks/","")); } catch (Exception e) { LOG.error("Exception when assigning task.", e); } } } };