dubbo 如何註冊到 zookeeper 轉

    Dubbo的Provider,Consumer在啓動時都會建立一個註冊中心,註冊中心能夠選擇Zookeeper,Redis。經常使用的是Zookeeperjava

    Dubbo裏默認使用zkclient來操做zookeeper服務器,其對zookeeper原始客戶單作了必定的封裝,操做zookeeper時能便捷一些,好比不須要手動處理session超時,不須要重複註冊watcher等等。node

    Dubbo在Zookeeper上註冊的節點目錄:數組

    假設接口名稱是:com.bob.dubbo.service.CityDubboService緩存

    Dubbo啓動時,Consumer和Provider都會把自身的URL格式化爲字符串,而後註冊到zookeeper相應節點下,做爲一個臨時節點,當連斷開時,節點被刪除。服務器

    Consumer在啓動時,不單單會註冊自身到 …/consumers/目錄下,同時還會訂閱…/providers目錄,實時獲取其上Provider的URL字符串信息。session

 

下面咱們就看相關的代碼實現:ide

public class ZookeeperRegistry extends FailbackRegistry {

    ......

    /**
     * 默認端口
     */
    private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
    /**
     * 默認 Zookeeper 根節點
     */
    private final static String DEFAULT_ROOT = "dubbo";

    /**
     * Zookeeper 根節點
     */
    private final String root;
    /**
     * Service 接口全名集合
     */
    private final Set<String> anyServices = new ConcurrentHashSet<String>();
    /**
     * 監聽器集合
     */
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
        = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
    /**
     * Zookeeper 客戶端
     */
    private final ZookeeperClient zkClient;

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);   // 調用父類FailbackRegistry的構造函數
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 得到 Zookeeper 根節點, 未指定 "group" 參數時爲 dubbo
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); // `url.parameters.group` 參數值
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;   // root = "/dubbo"
        // 建立 Zookeeper Client
        zkClient = zookeeperTransporter.connect(url);
        // 添加 StateListener 對象。該監聽器,在重連時,調用恢復方法。
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
}

 

public abstract class FailbackRegistry extends AbstractRegistry {

    /**
     * 發起註冊失敗的 URL 集合
     */
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
    /**
     * 取消註冊失敗的 URL 集合
     */
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
    /**
     * 發起訂閱失敗的監聽器集合
     */
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    /**
     * 取消訂閱失敗的監聽器集合
     */
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    /**
     * 通知通知的 URL 集合
     */
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

    public FailbackRegistry(URL url) {
        super(url);
        // 重試頻率,單位:毫秒 ,默認 5*1000
        int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 建立失敗重試定時器
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }

    /**
     * 重試
     */
    // Retry the failed actions
    protected void retry() {
        // 重試執行註冊
        if (!failedRegistered.isEmpty()) {
            ......
            for (URL url : failed) {
                try {
                    // 執行註冊
                    doRegister(url);
                    // 移除出 `failedRegistered`
                    failedRegistered.remove(url);
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        // 重試執行取消註冊
        if (!failedUnregistered.isEmpty()) {
            ......
            for (URL url : failed) {
                try {
                    // 執行取消註冊
                    doUnregister(url);
                    // 移除出 `failedUnregistered`
                    failedUnregistered.remove(url);
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        // 重試執行訂閱
        if (!failedSubscribed.isEmpty()) {
            ......
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    try {
                        // 執行訂閱
                        doSubscribe(url, listener);
                        // 移除監聽器
                        listeners.remove(listener);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // 重試執行取消訂閱
        if (!failedUnsubscribed.isEmpty()) {
            ......
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    try {
                        // 執行取消訂閱
                        doUnsubscribe(url, listener);
                        // 移除監聽器
                        listeners.remove(listener);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

}

 

    ZookeeperRegistry 在實例化時,調用父類構造函數。在父類構造函數中,會建立一個定時任務,每隔5S執行retry( ) 方法。函數

    在retry( ) 方法中,重試那些失敗的動做。重試的動做包括:this

    Provider向zookeeper註冊自身的url,生成一個臨時的znode
    Provider從Dubbo容器中退出,中止提供RPC調用。也就是移除zookeeper內自身url對應的znode
    Consumer訂閱 " /dubbo/…Service/providers" 目錄的子節點,生成ChildListener
    Consumer從Dubbo容器中退出,移除以前建立的ChildListenerurl

    爲何如此設置? 主要是和zookeeper的通訊機制有關的。當zookeeper的Client和Server鏈接斷開,或者心跳超時,那麼Server會將相應Client註冊的臨時節點刪除,固然註冊的Listener也相應刪除。

    而Provider和Consumer註冊的URL就屬於臨時節點,當鏈接斷開時,Dubbo註冊了zookeeper的StateListener,也就是狀態監聽器,當Dubbo裏的zookeeper Client和Server從新鏈接上時,將以前註冊的的URL添加入這幾個失敗集合中,而後從新註冊和訂閱。

 

看ZookeeperRegistry 的構造函數,其添加了一個StateListener:

public class ZookeeperRegistry extends FailbackRegistry {

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        ......
        // 添加 StateListener 對象。該監聽器,在重連時,調用恢復方法。
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
}
public abstract class FailbackRegistry extends AbstractRegistry {

    protected void recover() throws Exception {
        // register 恢復註冊,添加到 `failedRegistered` ,定時重試
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                failedRegistered.add(url);
            }
        }
        // subscribe 恢復訂閱,添加到 `failedSubscribed` ,定時重試
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }
}

 

    ZookeeperRegistry 構造函數中爲zookeeper的操做客戶端添加了一個狀態監聽器 StateListener,當從新鏈接時( 從新鏈接意味着以前鏈接斷開了 ),將已經註冊和訂閱的URL添加到失敗集合中,定時重試,也就是從新註冊和訂閱。

    zookeeper Client與Server斷開鏈接後,會定時的不斷嘗試從新鏈接,當鏈接成功後就會觸發一個Event,Dubbo註冊了CONNECTED狀態的監聽器,當鏈接成功後從新註冊和訂閱。

    zookeeper Server宕機了,Dubbo裏的Client並無對此事件作什麼響應,固然其內部的zkClient會不停地嘗試鏈接Server。當Zookeeper Server宕機了不影響Dubbo裏已註冊的組件的RPC調用,由於已經經過URL生成了Invoker對象,這些對象還在Dubbo容器內。固然由於註冊中心宕機了,確定不能感知到新的Provider。同時由於在以前訂閱得到的Provider信息已經持久化到本地文件,當Dubbo應用重啓時,若是zookeeper註冊中心不可用,會加載緩存在文件內的Provider信息,仍是能保證服務的高可用。

    Consumer會一直維持着對Provider的ChildListener,監聽Provider的實時數據信息。當Providers節點的子節點發生變化時,實時通知Dubbo,更新URL,同時更新Dubbo容器內的Consumer Invoker對象,只要是訂閱成功均會實時同步Provider,更新Invoker對象,不管是第一次訂閱仍是斷線重連後的訂閱:

public class ZookeeperRegistry extends FailbackRegistry {

    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 處理全部 Service 層的發起訂閱,例如監控中心的訂閱
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                ......
                // 處理指定 Service 層的發起訂閱,例如服務消費者的訂閱
            } else {
                // 子節點數據數組
                List<URL> urls = new ArrayList<URL>();
                // 循環分類數組 , router, configurator, provider
                for (String path : toCategoriesPath(url)) {
                    // 得到 url 對應的監聽器集合
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) { // 不存在,進行建立
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    // 得到 ChildListener 對象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) { //  不存在子目錄的監聽器,進行建立 ChildListener 對象
                        // 訂閱父級目錄, 當有子節點發生變化時,觸發此回調函數
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 變動時,調用 `#notify(...)` 方法,回調 NotifyListener
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    // 建立 Type 節點。該節點爲持久節點。
                    zkClient.create(path, false);
                    // 向 Zookeeper ,PATH 節點,發起訂閱,返回此節點下的全部子元素 path : /根節點/接口全名/providers, 好比 : /dubbo/com.bob.service.CityService/providers
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    // 添加到 `urls` 中
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 首次全量數據獲取完成時,調用 `#notify(...)` 方法,回調 NotifyListener, 在這一步從鏈接Provider,實例化Invoker
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

}

 

    訂閱獲取Providers的最新URL字符串,調用notify(…)方法,通知監聽器,最終會執行以下代碼:

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private volatile List<Configurator> configurators;
    
    private volatile Map<String, Invoker<T>> urlInvokerMap;
 
    private volatile Map<String, List<Invoker<T>>> methodInvokerMap;  

    private volatile Set<URL> cachedInvokerUrls; 


    private void refreshInvoker(List<URL> invokerUrls) {
        // 從zookeeper獲取到的url已經沒有合適的了,在訂閱返回爲空時,會手動生成一個 EMPTY_PROTOCOL 的 url
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.methodInvokerMap = null; // Set the method invoker map to null
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<URL>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
            // state change
            // If the calculation is wrong, it is not processed.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
                logger.error(new IllegalStateException(
                    "urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
                return;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            this.urlInvokerMap = newUrlInvokerMap;
            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
}

    更新Dubbo內的Invoker相關數據,保證Consumer能實時感知到Provider的信息,保證PRC調用不會出錯。

    以上就是Dubbo內Zookeeper註冊中心的實現過程。

 

總結:

    Provider和Consumer向Zookeeper註冊臨時節點,當鏈接斷開時刪除相應的註冊節點。     Consumer訂閱Providers節點的子節點,實時感知Provider的變化狀況,實時同步自身的Invoker對象,保證RPC的可用性。

相關文章
相關標籤/搜索