當RegistryDirectory#substribe()方法被RegistryProtocol#refer()方法調用時,本地服務消費端會與註冊中心交互,拉取最新的服務提供者,並與這些服務提供者創建TCP鏈接。html
public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); }
從上面的代碼塊能夠知道RegistryDirectory直接調用的註冊中心的substribe()方法。咱們以ZookeeperRegistry爲例,查看其方法doSubscribe()。緩存
public void doSubscribe(final URL url, final NotifyListener listener) { ...... } else { // 正常服務訂閱 List<URL> urls = new ArrayList<>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); //監聽,當目錄變動時,調用notify方法 ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } //訂閱節點後,要拉取節點的最新數據 notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
從上面代碼,能夠知道就是監聽zookeeper上的providers,routers,configurations節點,並註冊其監聽器。當訂閱完這些節點後,須要從新拉取最新的提供者數據,即調用其notify()方法。服務器
notify()方法最終會調用RegistryDirectory的notify()方法。該方法的主要完成以下內容:
一、獲得zookeeper的configurations節點下的URLS,並轉化爲configurators
二、獲得zookeeper的routers節點下的URLS,並轉化爲Routers
三、激活3.x的AddressListener特性
四、獲得zookeeper的providers節點下的URLS,與其服務提供者建立TCP連接,把URL轉化爲 invokerapp
咱們主要來看下第四點,其方法爲refreshOverrideAndInvoker()。框架
private void refreshOverrideAndInvoker(List<URL> urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); refreshInvoker(urls); }
該方法主要是2個操做,1個是若是必要的話,從新覆蓋訂閱的URL,由於dubbo的服務調用URL的一些配置,好比路由,mock能夠在monitor中心進行動態修改。因此須要從新覆蓋本地的URL一些參數。二、是經過refreshInvoker()與服務端創建TCP連接。less
/** * * 把提供者者的URL List 轉化爲 Invoker Map結合,轉化規則以下: * Convert the invokerURL list to the Invoker Map. The rules of the conversion are as follows: * <ol> * * <li> If URL has been converted to invoker, it is no longer re-referenced and obtained directly from the cache, * and notice that any parameter changes in the URL will be re-referenced.</li> * 若是URL已經在緩存中,則不用從新引用該服務提供者(即從新創建TCP鏈接),若是URL的參數變動須要從新引用。 * * * <li>If the incoming invoker list is not empty, it means that it is the latest invoker list.</li> * 若是傳入的調用程序列表不是空的,這意味着它是最新的調用程序列表 * * <li>If the list of incoming invokerUrl is empty, It means that the rule is only a override rule or a route * rule, which needs to be re-contrasted to decide whether to re-reference.</li> * </ol> * 若是傳入的invokerUrl列表爲空,則意味着該規則只是一個覆蓋規則或路由規則,須要對其進行從新對比以決定是否從新引用 * * @param invokerUrls this parameter can't be null */ // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated. private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); //若是隻有一個提供者,且爲空協議,則禁止連接和銷燬invoker if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; } Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map /** * If the calculation is wrong, it is not processed. * * 1. The protocol configured by the client is inconsistent with the protocol of the server. * eg: consumer protocol = dubbo, provider only has other protocol services(rest). * 2. The registration center is not robust and pushes illegal specification data. * */ if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls .toString())); return; } List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<>(); //目前的協議 String queryProtocols = this.queryMap.get(PROTOCOL_KEY); //服務提供方URLproviderUrl ,看這些提供方是否支持目前協議 for (URL providerUrl : urls) { // If protocol is configured at the reference side, only the matching protocol is selected if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } //空協議過濾 if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } //沒有在Spi框架找不大的擴展點,過濾 if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } // 合併url,通常參數配置可能配置在消費端,提供端,須要進行合併。合併規則爲:override(配置中心) > -D(啓動運行指定) >Consumer(消費端) > Provider(提供方) URL url = mergeUrl(providerUrl); String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key); // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again /** *key:是沒有合併消費端端配置參數的Url(provider端), * 緩存鍵是不與用戶端參數合併的url,不管用戶如何合併參數,若是服務器url更改,則再次引用 * * */ Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) {// 看本地緩存是否存在,若是存在// Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(DISABLED_KEY)) {//是否disable enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } if (enabled) { /** * 把rpc invoker 、mergeUrl(override > -D >Consumer > Provider 參數內容),原providerUrl * url: getProtocol()=dubbo */ invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
上面的註釋已經很是清楚,不在詳細講解。最後因此提供者的URL,會被轉化爲InvokerDelegate。該類表明一個Invoker對象的委派類,裏面包括真實的Invoker和相應的提供者的URL。並把這些InvokerDelegate放入到newUrlInvokerMap成員變量上。異步
來看下以下這條語句:
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
又是經過protocol.refer(serviceType, url)獲取一個Invoker,此時URL的getProtocol()==dubbo,因此會調用DubboProtocol#refer()方法。而DubboProtocol的refer()仍是AbstractProtocol#refer()。ide
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { /** * 異步轉同步Invoker */ return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); }
即返回一個異步轉同步的AsyncToSyncInvoker。DubboProtocol實現模板方法protocolBindingRefer()。優化
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException { // 優化序列化內容,目前沒什麼內容 optimizeSerialization(url); // create rpc invoker. /** * * 建立RPC DubboInvoker */ DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
該方法主要建立了DubboInvoker對象,並放入invokers中,經過getClients()方法獲得具體的TCP鏈接客戶端ExchangeClient。ui
從上面的分析能夠知道,服務訂閱的過程,服務拉取的方式是經過通知這種方式來獲取,而且知道了Invoker的具體一個實現DubboInvoker和TCP鏈接客戶端ExchangeClient進行關聯的,在下一章咱們將剖析ExchangeClient是如何實現的。