Dubbo2.6.x—註冊中心源碼分析 dubbo-registry模塊 (api and zookeeper)

文章有點長,親,要慢慢看!html

1. 概述

1.1 註冊中心做用

  • 在Dubbo中,註冊中心爲核心模塊,Dubbo經過註冊中心實現各個服務之間的註冊與發現等功能,而本次源碼的分析爲registry模塊的api和zookeeper的實現。
  • 服務的提供者和消費者都須要把本身註冊到註冊中心,提供者讓消費者感知到服務存在,從而消費者發起遠程調用,也讓服務治理中心感知到有服務提供者上線;消費者則是讓服務治理中心能夠發現本身。

1.2 Zookeeper

  • Zookeeper是一個提供分佈式協調服務的開源軟件,經常使用於解決分佈式應用中常常遇到的一些數據管理問題。Zookeeper功能很是強大,能夠實現如分佈式應用配置管理、統一命名服務、狀態同步服務、集羣管理等功能。關於Zookeeper,你們若是想了解能夠關注一下自行去搜索一下。

1.3 registry模塊

  • 整個registry下的模塊 java

    dubbo-registry

  • api是註冊中心全部的API和抽象類實現正則表達式

  • default是註冊中心的內存實現redis

  • zookeeper、redis、nacos就是基於不一樣的組件的實現apache

  • multicast是經過廣播實現api

1.4 註冊中心工做流程

這張圖相信只要是用過的都不陌生,掛在dubbo.io的官網掛了好久好久了。那麼這個流程主要是說了什麼呢?緩存

  • 0.是生產者(服務提供方)初始化,就比如你寫了個服務實現而後啓動起來。
  • 1.是服務提供方向啓動器起來事後,就會向註冊中心提交本身的服務信息
  • 2.是消費者(服務消費方)向註冊中心提交訂閱請求。就是你寫了一個業務須要用到一個生產者服務,這個時候你須要提早打招呼,我須要它,有它的消息的時候讓註冊中心告訴你他的信息。
  • 3.這個時候當服務提供者離開或者是有新的服務提供者加入,註冊中心就會將變化的信息發送給消費者。
  • 4.消費者知道了生產者的信息,要用的時候就直接調用,注意這裏的調用是不通過註冊中心的,而是直接同步的網絡調用。

2. dubbo-registry-api

  • api層主要是註冊中心全部API的抽象實現類,並非實際提供服務的組件。bash

  • 模塊關係圖 服務器

  • 類關係圖 網絡

  • 目錄結構

2.1 Registry的相關實現

  • 由類的關係圖科看到Registry的實現關係,咱們接下來就分析下各個接口和這個類

2.1.1 RegistryService

  • 註冊中心模塊的服務接口:提供了註冊、取消註冊、訂閱、取消訂閱、查詢符合條件的已註冊數據。
  • 雖然官方有解釋這個的地方可是仍是複製一下方法解釋以下,官方地址是:dubbo.apache.org/zh-cn/docs/…
public interface RegistryService {
    /** * 註冊服務. * @param url 註冊信息,不容許爲空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin */
    void register(URL url);
 
    /** * 取消註冊服務. * @param url 註冊信息,不容許爲空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin */
    void unregister(URL url);
 
    /** * 訂閱服務. * @param listener 變動事件監聽器,不容許爲空 */
    void subscribe(URL url, NotifyListener listener);
 
    /** * 取消訂閱服務. * @param url 訂閱條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @param listener 變動事件監聽器,不容許爲空 */
    void unsubscribe(URL url, NotifyListener listener);
 
    /** * 查詢註冊列表,與訂閱的推模式相對應,這裏爲拉模式,只返回一次結果。 * * @see org.apache.dubbo.registry.NotifyListener#notify(List) * @param url 查詢條件,不容許爲空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @return 已註冊信息列表,可能爲空,含義同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的參數。 */
    List<URL> lookup(URL url);
}
複製代碼

2.1.2 Node (不在api中定義,在common模塊中)

  • 節點的接口 裏面聲明瞭一些關於節點的操做方法
public interface Node {

    /**
     * 獲取節點Url
     */
    URL getUrl();

    /**
     * 是否可用
     */
    boolean isAvailable();

    /**
     * 銷燬節點
     */
    void destroy();

}
複製代碼

2.1.2 Registry

  • 這個接口其實就是把節點以及註冊中心服務的方法放在了一塊兒
public interface Registry extends Node, RegistryService {
}
複製代碼

2.1.3 AbstractRegistry

  • AbstractRegistry實現了Registry接口,爲減輕註冊中心的壓力,在該類中實現了把本地URL緩存到property文件中的機制,而且實現了註冊中心的註冊、訂閱等方法。
  • 看下類圖

  • 首先是抽象類的屬性
// url地址分隔符,用於文件緩存,服務提供程序url分隔
    private static final char URL_SEPARATOR = ' ';
    // URL地址分隔的正則表達式,用於分析文件緩存中的服務提供程序URL列表
    private static final String URL_SPLIT = "\\s+";
    // 日誌輸出
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    // 本地磁盤緩存,其中的特殊key爲registies是記錄註冊表中心列表,其餘是服務提供者的李彪
    private final Properties properties = new Properties();
    // 文件緩存寫入執行器 提供一個線程的線程池 
    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
    // 是否同步保存文件標誌
    private final boolean syncSaveFile;
    // 這個是緩存的版本號
    private final AtomicLong lastCacheChanged = new AtomicLong();
    // 這個是已經註冊的URL集合,不只僅是服務提供者的,也能夠是服務消費者的
    private final Set<URL> registered = new ConcurrentHashSet<URL>();

    // 已訂閱的url 值爲url的監聽器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
   
    // 消費者或服務治理服務獲取註冊信息後的緩存對象
    // 內存中服務器緩存的notified對象是ConcurrentHashMap裏面嵌套了一個Map,
    // 外層Map的Key是消費者的URL,
    // 內層的Map的key是分類,包括provider,consumer,routes,configurators四種,
    // value則對應服務列表,沒有服務提供者提供服務的URL,會以一個特別的empty://前綴開頭
    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    // 註冊中心的URL
    private URL registryUrl;
    // 本地磁盤緩存文件保存的是註冊中心的數據
    private File file;
複製代碼
2.1.3.1 構造方法
public AbstractRegistry(URL url) {
        // 設置註冊中心的地址URL
        setUrl(url);
        // 從URL參數中獲取是否同步保存的狀態,URL中若是不包含,那就設置默認值爲false
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        // 獲取文件路徑
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");

        // 開始讀入文件
        File file = null;
        // 不存在就拋出異常
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        // 把文件對象放到屬性上
        this.file = file;
        // 加載文件中的參數放入Properties,Properties繼承HashTable。
        loadProperties();
        // 通知監聽器 URL變化 見下面notify的源碼
        notify(url.getBackupUrls());
    }
複製代碼
private void loadProperties() {
        if (file != null && file.exists()) {
            InputStream in = null;
            try {
                // 把文件中的key-value讀進來
                in = new FileInputStream(file);
                // Properties是一個繼承HashTable的類.
                // 這個地方就是按行讀入,util裏面的類,裏面調用了一個load0 方法會把key和value作分割而後放入Properties中,。
                properties.load(in);
                if (logger.isInfoEnabled()) {
                    logger.info("Load registry store file " + file + ", data: " + properties);
                }
            } catch (Throwable e) {
                logger.warn("Failed to load registry store file " + file, e);
            } finally {
                // 關閉流
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }
複製代碼
2.1.3.2 lookup
  • 得到消費者url訂閱的服務URL列表
@Override
    public List<URL> lookup(URL url) {
        // 查找的結果數據
        List<URL> result = new ArrayList<URL>();
        // 獲取註冊信息中的分類服務列表信息
        Map<String, List<URL>> notifiedUrls = getNotified().get(url);
        // 若是該消費者訂閱了服務
        if (notifiedUrls != null && notifiedUrls.size() > 0) {
            for (List<URL> urls : notifiedUrls.values()) {
                for (URL u : urls) {
                    // 把非空的加入結果集中
                    if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                        result.add(u);
                    }
                }
            }
        } else {
            // 若是沒有訂閱服務
            // 使用原子類以保證在獲取註冊在註冊中心的服務url時可以保證是最新的url集合
            final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
            // 通知監聽器。當收到服務變動通知時觸發
            NotifyListener listener = new NotifyListener() {
                @Override
                public void notify(List<URL> urls) {
                    reference.set(urls);
                }
            };
            // 添加這個服務的監聽器
            subscribe(url, listener); // Subscribe logic guarantees the first notify to return
            List<URL> urls = reference.get();
            // 而後把非空結果放入結果集中
            if (urls != null && !urls.isEmpty()) {
                for (URL u : urls) {
                    if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                        result.add(u);
                    }
                }
            }
        }
        return result;
    }
複製代碼
2.1.3.3 register and unregister
  • url註冊和取消註冊代碼很簡單,就是向registered中add或者remove url
@Override
    public void register(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("register url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Register: " + url);
        }
        registered.add(url);
    }

    @Override
    public void unregister(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("unregister url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unregister: " + url);
        }
        registered.remove(url);
    }
複製代碼
2.1.3.4 notify
protected void notify(List<URL> urls) {
        if (urls == null || urls.isEmpty()) return;
        // 遍歷已訂閱的URL
        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();

            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }
            // 通知URL對應的監聽器
            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        // 通知監聽器,看下方代碼註釋
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }
複製代碼
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        // 將url進行分類
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                // 根據不一樣的category分別放到不一樣List中處理 以category的值作分類
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        // 沒有分類結果就直接return
        if (result.size() == 0) {
            return;
        }
        // 得到消費者被通知的url的Map
        Map<String, List<URL>> categoryNotified = notified.get(url);
        // 若是沒有 就建立一個
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            // 建立事後再獲取
            categoryNotified = notified.get(url);
        }
        // 發送URL變化給監聽器
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            // 把分類標實和分類後的列表放入notified的value中 覆蓋到 `notified`
            // 當分類的數據爲空,依然有urls 。不過其中的urls[0].protocol是empty,以此來處理全部服務提供者爲空時的狀況。
            categoryNotified.put(category, categoryList);
            // 保存一份到文件緩存中 中間作的 就是解析出參數而後同步或者異步保存到文件中
            saveProperties(url);
            // 通知監聽器
            listener.notify(categoryList);
        }
    }
複製代碼
2.1.3.5 subscribe and unsubscribe
  • 註冊中心服務實現的訂閱和取消訂閱
@Override
    public void subscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("subscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("subscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Subscribe: " + url);
        }
        // 得到url已經訂閱的服務的監聽器集合
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners == null) {
            subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
            listeners = subscribed.get(url);
        }
        // 而後把listener添加到上
        listeners.add(listener);
    }

    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        // 得到url已經訂閱的服務的監聽器集合
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners != null) {
            // 而後移除
            listeners.remove(listener);
        }
    }
複製代碼
2.1.3.6 recover
  • 註冊中心的鏈接斷開後恢復時調用的方法,裏面其實就是註冊和訂閱
protected void recover() throws Exception {
        // register
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                //調用的上面的註冊方法
                register(url);
            }
        }
        // subscribe
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    // 調用上面的訂閱方法
                    subscribe(url, listener);
                }
            }
        }
    }
複製代碼
2.1.3.7 destory
  • 這個方法是在進程關閉時,去取消註冊和訂閱,實際上就是調用unregister和unsubscribe
@Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        // 獲取以註冊的URL
        Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<URL>(getRegistered())) {
                if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                    try {
                        // 取消註冊
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // 獲取已訂閱的URL以及監聽器
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        // 去取消訂閱
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }
複製代碼

2.1.4 FailbackRegistry

  • 這個類實際上是爲AbstractRegistry增長了失敗重試的機制做爲抽象能力,後面不一樣的註冊中心具體實現繼承了這個類就能夠直接使用這個能力。
  • 類圖
  • 常規套路 類的屬性
// Scheduled executor service
    // 通過固定時間後(默認是5s),調用FailbackRegistry#retry方法
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    // 失敗重試計時器,按期檢查是否有失敗請求,若是有,則無限制重試
    private final ScheduledFuture<?> retryFuture;
    // 註冊失敗的集合
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
    // 取消註冊失敗的集合
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
    // 發起訂閱失敗的監聽器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    // 取消訂閱失敗的監聽器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    // 通知失敗的URL集合
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
    /** * The time in milliseconds the retryExecutor will wait * RetryExecutor將等待的時間(毫秒) */
    private final int retryPeriod;
複製代碼
2.1.4.1 構造方法
public FailbackRegistry(URL url) {
        super(url);
        // 獲取重試的時間 若是沒有就設置成默認的 DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 設置重試任務 裏面就是調用retry方法 見下方retry方法的解析
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
複製代碼
2.1.4.2 register and unregister 、 subscribe and unsubscribe
  • 註冊和取消註冊
@Override
    public void register(URL url) {
        // 緩存等註冊操做 見AbstractRegistry
        super.register(url);
        // 在失敗集合中將這個移除
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            // 向服務器發送註冊請求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 開啓了啓動時就檢測,直接拋異常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            // 記錄失敗的url
            failedRegistered.add(url);
        }
    }
複製代碼
  • 後面的unregister方法,subscribe unsubscribe都相似 能夠看下源碼, 中間的doXXXX這幾個方法都是abstract方法等着後面不一樣的服務來實現。
2.1.4.3 notify
  • notify則與上面的 四個方法不一樣,它是默認調用的父類AbstractRegistry的notify方法
@Override
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // Record a failed registration request to a failed list, retry regularly
            // 將失敗的註冊請求記錄到失敗列表,按期重試
            Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
            if (listeners == null) {
                failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
                listeners = failedNotified.get(url);
            }
            listeners.put(listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }

    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        // 注意 這個是調用父類的
        super.notify(url, listener, urls);
    }

複製代碼
2.1.4.4 recover
  • recover方法也區別於AbstractRegistry,他是直接添加到失敗重試的集合中,讓定時任務本身去從新註冊和訂閱
@Override
    protected void recover() throws Exception {
        // register
        // 把已註冊的添加到失敗重試的列表中
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                // 添加到失敗重試註冊列表
                failedRegistered.add(url);
            }
        }
        // subscribe
        // 把已訂閱的添加到失敗重試的列表中
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    // 添加到失敗重試訂閱列表
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }
複製代碼
2.1.4.5 retry
  • 重試的方法,其實也比較簡單,就是把集合中的數據拿出來,該作註冊作註冊,該訂閱就訂閱,成功了就從失敗重試集合中移除,失敗了就等下次再來。簡單看下對註冊列表的代碼就明白了。其餘代碼都是相似的
// Retry the failed actions
    protected void retry() {
        if (!failedRegistered.isEmpty()) {
            // 不爲空就把他URL拿到
            Set<URL> failed = new HashSet<URL>(failedRegistered);
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry register " + failed);
                }
                try {
                    // 而後遍歷它 作對應的操做
                    for (URL url : failed) {
                        try {
                            // 作註冊操做
                            doRegister(url);
                            // 移除失敗集合中URL
                            failedRegistered.remove(url);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        ......
    }
複製代碼
2.1.4.6 destroy
@Override
    public void destroy() {
        // 調用父類的方法
        super.destroy();
        try {
            // 取消執行任務
            retryFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod);
    }
複製代碼
2.1.4.7 待實現的方法
  • 這些方法都是交給不一樣的服務提供組件去本身實現的,後面的Zookeeper就針對這些方法作了實現。
// ==== Template method ====

    protected abstract void doRegister(URL url);

    protected abstract void doUnregister(URL url);

    protected abstract void doSubscribe(URL url, NotifyListener listener);

    protected abstract void doUnsubscribe(URL url, NotifyListener listener);
複製代碼

2.2 Registry的相關Factory的實現

  • 註冊中心的工廠類,顧名思義就是生產上面的Registry的實現。

2.2.1 RegistryFactory

@SPI("dubbo")
public interface RegistryFactory {

    // 這個接口方法實際上就是獲取對註冊中心的鏈接,而後返回不一樣註冊中心的不一樣Regsitry的實現對象,
    // 註解就是根據設置不一樣的protocol(協議)來選擇不一樣的實現,
    // 好比Zookeeper,就會去使用Zookeeper的ZookeeperRegistryFactory,具體怎麼選擇,後續博客再寫
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);
}
複製代碼

2.2.2 AbstractRegistryFactory

  • 類圖

  • 這個抽象類仍是相對來講比較簡答的。我們看一下他的類屬性
// 註冊中心獲取過程的鎖
    private static final ReentrantLock LOCK = new ReentrantLock();

    // 註冊中心Map<註冊地址,registry> 一個類的緩存。
    private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

複製代碼
2.2.2.1 getRegistryies
  • 獲取全部的registry對象
/** * Get all registries * 獲取全部的registry對象 * @return all registries */
    public static Collection<Registry> getRegistries() {
        //獲得一個集合的鏡像,它的返回結果不可直接被改變,不然會報錯
        return Collections.unmodifiableCollection(REGISTRIES.values());
    }

複製代碼
2.2.2.2 destoryAll
  • 關閉全部已建立的registry對象
/** * Close all created registries * 關閉全部已建立的registry對象 */
    // TODO: 2017/8/30 to move somewhere else better
    public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        //對註冊中心關閉操做加鎖
        LOCK.lock();
        try {
            // 遍歷全部的註冊中心的操做類,而後調用destroy來銷燬。
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            // 而後清除集合
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
複製代碼
2.2.2.3 getRegistry
  • 獲取對應註冊中心的操做實現類
@Override
    public Registry getRegistry(URL url) {
        // 經過URL來獲取到註冊中心的類型
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceStringWithoutResolving();
        // 鎖定註冊中心訪問進程以確保註冊表的單個實例
        LOCK.lock();
        try {
            // 經過key來拿到對應的註冊中心的操做類
            Registry registry = REGISTRIES.get(key);
            // 有就直接返回
            if (registry != null) {
                return registry;
            }
            // 沒有就建立對應的註冊中心操做類
            registry = createRegistry(url);
            // 若是建立失敗,報錯
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            // 建立成功就放到結合中
            REGISTRIES.put(key, registry);
            // 而後再返回
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
複製代碼
2.2.2.4 createRegistry
  • 抽象方法,沒有實現,須要不一樣的服務提供工廠對象來本身實現對應的建立方法
protected abstract Registry createRegistry(URL url);
複製代碼

2.3 Consumer And Provider InvokerWrapper

  • 實現Invoker接口,主要包裝消費者和服務提供者的屬性
  • 主要爲QOS提供服務 官方地址:dubbo.apache.org/zh-cn/docs/…
  • 什麼是QOS? qos-server,是dubbo在線運維命令服務,默認端口號爲:2222,用於接口命令,運維dubbo。

2.3.1 ConsumerInvokerWrapper

// invoker對象
    private Invoker<T> invoker;
    // 原始的URL地址
    private URL originUrl;
    // 註冊中心的地址
    private URL registryUrl;
    // 消費者的地址
    private URL consumerUrl;
    // 註冊中心的Directory
    private RegistryDirectory registryDirectory;
複製代碼

2.3.2 ProviderInvokerWrapper

// invoker對象
    private Invoker<T> invoker;
    // 原始的URL地址
    private URL originUrl;
    // 註冊中心的地址
    private URL registryUrl;
    // 提供者的地址
    private URL providerUrl;
    // 是否註冊
    private volatile boolean isReg;
複製代碼

2.4 ProviderConsumerRegTable

  • 這個類是消費者和服務提供者的註冊表操做,也是用在QOS中。
  • 主要類屬性
// 服務提供者的Invokers集合
    public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
    // 服務消費者的Invokers集合
    public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
複製代碼
  • 類圖

  • 裏面就是一些對類屬性集合的操做,主要是QOS會用。

2.5 RegistryStatusChecker

  • 這個類就一個方法 check方法,主要是作狀態校驗。作註冊中心相關的狀態檢查校驗
  • 類上面的@Activate 註解 使這個類自動被激活加載。
@Override
    public Status check() {
        // 獲取全部的註冊中心的對象
        Collection<Registry> registries = AbstractRegistryFactory.getRegistries();
        if (registries.isEmpty()) {
            return new Status(Status.Level.UNKNOWN);
        }
        Status.Level level = Status.Level.OK;
        StringBuilder buf = new StringBuilder();
        // 遍歷
        for (Registry registry : registries) {
            if (buf.length() > 0) {
                buf.append(",");
            }
            // 把地址拼接到一塊兒
            buf.append(registry.getUrl().getAddress());
            // 若是註冊中心的某個節點不可用就把狀態設置成error
            if (!registry.isAvailable()) {
                level = Status.Level.ERROR;
                buf.append("(disconnected)");
            } else {
                buf.append("(connected)");
            }
        }
        // 而後返回價差的結果對象
        return new Status(level, buf.toString());
    }
複製代碼

2.5 RegistryDirectory and RegistryProtocol

  • 這兩個類後續再說。牽涉到其餘地方的一些東西。

3. dubbo-registry-zookeeper

  • 不知道你們看到這裏有沒有忘記這張圖

  • 模塊關係圖

  • 全部的註冊中心實現FailbackRegistry 和 AbstractRegistryFactory來實現對應的功能。

  • 那麼Zookeeper也是如此。Zookeeper主要就只有兩個類

    1. 是ZookeeperRegistry
    1. 是ZookeeperRegistryFactory來實現對應的功能

3.1 Dubbo在Zookeeper中的數據結構

  • dubbo在使用Zookeeper時只會建立永久節點和臨時節點。

  • 根節點是註冊中心分組,下面是不少的服務接口,分組來自用戶配置的dubbo:registry中的group屬性,默認是/dubbo。
  • 服務接口下是如圖所示的四種服務目錄,都是持久節點。
  • 服務提供者路徑/dubbo/service/providers (這裏方便標識所有都用service替代接口com.demo.DemoService),下面包含接口的多個服務提供者者的URL元數據信息。
  • 服務提供者路徑/dubbo/service/consumers,下面包含接口有多個消費者的URL元數據信息
  • 服務提供者路徑/dubbo/service/routers,下面包含多個用於消費者路由策略URL元數據信息。
  • 服務提供者路徑/dubbo/service/configurators,下面包含多個用於服務提供者動態配置的URL元數據信息。

在Dubbo框架啓動時會根據咱們所寫的服務相關的配置在註冊中心建立4個目錄,在providers和consumers目錄中分別存儲服務提供方、消費方元數據信息。包括:IP、端口、權重和應用名等數據。

  • 目錄包含信息
目錄名稱 存儲值樣例
/dubbo/service/providers dubbo://192.168.1.1.20880/com.demo.DemoService?key=value&...
/dubbo/service/consumers dubbo://192.168.1.1.5002/com.demo.DemoService?key=value&...
/dubbo/service/routers condition://0.0.0.0/com.demo.DemoService?category=routers&key=value&...
/dubbo/service/configurators override://0.0.0.0/com.demo.DemoService?category=configurators&key=value&...
  • 在Dubbo中啓用註冊中心:
<beans>
    <!-- 適用於Zookeeper一個集羣有多個節點,多個IP和端口用逗號分隔-->
    <dubbo:registry protocol="zookeeper" address="ip:port;ip:port">
    <!-- 適用於Zookeeper多個集羣有多個節點,多個IP和端口用豎線分隔-->
    <dubbo:registry protocol="zookeeper" address="ip:port|ip:port">
</beans>
複製代碼

3.2 ZookeeperRegistry

  • 慣例給你們一張類圖

  • 而後看下屬性
// Zookeeper的默認端口號
    private final static int DEFAULT_ZOOKEEPER_PORT = 2181;

    // Dubbo在Zookeeper中註冊的默認根節點
    private final static String DEFAULT_ROOT = "dubbo";

    // 組的名稱 或者說是 根節點的值
    private final String root;

    // 服務集合
    private final Set<String> anyServices = new ConcurrentHashSet<String>();

    // zk節點的監聽器
    // Dubbo底層封裝了2套Zookeeper API,因此經過ChildListener抽象了監聽器,
    // 可是在實際調用時會經過createTargetChildListener轉爲對應框架的監聽器實現
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();

    // zk的客戶端, 對節點進行一些刪改等操做
    private final ZookeeperClient zkClient;
複製代碼
  • 關於Dubbo中的Zookeeper客戶端,Dubbo實現了一個統一的Client API,可是用兩種不一樣的Zookeeper開源庫來實現,一個是Apache的Curator,另外一個是zkClient 若是用戶不設置,則默認使用Curator實現。

3.2.1 構造方法

  • 構造方法比較簡單,就是獲取組名,鏈接Zookeeper
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        // 調用FailbackRegistry的構造方法
        super(url);

        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 獲取組名稱 並複製給root
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        // 鏈接上Zookeeper
        zkClient = zookeeperTransporter.connect(url);
        // 添加鏈接狀態監聽器
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        // 重連恢復
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }
複製代碼

3.2.2 服務註冊發佈與服務下線取消註冊

  • 也比較簡單就是建立節點和刪除節點
// 發佈
    @Override
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    // 取消發佈
    @Override
    protected void doUnregister(URL url) {
        try {
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

複製代碼

3.2.3 服務訂閱和取消訂閱

  • 訂閱有pull和push兩種方式,一種是客戶端定時輪詢註冊中心拉去配置,另外一種是註冊中心主動推送數據給客戶端。Dubbo目前採用的是第一次啓動拉取而後接受事件再從新拉取。
  • 再暴露服務的時候,服務端會訂閱configurators監聽動態配置,消費端啓動的時候回訂閱providers、routers、configurators類接收這三者的變動通知。
  • Dubbo在實現Zookeeper註冊中心的時候是,客戶端第一次鏈接獲取全量數據,而後在訂閱節點上註冊一個watcher,客戶端與註冊中心之間保持TCP長鏈接,後續有節點發生變化則會觸發watcher事件來把對應節點下的全量數據拉取過來。
3.2.3.1 doSubscribe
@Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 訂閱全部數據
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    // 爲空則把listeners放入到緩存的Map中
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }

                ChildListener zkListener = listeners.get(listener);
                // 建立子節點監聽器,對root下的子節點作監聽,一旦有子節點發生改變,
                // 那麼就對這個節點進行訂閱.
                if (zkListener == null) {
                    // zkListener爲空說明是第一次拉取,則新建一個listener
                    listeners.putIfAbsent(listener, new ChildListener() {
                        // 節點變動時,觸發通知時執行
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
                                // 遍歷全部節點
                                child = URL.decode(child);
                                // 若是有子節點還未被訂閱賊說明是新節點,
                                if (!anyServices.contains(child)) {
                                    // 加入到集合中
                                    anyServices.add(child);
                                    //就訂閱之
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                // 建立持久節點root,接下來訂閱持久節點的子節點
                zkClient.create(root, false);
                // 添加root節點的子節點監聽器,並返回當前的services
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    // 遍歷全部的子節點進行訂閱
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        // 增長當前節點的訂閱,而且會返回改節點下全部子節點的列表
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }

                // 訂閱類別服務
            } else {
                List<URL> urls = new ArrayList<URL>();
                // 將url轉變成
                // /dubbo/com.demo.DemoService/providers
                // /dubbo/com.demo.DemoService/configurators
                // /dubbo/com.demo.DemoService/routers
                // 根據url類別獲取一組要訂閱的路徑 
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    // 若是緩存沒有,則添加到緩存中
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    // 一樣若是監聽器緩存中沒有 則放入緩存
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 通知節點變化
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    // 訂閱並返回該節點下的子路徑並緩存
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        // 有子節點組裝,沒有那麼就將消費者的協議變成empty做爲url。
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 回調NotifyListener,更新本地緩存信息
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
複製代碼
3.2.3.2 doUnsubscribe
@Override
    protected void doUnsubscribe(URL url, NotifyListener listener) {
        // 經過url把監聽器所有拿到
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners != null) {
            ChildListener zkListener = listeners.get(listener);
            if (zkListener != null) {
                // 直接刪除group下全部的
                if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    // 移除監聽器
                    zkClient.removeChildListener(root, zkListener);
                } else {
                     // 移除類別服務下的監聽器
                    for (String path : toCategoriesPath(url)) {
                        zkClient.removeChildListener(path, zkListener);
                    }
                }
            }
        }
    }
複製代碼
3.2.3.3 其餘
  • 其餘代碼相對來講不是很複雜能夠自行看一下。

3.3 ZookeeperRegistryFactory

  • 工廠類的代碼極其短,隨意看下。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}
複製代碼

3.3.1 關於ZookeeperTransporter

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

複製代碼
  • 上面我提到過,Dubbo用Zookeeper的時候用了兩種方式實現,一個是Apache Curator,另外一個是zkClient,這個類就是作看了一個轉換。以下圖

  • 兩個類都實現了該接口來向外提供統一的ZookeeperClient。

  • 這個實如今remoting模塊。暫時就不講了。

4. 結語

  • 整個模塊,其餘的Redis、Nacos等實現都是根據不一樣組件的特色來實現。功能都同樣,只是實現不同,你們能夠本身去探索一下。
  • 整個模塊中咱們單獨看的話主要是就是一個實現,一個工廠,裏面牽涉到了本地緩存、重試這些機制。代碼量不是很大。認真看仍是不難的。其中特別須要注意的就是註冊中心的數據結構 和 發佈訂閱這些的實現了。
  • 結語有點亂。就這樣,不足之處但願留言指出,後續優化。!感謝!!!

關於我

  • 座標杭州,普通本科在讀,計算機科學與技術專業,20年畢業,目前處於實習階段。
  • 主要作Java開發,會寫點Golang、Shell。對微服務、大數據比較感興趣,預備作這個方向。
  • 目前處於菜鳥階段,各位大佬輕噴,小弟正在瘋狂學習。
  • 歡迎你們和我交流鴨!!!
相關文章
相關標籤/搜索