【Dubbo 源碼解析】05_Dubbo 服務發現&引用

Dubbo 服務發現&引用

Dubbo 引用的服務消費者最終會構形成一個 Spring 的 Bean,具體是經過 ReferenceBean 來實現的。它是一個 FactoryBean,全部的服務消費者 Bean 都經過它來生產。html

ReferenceBean#getObject() --> ReferenceConfig#get()

ReferenceConfig 最終會建立一個動態代理類返回:apache

private T createProxy(Map<String, String> map) {
    ......
    // assemble URL from register center's configuration
    // 從註冊中心的配置組裝 URL(服務發現)
    List<URL> us = loadRegistries(false);
    if (us != null && !us.isEmpty()) {
        for (URL u : us) {
            URL monitorUrl = loadMonitor(u);
            if (monitorUrl != null) {
                map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
            }
            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
        }
    }
    ......
    if (urls.size() == 1) {
        // 建立 Invoker
        invoker = refprotocol.refer(interfaceClass, urls.get(0));
    } else {
        List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
        URL registryURL = null;
        for (URL url : urls) {
            invokers.add(refprotocol.refer(interfaceClass, url));
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                registryURL = url; // use last registry url
            }
        }
        if (registryURL != null) { // registry url is available
            // use AvailableCluster only when register's cluster is available
            URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
            // 當服務提供者有多個時,就建立一個 ClusterInvoker
            invoker = cluster.join(new StaticDirectory(u, invokers));
        } else { // not a registry url
            invoker = cluster.join(new StaticDirectory(invokers));
        }
    }
    ......
    // create service proxy
    return (T) proxyFactory.getProxy(invoker);
}

 

 

服務發現

dubbo 的服務發現,是經過從註冊中心訂閱服務提供者組裝成 URL,而後經過 URL 建立出 Invoker 來實現的。app

服務引用

Dubbo 的服務引用,其實是爲引用的接口建立一個 Proxy,這個 Proxy 的功能就是去執行 refprotocol.refer(interfaceClass, url) 建立出來的 Invoker。 當服務提供者有多個時,就建立一個 ClusterInvoker。Cluster 是一個 SPI 擴展點,默認使用的是 failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster。負載均衡

因此,Consumer 端服務調用的邏輯被封裝在 refprotocol.refer(interfaceClass, url) 建立出來的 Invoker 上。經過以前 Dubbo Protocol & Filter 的學習,咱們能夠知道 refprotocol 是一個 Wrapped Protocol,refer() 方法建立出來的 Invoker 是被 Filter 包裹的一個 DubboInvoker。tcp

綜上,Consumer 端服務調用的邏輯是:ide

  1. 執行 FailoverClusterInvoker#invoke(Invocation invocation) (負載均衡。當有多個 provider 時走這一步,沒有的話,就跳過)學習

  2. 執行 Filter#invoke(Invoker<?> invoker, Invocation invocation) (全部 group="provider" 的 Filter)url

  3. 執行 DubboInvoker#invoke(Invocation inv)spa

 

服務消費端建立 tcp 鏈接

refprotocol.refer(interfaceClass, url) 被調用時會去建立與 provider 的 tcp 鏈接。代理

DubboProtocol#refer(Class<T> serviceType, URL url) 源碼以下:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // create rpc invoker. 同時,建立一條與 provider 的 tcp 鏈接
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

 

Dubbo 服務引用時,首先建立一條與 provider 的 tcp 鏈接 ExchangeClient,而後再建立一個 DubboInvoker。

getClients(url) 最終會調用 initClient(URL url) 來建立一條新鏈接:

private ExchangeClient initClient(URL url) {
    ......
    // 設置 codec = "dubbo"
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    ......
    ExchangeClient client;
    try {
        // connection should be lazy
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 建立一條新鏈接
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}

 

新鏈接建立過程:

Exchangers.connect(url, requestHandler) --> ExchangeClient#connect(URL url, ExchangeHandler handler) --> Exchanger$Adaptive#connect(URL url, ExchangeHandler handler) --> HeaderExchanger#connect(URL url, ExchangeHandler handler) --> 返回 new HeaderExchangeClient(client, true)

// HeaderExchanger.class
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 建立新鏈接;啓動 dubbo 心跳
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

 

Transporters#connect(URL url, ChannelHandler... handlers) --> Transporter$Adaptive#connect(URL url, ChannelHandler handler) --> NettyTransporter#connect(URL url, ChannelHandler listener) --> 返回 new NettyClient(url, listener)

編解碼最終使用的是:com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec

 

附:

Dubbo 心跳(HeartBeatTask)的做用是:當檢測到心跳超時的時候,自動重連

 

 

官方如是說:

引用服務

1. 直連引用服務:

在沒有註冊中心,直連提供者的狀況下 [3]ReferenceConfig 解析出的 URL 的格式爲:dubbo://service-host/com.foo.FooService?version=1.0.0

基於擴展點自適應機制,經過 URL 的 dubbo:// 協議頭識別,直接調用 DubboProtocolrefer() 方法,返回提供者引用。

2. 從註冊中心發現引用服務:

在有註冊中心,經過註冊中心發現提供者地址的狀況下 [4]ReferenceConfig 解析出的 URL 的格式爲:registry://registry-host/org.apache.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0")

基於擴展點自適應機制,經過 URL 的 registry:// 協議頭識別,就會調用 RegistryProtocolrefer()方法,基於 refer 參數中的條件,查詢提供者 URL,如: dubbo://service-host/com.foo.FooService?version=1.0.0

基於擴展點自適應機制,經過提供者 URL 的 dubbo:// 協議頭識別,就會調用 DubboProtocolrefer()方法,獲得提供者引用。

而後 RegistryProtocol 將多個提供者引用,經過 Cluster 擴展點,假裝成單個提供者引用返回。

服務消費者消費一個服務的詳細過程


上圖是服務消費的主過程:

首先 ReferenceConfig 類的 init 方法調用 Protocolrefer 方法生成 Invoker 實例(如上圖中的紅色部分),這是服務消費的關鍵。接下來把 Invoker 轉換爲客戶端須要的接口(如:HelloWorld)。

關於每種協議如 RMI/Dubbo/Web service 等它們在調用 refer 方法生成 Invoker 實例的細節和上一章節所描述的相似。

相關文章
相關標籤/搜索