Dubbo 引用的服務消費者最終會構形成一個 Spring 的 Bean,具體是經過 ReferenceBean
來實現的。它是一個 FactoryBean,全部的服務消費者 Bean 都經過它來生產。html
ReferenceBean#getObject() --> ReferenceConfig#get()
apache
private T createProxy(Map<String, String> map) { ...... // assemble URL from register center's configuration // 從註冊中心的配置組裝 URL(服務發現) List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } ...... if (urls.size() == 1) { // 建立 Invoker invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 當服務提供者有多個時,就建立一個 ClusterInvoker invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url invoker = cluster.join(new StaticDirectory(invokers)); } } ...... // create service proxy return (T) proxyFactory.getProxy(invoker); }
dubbo 的服務發現,是經過從註冊中心訂閱服務提供者組裝成 URL,而後經過 URL 建立出 Invoker 來實現的。app
Dubbo 的服務引用,其實是爲引用的接口建立一個 Proxy,這個 Proxy 的功能就是去執行 refprotocol.refer(interfaceClass, url)
建立出來的 Invoker。 當服務提供者有多個時,就建立一個 ClusterInvoker。Cluster 是一個 SPI 擴展點,默認使用的是 failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster。負載均衡
因此,Consumer 端服務調用的邏輯被封裝在 refprotocol.refer(interfaceClass, url) 建立出來的 Invoker 上。經過以前 Dubbo Protocol & Filter 的學習,咱們能夠知道 refprotocol 是一個 Wrapped Protocol,refer() 方法建立出來的 Invoker 是被 Filter 包裹的一個 DubboInvoker。tcp
綜上,Consumer 端服務調用的邏輯是:ide
執行 FailoverClusterInvoker#invoke(Invocation invocation) (負載均衡。當有多個 provider 時走這一步,沒有的話,就跳過)學習
執行 Filter#invoke(Invoker<?> invoker, Invocation invocation) (全部 group="provider" 的 Filter)url
執行 DubboInvoker#invoke(Invocation inv)spa
refprotocol.refer(interfaceClass, url) 被調用時會去建立與 provider 的 tcp 鏈接。代理
DubboProtocol#refer(Class<T> serviceType, URL url) 源碼以下:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. 同時,建立一條與 provider 的 tcp 鏈接 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
Dubbo 服務引用時,首先建立一條與 provider 的 tcp 鏈接 ExchangeClient,而後再建立一個 DubboInvoker。
getClients(url) 最終會調用 initClient(URL url) 來建立一條新鏈接:
private ExchangeClient initClient(URL url) { ...... // 設置 codec = "dubbo" url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); ...... ExchangeClient client; try { // connection should be lazy if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { // 建立一條新鏈接 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
新鏈接建立過程:
Exchangers.connect(url, requestHandler) --> ExchangeClient#connect(URL url, ExchangeHandler handler) --> Exchanger$Adaptive#connect(URL url, ExchangeHandler handler) --> HeaderExchanger#connect(URL url, ExchangeHandler handler) --> 返回 new HeaderExchangeClient(client, true)
// HeaderExchanger.class public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 建立新鏈接;啓動 dubbo 心跳 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
Transporters#connect(URL url, ChannelHandler... handlers) --> Transporter$Adaptive#connect(URL url, ChannelHandler handler) --> NettyTransporter#connect(URL url, ChannelHandler listener) --> 返回 new NettyClient(url, listener)
編解碼最終使用的是:com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec
附:
Dubbo 心跳(HeartBeatTask)的做用是:當檢測到心跳超時的時候,自動重連
在沒有註冊中心,直連提供者的狀況下 [3],ReferenceConfig
解析出的 URL 的格式爲:dubbo://service-host/com.foo.FooService?version=1.0.0
。
基於擴展點自適應機制,經過 URL 的 dubbo://
協議頭識別,直接調用 DubboProtocol
的 refer()
方法,返回提供者引用。
在有註冊中心,經過註冊中心發現提供者地址的狀況下 [4],ReferenceConfig
解析出的 URL 的格式爲:registry://registry-host/org.apache.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0")
。
基於擴展點自適應機制,經過 URL 的 registry://
協議頭識別,就會調用 RegistryProtocol
的 refer()
方法,基於 refer
參數中的條件,查詢提供者 URL,如: dubbo://service-host/com.foo.FooService?version=1.0.0
。
基於擴展點自適應機制,經過提供者 URL 的 dubbo://
協議頭識別,就會調用 DubboProtocol
的 refer()
方法,獲得提供者引用。
而後 RegistryProtocol
將多個提供者引用,經過 Cluster
擴展點,假裝成單個提供者引用返回。
上圖是服務消費的主過程:
首先 ReferenceConfig
類的 init
方法調用 Protocol
的 refer
方法生成 Invoker
實例(如上圖中的紅色部分),這是服務消費的關鍵。接下來把 Invoker
轉換爲客戶端須要的接口(如:HelloWorld)。
關於每種協議如 RMI/Dubbo/Web service 等它們在調用 refer
方法生成 Invoker