Dubbo 自 2011 年 10 月 27 日開源後,已被許多非阿里系的公司使用,其中既有當當網、網易考拉等互聯網公司,也不乏中國人壽、青島海爾等大型傳統企業。更多用戶信息,能夠訪問Dubbo @GitHub,issue#1012: Wanted: who's using dubbo。java
自去年 12 月開始,Dubbo 3.0 便已正式進入開發階段,並備受社區和廣大 Dubbo 用戶的關注,本文將爲您詳細解讀 3.0 預覽版的新特性和新功能。react
下面先解答一下兩個有意思的與 Dubbo 相關的疑問。git
筆者曾作過 Dubbo 協議的適配兼容,Dubbo 確實存在過 1.x 版本,並且從協議設計和模型設計上都與 2.0 的開源版本協議是徹底不同的。下圖是關於 Dubbo 的發展路徑:github
是的,很是肯定,當前開源版本的 Dubbo 在阿里巴巴被普遍使用,而阿里的電商核心部門是用的 HSF2.2 版本,這個版本是兼容了 Dubbo 使用方式和 Remoting 協議。固然,咱們如今正在作 HSF2.2 的升級,直接依賴開源版本的 Dubbo 來作內核的統一。因此,Dubbo 是獲得大規模線上系統驗證的分佈式服務框架,這一點毋容置疑。spring
Dubbo 3.0 在設計和功能上的新增支持和改進,主要是如下四方面:apache
這裏要指出的是,3.0 中規劃的異步去阻塞和 2.7 中提供的異步是兩個層面的特性。2.7 中的異步是創建在傳統 RPC 中 request – response 會話模型上的,而 3.0 中的異步將會從通信協議層面由下向上構建,關注的是跨進程、全鏈路的異步問題。經過底層協議開始支持 streaming 方式,不僅僅能夠支持多種會話模型,還能夠在協議層面開始支持反壓、限流等特性,使得整個分佈式體系更具備彈性。綜上所述,2.7 關注的異步更侷限在點對點的異步(一個 consumer 調用一個 provider),3.0 關注的異步化,寬度上則關注整個調用鏈上的異步,高度上則向上又能夠包裝成 Rx 的編程模型。有趣的是,Spring 5.0 發佈了對 Flux 的支持,隨後開始解決跨進程的異步問題。編程
最近幾年, reactive programming
這個詞語的熱度迅速提高,Wikipedia 上的 reactive programming 解釋是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0會實現Reactive Stream 的 rx 接口,從而能讓用戶享受到RP帶來的響應性提高,甚至面向 RP 的架構升級。固然,咱們但願 reactive 不僅僅可以帶來事件(event)驅動的應用集成方式的升級,也但願在 Load Balance(選擇最優的服務節點),fault tolerance(限流降級時最好作到自適應)等方面發揮其積極價值。設計模式
咱們定下的策略是進入 Envoy 社區來實現 Dubbo 融入 mesh 的理念思想,目前 Dubbo 協議已經被 Envoy 支持。固然,Dubbo Mesh 離真正可用還有很長一段距離,其在選址、負載均衡和服務治理方面的工做須要繼續在數據面建設,另外,控制面板的建設在社區也沒有提上日程。api
Dubbo 3.0 定下了內外融合的策略,也就是說 3.0 的核心最終會在阿里巴巴的生產系統中部署,相信經過大流量、大規模的考驗,Dubbo 用戶能夠得到一個性能、穩定、服務治理實踐各方面俱佳的核心,用戶在生產系統中採用 3.0 也會更加放心。這一點也是 Dubbo 3.0 最重要的使命。網絡
Dubbo 最強大的一處設計是其在 Filter 鏈上的抽象設計,經過其擴展機制的開放性支持,用戶能夠對 Dubbo 作功能加強,並容許各個擴展點被定製來是否保留。
Dubbo 的 Filter 定義以下:
@SPI public interface Filter { /** * do invoke filter. * <p> * <code> * // before filter * Result result = invoker.invoke(invocation); * // after filter * return result; * </code> * * @param invoker service * @param invocation invocation. * @return invoke result. * @throws RpcException * @see org.apache.dubbo.rpc.Invoker#invoke(Invocation) */ Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; }
按照「調用一個遠程服務的方法就像調用本地的方法同樣」這種說法,這個直接返回 Result 響應的方式是很是好的,用起來是簡單直接,問題是時代變換到了須要關注體驗,須要走 Reactive 響應式的時代,也回到基本點:invoke一個 invocation 須要通過網絡在不一樣的進程處理,自然就是異步的過程,也就是發送請求(invocation)與接收響應(Result)自己是兩個不一樣的事件,是須要兩個過程方法來在 Filter 鏈處理。那麼如何改造這個關鍵的 SPI 呢?有兩種方案:
第一種,把 invoke 的返回值改爲 CompletableFuture, 好處是一目瞭然,Result 不在建議同步獲取了;但基礎接口的簽名一改會致使代碼改造量巨大,同時也會讓原有的 SPI 擴展不在支持。
第二種,Result 接口直接繼承 CompletationStage,是表明了響應的異步計算。這樣能進避免第一種的劣勢。因此,3.0.0 Preview 版本對內部調用鏈路實現作了一次重構:基於 CompletableFuture 實現了框架內部的全異步調用,而在外圍編程上,同時支持同步、異步調用模式。
值得注意的是,這次重構僅限於框架內部實現,對使用方沒有任何影響即接口上保持徹底兼容。要了解 Dubbo 異步 API 如何使用,請參考《如何基於 Dubbo 實現全異步的調用鏈》,這篇文章將着重對實現思路和原理作一些簡單介紹。這次重構的要點有:
首先咱們來看一個通用的跨網絡異步調用的線程模型:
通訊框架異步發送請求消息,請求消息發送成功後,返回表明業務結果的 CompletableFuture 給業務線程。以後對於 Future 的處理,根據調用類型會有所區別:
接下來具體看一下一次異步 Dubbo RPC 請求的調用流程:
6. 調用方在拿到表明異步業務結果的 Future 後,可選擇註冊回調監聽器,以監聽真正的業務結果返回。
同步調用和異步調用基本上是一致的,而且也是走的回調模式,只是在鏈路返回以前作了一次阻塞 get 調用,以確保在收到實際結果時再返回。Filter 在註冊 Listener 時因爲 Future 已處於 complete 狀態,所以會同時觸發回調 onResponse()/onError()。
關於流程圖中提到的 Result,Result 在 Dubbo 的一次 RPC 調用中表明返回結果,在 3.0 中 Result 自身增長了表明狀態的接口,相似 Future 如今 Result 能夠表明一次未完成的調用。
要讓 Result 具有表明異步返回結果的能力,有兩中方式來實現:
1. Result is a Future,在 Java 8 中更合理的方式是繼承 CompletionStage 接口。
public interface Result extends CompletionStage { }
2. 讓 Result 實例持有 Future 實例,與 1 的區別便是設計中選用「繼承」仍是「組合」。
public class AsyncRpcResult implements Result { private CompletableFuture<RpcResult> resultFuture; }
同時,爲了讓 Result 更直觀的體現其異步結果的特性,也爲了方便麪向 Result 接口編程,咱們能夠考慮爲Result增長一些異步接口:
public interface Result extends Serializable { Result thenApplyWithContext(Function<Result, Result> fn); <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn); Result get() throws InterruptedException, ExecutionException; }
Filter 是 Dubbo 預置的攔截器擴展 SPI,用來作請求的預處理、結果的後處理,框架自己內置了一些攔截器實現,而從用戶層面,我相信這個 SPI 也應該是被擴展最多的一個。在 3.0 版本中,Filter 迴歸單一職責的設計模式,將回調接口單獨提取到 Listener 中。
@SPI public interface Filter { Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; interface Listener { void onResponse(Result result, Invoker<?> invoker, Invocation invocation); void onError(Throwable t, Invoker<?> invoker, Invocation invocation); } }
以上是 Filter 的 SPI 定義,Filter 的核心定義中只有一個 invoke() 方法用來傳遞調用請求。
同時,增長了一個新的回調接口 Listener,每一個 Filter 實現能夠定義本身的 Listenr 回調器,從而實現對返回結果的異步監聽,參考如下是爲 MonitorFilter 增長的 Listener 回調實現:
class MonitorListener implements Listener { @Override public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false); getConcurrent(invoker, invocation).decrementAndGet(); // count down } } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true); getConcurrent(invoker, invocation).decrementAndGet(); // count down } } }
爲了更直觀的作異步調用,泛化接口新增了 CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)
接口:
public interface GenericService { /** * Generic invocation * * @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is * required, e.g. findPerson(java.lang.String) * @param parameterTypes Parameter types * @param args Arguments * @return invocation return value * @throws GenericException potential exception thrown from the invocation */ Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException; default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException { Object object = $invoke(method, parameterTypes, args); if (object instanceof CompletableFuture) { return (CompletableFuture<Object>) object; } return CompletableFuture.completedFuture(object); } }
這樣,當咱們想作異步調用時,就能夠直接這樣使用:
CompletableFuture<Object> genericService.$invokeAsync(method, parameterTypes, args);
更具體用例請參見《泛化調用示例》
組要注意的是,框架內部的異步實現自己並不能提升單次調用的性能,相反,因爲線程切換和回調邏輯的存在,異步反而可能會致使單次調用性能的降低,可是異步帶來的優點是能減小對資源的佔用,提高整個系統的併發程度和吞吐量,這點對於 RPC 這種須要處理網絡延遲的場景很是適用。更多關於異步化設計的好處,請參考其餘異步化原理介紹相關文章。
響應式編程讓開發者更方便地編寫高性能的異步代碼,很惋惜,在以前很長一段時間裏,dubbo 並不支持響應式編程,簡單來講,dubbo 不支持在 rpc 調用時使用 Mono/Flux 這種流對象(reative-stream 裏流的概念),給用戶使用帶來了不便。(關於響應式編程更詳細的信息請參見這裏:http://reactivex.io/)。
RSocket 是一個開源的支持 reactive-stream 語義的網絡通訊協議,他將 reative 語義的複雜邏輯封裝起來了,使得上層能夠方便實現網絡程序。(RSocket詳細資料請參見這裏:http://rsocket.io/)。
dubbo 在 3.0.0-SNAPSHOT 版本里基於 RSocket 對響應式編程進行了簡單的支持,用戶能夠在請求參數和返回值裏使用 Mono 和 Flux 類型的對象。下面咱們給出使用範例,(範例源碼能夠在這裏獲取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)。
首先定義接口以下:
public interface DemoService { Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2); Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2); }
而後實現該 demo 接口:
public class DemoServiceImpl implements DemoService { @Override public Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) { return m1.zipWith(m2, new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) { return s+" "+s2; } }); } @Override public Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) { return f1.zipWith(f2, new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) { return s+" "+s2; } }); } }
而後配置並啓動服務端,注意協議名字填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- provider's application name, used for tracing dependency relationship --> <dubbo:application name="demo-provider"/> <!-- use registry center to export service --> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!-- use dubbo protocol to export service on port 20880 --> <dubbo:protocol name="rsocket" port="20890"/> <!-- service implementation, as same as regular local bean --> <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/> </beans> public class RsocketProvider { public static void main(String[] args) throws Exception { new EmbeddedZooKeeper(2181, false).start(); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"}); context.start(); System.in.read(); // press any key to exit } }
而後配置並啓動消費者消費者以下, 注意協議名填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion), don't set it same as provider --> <dubbo:application name="demo-consumer"/> <!-- use registry center to discover service --> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!-- generate proxy for the remote service, then demoService can be used in the same way as the local regular interface --> <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/> </beans> public class RsocketConsumer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"}); context.start(); DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy while (true) { try { Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B")); monoResult.doOnNext(new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }).block(); Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3")); fluxResult.doOnNext(new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }).blockLast(); } catch (Throwable throwable) { throwable.printStackTrace(); } } } }
能夠看到配置上除了協議名使用 rsocket 之外其餘並無特殊之處。
實現原理
之前用戶並不能在參數或者返回值裏使用 Mono/Flux 這種流對象(reative-stream 裏的流的概念)。由於流對象自帶異步屬性,當業務把流對象做爲參數或者返回值傳遞給框架以後,框架並不能將流對象正確的進行序列化。
dubbo 基於 RSocket 實現了 reative 支持。RSocket 將 reative 語義的複雜邏輯封裝起來了,給上層提供了簡潔的抽象以下:
/** * Fire and Forget interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. */ Mono<Void> fireAndForget(Payload payload); /** * Request-Response interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing at most a single {@code Payload} representing the * response. */ Mono<Payload> requestResponse(Payload payload); /** * Request-Stream interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ Flux<Payload> requestStream(Payload payload); /** * Request-Channel interaction model of {@code RSocket}. * * @param payloads Stream of request payloads. * @return Stream of response payloads. */ Flux<Payload> requestChannel(Publisher<Payload> payloads);
咱們只須要在此基礎上添加咱們的 rpc 邏輯便可。
通過上面的分析,咱們知道了 Dubbo 如何基於 RSocket 實現了響應式編程的支持。有了響應式編程支持,業務能夠更加方便的實現異步邏輯。
當前 Dubbo 3.0 將提供具有當代特性(如響應性編程)的相關支持,同時汲取阿里內部 HSF 的設計長處來實現二者的融合,當前預覽版的不少地方還在探討中,但願你們可以積極反饋,咱們都會虛心學習並參考。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。