Dubbo源碼解析(四)註冊中心——dubbo

註冊中心——dubbo

目標:解釋覺得dubbo實現的註冊中心原理,解讀duubo-registry-default源碼

dubbo內置的註冊中心實現方式有四種,這是第一種,也是dubbo默認的註冊中心實現方式。咱們能夠從上篇文章中看到RegistryFactory接口的@SPI默認值是dubbo。java

咱們先來看看包下面有哪些類:git

dubbo-registry-fault目錄

能夠看到該包下就兩個類,下面就來解讀這兩個類。github

(一)DubboRegistry

該類繼承了FailbackRegistry類,該類裏面封裝了一個重連機制,而註冊中心核心的功能註冊、訂閱、取消註冊、取消訂閱,查詢註冊列表都是調用了我上一篇文章《dubbo源碼解析(三)註冊中心——開篇》中講到的實現方法,畢竟這種實現註冊中心的方式是dubbo默認的方式,不過dubbo推薦使用zookeeper,這個後續講解。segmentfault

1.屬性
// 日誌記錄
private final static Logger logger = LoggerFactory.getLogger(DubboRegistry.class);

// Reconnecting detection cycle: 3 seconds (unit:millisecond)
// 從新鏈接週期:3秒
private static final int RECONNECT_PERIOD_DEFAULT = 3 * 1000;

// Scheduled executor service
// 任務調度器
private final ScheduledExecutorService reconnectTimer = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryReconnectTimer", true));

// Reconnection timer, regular check connection is available. If unavailable, unlimited reconnection.
// 從新鏈接執行器,按期檢查鏈接可用,若是不可用,則無限制重連
private final ScheduledFuture<?> reconnectFuture;

// The lock for client acquisition process, lock the creation process of the client instance to prevent repeated clients
// 客戶端的鎖,保證客戶端的原子性,可見行,線程安全。
private final ReentrantLock clientLock = new ReentrantLock();

// 註冊中心Invoker
private final Invoker<RegistryService> registryInvoker;

// 註冊中心服務對象
private final RegistryService registryService;

// 任務調度器reconnectTimer將等待的時間
private final int reconnectPeriod;

看上面的源碼,能夠看到這裏的重連是創建了一個計時器,而且會按期檢查鏈接是否可用,若是不可用,就無限重連。只要懂一點線程相關的知識,這裏的屬性仍是比較好理解的。api

2.構造函數DubboRegistry

先來看看源碼:安全

public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService registryService) {
    // 調用父類FailbackRegistry的構造函數
    super(registryInvoker.getUrl());
    this.registryInvoker = registryInvoker;
    this.registryService = registryService;
    // Start reconnection timer
    // 優先取url中key爲reconnect.perio的配置,若是沒有,則使用默認的3s
    this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
    // 每reconnectPeriod秒去鏈接,首次鏈接也延遲reconnectPeriod秒
    reconnectFuture = reconnectTimer.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            // Check and connect to the registry
            try {
                connect();
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
            }
        }
    }, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}

這個構造方法中有兩個關鍵點:微信

  1. 關於等待時間優先從url配置中取得,若是沒有這個值,再設置爲默認值3s。
  2. 建立了一個重連計時器,必定的間隔時間去檢查是否斷開,若是斷開就進行鏈接。
3.connect

該方法是鏈接註冊中心的實現,來看看源碼:app

protected final void connect() {
    try {
        // Check whether or not it is connected
        // 檢查註冊中心是否已鏈接
        if (isAvailable()) {
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Reconnect to registry " + getUrl());
        }
        // 得到客戶端鎖
        clientLock.lock();
        try {
            // Double check whether or not it is connected
            // 二次查詢註冊中心是否已經鏈接
            if (isAvailable()) {
                return;
            }
            // 恢復註冊和訂閱
            recover();
        } finally {
            // 釋放鎖
            clientLock.unlock();
        }
    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
        if (getUrl().getParameter(Constants.CHECK_KEY, true)) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            }
            throw new RuntimeException(t.getMessage(), t);
        }
        logger.error("Failed to connect to registry " + getUrl().getAddress() + " from provider/consumer " + NetUtils.getLocalHost() + " use dubbo " + Version.getVersion() + ", cause: " + t.getMessage(), t);
    }
}

咱們能夠看到這裏的重連機制其實就是調用了父類FailbackRegistry的recover方法,關於recover方法我在《dubbo源碼解析(三)註冊中心——開篇》中已經講解過了。還有要關注的就是須要保證客戶端線程安全。須要得到鎖和釋放鎖。ide

4.isAvailable

該方法就是用來檢查註冊中心是否鏈接,源碼以下:函數

public boolean isAvailable() {
    if (registryInvoker == null)
        return false;
    return registryInvoker.isAvailable();
}
5.destroy

該方法是銷燬方法,主要是銷燬重連計時器、註冊中心的Invoker和任務調度器,源碼以下:

@Override
public void destroy() {
    super.destroy();
    try {
        // Cancel the reconnection timer
        // 取消從新鏈接計時器
        if (!reconnectFuture.isCancelled()) {
            reconnectFuture.cancel(true);
        }
    } catch (Throwable t) {
        logger.warn("Failed to cancel reconnect timer", t);
    }
    // 銷燬註冊中心的Invoker
    registryInvoker.destroy();
    // 關閉任務調度器
    ExecutorUtil.gracefulShutdown(reconnectTimer, reconnectPeriod);
}

這裏用到了ExecutorUtil中的gracefulShutdown,由於ExecutorUtil是common模塊中的類,我在第一篇中講到我會穿插在各個文章中介紹這個模塊,因此我咋這裏介紹一下這個gracefulShutdown方法,咱們能夠看一下這個源碼:

public static void gracefulShutdown(Executor executor, int timeout) {
    if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
        return;
    }
    final ExecutorService es = (ExecutorService) executor;
    try {
        // Disable new tasks from being submitted
        // 中止接收新的任務而且等待已經提交的任務(包含提交正在執行和提交未執行)執行完成
        // 當全部提交任務執行完畢,線程池即被關閉
        es.shutdown();
    } catch (SecurityException ex2) {
        return;
    } catch (NullPointerException ex2) {
        return;
    }
    try {
        // Wait a while for existing tasks to terminate
        // 當等待超過設定時間時,會監測ExecutorService是否已經關閉,若是沒關閉,再關閉一次
        if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
            // 試圖中止全部正在執行的線程,再也不處理還在池隊列中等待的任務
            // ShutdownNow()並不表明線程池就必定當即就能退出,它可能必需要等待全部正在執行的任務都執行完成了才能退出。
            es.shutdownNow();
        }
    } catch (InterruptedException ex) {
        es.shutdownNow();
        Thread.currentThread().interrupt();
    }
    if (!isTerminated(es)) {
        newThreadToCloseExecutor(es);
    }
}

能夠看到這個銷燬任務調度器,也就是退出線程池,調用了shutdown、shutdownNow方法,這裏也替你們惡補了一下線程池關閉的方法區別。

6.doRegister && doUnregister && doSubscribe && doUnsubscribe && lookup

關於這個五個方法都是調用了RegistryService的方法,讀者可自主查看《dubbo源碼解析(三)註冊中心——開篇》來理解內部實現。

(二)DubboRegistryFactory

該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,是dubbo這種實現的註冊中心的工廠類,裏面作了一些初始化的處理,以及建立註冊中心DubboRegistry的對象實例。由於該類的屬性比較好理解,因此下面就不在展開講解了。

1.getRegistryURL

獲取註冊中心url,相似於初始化註冊中心url的方法。

private static URL getRegistryURL(URL url) {
    return url.setPath(RegistryService.class.getName())
            // 移除暴露服務和引用服務的參數
            .removeParameter(Constants.EXPORT_KEY).removeParameter(Constants.REFER_KEY)
            // 添加註冊中心服務接口class值
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            // 啓用sticky 粘性鏈接,讓客戶端老是鏈接同一提供者
            .addParameter(Constants.CLUSTER_STICKY_KEY, "true")
            // 決定在建立客戶端時創建鏈接
            .addParameter(Constants.LAZY_CONNECT_KEY, "true")
            // 不重連
            .addParameter(Constants.RECONNECT_KEY, "false")
            // 方法調用超時時間爲10s
            .addParameterIfAbsent(Constants.TIMEOUT_KEY, "10000")
            // 每一個客戶端上一個接口的回調服務實例的限制爲10000個
            .addParameterIfAbsent(Constants.CALLBACK_INSTANCES_LIMIT_KEY, "10000")
            // 註冊中心鏈接超時時間10s
            .addParameterIfAbsent(Constants.CONNECT_TIMEOUT_KEY, "10000")
            // 添加方法級配置
            .addParameter(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(Wrapper.getWrapper(RegistryService.class).getDeclaredMethodNames())), ","))
            //.addParameter(Constants.STUB_KEY, RegistryServiceStub.class.getName())
            //.addParameter(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()) //for event dispatch
            //.addParameter(Constants.ON_DISCONNECT_KEY, "disconnect")
            .addParameter("subscribe.1.callback", "true")
            .addParameter("unsubscribe.1.callback", "false");
}

看上面的源碼能夠很直白的看書就是對url中的配置作一些初始化設置。幾乎每一個key對應的意義我都在上面展現了,會比較好理解。

2.createRegistry

該方法就是實現了AbstractRegistryFactory抽象出來的createRegistry方法,該子類就只關注createRegistry方法,其餘公共的邏輯都在AbstractRegistryFactory已經實現。看一下源碼:

@Override
public Registry createRegistry(URL url) {
    // 相似於初始化註冊中心
    url = getRegistryURL(url);
    List<URL> urls = new ArrayList<URL>();
    // 移除備用的值
    urls.add(url.removeParameter(Constants.BACKUP_KEY));
    String backup = url.getParameter(Constants.BACKUP_KEY);
    if (backup != null && backup.length() > 0) {
        // 分割備用地址
        String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup);
        for (String address : addresses) {
            urls.add(url.setAddress(address));
        }
    }
    // 建立RegistryDirectory,裏面有多個Registry的Invoker
    RegistryDirectory<RegistryService> directory = new RegistryDirectory<RegistryService>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
    // 將directory中的多個Invoker假裝成一個Invoker
    Invoker<RegistryService> registryInvoker = cluster.join(directory);
    // 代理
    RegistryService registryService = proxyFactory.getProxy(registryInvoker);
    // 建立註冊中心對象
    DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // 通知監聽器
    directory.notify(urls);
    // 訂閱
    directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters()));
    return registry;
}

在這個方法中作了實例化了DubboRegistry,而且作了通知和訂閱的操做。相關集羣、代理以及包含了不少Invoker的Directory我會在後續文章中講到。

後記

該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...

該文章講解了dubbo的註冊中心默認實現,默認實現不少都是調用了以前api中講解到的方法,並無重寫過多的方法,其中已經涉及到集羣之類的內容,請關注我後續的文章,講解這些內容。若是我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,個人私人微信號碼:HUA799695226。

相關文章
相關標籤/搜索