dubbo源碼解析(四十八)異步化改造

2.7大揭祕——異步化改造

目標:從源碼的角度分析2.7的新特性中對於異步化的改造原理。

前言

dubbo中提供了不少類型的協議,關於協議的系列能夠查看下面的文章:html

官方推薦的是使用dubbo協議,而異步調用的支持也是在dubbo協議中實現的。java

看了我以前寫的2.7新特性的文章,應該對於異步化改造有個大體的印象。要弄懂異步在何時起做用,先要弄懂dubbo 的服務暴露和引用過程以及消費端發送請求過程和服務端處理請求過程。我在前四篇文章已經講述了相關內容,異步請求只是dubbo的一種請求方式,基於 dubbo 底層的異步 NIO 實現異步調用,對於 Provider 響應時間較長的場景是必須的,它能有效利用 Consumer 端的資源,相對於 Consumer 端使用多線程來講開銷較小。可讓消費者無需阻塞等待返回結果。web

通過改良後,Provider端也支持異步處理請求,引用官網的話就是如今Provider端異步執行和Consumer端異步調用是相互獨立的,你能夠任意正交組合兩端配置。redis

如何開啓和使用異步能夠查看如下連接:apache

Provider異步執行: http://dubbo.apache.org/zh-cn/docs/user/demos/async-execute-on-provider.html

Consumer異步調用:http://dubbo.apache.org/zh-cn/docs/user/demos/async-call.htmlsegmentfault

異步的改造

Listener作爲Filter的內部接口

從設計上數據結構

  1. 廢棄了Filter原先的onResponse()方法
  2. 在Filter接口新增了內部接口Listener,相關接口設計以下。多線程

    • 優勢:職責劃分更加明確,進行邏輯分組,加強可讀性,Filter自己應僅是傳遞調用的響應,而全部回調都放入Listener。這樣作之後能夠把以前回調的邏輯從invoke裏面剝離出來,放到Listener的onResponse或者onError中。
interface Listener {

    /**
     * 回調正常的調用結果
     * @param appResponse
     * @param invoker
     * @param invocation
     */
    void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);

    /**
     * 回調異常結果
     * @param t
     * @param invoker
     * @param invocation
     */
    void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
  1. 新增抽象類ListenableFilter,實現了Filter接口,其中只記錄了一個該過濾器的內部Listener實例。
public abstract class ListenableFilter implements Filter {

    protected Listener listener = null;

    public Listener listener() {
        // 提供該過濾器的內部類listener
        return listener;
    }
}

異步轉同步,新增InvokeMode

不變的是配置來決定調用方式,變的是在什麼時候去作同步異步的不一樣邏輯處理。看《dubbo源碼解析(四十六)消費端發送請求過程》講到的(十四)DubboInvoker的doInvoke,在之前的邏輯會直接在doInvoke方法中根據配置區分同步、異步、單向調用。如今只單獨作了單向調用和須要返回結果的區分,統一先使用AsyncRpcResult來表示結果,也就是說一開始統一都是異步調用,而後在調用回到AsyncToSyncInvoker的invoke中時,纔對同步異步作區分,這裏新增了InvokeMode,InvokeMode如今有三種模式:SYNC, ASYNC, FUTURE。前兩種很顯而易見,後面一種是調用的返回類型是Future類型,表明調用的方法的返回類型是CompletableFuture類型,這種模式專門用來支持服務端異步的。看下面的源碼。app

public static InvokeMode getInvokeMode(URL url, Invocation inv) {
    // 若是返回類型是future
    if (isReturnTypeFuture(inv)) {
        return InvokeMode.FUTURE;
    } else if (isAsync(url, inv)) {
        // 若是是異步調用
        return InvokeMode.ASYNC;
    } else {
        // 若是是同步
        return InvokeMode.SYNC;
    }
}

參考《dubbo源碼解析(四十六)消費端發送請求過程》的(十二)AsyncToSyncInvoker的invoke邏輯,若是是同步模式,就會阻塞調用get方法。直到調用成功有結果返回。若是不是同步模式,就直接返回。異步

ResponseFuture改成CompletableFuture

關於ResponseFuture能夠參考《dubbo源碼解析(十)遠程通訊——Exchange層》的(六)ResponseFuture。具體的能夠看它的兩個實現(七)DefaultFuture和(八)SimpleFuture。

在此次改造中,最小JDK版本從之前的1.6變成了1.8。固然也要用到1.8中新特性,其中就包括CompletableFuture。dubbo的通訊主要有兩處,一處是Consumer發送請求消息給Provider,另外一處就是Provider把結果發送給Consumer。在Consumer發送請求消息給Provider的時候,Consumer不會一直處於等待,而是生成ResponseFuture會拋給下游去作其餘操做,等到Provider把結果返回放入ResponseFuture,Consumer能夠經過get方法得到結果,或者它也支持回調。可是這就暴露了一些問題,也就是爲在新特性裏提到的缺陷:

  • Future只支持阻塞式的get()接口獲取結果。由於future.get()會致使線程阻塞。
  • Future接口沒法實現自動回調,而自定義ResponseFuture雖支持callback回調但支持的異步場景有限,如不支持Future間的相互協調或組合等;

針對以上兩個不足,CompletableFuture能夠很好的解決它們。

  • 針對第一點不足,由於CompletableFuture實現了CompletionStage和Future接口,因此它仍是能夠像之前同樣經過阻塞或者輪詢的方式得到結果。這一點就能保證阻塞式得到結果,也就是同步調用不會被拋棄。固然自己也不是很建議用get()這樣阻塞的方式來獲取結果。
  • 針對第二點不足,首先是自動回調,CompletableFuture提供了良好的回調方法。好比下面四個方法有關計算結果完成時的處理:
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

當計算完成後,就會執行該方法中的action方法。相比於ResponseFuture,再也不須要本身去作回調註冊的編碼,更加易於理解。

  • 仍是針對第二點,自定義的ResponseFuture不支持Future間的相互協調或組合,CompletableFuture很好的解決了這個問題,在CompletableFuture中如下三個方法實現了future之間轉化的功能:
public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

因爲回調風格的實現,咱們沒必要由於等待一個計算完成而阻塞着調用線程,而是告訴CompletableFuture當計算完成的時候請執行某個function。並且咱們還能夠將這些操做串聯起來,或者將CompletableFuture組合起來。這一組函數的功能是當原來的CompletableFuture計算完後,將結果傳遞給函數fn,將fn的結果做爲新的CompletableFuture計算結果。所以它的功能至關於將CompletableFuture<T>轉換成CompletableFuture<U>。

除了轉化以外,還有future之間組合的支持,例如如下三個方法:

public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

這一組方法接受一個Function做爲參數,這個Function的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture,這個新的CompletableFuture會組合原來的CompletableFuture和函數返回的CompletableFuture。

如今就能看出CompletableFuture的強大了,它解決了自定義ResponseFuture的許多問題,該類有幾十個方法,感興趣的能夠去一一嘗試。

隨處可見的CompletableFuture

能夠看到之前的版本只能在RpcContext中進行獲取。而通過改良後,首先RpcContext同樣能過獲取,其次在過濾器鏈返回的Result中也能獲取,能夠從最新的代碼中看到,原先的RpcResult類已經被去除,而在AsyncRpcResult也繼承了CompletableFuture<Result>類,也就是說有AsyncRpcResult的地方,就有CompletableFuture。而且在後續的dubbo3.0中,AsyncRpcResult將會內置CompletableFuture類型的變量,CompletableFuture的獲取方式也會大大增長。

AsyncRpcResult全面替代RpcResult

接下來我就來說解一下AsyncRpcResult類。

/**
 * 當相同的線程用於執行另外一個RPC調用時,而且回調發生時,原來的RpcContext可能已經被更改。
 * 因此咱們應該保留當前RpcContext實例的引用,並在執行回調以前恢復它。
 * 存儲當前的RpcContext
 */
private RpcContext storedContext;
/**
 * 存儲當前的ServerContext
 */
private RpcContext storedServerContext;

/**
 * 會話域
 */
private Invocation invocation;

public AsyncRpcResult(Invocation invocation) {
    // 設置會話域
    this.invocation = invocation;
    // 得到當前線程內表明消費者端的Context
    this.storedContext = RpcContext.getContext();
    // 得到當前線程內表明服務端的Context
    this.storedServerContext = RpcContext.getServerContext();
}

/**
 * 轉換成新的AsyncRpcResult
 * @param asyncRpcResult
 */
public AsyncRpcResult(AsyncRpcResult asyncRpcResult) {
    this.invocation = asyncRpcResult.getInvocation();
    this.storedContext = asyncRpcResult.getStoredContext();
    this.storedServerContext = asyncRpcResult.getStoredServerContext();
}

上面的是AsyncRpcResult核心的變量以及構造函數,storedContext和storedServerContext存儲了相關的RpcContext實例的引用,爲的就是防止在回調的時候因爲相同的線程用於執行另外一個RPC調用致使原來的RpcContext可能已經被更改。因此存儲下來後,咱們須要在執行回調以前恢復它。具體的能夠看下面的thenApplyWithContext方法。

@Override
public Object getValue() {
    // 得到計算的結果
    return getAppResponse().getValue();
}

@Override
public void setValue(Object value) {
    // 建立一個AppResponse實例
    AppResponse appResponse = new AppResponse();
    // 把結果放入AppResponse
    appResponse.setValue(value);
    // 標誌該future完成,而且把攜帶結果的appResponse設置爲該future的結果
    this.complete(appResponse);
}

@Override
public Throwable getException() {
    // 得到拋出的異常信息
    return getAppResponse().getException();
}

@Override
public void setException(Throwable t) {
    // 建立一個AppResponse實例
    AppResponse appResponse = new AppResponse();
    // 把異常放入appResponse
    appResponse.setException(t);
    // 標誌該future完成,而且把攜帶異常的appResponse設置爲該future的結果
    this.complete(appResponse);
}

@Override
public boolean hasException() {
    // 設置是否有拋出異常
    return getAppResponse().hasException();
}

public Result getAppResponse() {
    // 若是該結果計算完成,則直接調用get方法得到結果
    try {
        if (this.isDone()) {
            return this.get();
        }
    } catch (Exception e) {
        // This should never happen;
        logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
    }
    // 建立AppResponse
    return new AppResponse();
}

這些實現了Result接口的方法,能夠發現其中都是調用了AppResponse的方法,AppResponse跟AsyncRpcResult同樣也繼承了AbstractResult,不過它是做爲回調的數據結構。AppResponse我會在異步化過濾器鏈回調中講述。

@Override
public Object recreate() throws Throwable {
    // 強制類型轉化
    RpcInvocation rpcInvocation = (RpcInvocation) invocation;
    // 若是返回的是future類型
    if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
        // 建立AppResponse實例
        AppResponse appResponse = new AppResponse();
        // 建立future
        CompletableFuture<Object> future = new CompletableFuture<>();
        // appResponse設置future值,由於返回的就是CompletableFuture類型
        appResponse.setValue(future);
        // 當該AsyncRpcResult完成的時候,把結果放入future中,這樣返回的就是CompletableFuture包裹的結果
        this.whenComplete((result, t) -> {
            if (t != null) {
                if (t instanceof CompletionException) {
                    t = t.getCause();
                }
                future.completeExceptionally(t);
            } else {
                if (result.hasException()) {
                    future.completeExceptionally(result.getException());
                } else {
                    future.complete(result.getValue());
                }
            }
        });
        // 重置
        return appResponse.recreate();
    } else if (this.isDone()) {
        // 若是完成,則直接重置
        return this.get().recreate();
    }
    // 若是返回類型不是CompletableFuture,則調用AppResponse的重置
    return (new AppResponse()).recreate();
}

該方法是重置,原本也是直接調用了AppResponse的方法,不過由於支持了以CompletableFuture爲返回類型的服務方法調用,因此這裏作了一些額外的邏輯,也就是把結果用CompletableFuture包裹,做爲返回的結果放入AppResponse實例中。能夠對標使用了CompletableFuture簽名的服務。

@Override
public Result thenApplyWithContext(Function<Result, Result> fn) {
    // 當該AsyncRpcResult完成後,結果做爲參數先執行beforeContext,再執行fn,最後執行andThen
    this.thenApply(fn.compose(beforeContext).andThen(afterContext));
    // You may need to return a new Result instance representing the next async stage,
    // like thenApply will return a new CompletableFuture.
    return this;
}


/**
 * tmp context to use when the thread switch to Dubbo thread.
 * 臨時的RpcContext,當用戶線程切換爲Dubbo線程時候使用
 */
/**
 * 臨時的RpcContext
 */
private RpcContext tmpContext;
private RpcContext tmpServerContext;

private Function<Result, Result> beforeContext = (appResponse) -> {
    // 得到當前線程消費者端的RpcContext
    tmpContext = RpcContext.getContext();
    // 得到當前線程服務端的RpcContext
    tmpServerContext = RpcContext.getServerContext();
    // 從新設置消費者端的RpcContext
    RpcContext.restoreContext(storedContext);
    // 從新設置服務端的RpcContext
    RpcContext.restoreServerContext(storedServerContext);
    return appResponse;
};

private Function<Result, Result> afterContext = (appResponse) -> {
    // 從新把臨時的RpcContext設置回去
    RpcContext.restoreContext(tmpContext);
    RpcContext.restoreServerContext(tmpServerContext);
    return appResponse;
};

把這幾部分代碼放在一塊兒時由於當用戶線程切換爲Dubbo線程時候須要用到臨時的RpcContext來記錄,如何使用該thenApplyWithContext方法,我也會在異步化過濾器鏈回調中講到。

其餘的方法比較好理解,我就不一一講解。

異步化過濾器鏈回調

若是看過前兩篇關於發送請求和處理請求的過程,應該就知道在整個調用鏈中有許多的過濾器,而Consumer和Provider分別都有各自的過濾器來作一些功能加強。過濾器有執行鏈,也有回調鏈,若是整一個鏈路都是同步的,那麼過濾器一旦增多,鏈路增加,就會帶來請求響應時間的增長,這固然是最不想看到的事情。那若是把過濾器的調用鏈異步化,那麼咱們就能夠用一個future來代替結果拋給下游,讓下游再也不阻塞。這樣就大大下降了響應時間,節省資源,提高RPC響應性能。而這裏的future就是下面要介紹的AppResponse。那我先來介紹一下如何實現異步化過濾器鏈回調。我就拿消費端發送請求過程來舉例子說明。

參考《dubbo源碼解析(四十六)消費端發送請求過程》的(六)ProtocolFilterWrapper的內部類CallbackRegistrationInvoker的invoke,能夠看到當全部的過濾器執行完後,會遍歷每個過濾器鏈,得到上面所說的內部接口Listener實現類,進行異步回調,由於請求已經在(十四)DubboInvoker的doInvoke中進行了發送,返回給下游一個AsyncRpcResult,而AsyncRpcResult內包裹的是AppResponse,能夠看《dubbo源碼解析(四十七)服務端處理請求過程》的(二十三)AbstractProxyInvoker的invoke,當代理類執行相關方法後,會建立一個AppResponse,把結果放入AppResponse中。因此AsyncRpcResult中包裹的是AppResponse,而後調用回調方法onResponse。而且會執行thenApplyWithContext把回調結果放入上下文中。而這個上下文如何避免相同的線程用於執行另外一個RPC調用致使原來的RpcContext可能已經被更改的狀況,我也在上面已經說明。

新增AppResponse

AppResponse繼承了AbstractResult,一樣也是CompletableFuture<Result>類型,可是AppResponse跟AsyncRpcResult職能不同,AsyncRpcResult做爲一個future,而AppResponse能夠說是做爲rpc調用結果的一個數據結構,它的實現很簡單,就是封裝瞭如下三個屬性和對應的一些方法。

/**
 * 調用結果
 */
private Object result;

/**
 * rpc調用時的異常
 */
private Throwable exception;

/**
 * 附加值
 */
private Map<String, String> attachments = new HashMap<String, String>();

前面我也講了,Provider處理請求完成後,會把結果放在AppResponse內,在整個鏈路調用過程當中AsyncRpcResult內部必然會有一個AppResponse存在,而爲上文提到的過濾器內置接口Listener的onResponse方法中的appResponse就是AppResponse類型的,它做爲一個回調的數據類型。

後記

該文章講解了dubbo 2.7.x版本對於異步化改造的介紹,上面只是羅列了全部改動的點,沒有具體講述在哪些新增功能上的應用,若是感興趣,能夠參考前幾篇的調用過程文章,來看看新增的功能點如何運用上述的設計的,好比Provider異步,有一種實現方式就用到了上述的InvokeMode。接下來一篇我會講述元數據的改造。

相關文章
相關標籤/搜索