Dubbo 系列(07-1)集羣容錯 - 服務字典

Dubbo 系列(07-1)集羣容錯 - 服務字典

[toc]html

Spring Cloud Alibaba 系列目錄 - Dubbo 篇

1. 背景介紹

本篇文章,將開始分析 Dubbo 集羣容錯方面的源碼。集羣容錯源碼包含四個部分,分別是服務目錄 Directory、服務路由 Router、集羣 Cluster 和負載均衡 LoadBalance。 這四個接口都是 dubbo-cluster 工程中定義的。java

相關文檔推薦:spring

  1. Dubbo 源碼解讀 - 服務字典

1.1 Directory 接口

public interface Directory<T> extends Node {
    // 1. 獲取 serviceInterface
    Class<T> getInterface();
    
	// 2. 獲取指定 serviceInterface 對應的服務接口實例
    List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

總結: Directory 只負責管理單個 serviceInterface 對應的實例。這裏出現了 Dubbo 領域模型中的兩個核心概念,InvokerInvocation,經過會話的參數 invocation 能夠獲取其可執行體 invokers 列表,進而發起遠程調用。apache

  • Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它表明一個可執行體,可向它發起 invoke 調用,它有多是一個本地的實現,也多是一個遠程的實現,也可能一個集羣實現。
  • Invocation 是會話域,它持有調用過程當中的變量,好比方法名,參數等。
  • Protocol 是服務域,它是 Invoker 暴露和引用的主功能入口,它負責 Invoker 的生命週期管理。

1.2 繼承體系

服務目錄目前內置的實現有兩個,分別爲 StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類。AbstractDirectory 實現了 Directory 接口。下面咱們來看一下他們的繼承體系圖。緩存

圖1 Dubbo服務目錄繼承體系圖

總結: 服務目錄 Directory 負載管理單個服務接口對應的全部實例。它有兩個實現:app

  • StaticDirectory 顧名思義,serviceInterface 對應的服務提供者是一成不變的,即從配置文件中讀取服務列表信息。
  • RegistryDirectory 從註冊中心動態獲取指定 serviceInterface 對應的服務提供者。

Directory 接口設計原則分析:負載均衡

  1. Directory 核心方法爲 List<Invoker<T>> list(Invocation invocation),該方法只關注核心的功能,經過調用參數 invocation 獲取執行實例 invokers。Directory 接口自己只有讀,沒有寫功能。
  2. AbstractDirectory 定義了一些通用實現,增長了路由 routerChain 和訂閱者 consumerUrl 的信息。
  3. StaticDirectory/RegistryDirectory 具備寫的功能。StaticDirectory 是經過構造器傳入的,不能動態更新。而 RegistryDirectory 進一步實現了 NotifyListener 接口,當服務接口對應的註冊信息發生變化時會回調 notify(URL url, NotifyListener listener, List<URL> urls) 方法,通知 RegistryDirectory 更新 Invoker 列表,具備動態寫的功能。

另外,Directory 繼承自 Node 接口,Node 這個接口繼承者比較多,像 Registry、Monitor、Invoker 等均繼承了這個接口。這個接口包含了一個獲取配置信息的方法 getUrl,實現該接口的類能夠向外提供配置信息。ide

StaticDirectory#getUrl 通常爲 null,RegistryDirectory 對應的 URL 示例以下。老實說在我看來 Dubbo URL 本質上是一個配置類,各類配置信息都會轉換成 URL,致使 URL 的理解有些困難,有時候真不知道這個 URL 到底表明什麼意思。oop

## RegistryDirectory#getUrl() -> Nacos 註冊中心地址
registry://192.168.139.101:8848/org.apache.dubbo.registry.RegistryService?application=dubbo-consumer&dubbo=2.0.2&pid=24924&qos.port=33333&refer=application%3Ddubbo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D24924%26qos.port%3D33333%26register.ip%3D192.168.139.1%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1570940706766&registry=nacos&timestamp=1570940706897

StaticDirectory 很簡單,就不介紹了,下面主要介紹 RegistryDirectory 。源碼分析

2. 源碼分析

上面也提到了 RegistryDirectory 主要有兩方面的功能,一是讀功能,根據會話參數 invocation 獲取 Invoker 可執行體列表;二是寫功能,當註冊中心服務發生變化時更新 Invoker 列表。

2.1 服務獲取

圖2 Dubbo服務獲取流程
sequenceDiagram participant RegistryDirectory participant AbstractDirectory participant RouterChain participant Router RegistryDirectory ->> AbstractDirectory : list AbstractDirectory ->> RegistryDirectory : doList RegistryDirectory ->> RouterChain : route loop routers RouterChain ->> Router : route end

總結: RegistryDirectory#list 委託給 doList 方法獲取服務列表,doList 通過路由規則過濾後將可用的執行體列表返回。其中 routerChain 持有所有的 invokers,當調用 notify -> refreshOverrideAndInvoker -> refreshInvoker -> routerChain.setInvokers(newInvokers) 時都會更新 routerChain 持有的 invokers。

@Override
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 1. No service provider 2. Service providers are disabled
		...
    }
  
    List<Invoker<T>> invokers = null;
    try {
        // getConsumerUrl 返回服務訂閱者的URL
        invokers = routerChain.route(getConsumerUrl(), invocation);
    } catch (Throwable t) {
    }
    return invokers == null ? Collections.emptyList() : invokers;
}

2.2 服務更新

RegistryDirectory 除了實現 Directory 接口來獲取服務列表信息外,還實現了 NotifyListener 接口,動態更新服務列表。相對於服務獲取,服務更新要複雜的多。

RegistryDirectory 持有 Registry 註冊中心實例,須要首先訂閱指定的服務 consumer url,這樣當這個服務發生變化時就會調用 notify 通知 RegistryDirectory 更新服務列表。

圖3 Dubbo服務更新流程
sequenceDiagram participant RegistryProtocol participant RegistryDirectory participant Registry RegistryProtocol ->> RegistryDirectory : subscribe RegistryDirectory ->> Registry : subscribe alt 更新服務列表 Registry ->> RegistryDirectory : notify RegistryDirectory ->> RegistryDirectory : refreshOverrideAndInvoker end

總結: RegistryDirectory 首先會訂閱 consumerUrl,這樣當服務發生變化時會 notify 通知更新服務列表。

2.2.1 初始化RegistryDirectory

RegistryDirectory 的初始化在 DubboRegistryFactory、RegistryProtocol#doRefer 都會有建立,前者是 Dubbo 自帶的註冊中心,是基於內存的註冊中心,在 dubbo-registry-default 工程中,後者則是整合其它已有註冊中心的實現。一般,基於註冊中心的服務引入都是通過 RegistryProtocol#doRefer 建立的。下面的源碼分析也是對 RegistryProtocol 進行分析。

/**
 * RegistryProtocol:建立 type 的遠程代理 @Reference
 * @param registry 	註冊中心實例
 * @param type		服務接口類型
 * @param url		註冊中心地址
 */
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 建立 RegistryDirectory 實例。type是訂閱的服務接口類型,url是註冊中心地址。
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設置註冊中心和協議
    directory.setRegistry(registry);
    directory.setProtocol(protocol);

    // 生成服務消費者連接
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY),
                               0, type.getName(), parameters);
    // 設置服務策略
    directory.buildRouterChain(subscribeUrl);

    // 訂閱 providers、configurators、routers 等節點數據
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,PROVIDERS_CATEGORY + "," +
		CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
    ...
}

總結: RegistryDirectory 構造過程最主要的任務:

  1. 設置 Registry 實例:用於獲取註冊中心的服務列表。registry.subscribe(url, this)
  2. 設置 Protocol 實例:用於根據服務地址 URL 生成 type 接口的遠程代理。protocol.refer(serviceType, url)
  3. 設置 RouterChain 實例:用於服務路由。routerChain.route(getConsumerUrl(), invocation)
  4. 最後訂閱服務:用於獲取服務列表directory.subscribe(url)
// url指的是註冊中心地址,serviceType是服務接口的類型
public RegistryDirectory(Class<T> serviceType, URL url) {
    super(url);
    this.serviceType = serviceType;			//訂閱的服務接口類型
    this.serviceKey = url.getServiceKey();	//{group/}serivceInterface{:version}
    this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
    String group = directoryUrl.getParameter(GROUP_KEY, "");
    this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
}

思考:RegistryDirectory 的主要屬性都是經過 set 方法進行設置,構造器的參數有重合。

2.2.2 服務訂閱

構造 RegistryDirectory 後會調用 subscribe 訂閱服務列表,返回 url.serviceInterface 對應的服務列表。

public void subscribe(URL url) {
    setConsumerUrl(url);
    CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
    serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
    registry.subscribe(url, this);
}

2.2.3 服務更新

訂閱服務後,當服務更新時通知 RegistryDirectory 更新本地的服務列表,也是 RegistryDirectory 最複雜的一部分。根據從註冊中心獲取的 invokerUrls 生成更新 invokers 列表。若是不存在則建立新 Invoker,若是已經存在則忽略。

@Override
public synchronized void notify(List<URL> urls) {
    // 按 category 分類存儲,服務提供者url,路由url,外部化配置url
    Map<String, List<URL>> categoryUrls = urls.stream()
        .filter(Objects::nonNull)
        .filter(this::isValidCategory)
        .filter(this::isNotCompatibleFor26x)
        .collect(Collectors.groupingBy(url -> {
            if (UrlUtils.isConfigurator(url)) {
                return CONFIGURATORS_CATEGORY;
            } else if (UrlUtils.isRoute(url)) {
                return ROUTERS_CATEGORY;
            } else if (UrlUtils.isProvider(url)) {
                return PROVIDERS_CATEGORY;
            }
            return "";
        }));

    // configuratorURLs
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    // routerURLs
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // providerURLs
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    refreshOverrideAndInvoker(providerURLs);
}

總結: 雖然服務列表的更新比較複雜,但這段代理的邏輯仍是很清楚的。

  1. 將訂閱 serviceInterface 對應的服務列表按 category 進行分類。providers、routers、configurators。

  2. 將 configuratorURLs 轉化爲 Configurator。外部化配置的 Configurator 具備更高的優先級。保存在變量 configurators 中。

  3. 將 routerURLs 轉化爲 Router。經過 routerChain.addRouters(routers) 設置到變量 routerChain 中。

  4. 將 providerURLs 轉化爲 Invoker。保存在變量 invokers 中。

前三步都很簡單,refreshOverrideAndInvoker 的主要邏輯都委託給了 refreshInvoker 方法。

2.3 刷新 Invoker 列表

2.3.1 refreshInvoker

refreshInvoker 根據從註冊中心獲取的 invokerUrls 生成更新 invokers 列表。若是不存在則建立新 Invoker,若是已經存在則忽略。

private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");
    if (invokerUrls.size() == 1 && invokerUrls.get(0) != null
        && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // 1. invokerUrls 僅有一個元素,且 url 協議頭爲 empty,此時表示禁用全部服務
        // 設置 forbidden 爲 true
        this.forbidden = true; // Forbid to access
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        // 銷燬全部 Invoker
        destroyAllInvokers(); // Close all invokers
    } else {
        // 2. 有可用的url
        this.forbidden = false; // Allow to access
        // 2.1 urlInvokerMap保存上一次的invokers
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        // 2.2 cachedInvokerUrls保存上一次的invokerUrls
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            // 添加緩存 url 到 invokerUrls 中
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            // 緩存 invokerUrls
            this.cachedInvokerUrls = new HashSet<>();
            this.cachedInvokerUrls.addAll(invokerUrls);
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 2.3 核心方法:將 url 轉成 Invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);

        // 2.4 轉換出錯,直接打印異常,並返回
        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
            return;
        }

        // 2.5 更新routerChain中的invokers列表
        List<Invoker<T>> newInvokers = Collections.unmodifiableList(
            new ArrayList<>(newUrlInvokerMap.values()));
        routerChain.setInvokers(newInvokers);
        // 合併多個組的 Invoker, <methodName, Invoker> 列表映射關係
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            // 2.6 銷燬下線服務的 Invoker
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

總結: refreshInvoker 涉及到幾個集合,簡單的說明一下:urlInvokerMap 緩存上一次的服務列表;cachedInvokerUrls 緩存上一次的 URL。

  1. invokerUrls 只有一個 empty 協議的服務時,說明此時須要禁用服務,銷燬全部的服務後返回。此時 forbidden=false。
  2. 緩存 URL 到 cachedInvokerUrls 集合中,當註冊中心返回的服務地址列表爲空時,直接使用上一次緩存中的服務地址。
  3. 最核心的方法 toInvokers,將 invokerUrls 轉換爲 Invoker。
  4. 最後則是更新 routerChain,銷燬下線的 Invoker 等清理工做。

2.3.2 toInvokers

將 providerUrls 轉換爲 Invoker 對象,返回的對象是一個 <URL#toFullString(),Invoker> 的 Map。其中最核心的代碼則是 protocol.refer(serviceType, url) 根據 url 生成 Invoker 對象。另外,URL url=mergeUrl(providerUrl) 也要關心一下,主要合併外部化配置。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<>();
    // 獲取消費端配置的協議
    String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        // 1.1 協議匹配,queryProtocols是消費者可接收的協議類型,可有多個,
        //    providerUrl.getProtocol()是服務者提供的協議類型
        if (queryProtocols != null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            // providerUrl協議沒法匹配,直接過濾掉
            if (!accept) {
                continue;
            }
        }
        // 1.2 empty 協議,也直接過濾
        if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }

        // 1.3 providerUrl.getProtocol() 不存在,也直接過濾
        if (!ExtensionLoader.getExtensionLoader(Protocol.class)
            	.hasExtension(providerUrl.getProtocol())) {
            continue;
        }
        
        // 2. 合併 url,參數配置
        URL url = mergeUrl(providerUrl);
        
        // 1.4 忽略重複 url,已經處理過了
        String key = url.toFullString(); // The parameter urls are sorted
        if (keys.contains(key)) { // Repeated url
            continue;
        }
        keys.add(key);

        // 3.1 匹配緩存中Invoker,若是命中直接添加到新集合newUrlInvokerMap中,
        //     未命中則生成新的Invoker後添加到新集合newUrlInvokerMap中
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        // 3.2 緩存未命中,真正將 providerUrl -> Invoker
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                // 匹配參數:disable或enable,是否容許生成代理
                if (url.hasParameter(DISABLED_KEY)) {
                    enabled = !url.getParameter(DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(ENABLED_KEY, true);
                }
                // * 核心方法:調用 refer 獲取 Invoker
                if (enabled) {
                    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
            }
            if (invoker != null) { // Put new invoker in cache
                // 將 invoker 存儲到 newUrlInvokerMap 中
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
        	// 3.2 緩存未命中,真正將 providerUrl -> Invoker
            // 將 invoker 存儲到 newUrlInvokerMap 中
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

總結: toInvokers 代碼很長,核心邏輯是 invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl),至於其它的邏輯主要都是判斷是否須要執行這句代碼,生成新的 Invoker。

  1. protocol 協議匹配。將 consumer 可接收的協議和 providerUrl.getProtocol() 比較。一般狀況下,消費者不會設置這個參數,也就是默認都會匹配上。
  2. protocol 協議是否有效。empty 協議或該協議不存在時,也直接忽略。
  3. 配置 providerUrl 參數。mergeUrl(providerUrl) ,默認:外部化配置 configuratiors > consumerUrl > providerUrl。
  4. 判斷是否已經處理過。newUrlInvokerMap 的 key 爲 URL#toFullString(),若是已經存在,直接忽略。
  5. 判斷緩存中是否已經存在。若是在 urlInvokerMap 緩存中命中,直接忽略。
  6. 判斷 providerUrl 參數是否禁用服務。若是禁用,直接忽略。
  7. 若是所有經過,則調用 protocol.refer(serviceType, url) 生成新的 invoker。

上面的邏輯大部分都很簡單,主要關注一下 mergeUrl(providerUrl) 方法,參數的覆蓋規則。

2.3.3 mergeUrl

配置文件覆蓋規則:外部化配置優先,消費者優先。

private URL mergeUrl(URL providerUrl) {
    // 1. consumerUrl > providerUrl
    providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap);
    // 2. configuratorUrl > consumerUrl
    providerUrl = overrideWithConfigurator(providerUrl);
    providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); 
   	...
    return providerUrl;
}

private URL overrideWithConfigurator(URL providerUrl) {
    // 1. configuratorUrl  "override://"
    providerUrl = overrideWithConfigurators(this.configurators, providerUrl);

    // 2. configuratorUrl from "app-name.configurators"。針對整個應用
    providerUrl = overrideWithConfigurators(CONSUMER_CONFIGURATION_LISTENER.getConfigurators(), providerUrl);

    // 3. configuratorUrl from "service-name.configurators"。針對應用中的某個服務接口
    if (serviceConfigurationListener != null) {
        providerUrl = overrideWithConfigurators(serviceConfigurationListener.getConfigurators(), providerUrl);
    }
    return providerUrl;
}

總結: mergeUrl 覆蓋原則:外部化配置優先,消費者優先。至於 CONSUMER_CONFIGURATION_LISTENER 和 serviceConfigurationListener 主要是 dubbo-configcenter 的內容。

外部化配置示例:

override://0.0.0.0/org.apache.dubbo.DemoService?category=configurators&dynamic=false&enable=true&application=dubbo-test&timeout=1000
  1. override:override 協議。
  2. 0.0.0.0:表示對全部的服務生效,具體的 IP 則表示只對指定的 IP 生效。必填。
  3. org.apache.dubbo.DemoService:表示只對具體的服務接口生效。必填。
  4. category=configurators:表示這個參數是動態配置類型。必填。
  5. dynamic=false:false 表示持久化數據,當註冊方退出時數據仍保存在註冊中心。必填。
  6. enable=true:表示規則是否生效,默認爲 true。選填。
  7. application=dubbo-test:表示只對指定的應用生效。選填。
  8. timeout=1000&...:若是前面的規則生效,則覆蓋相應的配置信息。

2.4 其它方法說明

2.4.1 toMergeInvokerList

this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;

toMergeInvokerList 方法當訂閱者 group 配置有多個時 multiGroup =true,按組合並 invokers。一般狀況下,咱們使用 dubbo 時不會設置組,也就是不會走這個方法,直接返回 invokers。

private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
    List<Invoker<T>> mergedInvokers = new ArrayList<>();
    Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
    // group -> Invoker 列表
    for (Invoker<T> invoker : invokers) {
        String group = invoker.getUrl().getParameter(GROUP_KEY, "");
        groupMap.computeIfAbsent(group, k -> new ArrayList<>());
        groupMap.get(group).add(invoker);
    }

    if (groupMap.size() == 1) {
        // 1. 只有一個組,直接添加
        mergedInvokers.addAll(groupMap.values().iterator().next());
    } else if (groupMap.size() > 1) {
        // 2. 多個組,則須要使用 CLUSTER.join 將同組的 invoker 合併
        // {
        //     "dubbo": [invoker1, invoker2, invoker3, ...],
        //     "hello": [invoker4, invoker5, invoker6, ...]
        // }
        // 經過集羣類合併每一個分組對應的 Invoker 列表
        for (List<Invoker<T>> groupList : groupMap.values()) {
            StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
            staticDirectory.buildRouterChain();
            mergedInvokers.add(CLUSTER.join(staticDirectory));
        }
    } else {
        // 3. invokers.isEmpty()
        mergedInvokers = invokers;
    }
    return mergedInvokers;
}

總結: 主要的邏輯是 CLUSTER.join(staticDirectory),後期再研究一下這個方法。


天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索