Dubbo系列之 (六)服務訂閱(3)

輔助連接

Dubbo系列之 (一)SPI擴展

Dubbo系列之 (二)Registry註冊中心-註冊(1)

Dubbo系列之 (三)Registry註冊中心-註冊(2)

Dubbo系列之 (四)服務訂閱(1)

Dubbo系列之 (五)服務訂閱(2)

Dubbo系列之 (六)服務訂閱(3)

RegistryDirectory

當RegistryDirectory#substribe()方法被RegistryProtocol#refer()方法調用時,本地服務消費端會與註冊中心交互,拉取最新的服務提供者,並與這些服務提供者創建TCP鏈接。html

public void subscribe(URL url) {
        setConsumerUrl(url);
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        registry.subscribe(url, this);
    }

從上面的代碼塊能夠知道RegistryDirectory直接調用的註冊中心的substribe()方法。咱們以ZookeeperRegistry爲例,查看其方法doSubscribe()。緩存

public void doSubscribe(final URL url, final NotifyListener listener) {       
    ......
            } else {
                // 正常服務訂閱
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    //監聽,當目錄變動時,調用notify方法
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //訂閱節點後,要拉取節點的最新數據
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

從上面代碼,能夠知道就是監聽zookeeper上的providers,routers,configurations節點,並註冊其監聽器。當訂閱完這些節點後,須要從新拉取最新的提供者數據,即調用其notify()方法。服務器

notify方法的做用

notify()方法最終會調用RegistryDirectory的notify()方法。該方法的主要完成以下內容:
一、獲得zookeeper的configurations節點下的URLS,並轉化爲configurators
二、獲得zookeeper的routers節點下的URLS,並轉化爲Routers
三、激活3.x的AddressListener特性
四、獲得zookeeper的providers節點下的URLS,與其服務提供者建立TCP連接,把URL轉化爲 invokerapp

咱們主要來看下第四點,其方法爲refreshOverrideAndInvoker()。框架

private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }

該方法主要是2個操做,1個是若是必要的話,從新覆蓋訂閱的URL,由於dubbo的服務調用URL的一些配置,好比路由,mock能夠在monitor中心進行動態修改。因此須要從新覆蓋本地的URL一些參數。二、是經過refreshInvoker()與服務端創建TCP連接。less

/**
     *
     * 把提供者者的URL List 轉化爲 Invoker Map結合,轉化規則以下:
     * Convert the invokerURL list to the Invoker Map. The rules of the conversion are as follows:
     * <ol>
     *
     * <li> If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache,
     * and notice that any parameter changes in the URL will be re-referenced.</li>
     * 若是URL已經在緩存中,則不用從新引用該服務提供者(即從新創建TCP鏈接),若是URL的參數變動須要從新引用。
     *
     *
     * <li>If the incoming invoker list is not empty, it means that it is the latest invoker list.</li>
     *  若是傳入的調用程序列表不是空的,這意味着它是最新的調用程序列表
     *

     * <li>If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route
     * rule, which needs to be re-contrasted to decide whether to re-reference.</li>
     * </ol>
     * 若是傳入的invokerUrl列表爲空,則意味着該規則只是一個覆蓋規則或路由規則,須要對其進行從新對比以決定是否從新引用
     *
     * @param invokerUrls this parameter can't be null
     */
    // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated.
    private void refreshInvoker(List<URL> invokerUrls) {
        Assert.notNull(invokerUrls, "invokerUrls should not be null");


        //若是隻有一個提供者,且爲空協議,則禁止連接和銷燬invoker
        if (invokerUrls.size() == 1
                && invokerUrls.get(0) != null
                && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

            this.forbidden = true; // Forbid to access
            this.invokers = Collections.emptyList();
            routerChain.setInvokers(this.invokers);
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls == Collections.<URL>emptyList()) {
                invokerUrls = new ArrayList<>();
            }
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

            /**
             * If the calculation is wrong, it is not processed.
             *
             * 1. The protocol configured by the client is inconsistent with the protocol of the server.
             *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
             * 2. The registration center is not robust and pushes illegal specification data.
             *
             */
            if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                        .toString()));
                return;
            }

            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
            // pre-route and build cache, notice that route cache should build on original Invoker list.
            // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
            routerChain.setInvokers(newInvokers);
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;

            try {
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<>();
        //目前的協議
        String queryProtocols = this.queryMap.get(PROTOCOL_KEY);

        //服務提供方URLproviderUrl ,看這些提供方是否支持目前協議
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }

            //空協議過濾
            if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }

            //沒有在Spi框架找不大的擴展點,過濾
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }

            // 合併url,通常參數配置可能配置在消費端,提供端,須要進行合併。合併規則爲:override(配置中心) > -D(啓動運行指定) >Consumer(消費端) > Provider(提供方)
            URL url = mergeUrl(providerUrl);


            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            /**
             *key:是沒有合併消費端端配置參數的Url(provider端),
             * 緩存鍵是不與用戶端參數合併的url,不管用戶如何合併參數,若是服務器url更改,則再次引用
             *
             *
             */
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) {// 看本地緩存是否存在,若是存在// Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(DISABLED_KEY)) {//是否disable
                        enabled = !url.getParameter(DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(ENABLED_KEY, true);
                    }
                    if (enabled) {
                        /**
                         * 把rpc invoker 、mergeUrl(override > -D >Consumer > Provider 參數內容),原providerUrl
                         * url: getProtocol()=dubbo
                         */
                        invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

上面的註釋已經很是清楚,不在詳細講解。最後因此提供者的URL,會被轉化爲InvokerDelegate。該類表明一個Invoker對象的委派類,裏面包括真實的Invoker和相應的提供者的URL。並把這些InvokerDelegate放入到newUrlInvokerMap成員變量上。異步

Protocol.refer()的博大精深

來看下以下這條語句:
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
又是經過protocol.refer(serviceType, url)獲取一個Invoker,此時URL的getProtocol()==dubbo,因此會調用DubboProtocol#refer()方法。而DubboProtocol的refer()仍是AbstractProtocol#refer()。ide

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        /**
         * 異步轉同步Invoker
         */
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

即返回一個異步轉同步的AsyncToSyncInvoker。DubboProtocol實現模板方法protocolBindingRefer()。優化

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        // 優化序列化內容,目前沒什麼內容
        optimizeSerialization(url);

        // create rpc invoker.
        /**
         *
         * 建立RPC DubboInvoker
         */
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

該方法主要建立了DubboInvoker對象,並放入invokers中,經過getClients()方法獲得具體的TCP鏈接客戶端ExchangeClient。ui

總結

從上面的分析能夠知道,服務訂閱的過程,服務拉取的方式是經過通知這種方式來獲取,而且知道了Invoker的具體一個實現DubboInvoker和TCP鏈接客戶端ExchangeClient進行關聯的,在下一章咱們將剖析ExchangeClient是如何實現的。

相關文章
相關標籤/搜索