zookeeper -- 第八章 zk開源客戶端 Curator介紹 (上)

一、原生API的不足

一、鏈接的建立是異步的,須要開發人員自行編碼實現等待java

二、鏈接沒有自動的超時重連機制apache

三、ZK自己不提供序列化機制,須要開發人員自行指定,從而實現數據的序列化和反序列化服務器

四、Watcher註冊一次只會生效一次,須要不斷的重複註冊session

五、Watcher的使用方式不符合java自己的術語,若是採用監聽方式,更容易理解dom

六、不支持遞歸建立樹形節點異步

二、開源客戶端---Curator介紹

Apache基金會得頂級項目之一分佈式

1 、解決session會話超時重連源碼分析

二、watcher反覆註冊ui

三、簡化開發APIthis

四、遵循Fluent風格Api規範

五、NodeExistsException異常處理

六、共享鎖服務,master選舉 , 分佈式計數器

七、http://curator.apache.org/

三、建立會話

一、使用CuratorFrameworkFactory工廠的兩個靜態方法建立客戶端

public class CuratorClientDemo {

    private CuratorFramework client = null;

    /**
     *
     * public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
     *      return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
     * }
     *
     * public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
     *      return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
     * }
     *
     * connectStrng  逗號分開的ip:port
     * retryPolicy   重試策略,默認四種: Exponential BackoffRetry, RetryNtime, RetryOneTime, RetryUntilElapsed
     * sessionTimeoutMs 會哈超時時間,單位爲毫秒,默認60000ms
     * connectionTimeoutMs 鏈接建立超時時間,單位爲毫秒,默認是15000ms
     *
     */
    public CuratorClientDemo() {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
                .sessionTimeoutMs(1).retryPolicy(retryPolicy).build();

        client.start();
    }
}

四、重試策略

一、實現接口RetryPolicy能夠自定義重試策略

public static void main(String[] args) {
        /**
         *  retryCount : 已經重試次數,若是第一次重試,此值爲0
         * elapsedTimeMs : 重試花費的時間,單位爲毫秒
         * sleeper : 相似於Thread.sleep,用於sleep指定時間
         * 返回值 : 若是還會繼續重試,則返回true
         */
//        public interface RetryPolicy {
//            boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
//        }
    }

一、ExponentialBackoffRetry

  • ExponentialBackoffRetry
// baseSleepTimeMs : 初始sleep時間
// maxRetries : 最大重試次數
// maxSleepMs : 最大重試時間
// 返回值 : 若是還會繼續重試,則返回true

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries) 

ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

當前應該sleep的時間 : baseSleepTimeMs * Math.max(1,random.nextInt(1 << (retryConut +1 )))

二、RetryNTimes

// RetryNTimes
        // RetryNTimes(int n, int sleepMsBetweenRetries
        // 當前應該sleep
        // 參數名 n                        :    最大重試數
        // 參數名 sleepMsBetweenRetries

三、RetryOneTime

// RetryOneTime
        // 只重試一次
        // RetryOneTime(int sleepMsBetweenRetry)
        // 參數名 sleepMsBetweenRetries

四、RetryUntilElapsed

// RetryUntilElapsed
        // RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
        // 重試的時間超過最大時間後,就不在重試
        // 參數名 maxElapsedTimeMs         :    最大重試時間
        // 參數名 sleepMsBetweenRetries

五、Fluent風格的API

  • 定義 : 一種面向對象的開發方式,目的是提升代碼的可讀性

  • 實現方式 : 經過方法的級聯或者方法鏈的方式實現

  • 舉例:

public CuratorClientTest() {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
		client = CuratorFrameworkFactory.builder()
				.connectString("localhost:2181,localhost:2182")
				.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
				.namespace("base").build();
		client.start();
	}

六、建立節點

一、代碼舉例

public void createNode(String path, byte[] data) throws Exception {
        client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                .forPath(path, data);
    }

二、參數說明

  • 一、構建操做包裝類(Builder):CreateBuilder create()--- CuratorFramework

  • 二、CreateBuilder

一、creatingParentsIfNeeded // 遞歸建立父目錄

二、withMode(CreateMode mode) // 設置節點屬性 好比:CreateMode.PERSISTENT ,若是是遞歸建立,建立模式爲臨時節點,則只有葉子節點是臨時界定啊,非葉子節點都爲持久化節點

三、withACL(List aclList) // 設置ACL

四、forPath(String path) // 指定路徑

七、刪除節點

一、名詞解釋

  • 構建操做包裝類(Builder):DeleteBuilder delete() -----CuratorFramework

  • DeleteBuilder

一、withVersion (int version) // 特定版本號

二、guaranteed() // 確保節點被刪除

三、forPath(String path) // 指定路徑

四、deletingChildrenIfNeeded() // 遞歸刪除全部子節點

關於 guaranteed:

Solves edge cases where an operation may succeed on the server but connection failure occurs before a response can be successfully returned to the client

意思是: 解決當某個刪除操做在服務器端可能成功,可是此時客戶端與服務器端的鏈接中斷,而刪除的響 應沒有成功返回到客戶端 底層的本質:重試

二、源碼分析

public Void forPath(String path) throws Exception {
        final String unfixedPath = path;
        path = this.client.fixForNamespace(path);
        if (this.backgrounding.inBackground()) {
            ErrorCallback<String> errorCallback = null;
            // 
            if (this.guaranteed) {
                errorCallback = new ErrorCallback<String>() {
                    public void retriesExhausted(OperationAndData<String> operationAndData) {
                        // 刪除失敗的集合
                        DeleteBuilderImpl.this.client.getFailedDeleteManager().addFailedDelete(unfixedPath);
                    }
                };
            }

            this.client.processBackgroundOperation(new OperationAndData(this, path, this.backgrounding.getCallback(), errorCallback, this.backgrounding.getContext()), (CuratorEvent)null);
        } else {
            this.pathInForeground(path, unfixedPath);
        }

        return null;
    }

二次執行刪除

void addFailedDelete(String path) {
        if (this.debugListener != null) {
            this.debugListener.pathAddedForDelete(path);
        }
        
        // 客戶端狀態屬於啓動狀態
        if (this.client.getState() == CuratorFrameworkState.STARTED) {
            this.log.debug("Path being added to guaranteed delete set: " + path);

            try {
                // 再次執行刪除
                ((ErrorListenerPathable)this.client.delete().guaranteed().inBackground()).forPath(path);
            } catch (Exception var3) {
                ThreadUtils.checkInterrupted(var3);
                this.addFailedDelete(path);
            }
        }

    }

三、關於異步操做 inBackground

從參數看跟zk的原生異步API相同,多了一個線程池,用於執行回調

public interface Backgroundable<T> {
    T inBackground();

    T inBackground(Object var1);

    T inBackground(BackgroundCallback var1);

    T inBackground(BackgroundCallback var1, Object var2);

    T inBackground(BackgroundCallback var1, Executor var2);

    T inBackground(BackgroundCallback var1, Object var2, Executor var3);
}
相關文章
相關標籤/搜索