目標:介紹以後解讀遠程調用模塊的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的源碼解析。
最近我面臨着一個選擇,由於dubbo 2.7.0-release出如今了倉庫裏,最近一直在進行2.7.0版本的code review,那我以前說這一系列的文章都是講述2.6.x版本的源代碼,我如今要不要選擇直接開始講解2.7.0的版本的源碼呢?我最後仍是決定繼續講解2.6.x,由於我以爲仍是有不少公司在用着2.6.x的版本,而且對於升級2.7.0的計劃應該還沒那麼快,而且在瞭解2.6.x版本的原理後,再去了解2.7.0新增的特性會更加容易,也可以品位到設計者的意圖。固然在結束2.6.x的重要模塊講解後,我也會對2.7.0的新特性以及實現原理作一個全面的分析,2.7.0做爲dubbo社區的畢業版,更增強大,敬請期待。java
前面講了不少的內容,如今開始將遠程調用RPC,好像又回到我第一篇文章 《dubbo源碼解析(一)Hello,Dubbo》,在這篇文章開頭我講到了什麼叫作RPC,再通俗一點講,就是我把一個項目的兩部分代碼分開來,分別放到兩臺機器上,當我部署在A服務器上的應用想要調用部署在B服務器上的應用等方法,因爲不存在同一個內存空間,不能直接調用。而其實整個dubbo都在作遠程調用的事情,它涉及到不少內容,好比配置、代理、集羣、監控等等,那麼此次講的內容是隻關心一對一的調用,dubbo-rpc遠程調用模塊抽象各類協議,以及動態代理,Proxy層和Protocol層rpc的核心,我將會在本系列中講到。下面咱們來看兩張官方文檔的圖:git
你會發現其中有咱們之前講到的Transporter、Server、Registry,而此次的系列將會講到的就是紅色框框內的部分。github
在引用服務時序圖中,對應的也是紅色框框的部分。segmentfault
當閱讀完該系列後,但願能對這個調用鏈有所感悟。接下來看看dubbo-rpc的包結構:api
能夠看到有不少包,很規整,其中dubbo-rpc-api是對協議、暴露、引用、代理等的抽象和實現,是rpc整個設計的核心內容。其餘的包則是dubbo支持的9種協議,在官方文檔也能查看介紹,而且包括一種本地調用injvm。那麼咱們再來看看dubbo-rpc-api中包結構:服務器
下面的篇幅設計,本文會講解最外層的源碼和service下的源碼,support包下的源碼我會穿插在其餘用到的地方一併講解,filter、listener、protocol、proxy以及各種協議的實現各自用一篇來說。網絡
public interface Invoker<T> extends Node { /** * get service interface. * 得到服務接口 * @return service interface. */ Class<T> getInterface(); /** * invoke. * 調用下一個會話域 * @param invocation * @return result * @throws RpcException */ Result invoke(Invocation invocation) throws RpcException; }
該接口是實體域,它是dubbo的核心模型,其餘模型都向它靠攏,或者轉化成它,它表明了一個可執行體,能夠向它發起invoke調用,這個有多是一個本地的實現,也多是一個遠程的實現,也多是一個集羣的實現。它表明了一次調用併發
public interface Invocation { /** * get method name. * 得到方法名稱 * @return method name. * @serial */ String getMethodName(); /** * get parameter types. * 得到參數類型 * @return parameter types. * @serial */ Class<?>[] getParameterTypes(); /** * get arguments. * 得到參數 * @return arguments. * @serial */ Object[] getArguments(); /** * get attachments. * 得到附加值集合 * @return attachments. * @serial */ Map<String, String> getAttachments(); /** * get attachment by key. * 得到附加值 * @return attachment value. * @serial */ String getAttachment(String key); /** * get attachment by key with default value. * 得到附加值 * @return attachment value. * @serial */ String getAttachment(String key, String defaultValue); /** * get the invoker in current context. * 得到當前上下文的invoker * @return invoker. * @transient */ Invoker<?> getInvoker(); }
Invocation 是會話域,它持有調用過程當中的變量,好比方法名,參數等。jvm
public interface Exporter<T> { /** * get invoker. * 得到對應的實體域invoker * @return invoker */ Invoker<T> getInvoker(); /** * unexport. * 取消暴露 * <p> * <code> * getInvoker().destroy(); * </code> */ void unexport(); }
該接口是暴露服務的接口,定義了兩個方法分別是得到invoker和取消暴露服務。ide
@SPI public interface ExporterListener { /** * The exporter exported. * 暴露服務 * @param exporter * @throws RpcException * @see com.alibaba.dubbo.rpc.Protocol#export(Invoker) */ void exported(Exporter<?> exporter) throws RpcException; /** * The exporter unexported. * 取消暴露 * @param exporter * @throws RpcException * @see com.alibaba.dubbo.rpc.Exporter#unexport() */ void unexported(Exporter<?> exporter); }
該接口是服務暴露的監聽器接口,定義了兩個方法是暴露和取消暴露,參數都是Exporter類型的。
@SPI("dubbo") public interface Protocol { /** * Get default port when user doesn't config the port. * 得到默認的端口 * @return default port */ int getDefaultPort(); /** * Export service for remote invocation: <br> * 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress();<br> * 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when * export the same URL<br> * 3. Invoker instance is passed in by the framework, protocol needs not to care <br> * 暴露服務方法, * @param <T> Service type 服務類型 * @param invoker Service invoker 服務的實體域 * @return exporter reference for exported service, useful for unexport the service later * @throws RpcException thrown when error occurs during export the service, for example: port is occupied */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; /** * Refer a remote service: <br> * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object <br> * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation. <br> * 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when * connection fails. * 引用服務方法 * @param <T> Service type 服務類型 * @param type Service class 服務類名 * @param url URL address for the remote service * @return invoker service's local proxy * @throws RpcException when there's any error while connecting to the service provider */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; /** * Destroy protocol: <br> * 1. Cancel all services this protocol exports and refers <br> * 2. Release all occupied resources, for example: connection, port, etc. <br> * 3. Protocol can continue to export and refer new service even after it's destroyed. */ void destroy(); }
該接口是服務域接口,也是協議接口,它是一個可擴展的接口,默認實現的是dubbo協議。定義了四個方法,關鍵的是服務暴露和引用兩個方法。
@SPI public interface Filter { /** * do invoke filter. * <p> * <code> * // before filter * Result result = invoker.invoke(invocation); * // after filter * return result; * </code> * * @param invoker service * @param invocation invocation. * @return invoke result. * @throws RpcException * @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation) */ Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; }
該接口是invoker調用時過濾器接口,其中就只有一個invoke方法。在該方法中對調用進行過濾
@SPI public interface InvokerListener { /** * The invoker referred * 在服務引用的時候進行監聽 * @param invoker * @throws RpcException * @see com.alibaba.dubbo.rpc.Protocol#refer(Class, com.alibaba.dubbo.common.URL) */ void referred(Invoker<?> invoker) throws RpcException; /** * The invoker destroyed. * 銷燬實體域 * @param invoker * @see com.alibaba.dubbo.rpc.Invoker#destroy() */ void destroyed(Invoker<?> invoker); }
該接口是實體域的監聽器,定義了兩個方法,分別是服務引用和銷燬的時候執行的方法。
該接口是實體域執行invoke的結果接口,裏面定義了得到結果異常以及附加值等方法。比較好理解我就不貼代碼了。
@SPI("javassist") public interface ProxyFactory { /** * create proxy. * 建立一個代理 * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY}) <T> T getProxy(Invoker<T> invoker) throws RpcException; /** * create proxy. * 建立一個代理 * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY}) <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException; /** * create invoker. * 建立一個實體域 * @param <T> * @param proxy * @param type * @param url * @return invoker */ @Adaptive({Constants.PROXY_KEY}) <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException; }
該接口是代理工廠接口,它也是個可擴展接口,默認實現javassist,dubbo提供兩種動態代理方法分別是javassist/jdk,該接口定義了三個方法,前兩個方法是經過invoker建立代理,最後一個是經過代理來得到invoker。
該類就是遠程調用的上下文,貫穿着整個調用,例如A調用B,而後B調用C。在服務B上,RpcContext在B以前將調用信息從A保存到B。開始調用C,並在B調用C後將調用信息從B保存到C。RpcContext保存了調用信息。
public class RpcContext { /** * use internal thread local to improve performance * 本地上下文 */ private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; /** * 服務上下文 */ private static final InternalThreadLocal<RpcContext> SERVER_LOCAL = new InternalThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; /** * 附加值集合 */ private final Map<String, String> attachments = new HashMap<String, String>(); /** * 上下文值 */ private final Map<String, Object> values = new HashMap<String, Object>(); /** * 線程結果 */ private Future<?> future; /** * url集合 */ private List<URL> urls; /** * 當前的url */ private URL url; /** * 方法名稱 */ private String methodName; /** * 參數類型集合 */ private Class<?>[] parameterTypes; /** * 參數集合 */ private Object[] arguments; /** * 本地地址 */ private InetSocketAddress localAddress; /** * 遠程地址 */ private InetSocketAddress remoteAddress; /** * 實體域集合 */ @Deprecated private List<Invoker<?>> invokers; /** * 實體域 */ @Deprecated private Invoker<?> invoker; /** * 會話域 */ @Deprecated private Invocation invocation; // now we don't use the 'values' map to hold these objects // we want these objects to be as generic as possible /** * 請求 */ private Object request; /** * 響應 */ private Object response;
該類中最重要的是它的一些屬性,由於該上下文就是用來保存信息的。方法我就不介紹了,由於比較簡單。
/** * 不知道異常 */ public static final int UNKNOWN_EXCEPTION = 0; /** * 網絡異常 */ public static final int NETWORK_EXCEPTION = 1; /** * 超時異常 */ public static final int TIMEOUT_EXCEPTION = 2; /** * 基礎異常 */ public static final int BIZ_EXCEPTION = 3; /** * 禁止訪問異常 */ public static final int FORBIDDEN_EXCEPTION = 4; /** * 序列化異常 */ public static final int SERIALIZATION_EXCEPTION = 5;
該類是rpc調用拋出的異常類,其中封裝了五種通用的錯誤碼。
/** * 方法名稱 */ private String methodName; /** * 參數類型集合 */ private Class<?>[] parameterTypes; /** * 參數集合 */ private Object[] arguments; /** * 附加值 */ private Map<String, String> attachments; /** * 實體域 */ private transient Invoker<?> invoker;
該類實現了Invocation接口,是rpc的會話域,其中的方法比較簡單,主要是封裝了上述的屬性。
/** * 結果 */ private Object result; /** * 異常 */ private Throwable exception; /** * 附加值 */ private Map<String, String> attachments = new HashMap<String, String>();
該類實現了Result接口,是rpc的結果實現類,其中關鍵是封裝了以上三個屬性。
該類是rpc的一些狀態監控,其中封裝了許多的計數器,用來記錄rpc調用的狀態。
/** * uri對應的狀態集合,key爲uri,value爲RpcStatus對象 */ private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>(); /** * method對應的狀態集合,key是uri,第二個key是方法名methodName */ private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); /** * 已經沒用了 */ private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>(); /** * 活躍狀態 */ private final AtomicInteger active = new AtomicInteger(); /** * 總的數量 */ private final AtomicLong total = new AtomicLong(); /** * 失敗的個數 */ private final AtomicInteger failed = new AtomicInteger(); /** * 總調用時長 */ private final AtomicLong totalElapsed = new AtomicLong(); /** * 總調用失敗時長 */ private final AtomicLong failedElapsed = new AtomicLong(); /** * 最大調用時長 */ private final AtomicLong maxElapsed = new AtomicLong(); /** * 最大調用失敗時長 */ private final AtomicLong failedMaxElapsed = new AtomicLong(); /** * 最大調用成功時長 */ private final AtomicLong succeededMaxElapsed = new AtomicLong(); /** * Semaphore used to control concurrency limit set by `executes` * 信號量用來控制`execution`設置的併發限制 */ private volatile Semaphore executesLimit; /** * 用來控制`execution`設置的許可證 */ private volatile int executesPermits;
以上是該類的屬性,能夠看到保存了不少的計數器,分別用來記錄了失敗調用成功調用等累計數。
/** * 開始計數 * @param url */ public static void beginCount(URL url, String methodName) { // 對該url對應對活躍計數器加一 beginCount(getStatus(url)); // 對該方法對活躍計數器加一 beginCount(getStatus(url, methodName)); } /** * 以原子方式加1 * @param status */ private static void beginCount(RpcStatus status) { status.active.incrementAndGet(); }
該方法是增長計數。
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) { // url對應的狀態中計數器減一 endCount(getStatus(url), elapsed, succeeded); // 方法對應的狀態中計數器減一 endCount(getStatus(url, methodName), elapsed, succeeded); } private static void endCount(RpcStatus status, long elapsed, boolean succeeded) { // 活躍計數器減一 status.active.decrementAndGet(); // 總計數器加1 status.total.incrementAndGet(); // 總調用時長加上調用時長 status.totalElapsed.addAndGet(elapsed); // 若是最大調用時長小於elapsed,則設置最大調用時長 if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } // 若是rpc調用成功 if (succeeded) { // 若是成最大調用成功時長小於elapsed,則設置最大調用成功時長 if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { // 失敗計數器加一 status.failed.incrementAndGet(); // 失敗的過時數加上elapsed status.failedElapsed.addAndGet(elapsed); // 總調用失敗時長小於elapsed,則設置總調用失敗時長 if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } }
該方法是計數器減小。
該類是系統上下文,僅供內部使用。
/** * 系統名稱 */ private static final String SYSTEMNAME = "system"; /** * 系統上下文集合,僅供內部使用 */ private static final ConcurrentMap<String, StaticContext> context_map = new ConcurrentHashMap<String, StaticContext>(); /** * 系統上下文名稱 */ private String name;
上面是該類的屬性,它還記錄了全部的系統上下文集合。
public interface EchoService { /** * echo test. * 回聲測試 * @param message message. * @return message. */ Object $echo(Object message); }
該接口是回聲服務接口,定義了一個一個回聲測試的方法,回聲測試用於檢測服務是否可用,回聲測試按照正常請求流程執行,可以測試整個調用是否通暢,可用於監控,全部服務自動實現該接口,只需將任意服務強制轉化爲EchoService,就能夠用了。
該方法是通用的異常類。
/** * 異常類名 */ private String exceptionClass; /** * 異常信息 */ private String exceptionMessage;
比較簡單,就封裝了兩個屬性。
public interface GenericService { /** * Generic invocation * 通用的會話域 * @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is * required, e.g. findPerson(java.lang.String) * @param parameterTypes Parameter types * @param args Arguments * @return invocation return value * @throws Throwable potential exception thrown from the invocation */ Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException; }
該接口是通用的服務接口,一樣定義了一個相似invoke的方法
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了遠程調用的開篇,介紹以後解讀遠程調用模塊的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的源碼解析,其中的邏輯不負責,要關注的是其中的一些概念和dubbo如何去作暴露服務和引用服務,其中不少的接口定義須要弄清楚。接下來我將開始對rpc模塊的過濾器進行講解。