上圖是服務消費的主過程:html
首先經過ReferenceConfig
類的private void init()
方法會先檢查初始化全部的配置信息後,調用private T createProxy(Map<String, String> map)
建立代理,消費者最終獲得的是服務的代理, 在createProxy
接着調用Protocol
接口實現的<T> Invoker<T> refer(Class<T> type, URL url)
方法生成Invoker
實例(如上圖中的紅色部分),這是服務消費的關鍵。接下來把Invoker
經過ProxyFactory
代理工廠轉換爲客戶端須要的接口(如:HelloWorld
),建立服務代理並返回。java
一、把服務引用的信息封裝成URL並註冊到zk註冊中心;
二、監聽註冊中心的服務的上下線;
三、鏈接服務提供端,建立NettyClient對象;
四、將這些信息包裝成DubboInvoker消費端的調用鏈,建立消費端Invoker實例的服務代理並返回;apache
一、通過負載均衡策略,調用提供者;
二、選擇其中一個服務的URL與提供者netty創建鏈接,使用ProxyFactory 建立遠程通訊,或者本地通訊的,Invoker發到netty服務端;
三、服務器端接收到該Invoker信息後,找到對應的本地Invoker,處理Invocation請求;
四、獲取異步,或同步處理結果;服務器
先看一個簡單的客戶端引用服務的例子,HelloService
,dubbo
配置以下:app
<dubbo:application name="consumer-of-helloService" /> <dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" /> <dubbo:protocol name="dubbo" port="20880" /> <dubbo:reference id="helloService" interface="com.demo.dubbo.service.HelloService" />
Zookeeper
做爲註冊中心HelloService
接口服務根據以前的介紹,在Spring啓動的時候,根據<dubbo:reference>
配置會建立一個ReferenceBean,該bean又實現了Spring的FactoryBean接口,因此咱們以下方式使用時:負載均衡
@Autowired private HelloService helloService;
使用的不是ReferenceBean對象,而是ReferenceBean的getObject()方法返回的對象,該對象經過代理實現了HelloService接口,因此要看服務引用的整個過程就須要從ReferenceBean.getObject()方法開始入手。異步
將ReferenceConfig.init()中的內容拆成具體的步驟,以下:async
methods=hello, timestamp=1443695417847, dubbo=2.5.3 application=consumer-of-helloService side=consumer pid=7748 interface=com.demo.dubbo.service.HelloService
若是是單個註冊中心,代碼以下:ide
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); invoker = refprotocol.refer(interfaceClass, url);
上述url的內容以下:ui
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? application=consumer-of-helloService& dubbo=2.5.6& pid=8292& registry=zookeeper& timestamp=1443707173909& refer= application=consumer-of-helloService& dubbo=2.5.6& interface=com.demo.dubbo.service.HelloService& methods=hello& pid=8292& side=consumer& timestamp=1443707173884&
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); proxyFactory.getProxy(invoker);
下面就詳細說明下上述提到的幾個概念:Protocol、Invoker、ProxyFactory
Invoker是一個可執行對象,有三種類型的Invoker:
Invoker的實現狀況以下:
先來看服務引用的第2個步驟,返回Invoker對象
對於客戶端來講,Invoker應該是二、3這兩種類型。先來講第2種類型,即遠程通訊的Invoker,看DubboInvoker的源碼,調用過程AbstractInvoker.invoke()->doInvoke():
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
大體內容就是:
將經過遠程通訊將Invocation信息傳遞給服務器端,服務器端接收到該Invocation信息後,找到對應的本地Invoker,而後經過反射執行相應的方法,將方法的返回值再經過遠程通訊將結果傳遞給客戶端。
這裏分3種狀況:
ExchangeClient.send()
方法。ExchangeClient.request()
方法返回一個ResponseFuture
對象,經過RpcContext
中的ThreadLocal
使ResponseFuture
和當前線程綁定,未等服務端響應結果就直接返回,而後服務端經過ProtocolFilterWrapper.buildInvokerChain()
方法會調用Filter.invoke()
方法,即FutureFilter.invoker()->asyncCallback()
,會獲取RpcContext
的ResponseFuture
對象,異步返回結果。ExchangeClient.request()
方法,返回一個ResponseFuture
,一直阻塞到服務端返回響應結果服務引用的第二步就是:
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); invoker = refprotocol.refer(interfaceClass, url);
使用協議Protocol根據上述的url和服務接口來引用服務,建立出一個Invoker對象
默認實現的DubboProtocol也會通過ProtocolFilterWrapper、ProtocolListenerWrapper、RegistryProtocol的包裝
首先看下RegistryProtocol.refer()方法,它幹了哪些事呢?
Directory和Cluster都是服務治理的重點,接下去會單獨拿一章出來說
服務引用的第三步就是:
proxyFactory.getProxy(invoker);
對於Server端,ProxyFactory主要負責將服務如HelloServiceImpl統一進行包裝成一個Invoker,這些Invoker經過反射來執行具體的HelloServiceImpl對象的方法
對於client端,則是將上述建立的集羣版Invoker(Cluster)建立出代理對象
代碼以下:
public class JavassistProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
能夠看到是利用jdk自帶的Proxy來動態代理目標對象Invoker,因此咱們調用建立出來的代理對象如HelloService的方法時,會執行InvokerInvocationHandler中的邏輯:
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler){ this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } //AbstractClusterInvoker.invoke() return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
參考:http://dubbo.apache.org/books/dubbo-dev-book/implementation.html
參考:https://my.oschina.net/xiaominmin/blog/1599378