該圖是通過簡化後的rpc-api模塊的類圖,去除了一些非關鍵的屬性和方法定義,也去除了一些非核心的類和接口,只是一個簡化了的的示意圖,這樣你們可以去除干擾看清楚該模塊的核心接口極其關係,請點擊看大圖更清晰一些。java
服務協議。這是rpc模塊中最核心的一個類,它定義了rpc的最主要的兩個行爲即:一、provider暴露遠程服務,即將調用信息發佈到服務器上的某個URL上去,能夠供消費者鏈接調用,通常是將某個service類的所有方法總體發佈到服務器上。二、consumer引用遠程服務,即根據service的服務類和provider發佈服務的URL轉化爲一個Invoker對象,消費者能夠經過該對象調用provider發佈的遠程服務。這其實歸納了rpc的最爲核心的職責,提供了多級抽象的實現、包裝器實現等。express
Protocol的頂層抽象實現類,它定義了這些屬性:一、exporterMap表示發佈過的serviceKey和Exporter(遠程服務發佈的引用)的映射表;二、invokers是一個Invoker對象的集合,表示層級暴露過遠程服務的服務執行體對象集合。還提供了一個通用的服務發佈銷燬方法destroy,該方法是一個通用方法,它清空了兩個集合屬性,調用了全部invoker的destroy方法,也調用全部exporter對象的unexport方法。apache
繼承自AbstractProtoco的一個抽象代理協議類。它聚合了代理工廠ProxyFactory對象來實現服務的暴露和引用。它的源碼以下。設計模式
/* * Copyright 1999-2012 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.protocol; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.ProxyFactory; import com.alibaba.dubbo.rpc.Result; import com.alibaba.dubbo.rpc.RpcException; /** * AbstractProxyProtocol * * @author william.liangf */ public abstract class AbstractProxyProtocol extends AbstractProtocol { private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();; private ProxyFactory proxyFactory; public AbstractProxyProtocol() { } public AbstractProxyProtocol(Class<?>... exceptions) { for (Class<?> exception : exceptions) { addRpcException(exception); } } public void addRpcException(Class<?> exception) { this.rpcExceptions.add(exception); } public void setProxyFactory(ProxyFactory proxyFactory) { this.proxyFactory = proxyFactory; } public ProxyFactory getProxyFactory() { return proxyFactory; } @SuppressWarnings("unchecked") public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException { final String uri = serviceKey(invoker.getUrl());//得到Url對應的serviceKey值。 Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);//根據url獲取對應的exporter。 if (exporter != null) {//若是已經存在,則直接返回,實現接口支持冪等調用。該處難道無須考慮線程安全問題嗎? return exporter; } //執行抽放方法暴露服務。runnable方法的行爲有什麼約束沒有?該處不明確。 final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl()); //調用proxyFactory.getProxy(invoker)來得到invoker的代理對象。 exporter = new AbstractExporter<T>(invoker) { public void unexport() { super.unexport(); exporterMap.remove(uri); if (runnable != null) { try { runnable.run(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } }; exporterMap.put(uri, exporter); return exporter; } public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException { //先調用doRefer得到服務服務對象,再調用proxyFactory.getInvoker得到invoker對象。 final Invoker<T> tagert = proxyFactory.getInvoker(doRefer(type, url), type, url); Invoker<T> invoker = new AbstractInvoker<T>(type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { try { Result result = tagert.invoke(invocation); Throwable e = result.getException(); if (e != null) { for (Class<?> rpcException : rpcExceptions) { if (rpcException.isAssignableFrom(e.getClass())) { throw getRpcException(type, url, invocation, e); } } } return result; } catch (RpcException e) { if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) { e.setCode(getErrorCode(e.getCause())); } throw e; } catch (Throwable e) { throw getRpcException(type, url, invocation, e); } } }; invokers.add(invoker); return invoker; } protected RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) { RpcException re = new RpcException("Failed to invoke remote service: " + type + ", method: " + invocation.getMethodName() + ", cause: " + e.getMessage(), e); re.setCode(getErrorCode(e)); return re; } protected int getErrorCode(Throwable e) { return RpcException.UNKNOWN_EXCEPTION; } /** **留給子類實現的真正將類發佈到URL上的抽象方法定義,由具體的協議來實現。 **/ protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException; /** **留給子類實現的引用遠程服務的抽象方法定義,該方法是將URL和type接口類應用到一個能夠遠程調用代理對象。 **/ protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException; }
是一個Protocol的支持過濾器的裝飾器。經過該裝飾器的對原始對象的包裝使得Protocol支持可擴展的過濾器鏈,已經支持的包括ExceptionFilter、ExecuteLimitFilter和TimeoutFilter等多種支持不一樣特性的過濾器。api
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; //經過該句得到擴展配置的過濾器列表,具體機制須要研究該類的實現。 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { //循環將過濾器列表組裝成爲過濾器鏈,目標invoker是最後一個執行的。 for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
一個支持監聽器特性的Protocal的包裝器。支持兩種監聽器的功能擴展,分別是:ExporterListener是遠程服務發佈監聽器,能夠監聽服務發佈和取消發佈兩個事件點;InvokerListener是服務消費者引用調用器的監聽器,能夠監聽引用和銷燬兩個事件方法。支持可擴展的事件監聽模型,目前只提供了一些適配器InvokerListenerAdapter、ExporterListenerAdapter以及簡單的過時服務調用監聽器DeprecatedInvokerListener。開發者可自行擴展本身的監聽器。該類源碼以下。安全
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.protocol; import java.util.Collections; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.ExporterListener; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.InvokerListener; import com.alibaba.dubbo.rpc.Protocol; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.listener.ListenerExporterWrapper; import com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper; /** * ListenerProtocol * * @author william.liangf */ public class ProtocolListenerWrapper implements Protocol { private final Protocol protocol; public ProtocolListenerWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } public int getDefaultPort() { return protocol.getDefaultPort(); } public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //特殊協議,跳過監聽器觸發。 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //調用原始協議的發佈方法,觸發監聽器鏈事件。 return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); } public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); } public void destroy() { protocol.destroy(); } }
dubbo的代理工廠。定義了兩個接口分別是:getProxy根據invoker目標接口的代理對象,通常是消費者得到代理對象觸發遠程調用;getInvoker方法將代理對象proxy、接口類type和遠程服務的URL獲取執行對象Invoker,每每是提供者得到目標執行對象執行目標實現調用。AbstractProxyFactory是其抽象實現,提供了getProxy的模版方法實現,使得能夠支持多接口的映射。dubbo最終內置了兩種動態代理的實現,分別是jdkproxy和javassist。默認的實現使用javassist。爲何選擇javassist,梁飛選型的時候作過性能測試對比分析,參考:http://javatar.iteye.com/blog/814426/服務器
該接口是服務的執行體。它有獲取服務發佈的URL,服務的接口類等關鍵屬性的行爲;還有核心的服務執行方法invoke,執行該方法後返回執行結果Result,而傳遞的參數是調用信息Invocation。該接口有大量的抽象和具體實現類。AbstractProxyInvoker是基於代理的執行器抽象實現,AbstractInvoker是通用的抽象實現。app
首先ServiceConfig類拿到對外提供服務的實際類ref(如:HelloWorldImpl),而後經過ProxyFactory類的getInvoker方法使用ref生成一個AbstractProxyInvoker實例,到這一步就完成具體服務到Invoker的轉化。接下來就是Invoker轉換到Exporter的過程。
Dubbo處理服務暴露的關鍵就在Invoker轉換到Exporter的過程(如上圖中的紅色部分),下面咱們以Dubbo和RMI這兩種典型協議的實現來進行說明:less
上圖是服務消費的主過程:
首先ReferenceConfig類的init方法調用Protocol的refer方法生成Invoker實例(如上圖中的紅色部分),這是服務消費的關鍵。接下來把Invoker轉換爲客戶端須要的接口(如:HelloWorld)。
關於每種協議如RMI/Dubbo/Web service等它們在調用refer方法生成Invoker實例的細節和上一章節所描述的相似。ide
該模塊下設計較爲複雜,在設計中能夠看出來應用了大量的設計模式,包括模版方法、職責鏈、裝飾器和動態代理等設計模式。掌握該模塊下的核心概念對於後續閱讀其它部分代碼相當重要,後面的其它模塊要麼是它的實現,要麼是由它衍生出來的,要麼與它的關係很是緊密。
接下來咱們看看rpc的默認實現模塊——dubbo-rpc-default。該模塊提供了默認的dubbo協議的實現,也是默認使用的協議。