聊聊Spring Reactor反應式編程

前言

爲了應對 高併發環境下 的服務端編程,微軟提出了一個實現 異步編程 的方案 - Reactive Programming,中文名稱 反應式編程。隨後,其它技術也迅速地跟上了腳步,像 ES6 經過 Promise 引入了相似的異步編程方式。Java 社區也沒有落後不少,NetflixTypeSafe 公司提供了 RxJavaAkka Stream 技術,讓 Java 平臺也有了可以實現反應式編程的框架。java

正文

函數式編程

函數式編程是種編程方式,它將計算機的運算視爲函數的計算。函數編程語言最重要的基礎是 λ演算 (lambda calculus),而λ演算的函數能夠接受函數看成 輸入(參數)輸出(返回值)lambda 表達式對與大多數程序員已經很熟悉了,jdk8 以及 es6都是引入的 lambdareact

函數式編程的特色

  • 惰性計算
  • 函數是「第一等公民」
  • 只使用表達式而不使用語句
  • 沒有反作用

反應式編程

反應式編程 (reactive programming) 是一種基於 數據流 (data stream)變化傳遞 (propagation of change)聲明式 (declarative) 的編程範式。程序員

反應式編程的特色

1. 事件驅動

在一個 事件驅動 的應用程序中,組件之間的交互是經過鬆耦合的 生產者 (production)消費者 (consumption) 來實現的。這些事件是以 異步非阻塞 的方式發送和接收的。es6

事件驅動 的系統依靠 推模式 而不是 拉模式投票表決,即 生產者 是在有消息時才推送數據給 消費者,而不是經過一種浪費資源方式:讓 消費者 不斷地 輪詢等待數據web

2. 實時響應

程序發起執行之後,應該 快速 返回存儲 結果的上下文,把具體執行交給 後臺線程。待處理完成之後,異步地將 真實返回值 封裝在此 上下文 中,而不是 阻塞 程序的執行。實時響應是經過 異步 編程實現的,例如:發起調用後,快速返回相似 java8CompletableFuture 對象。算法

3. 彈性機制

事件驅動的 鬆散耦合 提供了組件在失敗下,能夠抓獲 徹底隔離 的上下文場景,做爲 消息封裝,發送到下游組件。在具體編程時能夠 檢查錯誤 ,好比:是否接收到,接收的命令是否可執行等,並決定如何應對。數據庫

Reactor簡介

Reactor 框架是 Pivotal 基於 Reactive Programming 思想實現的。它符合 Reactive Streams 規範 (Reactive Streams 是由 NetflixTypeSafePivotal 等公司發起的) 的一項技術。其名字有 反應堆 之意,反映了其背後的強大的 性能編程

1. Reactive Programming

Reactive Programming,中文稱 反應式編程Reactive Programming 是一種 非阻塞事件驅動數據流 的開發方案,使用 函數式編程 的概念來操做數據流,系統中某部分的數據變更後會自動更新其餘部分,並且成本極低。後端

其最先是由微軟提出並引入到 .NET 平臺中,隨後 ES6 也引入了相似的技術。在 Java 平臺上,較早採用反應式編程技術的是 Netflix 公司開源的 RxJava 框架。Hystrix 就是以 RxJava 爲基礎開發的。數組

反應式編程其實並不神祕,經過與咱們熟悉的 迭代器模式 對比,即可瞭解其基本思想:

事件 Iterable (pull) Observable (push)
獲取數據 T next() onNext(T)
發現異常 throws Exception onError(Exception)
處理完成 hasNext() onCompleted()

上面表格的中的 Observable 那一列便表明 反應式編程API 的使用方式。它實際上是 觀察者模式 的一種延伸。

若是將 迭代器模式 看做是 拉模式,那 觀察者模式 即是 推模式

  1. 被訂閱者 (Publisher) 主動推送數據給 訂閱者 (Subscriber),觸發 onNext() 方法。異常和完成時觸發另外兩個方法。

  2. 被訂閱者 (Publisher) 發生異常,則觸發 訂閱者 (Subscriber)onError() 方法進行異常捕獲處理。

  3. 被訂閱者 (Publisher) 每次推送都會觸發一次 onNext() 方法。全部的推送完成且無異常時,onCompleted() 方法將 在最後 觸發一次。

若是 Publisher 發佈消息太快了,超過了 Subscriber 的處理速度,那怎麼辦?這就是 Backpressure 的由來。Reactive Programming 框架須要提供 背壓機制,使得 Subscriber 可以控制 消費消息 的速度。

2. Reactive Streams

Java 平臺上,Netflix(開發了 RxJava)、TypeSafe(開發了 ScalaAkka)、Pivatol(開發了 SpringReactor)共同制定了一個被稱爲 Reactive Streams 項目(規範),用於制定反應式編程相關的規範以及接口。

Reactive Streams 由如下幾個組件組成:

  • 發佈者:發佈元素到訂閱者
  • 訂閱者:消費元素
  • 訂閱:在發佈者中,訂閱被建立時,將與訂閱者共享
  • 處理器:發佈者與訂閱者之間處理數據

其主要的接口有這三個:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 中便包含了上面表格提到的 onNextonErroronCompleted 這三個方法。對於 Reactive Streams,只須要理解其思想就能夠,包括基本思想以及 Backpressure 等思想便可。

3. Reactor的主要模塊

Reactor 框架主要有兩個主要的模塊:

  • reactor-core
  • reactor-ipc

前者主要負責 Reactive Programming 相關的 核心 API 的實現,後者負責 高性能網絡通訊 的實現,目前是基於 Netty 實現的。

4. Reactor的核心類

Reactor 中,常用的類並非不少,主要有如下兩個:

  • Mono

Mono 實現了 org.reactivestreams.Publisher 接口,表明 01 個元素的 發佈者

  • Flux

Flux 一樣實現了 org.reactivestreams.Publisher 接口,表明 0N 個元素的發表者。

  • Scheduler

表明背後驅動反應式流的調度器,一般由各類線程池實現。

5. WebFlux

Spring 5 引入的一個基於 Netty 而不是 Servlet 的高性能的 Web 框架 - Spring WebFlux ,可是使用方式並無同傳統的基於 ServletSpring MVC 有什麼大的不一樣。

WebFluxMVC 接口的示例:

@RequestMapping("/webflux")
@RestController
public class WebFluxTestController {
    @GetMapping("/mono")
    public Mono<Foobar> foobar() {
        return Mono.just(new Foobar());
    }
}
複製代碼

最大的變化就是返回值從 Foobar 所表示的一個對象變爲 Mono<Foobar>Flux<Foobar>

6. Reactive Streams、Reactor和WebFlux

上面介紹了 反應式編程 的一些概念。可能讀者看到這裏有些亂,梳理一下三者的關係:

  1. Reactive Streams 是一套反應式編程 標準規範
  2. Reactor 是基於 Reactive Streams 一套 反應式編程框架
  3. WebFluxReactor 爲基礎,實現 Web 領域的 反應式編程框架

其實,對於業務開發人員來講,當編寫反應式代碼時,一般只會接觸到 Publisher 這個接口,對應到 Reactor 即是 MonoFlux

對於 SubscriberSubcription 這兩個接口,Reactor 也有相應的實現。這些都是 Spring WebFluxSpring Data Reactive 這樣的框架用到的。若是 不開發中間件,開發人員是不會接觸到的。

Reactor入門

接下來介紹一下 ReactorMonoFlux 這兩個類中的主要方法的使用。

如同 Java 8 所引入的 Stream 同樣,Reactor 的使用方式基本上也是分三步:

  • 開始階段的建立
  • 中間階段的處理
  • 最終階段的消費

只不過建立和消費多是經過像 Spring 5 這樣框架完成的(好比經過 WebFlux 中的 WebClient 調用 HTTP 接口,返回值即是一個 Mono)。但咱們仍是須要基本瞭解這些階段的開發方式。

1. 建立 Mono 和 Flux(開始階段)

使用 Reactor 編程的開始必然是先建立出 MonoFlux。有些時候不須要咱們本身建立,而是實現例如 WebFlux 中的 WebClientSpring Data Reactive 獲得一個 MonoFlux

  • 使用 WebFlux WebClient 調用 HTTP 接口
WebClient webClient = WebClient.create("http://localhost:8080");
public Mono<User> findById(Long userId) {
    return webClient
            .get()
            .uri("/users/" + userId)
            .accept(MediaType.APPLICATION_JSON)
            .exchange()
            .flatMap(cr -> cr.bodyToMono(User.class));
}
複製代碼
  • 使用 ReactiveMongoRepository 查詢 User
public interface UserRepository extends ReactiveMongoRepository<User, Long> {
    Mono<User> findByUsername(String username);
}
複製代碼

但有些時候,咱們也須要主動地建立一個 MonoFlux

普通的建立方式

Mono<String> helloWorld = Mono.just("Hello World");
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(words);
複製代碼

這樣的建立方式在何時用呢?通常是用在通過一系列 非IO型 操做以後,獲得了一個對象。接下來要基於這個對象運用 Reactor 進行 高性能IO 操做時,能夠用這種方式將以前獲得的對象轉換爲 MonoFlux

文藝的建立方式

上面是經過一個 同步調用 獲得的結果建立出 MonoFlux,但有時須要從一個 Reactive異步調用 的結果建立出 MonoFlux

若是這個 異步方法 返回一個 CompletableFuture,那能夠基於這個 CompletableFuture 建立一個 Mono

Mono.fromFuture(completableFuture);
複製代碼

若是這個 異步調用 不會返回 CompletableFuture,是有本身的 回調方法,那怎麼建立 Mono 呢?可使用 static <T> Mono<T> create(Consumer<MonoSink<T>> callback) 方法:

Mono.create(sink -> {
    ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
    entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
        @Override
        public void onSuccess(ResponseEntity<String> result) {
            sink.success(result.getBody());
        }

        @Override
        public void onFailure(Throwable ex) {
            sink.error(ex);
        }
    });
});
複製代碼

在使用 WebFlux 以後,AsyncRestTemplate 已經不推薦使用,這裏只是作演示。

2. 處理 Mono 和 Flux(中間階段)

中間階段的 MonoFlux 的方法主要有 filtermapflatMapthenzipreduce 等。這些方法使用方法和 Stream 中的方法相似。

下面舉幾個 Reactor 開發實際項目的問題,幫你們理解這些方法的使用場景:

問題一: map、flatMap 和 then 在何時使用

本段內容將涉及到以下類和方法:

  • 方法Mono.map()
  • 方法Mono.flatMap()
  • 方法Mono.then()
  • Function

MonoFlux 中間環節的處理過程當中,有三個有些相似的方法:map()flatMap()then()。這三個方法的使用頻率很高。

  • 傳統的命令式編程
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
複製代碼
  • 對應的反應式編程
Mono.just(params)
    .flatMap(v -> doStep1(v))
    .flatMap(v -> doStep2(v))
    .flatMap(v -> doStep3(v));
複製代碼

從上面兩段代碼的對比就能夠看出來 flatMap() 方法在其中起到的做用,map()then() 方法也有相似的做用。但這些方法之間的區別是什麼呢?咱們先來看看這三個方法的簽名(以 Mono 爲例):

  • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
  • map(Function<? super T, ? extends R> mapper)
  • then(Mono other)
then()

then() 看上去是下一步的意思,但它只表示執行順序的下一步,不表示下一步依賴於上一步。then() 方法的參數只是一個 Mono,無從接受上一步的執行結果。而 flatMap()map() 的參數都是一個 Function,入參是上一步的執行結果。

flatMap() 和 map()

flatMap()map() 的區別在於,flatMap() 中的入參 Function 的返回值要求是一個 Mono 對象,而 map 的入參 Function 只要求返回一個 普通對象。在業務處理中常須要調用 WebClientReactiveXxxRepository 中的方法,這些方法的 返回值 都是 Mono(或 Flux)。因此要將這些調用串聯爲一個總體 鏈式調用,就必須使用 flatMap(),而不是 map()

問題二:如何實現併發執行

本段內容將涉及到以下類和方法:

  • 方法Mono.zip()
  • Tuple2
  • BiFunction

併發執行 是常見的一個需求。Reactive Programming 雖然是一種 異步編程 方式,可是 異步 不表明就是 併發並行 的。

傳統的命令式編程 中,併發執行 是經過 線程池Future 的方式實現的。

Future<Result1> result1Future = threadPoolExecutor.submit(() -> doStep1(params));
Future<Result2> result2Future = threadPoolExecutor.submit(() -> doStep2(params));
// Retrive result
Result1 result1 = result1Future.get();
Result2 result2 = result2Future.get();
// Do merge;
return mergeResult;
複製代碼

上面的代碼雖然實現了 異步調用,但 Future.get() 方法是 阻塞 的。在使用 Reactor 開發有 併發 執行場景的 反應式代碼 時,不能用上面的方式。

這時應該使用 MonoFlux 中的 zip() 方法,以 Mono 爲例,代碼以下:

Mono<CustomType1> item1Mono = ...;
Mono<CustomType2> item2Mono = ...;
Mono.zip(items -> {
    CustomType1 item1 = CustomType1.class.cast(items[0]);
    CustomType2 item2 = CustomType2.class.cast(items[1]);
    // Do merge
    return mergeResult;
}, item1Mono, item2Mono);
複製代碼

上述代碼中,產生 item1Monoitem2Mono 的過程是 並行 的。好比,調用一個 HTTP 接口的同時,執行一個 數據庫查詢 操做。這樣就能夠加快程序的執行。

但上述代碼存在一個問題,就是 zip() 方法須要作 強制類型轉換。而強制類型轉換是 不安全的。好在 zip() 方法存在 多種重載 形式。除了最基本的形式之外,還有多種 類型安全 的形式:

static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator); 
static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);
複製代碼

對於不超過 7 個元素的合併操做,都有 類型安全zip() 方法可選。以兩個元素的合併爲例,介紹一下使用方法:

Mono.zip(item1Mono, item2Mono).map(tuple -> {
    CustomType1 item1 = tuple.getT1();
    CustomType2 item2 = tuple.getT2();
    // Do merge
    return mergeResult;
});
複製代碼

上述代碼中,map() 方法的參數是一個 Tuple2,表示一個 二元數組,相應的還有 Tuple3Tuple4 等。

對於兩個元素的併發執行,也能夠經過 zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) 方法直接將結果合併。方法是傳遞 BiFunction 實現 合併算法

問題三:集合循環以後的匯聚

本段內容將涉及到以下類和方法:

  • 方法Flux.fromIterable()
  • 方法Flux.reduce()
  • BiFunction

另一個稍微複雜的場景,對一個對象中的一個類型爲集合類的(ListSet)進行處理以後,再對本來的對象進行處理。使用 迭代器模式 的代碼很容易編寫:

List<SubData> subDataList = data.getSubDataList();
for (SubData item : subDataList) {
    // Do something on data and item
}
// Do something on data
複製代碼

當咱們要用 Reactive 風格的代碼實現上述邏輯時,就不是那麼簡單了。這裏會用到 Fluxreduce() 方法。reduce() 方法的簽名以下:

  • <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

能夠看出,reduce() 方法的功能是將一個 Flux 聚合 成一個 Mono

  • 第一個參數: 返回值 Mono 中元素的 初始值

  • 第二個參數: 是一個 BiFunction,用來實現 聚合操做 的邏輯。對於泛型參數 <A, ? super T, A> 中:

    • 第一個 A: 表示每次 聚合操做 以後的 結果的類型,它做爲 BiFunction.apply() 方法的 第一個入參
    • 第二個 ? super T: 表示集合中的每一個元素的類型,它做爲 BiFunction.apply() 方法的 第二個入參
    • 第三個 A: 表示聚合操做的 結果,它做爲 BiFunction.apply() 方法的 返回值

接下來看一下示例:

Data initData = ...;
List<SubData> list = ...;
Flux.fromIterable(list)
    .reduce(initData, (data, itemInList) -> {
        // Do something on data and itemInList
        return data;
    });
複製代碼

上面的示例代碼中,initDatadata 的類型相同。執行完上述代碼以後,reduce() 方法會返回 Mono<Data>

3. 消費 Mono 和 Flux(結束階段)

直接消費的 MonoFlux 的方式就是調用 subscribe() 方法。若是在 WebFlux 接口中開發,直接返回 Mono 或 Flux 便可。WebFlux 框架會完成最後的 Response 輸出工做。

小結

本文介紹了反應式編程的一些概念和 Spring Reactor 框架的基本用法,還介紹瞭如何用 Reactor 解決一些稍微複雜一點的問題。ReactorSpring 5 中有大量的應用,後面會給你們分享一些 Spring Reactor 實戰系列的博客。


歡迎關注技術公衆號: 零壹技術棧

零壹技術棧

本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。

相關文章
相關標籤/搜索