RegistryDirectory,基於註冊中心的服務發現,本文將重點探討Dubbo是如何實現服務的自動註冊與發現。從上篇文章,得知在消息消費者在建立服務調用器(Invoker)【消費者在初始時】時須要根據不一樣的協議,例如dubbo、registry(從註冊中心獲取服務提供者)來構建,其調用的方法爲Protocol#refer,基於註冊中心發現服務提供者的實現協議爲RegistryProtocol。數組
RegistryProtocol#refer ----> doRefer方法。緩存
RegistryProtocol#doRefer架構
private <t> Invoker<t> doRefer(Cluster cluster, Registry registry, Class<t> type, URL url) { // @1 RegistryDirectory<t> directory = new RegistryDirectory<t>(type, url); // @2 directory.setRegistry(registry); directory.setProtocol(protocol); // @3 // all attributes of REFER_KEY Map<string, string> parameters = new HashMap<string, string>(directory.getUrl().getParameters()); // @4 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // @5 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // @6 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // @7 Invoker invoker = cluster.join(directory); // @8 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); // @9 return invoker; }
代碼@1:參數詳解併發
代碼@2:構建RegistryDirectory對象,基於註冊中心動態發現服務提供者(服務提供者新增或減小),本節重點會剖析該類的實現細節。 代碼@3:爲RegistryDirectory設置註冊中心、協議。 代碼@4:獲取服務消費者的配置屬性。 代碼@5:構建消費者URL,例如:app
consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185
代碼@6:向註冊中心消息消費者:框架
consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185
相比第5步的URL,增長了category=consumers、check=false,其中category表示在註冊中心的命名空間,這裏表明消費端。該步驟的做用就是向註冊中心爲服務增長一個消息消費者,其生成的效果以下:【以zookeeper爲例】。 分佈式
代碼@7:爲消息消費者添加category=providers,configurators,routers屬性後,而後向註冊中心訂閱該URL,關注該服務下的providers,configurators,routers發生變化時通知RegistryDirectory,以便及時發現服務提供者、配置、路由規則的變化。
consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185
其訂閱關係調用的入口爲:RegistryDirectory#subscribe方法,是接下來須要重點分析的重點。 代碼@8:根據Directory,利用集羣策略返回集羣Invoker。 代碼@9:緩存服務消費者、服務提供者對應關係。ide
從這裏發現,服務的註冊與發現與RegistryDirectory聯繫很是緊密,接下來讓咱們來詳細分析RegistryDirectory的實現細節。高併發
public RegistryDirectory(Class<t> serviceType, URL url) { // @1 super(url); if (serviceType == null) throw new IllegalArgumentException("service type is null."); if (url.getServiceKey() == null || url.getServiceKey().length() == 0) throw new IllegalArgumentException("registry serviceKey is null."); this.serviceType = serviceType; this.serviceKey = url.getServiceKey(); // @2 this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // @3 this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY); //@4 String group = directoryUrl.getParameter(Constants.GROUP_KEY, ""); this.multiGroup = group != null && ("*".equals(group) || group.contains(",")); String methods = queryMap.get(Constants.METHODS_KEY); this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods); // @5 }
代碼@1:參數描述,serviceType:消費者引用的服務< dubbo:reference interface="" .../>;URL url:註冊中心的URL,例如:源碼分析
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=5552&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5552%26qos.port%3D33333%26register.ip%3D192.168.56.1%26side%3Dconsumer%26timestamp%3D1528379076123&timestamp=1528379076179
代碼@2:獲取註冊中心URL的serviceKey:com.alibaba.dubbo.registry.RegistryService。 代碼@3:獲取註冊中心URL消費提供者的全部配置參數:從url屬性的refer。 代碼@4:初始化haulovverrideDirecotryUrl、directoryUrl:註冊中心的URL,移除監控中心以及其餘屬性值,只保留消息消費者的配置屬性。 代碼@5:獲取服務消費者單獨配置的方法名dubbo:method。
public void subscribe(URL url) { setConsumerUrl(url); // @1 registry.subscribe(url, this); // @2 }
代碼@1:設置RegistryDirectory的consumerUrl爲消費者URL。 代碼@2:調用註冊中心訂閱消息消息消費者URL,首先看一下接口Registry#subscribe的接口聲明: RegistryService:void subscribe(URL url, NotifyListener listener); 這裏傳入的NotifyListener爲RegistryDirectory,其註冊中心的subscribe方法暫時不深刻去跟蹤,不過根據上面URL上面的特色,應該能猜出以下實現關鍵點:
consumer://192.168.56.1/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9892&qos.port=33333&side=consumer&timestamp=1528380277185
首先該方法是在註冊中心providers、configurators、routers目錄下的節點發生變化後,通知RegistryDirectory,已便更新最新信息,實現」動態「發現機制。
RegistryDirectory#notify
List<url> invokerUrls = new ArrayList<url>(); List<url> routerUrls = new ArrayList<url>(); List<url> configuratorUrls = new ArrayList<url>(); for (URL url : urls) { String protocol = url.getProtocol(); // @1 String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); // @2 if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { // @3 routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { // @4 configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { // @5 invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } }
Step1:根據通知的URL的前綴,分別添加到:invokerUrls(提供者url)、routerUrls(路由信息)、configuratorUrls (配置url)。 代碼@1:從url中獲取協議字段,例如condition://、route://、script://、override://等。 代碼@2:獲取url的category,在註冊中心的命令空間,例如:providers、configurators、routers。 代碼@3:若是category等於routers或協議等於route,則添加到routerUrls中。 代碼@4:若是category等於configurators或協議等於override,則添加到configuratorUrls中。 代碼@5:若是category等於providers,則表示服務提供者url,加入到invokerUrls中。
RegistryDirectory#notify
// configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { this.configurators = toConfigurators(configuratorUrls); }
Step2:將configuratorUrls轉換爲配置對象List< Configurator> configurators,該方法將在《源碼分析Dubbo配置規則實現細節》一文中詳細講解。
RegistryDirectory#notify
// routers if (routerUrls != null && !routerUrls.isEmpty()) { List<router> routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } }
Step3:將routerUrls路由URL轉換爲Router對象,該部份內容將在《源碼分析Dubbo路由機制實現細節》一文中詳細分析。
RegistryDirectory#notify
// providers refreshInvoker(invokerUrls);
Step4:根據回調通知刷新服務提供者集合。
RegistryDirectory#refreshInvoker
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.methodInvokerMap = null; // Set the method invoker map to null destroyAllInvokers(); // Close all invokers }
Step1:若是invokerUrls不爲空而且長度爲1,而且協議爲empty,表示該服務的全部服務提供者都下線了。須要銷燬當前全部的服務提供者Invoker。
RegistryDirectory#refreshInvoker
this.forbidden = false; // Allow to access Map<string, invoker<t>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<url>(); this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison } if (invokerUrls.isEmpty()) { return; }
Step2: 若是invokerUrls爲空,而且已緩存的invokerUrls不爲空,將緩存中的invoker url複製到invokerUrls中,這裏能夠說明若是providers目錄未發送變化,invokerUrls則爲空,表示使用上次緩存的服務提供者URL對應的invoker;若是invokerUrls不爲空,則用iinvokerUrls中的值替換原緩存的invokerUrls,這裏說明,若是providers發生變化,invokerUrls中會包含此時註冊中心全部的服務提供者。若是invokerUrls爲空,則無需處理,結束本次更新服務提供者Invoker操做。
RegistryDirectory#refreshInvoker
Map<string, invoker<t>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map Map<string, list<invoker<t>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
Step3:將invokerUrls轉換爲對應的Invoke,而後根據服務級的url:invoker映射關係建立method:List< Invoker>映射關係,將在下文相信分析。
RegistryDirectory#refreshInvoker
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); }
Step4:若是支持multiGroup機制,則合併methodInvoker,將在下文分析,而後根據toInvokers、toMethodInvokers刷新當前最新的服務提供者信息。
RegistryDirectory#toInvokers
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { // ... }
Step1:獲取消息消費者URL中的協議類型,< dubbo:reference protocol="" .../>屬性值,而後遍歷全部的Invoker Url(服務提供者URL)。
RegistryDirectory#toInvokers
if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } }
Step2: 從這一步開始,代碼都包裹在for(URL providerUrl : urls)中,一個一個處理提供者URL。若是dubbo:referecnce標籤的protocol不爲空,則須要對服務提供者URL進行過濾,匹配其協議與protocol屬性相同的服務,若是不匹配,則跳事後續處理邏輯,接着處理下一個服務提供者URL。
RegistryDirectory#toInvokers
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; }
Step3:若是協議爲empty,跳過,處理下一個服務提供者URL。
RegistryDirectory#toInvokers
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; }
Step4:驗證服務提供者協議,若是不支持,則跳過。
RegistryDirectory#toInvokers
URL url = mergeUrl(providerUrl);
Step5:合併URL中的屬性,其具體實現細節以下:
String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key);
Step6:獲取url全部屬性構成的key,該key也是RegistryDirectory中Map<string, invoker<t>> urlInvokerMap;中的key。
RegistryDirectory#toInvokers
Map<string, invoker<t>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<t> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { enabled = !url.getParameter(Constants.DISABLED_KEY, false); } else { enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { invoker = new InvokerDelegate<t>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache newUrlInvokerMap.put(key, invoker); } } else { newUrlInvokerMap.put(key, invoker); }
Step7:若是localUrlInvokerMap中未包含invoker而且該provider狀態爲啓用,則建立該URL對應的Invoker,並添加到newUrlInvokerMap中。toInvokers運行結束後,回到refreshInvoker方法中繼續往下執行,根據 最新的服務提供者映射關係Map< String,Invoker>,構建Map< String,List< Invoker>>,其中鍵爲methodName。而後更新RegistryDirectory的urlInvokerMap、methodInvokerMap屬性,並銷燬老的Invoker對象,完成一次路由發現過程。
上面整個過程完成了一次動態服務提供者發現流程,下面再分析一下RegistryDirectory的另一個重要方法,doList,再重複一遍RegistryDirectory的做用,服務提供者目錄服務,在集羣Invoker的實現中,內部持有一個Direcotry對象,在進行服務調用以前,首先先從衆多的Invoker中選擇一個來執行,那衆多的Invoker從哪來呢?其來源於集羣Invoker中會調用Direcotry的public List< Invoker< T>> list(Invocation invocation),首先將調用AbstractDirectory#list方法,而後再內部調用doList方法,doList方法有其子類實現。
RegistryDirectory#doList
if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); }
Step1:若是禁止訪問(若是沒有服務提供者,或服務提供者被禁用),則拋出沒有提供者異常。
RegistryDirectory#doList
Map<string, list<invoker<t>>> localMethodInvokerMap = this.methodInvokerMap; // local reference if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter } if (invokers == null) { invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } if (invokers == null) { Iterator<list<invoker<t>>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } return invokers == null ? new ArrayList<invoker<t>>(0) : invokers;
Step2:根據方法名稱,從Map< String,List< Invoker>>這個集合中找到合適的List< Invoker>,若是方法名未命中,則返回全部的Invoker,localMethodInvokerMap中方法名,主要是dubbo:service的子標籤dubbo:method,最終返回invokers。
本文詳細介紹了服務消費者基於註冊中心的服務發現機制,其中對routers(路由)與configurators(override協議)並未詳細展開,下節先重點分析configurators與routers(路由)實現細節。
總結一下服務註冊與發現機制: 基於註冊 中心的事件通知(訂閱與發佈),一切支持事件訂閱與發佈的框架均可以做爲Dubbo註冊中心的選型。
服務提供者在暴露服務時,會向註冊中心註冊本身,具體就是在${service interface}/providers目錄下添加 一個節點(臨時),服務提供者須要與註冊中心保持長鏈接,一旦鏈接斷掉(重試鏈接)會話信息失效後,註冊中心會認爲該服務提供者不可用(提供者節點會被刪除)。
消費者在啓動時,首先也會向註冊中心註冊本身,具體在${interface interface}/consumers目錄下建立一個節點。
消費者訂閱${service interface}/ [ providers、configurators、routers ]三個目錄,這些目錄下的節點刪除、新增事件都胡通知消費者,根據通知,重構服務調用器(Invoker)。
以上就是Dubbo服務註冊與動態發現機制的原理與實現細節。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。
</invoker<t></list<invoker<t></string,></t></t></string,></string,></string,></string,></url></string,></router></url></url></url></url></url></url></t></t></string,></string,></t></t></t></t></t>