Dubbo 源碼分析 - 服務引用

1. 簡介

上一篇文章中,我詳細的分析了服務導出的原理。本篇文章咱們趁熱打鐵,繼續分析服務引用的原理。在 Dubbo 中,咱們能夠經過兩種方式引用遠程服務。第一種是使用服務直聯的方式引用服務,第二種方式是基於註冊中心進行引用。服務直聯的方式僅適合在調試或測試服務的場景下使用,不適合在線上環境使用。所以,本文我將重點分析經過註冊中心引用服務的過程。從註冊中心中獲取服務配置只是服務引用過程當中的一環,除此以外,服務消費者還須要經歷 Invoker 建立、代理類建立等步驟。這些步驟,我將在後續章節中一一進行分析。java

2.服務引用原理

Dubbo 服務引用的時機有兩個,第一個是在 Spring 容器調用 ReferenceBean 的 afterPropertiesSet 方法時引用服務,第二個是在 ReferenceBean 對應的服務被注入到其餘類中時引用。這兩個引用服務的時機區別在於,第一個是餓漢式的,第二個是懶漢式的。默認狀況下,Dubbo 使用懶漢式引用服務。若是須要使用餓漢式,可經過配置 <dubbo:reference> 的 init 屬性開啓。下面咱們按照 Dubbo 默認配置進行分析,整個分析過程從 ReferenceBean 的 getObject 方法開始。當咱們的服務被注入到其餘類中時,Spring 會第一時間調用 getObject 方法,並由該方法執行服務引用邏輯。按照慣例,在進行具體工做以前,需先進行配置檢查與收集工做。接着根據收集到的信息決定服務用的方式,有三種,第一種是引用本地 (JVM) 服務,第二是經過直聯方式引用遠程服務,第三是經過註冊中心引用遠程服務。不論是哪一種引用方式,最後都會獲得一個 Invoker 實例。若是有多個註冊中心,多個服務提供者,這個時候會獲得一組 Invoker 實例,此時須要經過集羣管理類 Cluster 將多個 Invoker 合併成一個實例。合併後的 Invoker 實例已經具有調用本地或遠程服務的能力了,但並不能將此實例暴露給用戶使用,這會對用戶業務代碼形成侵入。此時框架還須要經過代理工廠類 (ProxyFactory) 爲服務接口生成代理類,並讓代理類去調用 Invoker 邏輯。避免了 Dubbo 框架代碼對業務代碼的侵入,同時也讓框架更容易使用。git

以上就是 Dubbo 引用服務的大體原理,下面咱們深刻到代碼中,詳細分析服務引用細節。github

3.源碼分析

服務引用的入口方法爲 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實現了這個方法。實現代碼以下:apache

public Object getObject() throws Exception {
    return get();
}

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    // 檢測 ref 是否爲空,爲空則經過 init 方法建立
    if (ref == null) {
        // init 方法主要用於處理配置,以及調用 createProxy 生成代理類
        init();
    }
    return ref;
}

這裏兩個方法代碼都比較簡短,並不難理解。不過這裏須要特別說明一下,若是你們從 getObject 方法進行代碼調試時,會碰到比較詫異的問題。這裏假設你使用 IDEA,且保持了 IDEA 的默認配置。當你面調試到 get 方法的if (ref == null)時,你會驚奇的發現 ref 不爲空,致使你沒法進入到 init 方法中繼續調試。致使這個現象的緣由是 Dubbo 框架自己有點小問題,這個小問題會引起一些讓人詫異的現象。關於這個問題,我進行了將近兩個小時的排查。查明問題後,我給 Dubbo 提交了一個 pull request (#2754) 修復了此問題。另外,beiwei30 前輩開了一個 issue (#2757 ) 介紹這個問題,有興趣的朋友能夠去看看。你們若是想規避這個問題,能夠修改一下 IDEA 的配置。在配置面板中搜索 toString,而後取消Enable 'toString' object view前的對號。具體以下:數組

講完須要注意的點,咱們繼續向下分析,接下來將分析配置的處理過程。緩存

3.1 處理配置

Dubbo 提供了豐富的配置,用於調整和優化框架行爲,性能等。Dubbo 在引用或導出服務時,首先會對這些配置進行檢查和處理,以保證配置到正確性。若是你們不是很熟悉 Dubbo 配置,建議先閱讀如下官方文檔。配置解析的方法爲 ReferenceConfig 的 init 方法,下面來看一下方法邏輯。服務器

private void init() {
    if (initialized) {
        return;
    }
    initialized = true;
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("interface not allow null!");
    }

    // 檢測 consumer 變量是否爲空,爲空則建立
    checkDefault();
    appendProperties(this);
    if (getGeneric() == null && getConsumer() != null) {
        // 設置 generic
        setGeneric(getConsumer().getGeneric());
    }

    // 檢測是否爲泛化接口
    if (ProtocolUtils.isGeneric(getGeneric())) {
        interfaceClass = GenericService.class;
    } else {
        try {
            // 加載類
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                    .getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        checkInterfaceAndMethods(interfaceClass, methods);
    }
    
    // -------------------------------✨ 分割線1 ✨------------------------------

    // 從系統變量中獲取與接口名對應的屬性值
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    if (resolve == null || resolve.length() == 0) {
        // 從系統屬性中獲取解析文件路徑
        resolveFile = System.getProperty("dubbo.resolve.file");
        if (resolveFile == null || resolveFile.length() == 0) {
            // 從指定位置加載配置文件
            File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
            if (userResolveFile.exists()) {
                // 獲取文件絕對路徑
                resolveFile = userResolveFile.getAbsolutePath();
            }
        }
        if (resolveFile != null && resolveFile.length() > 0) {
            Properties properties = new Properties();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File(resolveFile));
                // 從文件中加載配置
                properties.load(fis);
            } catch (IOException e) {
                throw new IllegalStateException("Unload ..., cause:...");
            } finally {
                try {
                    if (null != fis) fis.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
            // 獲取與接口名對應的配置
            resolve = properties.getProperty(interfaceName);
        }
    }
    if (resolve != null && resolve.length() > 0) {
        // 將 resolve 賦值給 url
        url = resolve;
    }
    
    // -------------------------------✨ 分割線2 ✨------------------------------
    if (consumer != null) {
        if (application == null) {
            // 從 consumer 中獲取 Application 實例,下同
            application = consumer.getApplication();
        }
        if (module == null) {
            module = consumer.getModule();
        }
        if (registries == null) {
            registries = consumer.getRegistries();
        }
        if (monitor == null) {
            monitor = consumer.getMonitor();
        }
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {
            monitor = module.getMonitor();
        }
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {
            monitor = application.getMonitor();
        }
    }
    
    // 檢測本地 Application 和本地存根配置合法性
    checkApplication();
    checkStubAndMock(interfaceClass);
    
    // -------------------------------✨ 分割線3 ✨------------------------------
    
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();

    // 添加 side、協議版本信息、時間戳和進程號等信息到 map 中
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }

    if (!isGeneric()) {    // 非泛化服務
        // 獲取版本
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }

        // 獲取接口方法列表,並添加到 map 中
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // 將 ApplicationConfig、ConsumerConfig、ReferenceConfig 等對象的字段信息添加到 map 中
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    
    // -------------------------------✨ 分割線4 ✨------------------------------
    
    String prefix = StringUtils.getServiceKey(map);
    if (methods != null && !methods.isEmpty()) {
        // 遍歷 MethodConfig 列表
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            // 檢測 map 是否包含 methodName.retry
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    // 添加劇試次數配置 methodName.retries
                    map.put(method.getName() + ".retries", "0");
                }
            }
 
            // 添加 MethodConfig 中的「屬性」字段到 attributes
            // 好比 onreturn、onthrow、oninvoke 等
            appendAttributes(attributes, method, prefix + "." + method.getName());
            checkAndConvertImplicitConfig(method, map, attributes);
        }
    }
    
    // -------------------------------✨ 分割線5 ✨------------------------------

    // 獲取服務消費者 ip 地址
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("Specified invalid registry ip from property..." );
    }
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

    // 存儲 attributes 到系統上下文中
    StaticContext.getSystemContext().putAll(attributes);

    // 建立代理類
    ref = createProxy(map);

    // 根據服務名,ReferenceConfig,代理類構建 ConsumerModel,
    // 並將 ConsumerModel 存入到 ApplicationModel 中
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

上面的代碼很長,作的事情比較多。這裏我根據代碼邏輯,對代碼進行了分塊,下面咱們一塊兒來看一下。多線程

首先是方法開始到分割線1之間的代碼。這段代碼主要用於檢測 ConsumerConfig 實例是否存在,如不存在則建立一個新的實例,而後經過系統變量或 dubbo.properties 配置文件填充 ConsumerConfig 的字段。接着是檢測泛化配置,並根據配置設置 interfaceClass 的值。本段代碼邏輯大體就是這些,接着來看分割線1到分割線2之間的邏輯。這段邏輯用於從系統屬性或配置文件中加載與接口名相對應的配置,並將解析結果賦值給 url 字段。url 字段的做用通常是用於點對點調用。繼續向下看,分割線2和分割線3之間的代碼用於檢測幾個核心配置類是否爲空,爲空則嘗試從其餘配置類中獲取。分割線3與分割線4之間的代碼主要是用於收集各類配置,並將配置存儲到 map 中。分割線4和分割線5之間的代碼用於處理 MethodConfig 實例。該實例包含了事件通知配置,好比 onreturn、onthrow、oninvoke 等。分割線5到方法結尾的代碼主要用於解析服務消費者 ip,以及調用 createProxy 建立代理對象。關於該方法的詳細分析,將會在接下來的章節中展開。app

到這裏,關於配置的檢查與處理過長就分析完了。這部分邏輯不是很難理解,但比較繁雜,你們須要耐心看一下。好了,本節先到這,接下來分析服務引用的過程。框架

3.2 引用服務

本節咱們要從 createProxy 開始看起。createProxy 這個方法表面上看起來只是用於建立代理對象,但實際上並不是如此。該方法還會調用其餘方法構建以及合併 Invoker 實例。具體細節以下。

private T createProxy(Map<String, String> map) {
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    final boolean isJvmRefer;
    if (isInjvm() == null) {
        // url 配置被指定,則不作本地引用
        if (url != null && url.length() > 0) {
            isJvmRefer = false;
        // 根據 url 的協議、scope 以及 injvm 等參數檢測是否須要本地引用
        // 好比若是用戶顯式配置了 scope=local,此時 isInjvmRefer 返回 true
        } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
            isJvmRefer = true;
        } else {
            isJvmRefer = false;
        }
    } else {
        // 獲取 injvm 配置值
        isJvmRefer = isInjvm().booleanValue();
    }

    // 本地引用
    if (isJvmRefer) {
        // 生成本地引用 URL,協議爲 injvm
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
        // 調用 refer 方法構建 InjvmInvoker 實例
        invoker = refprotocol.refer(interfaceClass, url);
        
    // 遠程引用
    } else {
        // url 不爲空,代表用戶可能想進行點對點調用
        if (url != null && url.length() > 0) {
            // 當須要配置多個 url 時,可用分號進行分割,這裏會進行切分
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        // 設置接口全限定名爲 url 路徑
                        url = url.setPath(interfaceName);
                    }
                    
                    // 檢測 url 協議是否爲 registry,如果,代表用戶想使用指定的註冊中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 將 map 轉換爲查詢字符串,並做爲 refer 參數的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合併 url,移除服務提供者的一些配置(這些配置來源於用戶配置的 url 屬性),
                        // 好比線程池相關配置。並保留服務提供者的部分配置,好比版本,group,時間戳等
                        // 最後將合併後的配置設置爲 url 查詢字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else {
            // 加載註冊中心 url
            List<URL> us = loadRegistries(false);
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 添加 refer 參數到 url 中,並將 url 添加到 urls 中
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }

            // 未配置註冊中心,拋出異常
            if (urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference...");
            }
        }

        // 單個註冊中心或服務提供者(服務直聯,下同)
        if (urls.size() == 1) {
            // 調用 RegistryProtocol 的 refer 構建 Invoker 實例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
            
        // 多個註冊中心或多個服務提供者,或者二者混合
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;

            // 獲取全部的 Invoker
            for (URL url : urls) {
                // 經過 refprotocol 調用 refer 構建 Invoker,refprotocol 會在運行時
                // 根據 url 協議頭加載指定的 Protocol 實例,並調用實例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 若是註冊中心連接不爲空,則將使用 AvailableCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 建立 StaticDirectory 實例,並由 Cluster 對多個 Invoker 進行合併
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else {
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    }
    if (c == null) {
        c = true;
    }
    
    // invoker 可用性檢查
    if (c && !invoker.isAvailable()) {
        throw new IllegalStateException("No provider available for the service...");
    }

    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}

上面代碼不少,不過邏輯比較清晰。首先根據配置檢查是否爲本地調用,如果,則調用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例。若不是,則讀取直聯配置項,或註冊中心 url,並將讀取到的 url 存儲到 urls 中。而後,根據 urls 元素數量進行後續操做。若 urls 元素數量爲1,則直接經過 Protocol 自適應拓展構建 Invoker 實例接口。若 urls 元素數量大於1,即存在多個註冊中心或服務直聯 url,此時先根據 url 構建 Invoker。而後再經過 Cluster 合併多個 Invoker,最後調用 ProxyFactory 生成代理類。這裏,Invoker 的構建過程以及代理類的過程比較重要,所以我將分兩小節對這兩個過程進行分析。

3.2.1 建立 Invoker

Invoker 是 Dubbo 的核心模型,表明一個可執行體。在服務提供方,Invoker 用於調用服務提供類。在服務消費方,Invoker 用於執行遠程調用。Invoker 在 Dubbo 中的位置十分重要,所以咱們有必要去搞懂它。從前面的代碼中可知,Invoker 是由 Protocol 實現類構建的。Protocol 實現類有不少,這裏我會分析最經常使用的兩個,分別是 RegistryProtocol 和 DubboProtocol,其餘的你們自行分析。下面先來分析 DubboProtocol 的 refer 方法源碼。以下:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // 建立 DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

上面方法看起來比較簡單,不過這裏有一個調用須要咱們注意一下,即 getClients。這個方法用於獲取客戶端實例,實例類型爲 ExchangeClient。ExchangeClient 實際上並不具有通訊能力,所以它須要更底層的客戶端實例進行通訊。好比 NettyClient、MinaClient 等,默認狀況下,Dubbo 使用 NettyClient 進行通訊。接下來,咱們簡單看一下 getClients 方法的邏輯。

private ExchangeClient[] getClients(URL url) {
    // 是否共享鏈接
    boolean service_share_connect = false;
    // 獲取鏈接數,默認爲0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 若是未配置 connections,則共享鏈接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 獲取共享客戶端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的客戶端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

這裏根據 connections 數量決定是獲取共享客戶端仍是建立新的客戶端實例,默認狀況下,使用共享客戶端實例。不過 getSharedClient 方法中也會調用 initClient 方法,所以下面咱們一塊兒看一下這兩個方法。

private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress();
    // 獲取帶有「引用計數」功能的 ExchangeClient
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    if (client != null) {
        if (!client.isClosed()) {
            // 增長引用計數
            client.incrementAndGetCount();
            return client;
        } else {
            referenceClientMap.remove(key);
        }
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        if (referenceClientMap.containsKey(key)) {
            return referenceClientMap.get(key);
        }

        // 建立 ExchangeClient 客戶端
        ExchangeClient exchangeClient = initClient(url);
        // 將 ExchangeClient 實例傳給 ReferenceCountExchangeClient,這裏明顯用了裝飾模式
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        locks.remove(key);
        return client;
    }
}

上面方法先訪問緩存,若緩存未命中,則經過 initClient 方法建立新的 ExchangeClient 實例,並將該實例傳給 ReferenceCountExchangeClient 構造方法建立一個帶有引用技術功能的 ExchangeClient 實例。ReferenceCountExchangeClient 內部實現比較簡單,就不分析了。下面咱們再來看一下 initClient 方法的代碼。

private ExchangeClient initClient(URL url) {

    // 獲取客戶端類型,默認爲 netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    // 添加編解碼和心跳包參數到 url 中
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // 檢測客戶端類型是否存在,不存在則拋出異常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: ...");
    }

    ExchangeClient client;
    try {
        // 獲取 lazy 配置,並根據配置值決定建立的客戶端類型
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 建立懶加載 ExchangeClient 實例
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 建立普通 ExchangeClient 實例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service...");
    }
    return client;
}

initClient 方法首先獲取用戶配置的客戶端類型,默認爲 netty。而後檢測用戶配置的客戶端類型是否存在,不存在則拋出異常。最後根據 lazy 配置決定建立什麼類型的客戶端。這裏的 LazyConnectExchangeClient 代碼並非很複雜,該類會在 request 方法被調用時經過 Exchangers 的 connect 方法建立 ExchangeClient 客戶端,這裏就不分析 LazyConnectExchangeClient 的代碼了。下面咱們分析一下 Exchangers 的 connect 方法。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 實例,默認爲 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}

如上,getExchanger 會經過 SPI 加載 HeaderExchangeClient 實例,這個方法比較簡單,你們本身看一下吧。接下來分析 HeaderExchangeClient 的實現。

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 這裏包含了多個調用,分別以下:
    // 1. 建立 HeaderExchangeHandler 對象
    // 2. 建立 DecodeHandler 對象
    // 3. 經過 Transporters 構建 Client 實例
    // 4. 建立 HeaderExchangeClient 對象
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

這裏的調用比較多,咱們這裏重點看一下 Transporters 的 connect 方法。以下:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 若是 handler 數量大於1,則建立一個 ChannelHandler 分發器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    
    // 獲取 Transporter 自適應拓展類,並調用 connect 方法生成 Client 實例
    return getTransporter().connect(url, handler);
}

這裏,getTransporter 方法返回的是自適應拓展類,該類會在運行時根據客戶端類型加載指定的 Transporter 實現類。若用戶未顯示配置客戶端類型,則默認加載 NettyTransporter,並調用該類的 connect 方法。以下:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 建立 NettyClient 對象
    return new NettyClient(url, listener);
}

到這裏就不繼續跟下去了,在往下就是經過 Netty 提供的接口構建 Netty 客戶端了,你們有興趣本身看看。到這裏,關於 DubboProtocol 的 refer 方法就分析完了。接下來,繼續分析 RegistryProtocol 的 refer 方法邏輯。

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取 registry 參數值,並將其設置爲協議頭
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 獲取註冊中心實例
    Registry registry = registryFactory.getRegistry(url);
    // 這個判斷暫時不知道有什麼意圖,爲何要給 RegistryService 類型生成 Invoker ?
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // 將 url 查詢字符串轉爲 Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    // 獲取 group 配置
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            // 經過 SPI 加載 MergeableCluster 實例,並調用 doRefer 繼續執行引用服務邏輯
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    
    // 調用 doRefer 繼續執行引用服務邏輯
    return doRefer(cluster, registry, type, url);
}

上面代碼首先爲 url 設置協議頭,而後根據 url 參數加載註冊中心實例。接下來對 RegistryService 繼續針對性處理,這個處理邏輯我不是很明白,不知道爲何要爲 RegistryService 類型生成 Invoker,有知道同窗麻煩告知一下。而後就是獲取 group 配置,根據 group 配置決定 doRefer 第一個參數的類型。這裏的重點是 doRefer 方法,以下:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 建立 RegistryDirectory 實例
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設置註冊中心和協議
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 生成服務消費者連接
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

    // 註冊服務消費者,在 consumers 目錄下新節點
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }

    // 訂閱 providers、configurators、routers 等節點數據
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    // 一個註冊中心可能有多個服務提供者,所以這裏須要將多個服務提供者合併爲一個
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

如上,doRefer 方法建立一個 RegistryDirectory 實例,而後生成服務者消費者連接,並向註冊中心進行註冊。註冊完畢後,緊接着訂閱 providers、configurators、routers 等節點下的數據。完成訂閱後,RegistryDirectory 會收到這幾個節點下的子節點信息,好比能夠獲取到服務提供者的配置信息。因爲一個服務可能部署在多臺服務器上,這樣就會在 providers 產生多個節點,這個時候就須要 Cluster 將多個服務節點合併爲一個,並生成一個 Invoker。關於 RegistryDirectory 和 Cluster,本文不打算進行分析,相關分析將會在隨後的文章中展開。

好了,關於 Invoker 的建立的邏輯就先分析到這。邏輯比較多,你們耐心看一下。

3.2.2 建立代理

Invoker 建立完畢後,接下來要作的事情是爲服務接口生成代理對象。有了代理對象,咱們就能夠經過代理對象進行遠程調用。代理對象生成的入口方法爲在 ProxyFactory 的 getProxy,接下來進行分析。

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    // 調用重載方法
    return getProxy(invoker, false);
}

public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
    Class<?>[] interfaces = null;
    // 獲取接口列表
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        // 切分接口列表
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
            interfaces = new Class<?>[types.length + 2];
            // 設置服務接口類和 EchoService.class 到 interfaces 中
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i++) {
                // 加載接口類
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
            }
        }
    }
    if (interfaces == null) {
        interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    }

    // 爲 http 和 hessian 協議提供泛化調用支持,參考 pull request #1827
    if (!invoker.getInterface().equals(GenericService.class) && generic) {
        int len = interfaces.length;
        Class<?>[] temp = interfaces;
        // 建立新的 interfaces 數組
        interfaces = new Class<?>[len + 1];
        System.arraycopy(temp, 0, interfaces, 0, len);
        // 設置 GenericService.class 到數組中
        interfaces[len] = GenericService.class;
    }

    // 調用重載方法
    return getProxy(invoker, interfaces);
}

public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

如上,上面大段代碼都是用來獲取 interfaces 數組的,所以咱們須要繼續往下看。getProxy(Invoker, Class<?>[]) 這個方法是一個抽象方法,下面咱們到 JavassistProxyFactory 類中看一下該方法的實現代碼。

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    // 生成 Proxy 子類(Proxy 是抽象類)。並調用Proxy 子類的 newInstance 方法生成 Proxy 實例
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

上面代碼並很少,首先是經過 Proxy 的 getProxy 方法獲取 Proxy 子類,而後建立 InvokerInvocationHandler 對象,並將該對象傳給 newInstance 生成 Proxy 實例。InvokerInvocationHandler 實現自 JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調用。該類邏輯比較簡單,這裏就不分析了。下面咱們重點關注一下 Proxy 的 getProxy 方法,以下。

public static Proxy getProxy(Class<?>... ics) {
    // 調用重載方法
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > 65535)
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    // 遍歷接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 檢測類型是否爲接口
        if (!ics[i].isInterface())
            throw new RuntimeException(itf + " is not a interface.");

        Class<?> tmp = null;
        try {
            // 從新加載接口類
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }

        // 檢測接口是否相同,這裏 tmp 有可能爲空
        if (tmp != ics[i])
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

        // 拼接接口全限定名,分隔符爲 ;
        sb.append(itf).append(';');
    }

    // 使用拼接後接口名做爲 key
    String key = sb.toString();

    Map<String, Object> cache;
    synchronized (ProxyCacheMap) {
        cache = ProxyCacheMap.get(cl);
        if (cache == null) {
            cache = new HashMap<String, Object>();
            ProxyCacheMap.put(cl, cache);
        }
    }

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 從緩存中獲取 Reference<Proxy> 實例
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null) {
                    return proxy;
                }
            }

            // 多線程控制,保證只有一個線程能夠進行後續操做
            if (value == PendingGenerationMarker) {
                try {
                    // 其餘線程在此處進行等待
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                // 放置標誌位到緩存中,並跳出 while 循環進行後續操做
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 建立 ClassGenerator 對象
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<String>();
        List<Method> methods = new ArrayList<Method>();

        for (int i = 0; i < ics.length; i++) {
            // 檢測接口訪問級別是否爲 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 獲取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    if (!pkg.equals(npkg))
                        // 非 public 級別的接口必須在同一個包下,否者拋出異常
                        throw new IllegalArgumentException("non-public interfaces from different packages");
                }
            }
            
            // 添加接口到 ClassGenerator 中
            ccp.addInterface(ics[i]);

            // 遍歷接口方法
            for (Method method : ics[i].getMethods()) {
                // 獲取方法描述,可理解爲方法簽名
                String desc = ReflectUtils.getDesc(method);
                // 若是已包含在 worked 中,則忽略。考慮這種狀況,
                // A 接口和 B 接口中包含一個徹底相同的方法
                if (worked.contains(desc))
                    continue;
                worked.add(desc);

                int ix = methods.size();
                // 獲取方法返回值類型
                Class<?> rt = method.getReturnType();
                // 獲取參數列表
                Class<?>[] pts = method.getParameterTypes();

                // 生成 Object[] args = new Object[1...N]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                for (int j = 0; j < pts.length; j++)
                    // 生成 args[1...N] = ($w)$1...N;
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                // 生成 InvokerHandler 接口的 invoker 方法調用語句,以下:
                // Object ret = handler.invoke(this, methods[1...N], args);
                code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");

                // 返回值不爲 void
                if (!Void.TYPE.equals(rt))
                    // 生成返回語句,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");

                methods.add(method);
                // 添加方法名、訪問控制符、參數列表、方法代碼等信息到 ClassGenerator 中 
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
            }
        }

        if (pkg == null)
            pkg = PACKAGE_NAME;

        // 構建接口代理類名稱:pkg + ".proxy" + id,好比 com.tianxiaobo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");

        // 爲接口代理類添加帶有 InvocationHandler 參數的構造方法,好比:
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        //     handler=$1;
        // }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
        // 爲接口代理類添加默認構造方法
        ccp.addDefaultConstructor();
        
        // 生成接口代理類
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // 構建 Proxy 子類名稱,好比 Proxy1,Proxy2 等
        String fcn = Proxy.class.getName() + id;
        ccm = ClassGenerator.newInstance(cl);
        ccm.setClassName(fcn);
        ccm.addDefaultConstructor();
        ccm.setSuperClass(Proxy.class);
        // 爲 Proxy 的抽象方法 newInstance 生成實現代碼,形如:
        // public Object newInstance(java.lang.reflect.InvocationHandler h) { 
        //     return new com.tianxiaobo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
        // 生成 Proxy 實現類
        Class<?> pc = ccm.toClass();
        // 經過反射建立 Proxy 實例
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        if (ccp != null)
            // 釋放資源
            ccp.release();
        if (ccm != null)
            ccm.release();
        synchronized (cache) {
            if (proxy == null)
                cache.remove(key);
            else
                // 寫緩存
                cache.put(key, new WeakReference<Proxy>(proxy));
            // 喚醒其餘等待線程
            cache.notifyAll();
        }
    }
    return proxy;
}

上面代碼比較複雜,我也寫了不少註釋。你們在閱讀這段代碼時,要搞清楚 ccp 和 ccm 的用途,否則會被搞暈。ccp 用於爲服務接口生成代理類,好比咱們有一個 DemoService 接口,這個接口代理類就是由 ccp 生成的。ccm 則是用於爲 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現 Proxy 的抽象方法。下面以 org.apache.dubbo.demo.DemoService 這個接口爲例,來看一下該接口代理類代碼大體是怎樣的(忽略 EchoService 接口)。

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = $1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = ($w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

好了,到這裏代理類生成邏輯就分析完了。整個過程比較複雜,你們須要耐心看一下,本節點到這裏。

4.總結

本篇文章對服務引用的過程進行了較爲詳盡的分析,之因此說是較爲詳盡,是由於還有一些地方沒有分析到。好比 Directory、Cluster 等實現類的代碼並未進行詳細分析,因爲這些類功能比較獨立,所以我打算後續單獨成文進行分析。暫時咱們能夠先把這些類當作黑盒,只要知道這些類的用途便可。引用服務過程涉及到的調用也很是多,你們在閱讀相關代碼的中耐心些,並多進行調試。

好了,本篇文章就先到這裏了。謝謝閱讀。

本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處註明出處
做者:田小波
本文同步發佈在個人我的博客:http://www.tianxiaobo.com

cc
本做品採用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。

相關文章
相關標籤/搜索