一、鏈接的建立是異步的,須要開發人員自行編碼實現等待java
二、鏈接沒有自動的超時重連機制apache
三、ZK自己不提供序列化機制,須要開發人員自行指定,從而實現數據的序列化和反序列化服務器
四、Watcher註冊一次只會生效一次,須要不斷的重複註冊session
五、Watcher的使用方式不符合java自己的術語,若是採用監聽方式,更容易理解dom
六、不支持遞歸建立樹形節點異步
Apache基金會得頂級項目之一分佈式
1 、解決session會話超時重連源碼分析
二、watcher反覆註冊ui
三、簡化開發APIthis
四、遵循Fluent風格Api規範
五、NodeExistsException異常處理
六、共享鎖服務,master選舉 , 分佈式計數器
一、使用CuratorFrameworkFactory工廠的兩個靜態方法建立客戶端
public class CuratorClientDemo { private CuratorFramework client = null; /** * * public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) { * return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy); * } * * public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { * return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build(); * } * * connectStrng 逗號分開的ip:port * retryPolicy 重試策略,默認四種: Exponential BackoffRetry, RetryNtime, RetryOneTime, RetryUntilElapsed * sessionTimeoutMs 會哈超時時間,單位爲毫秒,默認60000ms * connectionTimeoutMs 鏈接建立超時時間,單位爲毫秒,默認是15000ms * */ public CuratorClientDemo() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder().connectString("localhost:2181") .sessionTimeoutMs(1).retryPolicy(retryPolicy).build(); client.start(); } }
一、實現接口RetryPolicy能夠自定義重試策略
public static void main(String[] args) { /** * retryCount : 已經重試次數,若是第一次重試,此值爲0 * elapsedTimeMs : 重試花費的時間,單位爲毫秒 * sleeper : 相似於Thread.sleep,用於sleep指定時間 * 返回值 : 若是還會繼續重試,則返回true */ // public interface RetryPolicy { // boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper); // } }
// baseSleepTimeMs : 初始sleep時間 // maxRetries : 最大重試次數 // maxSleepMs : 最大重試時間 // 返回值 : 若是還會繼續重試,則返回true ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 當前應該sleep的時間 : baseSleepTimeMs * Math.max(1,random.nextInt(1 << (retryConut +1 )))
// RetryNTimes // RetryNTimes(int n, int sleepMsBetweenRetries // 當前應該sleep // 參數名 n : 最大重試數 // 參數名 sleepMsBetweenRetries
// RetryOneTime // 只重試一次 // RetryOneTime(int sleepMsBetweenRetry) // 參數名 sleepMsBetweenRetries
// RetryUntilElapsed // RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries) // 重試的時間超過最大時間後,就不在重試 // 參數名 maxElapsedTimeMs : 最大重試時間 // 參數名 sleepMsBetweenRetries
定義 : 一種面向對象的開發方式,目的是提升代碼的可讀性
實現方式 : 經過方法的級聯或者方法鏈的方式實現
舉例:
public CuratorClientTest() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString("localhost:2181,localhost:2182") .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("base").build(); client.start(); }
public void createNode(String path, byte[] data) throws Exception { client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path, data); }
一、構建操做包裝類(Builder):CreateBuilder create()--- CuratorFramework
二、CreateBuilder
一、creatingParentsIfNeeded // 遞歸建立父目錄
二、withMode(CreateMode mode) // 設置節點屬性 好比:CreateMode.PERSISTENT ,若是是遞歸建立,建立模式爲臨時節點,則只有葉子節點是臨時界定啊,非葉子節點都爲持久化節點
三、withACL(List aclList) // 設置ACL
四、forPath(String path) // 指定路徑
構建操做包裝類(Builder):DeleteBuilder delete() -----CuratorFramework
DeleteBuilder
一、withVersion (int version) // 特定版本號
二、guaranteed() // 確保節點被刪除
三、forPath(String path) // 指定路徑
四、deletingChildrenIfNeeded() // 遞歸刪除全部子節點
關於 guaranteed:
Solves edge cases where an operation may succeed on the server but connection failure occurs before a response can be successfully returned to the client
意思是: 解決當某個刪除操做在服務器端可能成功,可是此時客戶端與服務器端的鏈接中斷,而刪除的響 應沒有成功返回到客戶端 底層的本質:重試
public Void forPath(String path) throws Exception { final String unfixedPath = path; path = this.client.fixForNamespace(path); if (this.backgrounding.inBackground()) { ErrorCallback<String> errorCallback = null; // if (this.guaranteed) { errorCallback = new ErrorCallback<String>() { public void retriesExhausted(OperationAndData<String> operationAndData) { // 刪除失敗的集合 DeleteBuilderImpl.this.client.getFailedDeleteManager().addFailedDelete(unfixedPath); } }; } this.client.processBackgroundOperation(new OperationAndData(this, path, this.backgrounding.getCallback(), errorCallback, this.backgrounding.getContext()), (CuratorEvent)null); } else { this.pathInForeground(path, unfixedPath); } return null; }
二次執行刪除
void addFailedDelete(String path) { if (this.debugListener != null) { this.debugListener.pathAddedForDelete(path); } // 客戶端狀態屬於啓動狀態 if (this.client.getState() == CuratorFrameworkState.STARTED) { this.log.debug("Path being added to guaranteed delete set: " + path); try { // 再次執行刪除 ((ErrorListenerPathable)this.client.delete().guaranteed().inBackground()).forPath(path); } catch (Exception var3) { ThreadUtils.checkInterrupted(var3); this.addFailedDelete(path); } } }
從參數看跟zk的原生異步API相同,多了一個線程池,用於執行回調
public interface Backgroundable<T> { T inBackground(); T inBackground(Object var1); T inBackground(BackgroundCallback var1); T inBackground(BackgroundCallback var1, Object var2); T inBackground(BackgroundCallback var1, Executor var2); T inBackground(BackgroundCallback var1, Object var2, Executor var3); }