本篇文章,將開始分析 Dubbo 集羣容錯方面的源碼。集羣容錯源碼包含四個部分,分別是服務目錄 Directory、服務路由 Router、集羣 Cluster 和負載均衡 LoadBalance。 這四個接口都是 dubbo-cluster
public interface Directory<T> extends Node { // 1. 獲取 serviceInterface Class<T> getInterface(); // 2. 獲取指定 serviceInterface 對應的服務接口實例 List<Invoker<T>> list(Invocation invocation) throws RpcException; }
總結: Directory 只負責管理單個 serviceInterface 對應的實例。這裏出現了 Dubbo 領域模型中的兩個核心概念,Invoker
和 Invocation
,經過會話的參數 invocation 能夠獲取其可執行體 invokers 列表,進而發起遠程調用。apache
是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它表明一個可執行體,可向它發起 invoke 調用,它有多是一個本地的實現,也多是一個遠程的實現,也可能一個集羣實現。Invocation
是服務域,它是 Invoker 暴露和引用的主功能入口,它負責 Invoker 的生命週期管理。服務目錄目前內置的實現有兩個,分別爲 StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類。AbstractDirectory 實現了 Directory 接口。下面咱們來看一下他們的繼承體系圖。緩存
總結: 服務目錄 Directory
顧名思義,serviceInterface 對應的服務提供者是一成不變的,即從配置文件中讀取服務列表信息。RegistryDirectory
從註冊中心動態獲取指定 serviceInterface 對應的服務提供者。Directory 接口設計原則分析:負載均衡
核心方法爲 List<Invoker<T>> list(Invocation invocation)
,該方法只關注核心的功能,經過調用參數 invocation 獲取執行實例 invokers。Directory 接口自己只有讀,沒有寫功能。AbstractDirectory
定義了一些通用實現,增長了路由 routerChain 和訂閱者 consumerUrl 的信息。StaticDirectory/RegistryDirectory
具備寫的功能。StaticDirectory 是經過構造器傳入的,不能動態更新。而 RegistryDirectory 進一步實現了 NotifyListener 接口,當服務接口對應的註冊信息發生變化時會回調 notify(URL url, NotifyListener listener, List<URL> urls)
方法,通知 RegistryDirectory 更新 Invoker 列表,具備動態寫的功能。另外,Directory 繼承自 Node 接口,Node 這個接口繼承者比較多,像 Registry、Monitor、Invoker 等均繼承了這個接口。這個接口包含了一個獲取配置信息的方法 getUrl,實現該接口的類能夠向外提供配置信息。ide
StaticDirectory#getUrl 通常爲 null,RegistryDirectory 對應的 URL 示例以下。老實說在我看來 Dubbo URL 本質上是一個配置類,各類配置信息都會轉換成 URL,致使 URL 的理解有些困難,有時候真不知道這個 URL 到底表明什麼意思。oop
## RegistryDirectory#getUrl() -> Nacos 註冊中心地址 registry://®istry=nacos×tamp=1570940706897
StaticDirectory 很簡單,就不介紹了,下面主要介紹 RegistryDirectory 。源碼分析
上面也提到了 RegistryDirectory 主要有兩方面的功能,一是讀功能,根據會話參數 invocation 獲取 Invoker 可執行體列表;二是寫功能,當註冊中心服務發生變化時更新 Invoker 列表。
總結: RegistryDirectory#list 委託給 doList 方法獲取服務列表,doList 通過路由規則過濾後將可用的執行體列表返回。其中 routerChain 持有所有的 invokers,當調用 notify -> refreshOverrideAndInvoker -> refreshInvoker -> routerChain.setInvokers(newInvokers)
時都會更新 routerChain 持有的 invokers。
@Override public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled ... } List<Invoker<T>> invokers = null; try { // getConsumerUrl 返回服務訂閱者的URL invokers = routerChain.route(getConsumerUrl(), invocation); } catch (Throwable t) { } return invokers == null ? Collections.emptyList() : invokers; }
RegistryDirectory 除了實現 Directory 接口來獲取服務列表信息外,還實現了 NotifyListener 接口,動態更新服務列表。相對於服務獲取,服務更新要複雜的多。
RegistryDirectory 持有 Registry 註冊中心實例,須要首先訂閱指定的服務 consumer url,這樣當這個服務發生變化時就會調用 notify 通知 RegistryDirectory 更新服務列表。
總結: RegistryDirectory 首先會訂閱 consumerUrl,這樣當服務發生變化時會 notify 通知更新服務列表。
RegistryDirectory 的初始化在 DubboRegistryFactory、RegistryProtocol#doRefer 都會有建立,前者是 Dubbo 自帶的註冊中心,是基於內存的註冊中心,在 dubbo-registry-default 工程中,後者則是整合其它已有註冊中心的實現。一般,基於註冊中心的服務引入都是通過 RegistryProtocol#doRefer 建立的。下面的源碼分析也是對 RegistryProtocol 進行分析。
/** * RegistryProtocol:建立 type 的遠程代理 @Reference * @param registry 註冊中心實例 * @param type 服務接口類型 * @param url 註冊中心地址 */ private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 建立 RegistryDirectory 實例。type是訂閱的服務接口類型,url是註冊中心地址。 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 設置註冊中心和協議 directory.setRegistry(registry); directory.setProtocol(protocol); // 生成服務消費者連接 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); // 設置服務策略 directory.buildRouterChain(subscribeUrl); // 訂閱 providers、configurators、routers 等節點數據 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); ... }
總結: RegistryDirectory 構造過程最主要的任務:
registry.subscribe(url, this)
protocol.refer(serviceType, url)
routerChain.route(getConsumerUrl(), invocation)
// url指的是註冊中心地址,serviceType是服務接口的類型 public RegistryDirectory(Class<T> serviceType, URL url) { super(url); this.serviceType = serviceType; //訂閱的服務接口類型 this.serviceKey = url.getServiceKey(); //{group/}serivceInterface{:version} this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url); String group = directoryUrl.getParameter(GROUP_KEY, ""); this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(",")); }
思考:RegistryDirectory 的主要屬性都是經過 set 方法進行設置,構造器的參數有重合。
構造 RegistryDirectory 後會調用 subscribe 訂閱服務列表,返回 url.serviceInterface 對應的服務列表。
public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); serviceConfigurationListener = new ReferenceConfigurationListener(this, url); registry.subscribe(url, this); }
訂閱服務後,當服務更新時通知 RegistryDirectory 更新本地的服務列表,也是 RegistryDirectory 最複雜的一部分。根據從註冊中心獲取的 invokerUrls 生成更新 invokers 列表。若是不存在則建立新 Invoker,若是已經存在則忽略。
@Override public synchronized void notify(List<URL> urls) { // 按 category 分類存儲,服務提供者url,路由url,外部化配置url Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(url -> { if (UrlUtils.isConfigurator(url)) { return CONFIGURATORS_CATEGORY; } else if (UrlUtils.isRoute(url)) { return ROUTERS_CATEGORY; } else if (UrlUtils.isProvider(url)) { return PROVIDERS_CATEGORY; } return ""; })); // configuratorURLs List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); // routerURLs List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); toRouters(routerURLs).ifPresent(this::addRouters); // providerURLs List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); refreshOverrideAndInvoker(providerURLs); }
總結: 雖然服務列表的更新比較複雜,但這段代理的邏輯仍是很清楚的。
將訂閱 serviceInterface 對應的服務列表按 category 進行分類。providers、routers、configurators。
將 configuratorURLs 轉化爲 Configurator。外部化配置的 Configurator 具備更高的優先級。保存在變量 configurators 中。
將 routerURLs 轉化爲 Router。經過 routerChain.addRouters(routers)
設置到變量 routerChain 中。
將 providerURLs 轉化爲 Invoker。保存在變量 invokers 中。
前三步都很簡單,refreshOverrideAndInvoker 的主要邏輯都委託給了 refreshInvoker 方法。
refreshInvoker 根據從註冊中心獲取的 invokerUrls 生成更新 invokers 列表。若是不存在則建立新 Invoker,若是已經存在則忽略。
private void refreshInvoker(List<URL> invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { // 1. invokerUrls 僅有一個元素,且 url 協議頭爲 empty,此時表示禁用全部服務 // 設置 forbidden 爲 true this.forbidden = true; // Forbid to access this.invokers = Collections.emptyList(); routerChain.setInvokers(this.invokers); // 銷燬全部 Invoker destroyAllInvokers(); // Close all invokers } else { // 2. 有可用的url this.forbidden = false; // Allow to access // 2.1 urlInvokerMap保存上一次的invokers Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls == Collections.<URL>emptyList()) { invokerUrls = new ArrayList<>(); } // 2.2 cachedInvokerUrls保存上一次的invokerUrls if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { // 添加緩存 url 到 invokerUrls 中 invokerUrls.addAll(this.cachedInvokerUrls); } else { // 緩存 invokerUrls this.cachedInvokerUrls = new HashSet<>(); this.cachedInvokerUrls.addAll(invokerUrls); } if (invokerUrls.isEmpty()) { return; } // 2.3 核心方法:將 url 轉成 Invoker Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls); // 2.4 轉換出錯,直接打印異常,並返回 if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { return; } // 2.5 更新routerChain中的invokers列表 List<Invoker<T>> newInvokers = Collections.unmodifiableList( new ArrayList<>(newUrlInvokerMap.values())); routerChain.setInvokers(newInvokers); // 合併多個組的 Invoker, <methodName, Invoker> 列表映射關係 this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; try { // 2.6 銷燬下線服務的 Invoker destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
總結: refreshInvoker 涉及到幾個集合,簡單的說明一下:urlInvokerMap
緩存上一次的 URL。
只有一個 empty 協議的服務時,說明此時須要禁用服務,銷燬全部的服務後返回。此時 forbidden=false。將 providerUrls 轉換爲 Invoker 對象,返回的對象是一個 <URL#toFullString(),Invoker>
的 Map。其中最核心的代碼則是 protocol.refer(serviceType, url)
根據 url 生成 Invoker 對象。另外,URL url=mergeUrl(providerUrl)
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } Set<String> keys = new HashSet<>(); // 獲取消費端配置的協議 String queryProtocols = this.queryMap.get(PROTOCOL_KEY); for (URL providerUrl : urls) { // 1.1 協議匹配,queryProtocols是消費者可接收的協議類型,可有多個, // providerUrl.getProtocol()是服務者提供的協議類型 if (queryProtocols != null && queryProtocols.length() > 0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } // providerUrl協議沒法匹配,直接過濾掉 if (!accept) { continue; } } // 1.2 empty 協議,也直接過濾 if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } // 1.3 providerUrl.getProtocol() 不存在,也直接過濾 if (!ExtensionLoader.getExtensionLoader(Protocol.class) .hasExtension(providerUrl.getProtocol())) { continue; } // 2. 合併 url,參數配置 URL url = mergeUrl(providerUrl); // 1.4 忽略重複 url,已經處理過了 String key = url.toFullString(); // The parameter urls are sorted if (keys.contains(key)) { // Repeated url continue; } keys.add(key); // 3.1 匹配緩存中Invoker,若是命中直接添加到新集合newUrlInvokerMap中, // 未命中則生成新的Invoker後添加到新集合newUrlInvokerMap中 Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); // 3.2 緩存未命中,真正將 providerUrl -> Invoker if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; // 匹配參數:disable或enable,是否容許生成代理 if (url.hasParameter(DISABLED_KEY)) { enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } // * 核心方法:調用 refer 獲取 Invoker if (enabled) { invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { } if (invoker != null) { // Put new invoker in cache // 將 invoker 存儲到 newUrlInvokerMap 中 newUrlInvokerMap.put(key, invoker); } } else { // 3.2 緩存未命中,真正將 providerUrl -> Invoker // 將 invoker 存儲到 newUrlInvokerMap 中 newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
總結: toInvokers 代碼很長,核心邏輯是 invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl)
,至於其它的邏輯主要都是判斷是否須要執行這句代碼,生成新的 Invoker。
上面的邏輯大部分都很簡單,主要關注一下 mergeUrl(providerUrl)
private URL mergeUrl(URL providerUrl) { // 1. consumerUrl > providerUrl providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 2. configuratorUrl > consumerUrl providerUrl = overrideWithConfigurator(providerUrl); providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); ... return providerUrl; } private URL overrideWithConfigurator(URL providerUrl) { // 1. configuratorUrl "override://" providerUrl = overrideWithConfigurators(this.configurators, providerUrl); // 2. configuratorUrl from "app-name.configurators"。針對整個應用 providerUrl = overrideWithConfigurators(CONSUMER_CONFIGURATION_LISTENER.getConfigurators(), providerUrl); // 3. configuratorUrl from "service-name.configurators"。針對應用中的某個服務接口 if (serviceConfigurationListener != null) { providerUrl = overrideWithConfigurators(serviceConfigurationListener.getConfigurators(), providerUrl); } return providerUrl; }
總結: mergeUrl 覆蓋原則:外部化配置優先,消費者優先。至於 CONSUMER_CONFIGURATION_LISTENER 和 serviceConfigurationListener 主要是 dubbo-configcenter 的內容。
:override 協議。
:表示對全部的服務生效,具體的 IP 則表示只對指定的 IP 生效。必填。org.apache.dubbo.DemoService
:false 表示持久化數據,當註冊方退出時數據仍保存在註冊中心。必填。enable=true
:表示規則是否生效,默認爲 true。選填。application=dubbo-test
:若是前面的規則生效,則覆蓋相應的配置信息。this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
toMergeInvokerList 方法當訂閱者 group 配置有多個時 multiGroup =true,按組合並 invokers。一般狀況下,咱們使用 dubbo 時不會設置組,也就是不會走這個方法,直接返回 invokers。
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) { List<Invoker<T>> mergedInvokers = new ArrayList<>(); Map<String, List<Invoker<T>>> groupMap = new HashMap<>(); // group -> Invoker 列表 for (Invoker<T> invoker : invokers) { String group = invoker.getUrl().getParameter(GROUP_KEY, ""); groupMap.computeIfAbsent(group, k -> new ArrayList<>()); groupMap.get(group).add(invoker); } if (groupMap.size() == 1) { // 1. 只有一個組,直接添加 mergedInvokers.addAll(groupMap.values().iterator().next()); } else if (groupMap.size() > 1) { // 2. 多個組,則須要使用 CLUSTER.join 將同組的 invoker 合併 // { // "dubbo": [invoker1, invoker2, invoker3, ...], // "hello": [invoker4, invoker5, invoker6, ...] // } // 經過集羣類合併每一個分組對應的 Invoker 列表 for (List<Invoker<T>> groupList : groupMap.values()) { StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList); staticDirectory.buildRouterChain(); mergedInvokers.add(CLUSTER.join(staticDirectory)); } } else { // 3. invokers.isEmpty() mergedInvokers = invokers; } return mergedInvokers; }
總結: 主要的邏輯是 CLUSTER.join(staticDirectory)