爲了應對 高併發環境下 的服務端編程,微軟提出了一個實現 異步編程 的方案 - Reactive Programming
,中文名稱 反應式編程。隨後,其它技術也迅速地跟上了腳步,像 ES6
經過 Promise
引入了相似的異步編程方式。Java
社區也沒有落後不少,Netflix
和 TypeSafe
公司提供了 RxJava
和 Akka Stream
技術,讓 Java
平臺也有了可以實現反應式編程的框架。java
函數式編程是種編程方式,它將計算機的運算視爲函數的計算。函數編程語言最重要的基礎是 λ演算 (lambda calculus)
,而λ演算的函數能夠接受函數看成 輸入(參數) 和 輸出(返回值)。lambda
表達式對與大多數程序員已經很熟悉了,jdk8
以及 es6
都是引入的 lambda
。react
反應式編程 (reactive programming)
是一種基於 數據流 (data stream)
和 變化傳遞 (propagation of change)
的 聲明式 (declarative)
的編程範式。程序員
在一個 事件驅動 的應用程序中,組件之間的交互是經過鬆耦合的 生產者 (production)
和 消費者 (consumption)
來實現的。這些事件是以 異步 和 非阻塞 的方式發送和接收的。es6
事件驅動 的系統依靠 推模式 而不是 拉模式 或 投票表決,即 生產者 是在有消息時才推送數據給 消費者,而不是經過一種浪費資源方式:讓 消費者 不斷地 輪詢 或 等待數據。web
程序發起執行之後,應該 快速 返回存儲 結果的上下文,把具體執行交給 後臺線程。待處理完成之後,異步地將 真實返回值 封裝在此 上下文 中,而不是 阻塞 程序的執行。實時響應是經過 異步 編程實現的,例如:發起調用後,快速返回相似 java8
中 CompletableFuture
對象。算法
事件驅動的 鬆散耦合 提供了組件在失敗下,能夠抓獲 徹底隔離 的上下文場景,做爲 消息封裝,發送到下游組件。在具體編程時能夠 檢查錯誤 ,好比:是否接收到,接收的命令是否可執行等,並決定如何應對。數據庫
Reactor
框架是 Pivotal
基於 Reactive Programming
思想實現的。它符合 Reactive Streams
規範 (Reactive Streams
是由 Netflix
、TypeSafe
、Pivotal
等公司發起的) 的一項技術。其名字有 反應堆 之意,反映了其背後的強大的 性能。編程
Reactive Programming
,中文稱 反應式編程。Reactive Programming
是一種 非阻塞、事件驅動數據流 的開發方案,使用 函數式編程 的概念來操做數據流,系統中某部分的數據變更後會自動更新其餘部分,並且成本極低。後端
其最先是由微軟提出並引入到 .NET 平臺中,隨後 ES6 也引入了相似的技術。在 Java 平臺上,較早採用反應式編程技術的是 Netflix 公司開源的 RxJava 框架。Hystrix 就是以 RxJava 爲基礎開發的。數組
反應式編程其實並不神祕,經過與咱們熟悉的 迭代器模式 對比,即可瞭解其基本思想:
事件 | Iterable (pull) | Observable (push) |
---|---|---|
獲取數據 | T next() | onNext(T) |
發現異常 | throws Exception | onError(Exception) |
處理完成 | hasNext() | onCompleted() |
上面表格的中的 Observable
那一列便表明 反應式編程 的 API
的使用方式。它實際上是 觀察者模式 的一種延伸。
若是將 迭代器模式 看做是 拉模式,那 觀察者模式 即是 推模式。
被訂閱者 (Publisher)
主動推送數據給 訂閱者 (Subscriber)
,觸發 onNext()
方法。異常和完成時觸發另外兩個方法。
被訂閱者 (Publisher)
發生異常,則觸發 訂閱者 (Subscriber)
的 onError()
方法進行異常捕獲處理。
被訂閱者 (Publisher)
每次推送都會觸發一次 onNext()
方法。全部的推送完成且無異常時,onCompleted()
方法將 在最後 觸發一次。
若是 Publisher
發佈消息太快了,超過了 Subscriber
的處理速度,那怎麼辦?這就是 Backpressure
的由來。Reactive Programming
框架須要提供 背壓機制,使得 Subscriber
可以控制 消費消息 的速度。
在 Java
平臺上,Netflix
(開發了 RxJava
)、TypeSafe
(開發了 Scala
、Akka
)、Pivatol
(開發了 Spring
、Reactor
)共同制定了一個被稱爲 Reactive Streams
項目(規範),用於制定反應式編程相關的規範以及接口。
Reactive Streams
由如下幾個組件組成:
其主要的接口有這三個:
其中,Subcriber
中便包含了上面表格提到的 onNext
、onError
、onCompleted
這三個方法。對於 Reactive Streams
,只須要理解其思想就能夠,包括基本思想以及 Backpressure
等思想便可。
Reactor
框架主要有兩個主要的模塊:
前者主要負責 Reactive Programming
相關的 核心 API
的實現,後者負責 高性能網絡通訊 的實現,目前是基於 Netty
實現的。
在 Reactor
中,常用的類並非不少,主要有如下兩個:
Mono
實現了 org.reactivestreams.Publisher
接口,表明 0
到 1
個元素的 發佈者。
Flux
一樣實現了 org.reactivestreams.Publisher
接口,表明 0
到 N
個元素的發表者。
表明背後驅動反應式流的調度器,一般由各類線程池實現。
Spring 5
引入的一個基於 Netty
而不是 Servlet
的高性能的 Web
框架 - Spring WebFlux
,可是使用方式並無同傳統的基於 Servlet
的 Spring MVC
有什麼大的不一樣。
WebFlux
中 MVC
接口的示例:
@RequestMapping("/webflux")
@RestController
public class WebFluxTestController {
@GetMapping("/mono")
public Mono<Foobar> foobar() {
return Mono.just(new Foobar());
}
}
複製代碼
最大的變化就是返回值從 Foobar
所表示的一個對象變爲 Mono<Foobar>
或 Flux<Foobar>
。
上面介紹了 反應式編程 的一些概念。可能讀者看到這裏有些亂,梳理一下三者的關係:
Reactive Streams
是一套反應式編程 標準 和 規範;Reactor
是基於 Reactive Streams
一套 反應式編程框架;WebFlux
以 Reactor
爲基礎,實現 Web
領域的 反應式編程框架。其實,對於業務開發人員來講,當編寫反應式代碼時,一般只會接觸到 Publisher
這個接口,對應到 Reactor
即是 Mono
和 Flux
。
對於 Subscriber
和 Subcription
這兩個接口,Reactor
也有相應的實現。這些都是 Spring WebFlux
和 Spring Data Reactive
這樣的框架用到的。若是 不開發中間件,開發人員是不會接觸到的。
接下來介紹一下 Reactor
中 Mono
和 Flux
這兩個類中的主要方法的使用。
如同 Java 8
所引入的 Stream
同樣,Reactor
的使用方式基本上也是分三步:
只不過建立和消費多是經過像 Spring 5
這樣框架完成的(好比經過 WebFlux
中的 WebClient
調用 HTTP
接口,返回值即是一個 Mono
)。但咱們仍是須要基本瞭解這些階段的開發方式。
使用 Reactor
編程的開始必然是先建立出 Mono
或 Flux
。有些時候不須要咱們本身建立,而是實現例如 WebFlux
中的 WebClient
或 Spring Data Reactive
獲得一個 Mono
或 Flux
。
WebClient webClient = WebClient.create("http://localhost:8080");
public Mono<User> findById(Long userId) {
return webClient
.get()
.uri("/users/" + userId)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMap(cr -> cr.bodyToMono(User.class));
}
複製代碼
public interface UserRepository extends ReactiveMongoRepository<User, Long> {
Mono<User> findByUsername(String username);
}
複製代碼
但有些時候,咱們也須要主動地建立一個 Mono
或 Flux
。
Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);
複製代碼
這樣的建立方式在何時用呢?通常是用在通過一系列 非IO型 操做以後,獲得了一個對象。接下來要基於這個對象運用 Reactor
進行 高性能 的 IO
操做時,能夠用這種方式將以前獲得的對象轉換爲 Mono
或 Flux
。
上面是經過一個 同步調用 獲得的結果建立出 Mono
或 Flux
,但有時須要從一個 非 Reactive
的 異步調用 的結果建立出 Mono
或 Flux
。
若是這個 異步方法 返回一個 CompletableFuture
,那能夠基於這個 CompletableFuture
建立一個 Mono
:
Mono.fromFuture(completableFuture);
複製代碼
若是這個 異步調用 不會返回 CompletableFuture
,是有本身的 回調方法,那怎麼建立 Mono
呢?可使用 static <T> Mono<T> create(Consumer<MonoSink<T>> callback)
方法:
Mono.create(sink -> {
ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
@Override
public void onSuccess(ResponseEntity<String> result) {
sink.success(result.getBody());
}
@Override
public void onFailure(Throwable ex) {
sink.error(ex);
}
});
});
複製代碼
在使用 WebFlux
以後,AsyncRestTemplate
已經不推薦使用,這裏只是作演示。
中間階段的 Mono
和 Flux
的方法主要有 filter
、map
、flatMap
、then
、zip
、reduce
等。這些方法使用方法和 Stream
中的方法相似。
下面舉幾個 Reactor
開發實際項目的問題,幫你們理解這些方法的使用場景:
本段內容將涉及到以下類和方法:
Mono.map()
Mono.flatMap()
Mono.then()
Function
在 Mono
和 Flux
中間環節的處理過程當中,有三個有些相似的方法:map()
、flatMap()
和 then()
。這三個方法的使用頻率很高。
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
複製代碼
Mono.just(params)
.flatMap(v -> doStep1(v))
.flatMap(v -> doStep2(v))
.flatMap(v -> doStep3(v));
複製代碼
從上面兩段代碼的對比就能夠看出來 flatMap()
方法在其中起到的做用,map()
和 then()
方法也有相似的做用。但這些方法之間的區別是什麼呢?咱們先來看看這三個方法的簽名(以 Mono
爲例):
then()
看上去是下一步的意思,但它只表示執行順序的下一步,不表示下一步依賴於上一步。then()
方法的參數只是一個 Mono
,無從接受上一步的執行結果。而 flatMap()
和 map()
的參數都是一個 Function
,入參是上一步的執行結果。
flatMap()
和 map()
的區別在於,flatMap()
中的入參 Function
的返回值要求是一個 Mono
對象,而 map
的入參 Function
只要求返回一個 普通對象。在業務處理中常須要調用 WebClient
或 ReactiveXxxRepository
中的方法,這些方法的 返回值 都是 Mono
(或 Flux
)。因此要將這些調用串聯爲一個總體 鏈式調用,就必須使用 flatMap()
,而不是 map()
。
本段內容將涉及到以下類和方法:
Mono.zip()
Tuple2
BiFunction
併發執行 是常見的一個需求。Reactive Programming
雖然是一種 異步編程 方式,可是 異步 不表明就是 併發並行 的。
在 傳統的命令式編程 中,併發執行 是經過 線程池 加 Future
的方式實現的。
Future<Result1> result1Future = threadPoolExecutor.submit(() -> doStep1(params));
Future<Result2> result2Future = threadPoolExecutor.submit(() -> doStep2(params));
// Retrive result
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;
複製代碼
上面的代碼雖然實現了 異步調用,但 Future.get()
方法是 阻塞 的。在使用 Reactor
開發有 併發 執行場景的 反應式代碼 時,不能用上面的方式。
這時應該使用 Mono
和 Flux
中的 zip()
方法,以 Mono
爲例,代碼以下:
Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
CustomType1 item1 = CustomType1.class.cast(items[0]);
CustomType2 item2 = CustomType2.class.cast(items[1]);
// Do merge
return mergeResult;
}, item1Mono, item2Mono);
複製代碼
上述代碼中,產生 item1Mono
和 item2Mono
的過程是 並行 的。好比,調用一個 HTTP
接口的同時,執行一個 數據庫查詢 操做。這樣就能夠加快程序的執行。
但上述代碼存在一個問題,就是 zip()
方法須要作 強制類型轉換。而強制類型轉換是 不安全的。好在 zip()
方法存在 多種重載 形式。除了最基本的形式之外,還有多種 類型安全 的形式:
static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator);
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);
複製代碼
對於不超過 7
個元素的合併操做,都有 類型安全 的 zip()
方法可選。以兩個元素的合併爲例,介紹一下使用方法:
Mono.zip(item1Mono, item2Mono).map(tuple -> {
CustomType1 item1 = tuple.getT1();
CustomType2 item2 = tuple.getT2();
// Do merge
return mergeResult;
});
複製代碼
上述代碼中,map()
方法的參數是一個 Tuple2
,表示一個 二元數組,相應的還有 Tuple3
、Tuple4
等。
對於兩個元素的併發執行,也能夠經過 zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator)
方法直接將結果合併。方法是傳遞 BiFunction
實現 合併算法。
本段內容將涉及到以下類和方法:
Flux.fromIterable()
Flux.reduce()
BiFunction
另一個稍微複雜的場景,對一個對象中的一個類型爲集合類的(List
、Set
)進行處理以後,再對本來的對象進行處理。使用 迭代器模式 的代碼很容易編寫:
List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
// Do something on data and item
}
// Do something on data
複製代碼
當咱們要用 Reactive
風格的代碼實現上述邏輯時,就不是那麼簡單了。這裏會用到 Flux
的 reduce()
方法。reduce()
方法的簽名以下:
<A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
能夠看出,reduce()
方法的功能是將一個 Flux
聚合 成一個 Mono
。
第一個參數: 返回值 Mono
中元素的 初始值。
第二個參數: 是一個 BiFunction
,用來實現 聚合操做 的邏輯。對於泛型參數 <A, ? super T, A>
中:
A
: 表示每次 聚合操做 以後的 結果的類型,它做爲 BiFunction.apply()
方法的 第一個入參;? super T
: 表示集合中的每一個元素的類型,它做爲 BiFunction.apply()
方法的 第二個入參;A
: 表示聚合操做的 結果,它做爲 BiFunction.apply()
方法的 返回值。接下來看一下示例:
Data initData = ...;
List<SubData> list = ...;
Flux.fromIterable(list)
.reduce(initData, (data, itemInList) -> {
// Do something on data and itemInList
return data;
});
複製代碼
上面的示例代碼中,initData
和 data
的類型相同。執行完上述代碼以後,reduce()
方法會返回 Mono<Data>
。
直接消費的 Mono
或 Flux
的方式就是調用 subscribe()
方法。若是在 WebFlux
接口中開發,直接返回 Mono
或 Flux 便可。WebFlux
框架會完成最後的 Response
輸出工做。
本文介紹了反應式編程的一些概念和 Spring Reactor
框架的基本用法,還介紹瞭如何用 Reactor
解決一些稍微複雜一點的問題。Reactor
在 Spring 5
中有大量的應用,後面會給你們分享一些 Spring Reactor
實戰系列的博客。
歡迎關注技術公衆號: 零壹技術棧
本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。