dubbo之Zookeeper註冊中心

目前dubbo支持多種註冊中心:Zookeeper、Redis、Simple、Multicast、Etcd3。數組

本編文章是分析使用Zookeeper做爲註冊中心,dubbo如何整合Zookeeper進行服務註冊和訂閱服務。app

首先dubbo將服務註冊到Zookeeper後,目錄結構以下所示:(註冊接口名:com.bob.dubbo.service.CityDubboService)
圖片描述ide

在consumer和provider服務啓動的時候,去把自身URL格式化成字符串,而後註冊到zookeeper相應節點下,做爲臨時節點,斷開鏈接後,節點刪除;consumer啓動時,不只會訂閱服務,同時也會將本身的URL註冊到zookeeper中;函數

ZookeeperRegistry

ZookeeperRegistry:dubbo與zookeeper交互主要的類,已下結合源碼進行分析,先來看學習

  • doSubcribe()

這個方法主要是用於訂閱服務,添加監聽器,動態監聽提供者列表變化:this

@Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 處理全部service層發起的訂閱,例如監控中心的訂閱
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
                // 處理指定service層發起的訂閱,例如服務消費者的訂閱
            } else {
                List<URL> urls = new ArrayList<>();
                // 循環分類數組 , router, configurator, provider
                for (String path : toCategoriesPath(url)) {
                    // 得到 url 對應的監聽器集合
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {// 不存在,進行建立
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    // 得到 ChildListener 對象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {//  不存在子目錄的監聽器,進行建立 ChildListener 對象
                        // 訂閱父級目錄, 當有子節點發生變化時,觸發此回調函數,回調listener中的notify()方法
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    建立Type節點,此節點爲持久節點
                    zkClient.create(path, false);
                    // 向 Zookeeper ,PATH 節點,發起訂閱,返回此節點下的全部子元素 path : /根節點/接口全名/providers, 好比 : /dubbo/com.bob.service.CityService/providers
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 首次全量數據獲取完成時,調用 `#notify(...)` 方法,回調 NotifyListener, 在這一步從鏈接Provider,實例化Invoker
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
  • register()

ZookeeperRegistry父類FailbackRegistry中的方法,用於將服務註冊到zookeeper,具體代碼以下:url

@Override
    public void register(URL url) {
        // 調用父類AbstractRegistry中的register()方法,將url存儲到註冊集合中
        super.register(url);
        // 若是以前這個url註冊失敗,則會從註冊失敗集合中刪除
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // 像註冊中心發送註冊請求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // 將url存入註冊失敗集合中,進行重試try()
            addFailedRegistered(url);
        }
    }
  • doRegister()

ZookeeperRegistry類中的方法spa

@Override
    public void doRegister(URL url) {
        try {
            // 經過zookeeper客戶端向註冊中心發送服務註冊請求,在zookeeper下建立服務對應的節點
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在介紹註冊registry()方法的時候,解析到了FailbackRegistry類,接下來我們來分析一下這個類的做用:線程

FailbackRegistry

這個類是ZookeeperRegistry的父類,經過分析該類的結構,主要是用於服務的註冊、訂閱、重試,而服務具體的註冊、訂閱又在ZookeeperRegistry子類進行了實現,如今咱們來分析重試這個功能,服務暴露和訂閱的配置文件中通常會設置重試這個屬性,以下所示:code

<dubbo:service interface="com.fy.view.service.ProductManageService" ref="productManageServiceImpl" retries="2"/>

上面是一個服務暴露的示例,設置了retries屬性,表示重試的次數。接下來我們就以註冊重試進行分析(服務訂閱是一樣的原理):在註冊registry()方法中(代碼上面已提供),在異常catch{}代碼塊中有一個addFailedRegistered(url)方法,這個就是將註冊失敗的url添加到集合中,並建立一個重試的任務FailedRegisteredTask(url, this),代碼以下:

private void addFailedRegistered(URL url) {
        // 先從集合中獲取,若是存在,直接返回
        FailedRegisteredTask oldOne = failedRegistered.get(url);
        if (oldOne != null) {
            return;
        }
        // 本地集合不存在,則建立重試定時任務,默認每隔5s執行
        FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
        oldOne = failedRegistered.putIfAbsent(url, newTask);
        if (oldOne == null) {
            // 將定時任務放置在HashedWheelTimer這個處理定時任務的容器,(HashedWheelTimer執行原理,能夠自行查找資料,這裏就不介紹)
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }

我們下來看FailedRegisteredTask這個定時任務,有哪些東西,FailedRegisteredTask是AbstractRetryTask的子類,在執行new FailedRegisteredTask(url, this)代碼時,其實調用的是父類構造函數,其中retryTimes表示重試的次數,在沒有配置的狀況下,默認重試三次:

AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) {
        if (url == null || StringUtils.isBlank(taskName)) {
            throw new IllegalArgumentException();
        }
        this.url = url;
        this.registry = registry;
        this.taskName = taskName;
        cancel = false;
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 重試次數,默認狀況下重試三次
        this.retryTimes = url.getParameter(Constants.REGISTRY_RETRY_TIMES_KEY, Constants.DEFAULT_REGISTRY_RETRY_TIMES);
    }

在AbstractRetryTask類中有一個run()方法,在run()方法會根據XML配置文件中的retries屬性值進行比較來進行重試,若是沒有達到重試次數,則會調用doRetry(url, registry, timeout),而這個方法又在子類具體實現,這裏我以註冊FailedRegisteredTask舉例:

@Override
    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
            // other thread cancel this timeout or stop the timer.
            return;
        }
        // 重試次數與設置的retries進行比較,超過則不在進行重試
        if (times > retryTimes) {
            // reach the most times of retry.
            logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times.");
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info(taskName + " : " + url);
        }
        try {
            // 調用子類實現,進行重試
            doRetry(url, registry, timeout);
        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
            logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
            // reput this task when catch exception.
            reput(timeout, retryPeriod);
        }
    }

在子類FailedRegisteredTask中doRetry()方法具體實現:

public final class FailedRegisteredTask extends AbstractRetryTask {

    private static final String NAME = "retry register";

    public FailedRegisteredTask(URL url, FailbackRegistry registry) {
        super(url, registry, NAME);
    }

    @Override
    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
        // 調用ZookeeperRegistry類中的doRegister()方法進行註冊
        registry.doRegister(url);
        registry.removeFailedRegisteredTask(url);
    }
}

分析到這裏,有個疑問:重試任務已經封裝了,任務何時去執行,怎麼執行的?其實在上面我們就分析到過,就是使用了HashedWheelTimer,這個類是在ZookeeperRegistry類初始化的時候就會去初始化:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        // 這個地方進行初始化的:初始化父類FailbackRegistry
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }
public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

        // 建立HashedWheelTimer對象
        retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
    }

而後在addFailedRegistered()方法中有retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);這樣的一條代碼,這個就是執行任務的開始點:

@Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }
        // 開啓輪詢任務
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

調用start()方法時,開啓一個線程work去輪詢存儲到HashedWheelTimer容器的任務,而後調用任務中的run()方法,

public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    // 開啓work線程,執行work線程中的run()方法
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }
@Override
        public void run() {
            // Initialize the startTime.
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            startTimeInitialized.countDown();

            do {
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    transferTimeoutsToBuckets();
                    // 執行重試任務
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket : wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (; ; ) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }
void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            while (timeout != null) {
                // 輪詢獲取重試任務
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        // 執行重試任務
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds--;
                }
                timeout = next;
            }
        }
public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                // 調用任務中的run()方法,(如:AbstractRetryTask任務中的run()方法,在去調用子類FailedRegisteredTask中的doRetry()方法進行重試註冊)
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }

在上面對於HashedWheelTimer的具體實現原理,並無進行詳細的進行分析,若是想了解的和學習的話,能夠自行查找資料。

相關文章
相關標籤/搜索