Spring 5 響應式編程

要點

  • Reactor 是一個運行在 Java8 之上的響應式流框架,它提供了一組響應式風格的 API
  • 除了個別 API 上的區別,它的原理跟 RxJava 很類似
  • 它是第四代響應式框架,支持操做融合,相似 RxJava 2
  • Spring 5 的響應式編程模型主要依賴 Reactor

RxJava 回顧

Reactor 是第四代響應式框架,跟RxJava 2 有些類似。Reactor 項目由Pivotal 啓動,以響應式流規範、Java8 和ReactiveX 術語表爲基礎。它的設計是Reactor 2(上一個主要版本)和RxJava 核心貢獻者共同努力的結果。java

在以前的同系列文章 RxJava 實例解析測試RxJava裏,咱們已經瞭解了響應式編程的基礎:數據流的概念、Observable 類和它的各類操做以及經過工廠方法建立靜態和動態的Observable 對象。react

Observable 是事件的源頭,Observer 提供了一組簡單的接口,並經過訂閱事件源來消費 Observable 的事件。Observable 經過 onNext 向 Observer 通知事件的到達,後面可能會跟上 onError 或 onComplete 來表示事件的結束。web

RxJava 提供了 TestSubscriber 來測試 Observable,TestSubscriber 是一個特別的 Observer,能夠用它斷言流事件。spring

在這篇文章裏,咱們將會對 ReactorRxJava 進行比較,包括它們的相同點和不一樣點。數據庫

Reactor 的類型

Reactor 有兩種類型, Flux<T>Mono<T>編程

  • Flux 相似 RaxJava 的 Observable,它能夠觸發零到多個事件,並根據實際狀況結束處理或觸發錯誤。
  • Mono 最多隻觸發一個事件,它跟 RxJava 的 Single 和 Maybe 相似,因此能夠把 Mono 用於在異步任務完成時發出通知。

由於這兩種類型之間的簡單區別,咱們能夠很容易地區分響應式 API 的類型:從返回的類型咱們就能夠知道一個方法會發射並忘記請求並等待(Mono),仍是在處理一個包含多個數據項的流(Flux)。後端

FluxMono 的一些操做利用了這個特色在這兩種類型間互相轉換。例如,調用 Flux 的 single() 方法將返回一個 Mono ,而使用 concatWith() 方法把兩個 Mono 串在一塊兒就能夠獲得一個 Flux。相似地,有些操做對 Mono 來講毫無心義(例如 take(n) 會獲得 n>1 的結果),而有些操做只有做用在 Mono 上纔有意義(例如 or(otherMono))。 api

Reactor 設計的原則之一是要保持 API 的精簡,而對這兩種響應式類型的分離,是表現力與 API 易用性之間的折中。數組

使用響應式流,基於 Rx 構建

正如RxJava 實例解析裏所說的,從設計概念方面來看,RxJava 有點相似 Java 8 Streams API。而 Reactor 看起來有點像 RxJava,不過這決不僅是個巧合。這樣的設計是爲了可以給複雜的異步邏輯提供一套原生的具備 Rx 操做風格的響應式流 API。因此說 Reactor 紮根於響應式流,同時在 API 方面儘量地與 RxJava 靠攏。併發

響應式類庫和響應式流的使用

Reactive Streams (如下簡稱爲 RS)是一種規範,它爲基於非阻塞回壓的異步流處理提供了標準。它是一組包含了 TCK 工具套件和四個簡單接口(Publisher、Subscriber、Subscription 和 Processor)的規範,這些接口將被集成到 Java 9.

RS 主要跟響應式回壓(稍後會詳細介紹)以及多個響應式事件源之間的交互操做有關。它並不提供任何操做方法,它只關注流的生命週期。

Reactor 不一樣於其它框架的最關鍵一點就是 RS。Flux 和 Mono 這二者都是 RS 的 Publisher 實現,它們都具有了響應式回壓的特色。

RxJava 1 裏,只有少部分操做支持回壓,RxJava 1 的 Observable 並無實現 RS 裏的任何類型,不過它有一些 RS 類型的適配器。能夠說,RxJava 1 實際上比 RS 規範出現得更早,並且在 RS 規範設計期間,RxJava 1 充當了函數式工做者的角色。

因此,你在使用那些 Publisher 適配器時,它們並不會爲你提供任何操做。爲了能作一些有用的操做,你可能須要用回 Observable,而這個時候你須要另外一個適配器。這種視覺上的混亂會破壞代碼的可讀性,特別是像 Spring 5 這樣的框架,若是整個框架創建在這樣的 Publisher 之上,那麼就更是雜亂不堪。

RS 規範不支持 null 值,因此在從 RxJava 1 遷移到 Reactor 或 RxJava 2 時要注意這點。若是你在代碼裏把 null 用做特殊用途,那麼就更是要注意了。

RxJava 2 是在 RS 規範以後出現的,因此它直接在 Flowable 類型裏實現了 Publisher。不過除了 RS 類型,RxJava 2 還保留了 RxJava 1 的遺留類型(Observable、Completable 和 Single)而且引入了其它一些可選類型——Maybe。這些類型提供了不一樣的語義,不過它們並無實現 RS 接口,這是它們的不足之處。跟 RxJava 1 不同,RxJava 2 的 Observable 不支持 RxJava 2 的回壓協議(只有 Flowable 具有這個特性)。之因此這樣設計是爲了可以爲一些場景提供一組豐富且流暢的 API,好比用戶界面發出的事件,在這樣的場景裏是不須要用到回壓的,並且也不可能用到。Completable、Single 和 Maybe 不須要支持回壓,不過它們也提供了一組豐富的 API,並且在被訂閱以前不會作任何事情。

在響應式領域,Reactor 變得越發精益,它的 Mono 和 Flux 兩種類型都實現了 Publisher,而且都支持回壓。雖然把 Mono 做爲一個 Publisher 須要付出一些額外的開銷,不過 Mono 在其它方面的優點彌補了它的缺點。在後續部分咱們將看到對 Mono 來講回壓意味着什麼。

相比 RxJava,API 類似但不相同

ReactiveX 和 RxJava 的操做術語表有時候真的難以掌握,由於歷史緣由,有些操做的名字讓人感到困惑。Reactor 儘可能把 API 設計得緊湊,在給 API 取名時儘可能選擇好一點的名字,不過總的來講,這兩套 API 看起來仍是很相像。在最新的 RxJava 2 迭代版本中,RxJava 2 借鑑了 Reactor 的一些術語,這預示着這兩個項目之間可能會有愈來愈緊密的合做。一些操做和概念老是先出如今其中的一個項目裏,而後互相借鑑,最後會同時滲透到兩個項目裏。

例如,Flux 也有常見的 just 工廠方法(雖然只有兩種變形:接受一個參數或變長參數)。不過 from 方法有不少個變種,最值得一提的是 fromIterable。固然,Flux 也包含了那些常規的操做:map、merge、concat、flatMap、take,等等。

Reactor 把 RxJava 裏使人困惑的 amb 操做改爲了看起來更加中肯的 firstEmitting。另外,爲了保持 API 的一致,toList 被從新命名爲 collectList。實際上,全部以 collect 開頭的操做都會把值聚合到一個特定類型的集合裏,不過只會爲每一個集合生成一個 Mono。而全部以 to 開頭的操做被保留用於類型轉換,轉換以後的類型能夠用於非響應式編程,例如 toFuture()。

在類初始化和資源使用方面,Reactor 之因此也能表現得如此精益,要得益於它的融合特性:Reactor 能夠把多個串行的操做(例如調用 concatWith 兩次)合併成單個操做,這樣就能夠只對這個操做的內部類作一次初始化(也就是 macro-fusion)。這個特性包含了基於數據源的優化,抵消了 Mono 在實現 Publisher 時的一些額外開銷。它還能在多個相關的操做之間共享資源(也就是 micro-fusion),好比內部隊列。這些特性讓 Reactor 成爲徹徹底底的的第四代響應式框架,不過這個超出了這篇文章的討論範圍。

下面讓咱們來看看幾個 Reactor 的操做。

一些操做示例

(這一小節包含了一些代碼片斷,咱們建議你動手去運行它們,深刻體驗一下 Reactor。因此你須要打開 IDE,並建立一個測試項目,把 Reactor 加入到依賴項裏。)

對於 Maven,能夠把下面的依賴加到 pom.xml 裏:

<dependency>
    <groupId>io.projectreactor</groupId>    
    <artifactId>reactor-core</artifactId>
    <version>3.0.3.RELEASE</version>
</dependency>

對於 Gradle,要把 Reactor 做爲依賴項,相似這樣:

dependencies {
    compile "io.projectreactor:reactor-core:3.0.3.RELEASE"
}

咱們來重寫前面幾篇同系列文章裏的例子!

Observable 的建立跟在 RxJava 裏有點相似,在 Reactor 裏可使用 just(T...) 和 fromIterator(Iterable ) 工廠方法來建立。just 方法會把 List 做爲一個總體觸發,而 fromIterable 會逐個觸發 List 裏的每一個元素:

public class ReactorSnippets {
  private static List<String> words = Arrays.asList(
        "the",
        "quick",
        "brown",
        "fox",
        "jumped",
        "over",
        "the",
        "lazy",
        "dog"
        );

  @Test
  public void simpleCreation() {
     Flux<String> fewWords = Flux.just("Hello", "World");
     Flux<String> manyWords = Flux.fromIterable(words);

     fewWords.subscribe(System.out::println);
     System.out.println();
     manyWords.subscribe(System.out::println);
  }
}

跟在 RxJava 裏同樣,上面的代碼會打印出:

Hello
World
the
quick
brown
fox
jumped
over
the
lazy
dog

爲了打印句子裏的每個字母,咱們還須要 flatMap 方法(跟在 RxJava 裏同樣),不過在 Reactor 裏咱們使用 fromArray 來代替 from。而後咱們會用 distinct 過濾掉重複的字母,並用 sort 對它們進行排序。最後,咱們使用 zipWith 和 range 輸出每一個字母的次序:

@Test
public void findingMissingLetter() {
  Flux<String> manyLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  manyLetters.subscribe(System.out::println);
}

咱們能夠很容易地看到s被遺漏了:

1. a
2. b
...
18. r
19. t
20. u
...
25. z

咱們能夠經過糾正單詞數組來修復這個問題,不過也可使用 concat/concatWith 和一個 Mono 來手動往字母 Flux 裏添加「s」:

@Test
public void restoringMissingLetter() {
  Mono<String> missing = Mono.just("s");
  Flux<String> allLetters = Flux
        .fromIterable(words)
        .flatMap(word -> Flux.fromArray(word.split("")))
        .concatWith(missing)
        .distinct()
        .sort()
        .zipWith(Flux.range(1, Integer.MAX_VALUE),
              (string, count) -> String.format("%2d. %s", count, string));

  allLetters.subscribe(System.out::println);
}

這樣,在去重和排序後,遺漏的 s 字母就被添加進來了:

1. a
2. b
...
18. r
19. s
20. t
...
26. z

上一篇文章提到了 Rx 和 Streams API 之間的類似之處,而實際上,在數據就緒的時候,Reactor 也會像 Java Steams 那樣開始簡單地推送數據事件(能夠參看下面關於回壓的內容)。只是在主線程裏對事件源進行訂閱沒法完成更加複雜的異步操做,主要是由於在訂閱完成以後,控制權會立刻返回到主線程,並退出整個程序。例如:

@Test
public void shortCircuit() {
  Flux<String> helloPauseWorld = 
              Mono.just("Hello")
                  .concatWith(Mono.just("world")
                  .delaySubscriptionMillis(500));

  helloPauseWorld.subscribe(System.out::println);
}

這個單元測試會打印出Hello,但沒法打印出world,由於程序會過早地退出。在作簡單測試的時候,若是你只是像這樣寫一個簡單的主類,你一般會掉入陷阱。做爲補救,你能夠建立一個 CountDownLatch 對象,並在 Subscriber(包括 onError 和 onComplete)裏調用 countDown 方法。不過這樣就變得不那麼響應式了,不是嗎?(萬一你忘了調用 countDown 方法,而恰好發生錯誤了該怎麼辦?)

解決這個問題的第二種方法是使用一些操做轉換到非響應式模式。toItetable 和 toStream 會生成阻塞實例。咱們在例子裏使用 toStream:

@Test
public void blocks() {
  Flux<String> helloPauseWorld = 
    Mono.just("Hello")
        .concatWith(Mono.just("world")
                        .delaySubscriptionMillis(500));

  helloPauseWorld.toStream()
                 .forEach(System.out::println);
}

正如你所期待的那樣,在打印出Hello以後有一個短暫的停頓,而後打印出「world」並退出。咱們以前也提過,RxJava 的 amb 操做在 Reactor 裏被重命名爲 firstEmitting(正如它的名字所表達的:選擇第一個 Flux 來觸發)。在下面的例子裏,咱們會建立一個 Mono,這個 Mono 會有 450 毫秒的延遲,還會建立一個 Flux,這個 Flux 以 400 毫秒的間隔觸發事件。在使用 firstEmitting() 對它們進行合併時,由於 Flux 的第一個值先於 Mono 的值出現,因此最後 Flux 會被採用:

@Test
public void firstEmitting() {
  Mono<String> a = Mono.just("oops I'm late")
                       .delaySubscriptionMillis(450);
  Flux<String> b = Flux.just("let's get", "the party", "started")
                       .delayMillis(400);

  Flux.firstEmitting(a, b)
      .toIterable()
      .forEach(System.out::println);
}

這個單元測試會打印出句子的全部部分,它們之間有 400 毫秒的時間間隔。

這個時候你可能會想,若是我寫的測試使用的是 4000 毫秒的間隔而不是 400 毫秒,那會怎樣?你不會想在一個單元測試裏等待 4 秒鐘的!在後面的部分,咱們會看到 Reactor 提供了一些測試工具能夠很好地解決這個問題。

咱們已經經過例子比較了 Reactor 的一些經常使用操做,如今咱們要回頭看看這個框架其它方面的不一樣點。

基於 Java 8

Reactor 選擇 Java 8 做爲運行基礎而不是以前的任何版本,這再一次與它簡化 API 的目標不謀而合:RxJava 選擇了 Java 6,而 Java 6 裏沒有 java.util.function 包,RxJava 也就沒法利用這個包下面的 Functino 類和 Consumer 類,因此它必須建立不少相似 Func一、Func二、Action0、Action1 這樣的類。RxJava 2 使用相似 Reactor 2 的方式把這些類做爲 java.util.function 的鏡像,由於它還得支持 Java 7。

Reactor API 還使用了 Java 8 裏新引入的一些類型。由於大部分基於時間的操做都跟時間段有關係(例如超時、時間間隔、延遲,等等),因此直接就使用了 Java 8 裏的 Duration 類。

Java 8 Stream API 和 CompletableFuture 跟 Flux/Mono 之間能夠很容易地進行互相轉換。那麼通常狀況下咱們是否要把 Stream 轉成 Flux?不必定。雖說 Flux 或 Mono 對 IO 和內存相關操做的封裝所產生的開銷微不足道,不過 Stream 自己也並不會帶來很大延遲,因此直接使用 Stream API 是沒有問題的。對於上述狀況,在 RxJava 2 裏須要使用 Observable,由於 Observable 不支持回壓,因此一旦對其進行訂閱,它就成爲事件推送的來源。Reactor 是基於 Java 8 的,因此在大部分狀況下,Stream API 已經可以知足需求了。要注意的是,儘管 Flux 和 Mono 的工廠模式也支持簡單類型,但它們的主要用途仍是在於把對象合併到更高層次的流裏面。因此通常來講,在現有代碼上應用響應式模式時,你不會但願把「long getCount()」這樣的方法轉成「Mono getCount()」。

關於回壓

回壓是 RS 規範和 Reactor 主要關注點之一(若是還有其它關注點的話)。回壓的原理是說,在一個推送場景裏,生產者的生產速度比消費者的消費速度快,消費者會向生產者發出信號說「嘿,慢一點,我處理不過來了。」生產者能夠藉機控制數據生成的速度,而不是拋棄數據或者冒着產生級聯錯誤的風險繼續生成數據。

你也許會想,在 Mono 裏爲何也須要回壓:什麼樣的消費者會被一個單獨的觸發事件壓垮?答案是「應該不會有這樣的消費者」。不過,在 Mono 和 CompletableFuture 工做原理之間仍然有一個關鍵的不一樣點。後者只有推送:若是你持有一個 Future 的引用,那麼說明一個異步任務已經在執行了。另外一方面,回壓的 Flux 或 Mono 會啓動延遲的拉取 - 推送迭代:

  1. 延遲是由於在調用 subscribe() 方法以前不會發生任何事情
  2. 拉取是由於在訂閱和發出請求時,Subscriber 會向上遊發出信號,準備拉取下一個數據塊
  3. 接下來生產者向消費者推送數據,這些數據在消費者的請求範圍以內

對 Mono 來講,subscribe() 方法就至關於一個按鈕,按下它就等於說我準備好接收數據了。Flux 也有一個相似的按鈕,不過它是 request(n) 方法,這個方法是 subscribe() 的通常化用法。

Mono 做爲一個 Publisher,它每每表明着一個耗費資源的任務(在 IO、延遲等方面),意識到這點是理解回壓的關鍵:若是不對其進行訂閱,你就不須要爲之付出任何代價。由於 Mono 常常跟具備回壓的 Flux 一塊兒被編排到一個響應式鏈上,來自多個異步數據源的結果有可能被組合到一塊兒,這種按需觸發的能力是避免阻塞的關鍵。

咱們可使用回壓來區分 Mono 的不一樣使用場景,相比上述的例子,Mono 有另一個常見的使用場景:把 Flux 的數據異步地聚合到 Mono 裏。reduce 和 hasElement 能夠消費 Flux 裏的每個元素,再把這些數據以某種形式聚合起來(分別是 reduce 函數的調用結果和一個 boolean 值),做爲一個 Mono 對外暴露數據。在這種狀況下,使用 Long.MAX_VALUE 向上遊發出回壓信號,上游會以徹底推送的方式工做。

關於回壓另外一個有意思的話題是它如何對存儲在內存裏的流的對象數量進行限制。做爲一個 Publisher,數據源頗有可能出現生成數據緩慢的問題,而來自下游的請求超出了可用數據項。在這種狀況下,整個流很天然地進入到推送模式,消費者會在有新數據到達時收到通知。當生產高峯來臨,或者在生產速度加快的狀況下,整個流又回到了拉取模式。在以上兩種狀況下,最多有 N 項數據(request() 請求的數據量)會被保留在內存裏。

你能夠對內存的使用狀況進行更精確的推算,把 N 項數據跟每項數據須要消耗的內存 W 結合起來:這樣你就能夠推算出最多須要消耗 W*N 的內存。實際上,Reactor 在大多數狀況下會根據 N 來作出優化:根據狀況建立內部隊列,並應用預取策略,每次自動請求 75% 的數據量。

Reactor 的操做有時候會根據它們所表明的語義和調用者的指望來改變回壓信號。例如對於操做 buffer(10):下游請求 N 項數據,而這個操做會向上遊請求 10N 的數據量,這樣就能夠填滿緩衝區,爲訂閱者提供足夠的數據。這一般被稱爲「主動式回壓」,開發人員能夠充分利用這種特性,例如在微批次場景裏,能夠顯式地告訴 Reactor 該如何從一個輸入源切換到一個輸出地。

跟 Spring 的關係

Reactor 是 Spring 整個生態系統的基礎,特別是 Spring 5(經過 Spring Web Reactive)和 Spring Data 「kay」(跟 spring-data-commons 2.0 相對應的)。

這兩個項目的響應式版本是很是有用的,咱們所以能夠開發出徹底響應式的 Web 應用:異步地處理請求,一直到數據庫,最後異步地返回結果。Spring 應用所以能夠更有效地利用資源,避免爲每一個請求單獨分配一個線程,還要等待 I/O 阻塞。

Reactor 將被用於將來 Spring 應用的內部響應式核心組件,以及這些 Spring 組件暴露出來的 API。通常狀況下,它們能夠處理 RS Publisher,不過大多數時候它們要面對的是 Flux/Mono,須要用到 Reactor 的豐富特性。固然,你也能夠自行選擇其它響應式框架,Reactor 提供了能夠用來適配其它 Reactor 類型和 RxJava 類型甚至簡單的 RS 類型的鉤子接口。

目前,你能夠經過 Spring Boot 2.0.0.BUILD-SNAPSHOT 和 spring-boot-starter-web-reactive 依賴項(能夠在 start.spring.io 上生成一個這樣的項目)來體驗 Spring Web Reactive:

<dependency>
  <groupId>org.springframework.boot.experimental</groupId>
  <artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>

你能夠像往常那樣寫你的 @Controller,只不過把 Spring MVC 的底層變成響應式的,把大部分 Spring MVC 的契約換成了響應式非阻塞的契約。響應式層默認運行在 Tomcat 8.5 上,你也能夠選擇使用 Undertow 或 Netty。

{% asset_img 1.jpg %}

另外,雖然 Spring API 是以 Reactor 類型爲基礎的,不過在 Spring Web Reactive 模塊裏能夠爲請求和響應使用各類各樣的響應式類型:

  • Mono :做爲 @RequestBody,請求實體 T 會被異步反序列化,以後的處理能夠跟 Mono 關聯起來。做爲返回類型,每次 Mono 發出了一個值,T 就會被異步序列化併發回客戶端。你能夠把請求 Mono 做爲參數,並把參數化了的關聯處理做爲結果 Mono 返回。
  • Flux :在流場景裏使用(做爲 @RequestBody 使用的輸入流以及包含了 Flux 返回類型的 Server Sent Events)。
  • Single/Observable:分別對應 Mono 和 Flux,不過會切換回 RxJava。
  • Mono 做爲返回類型:在 Mono 結束時請求的處理也跟着完成。
  • 非響應式返回類型(void 和 T):這個時候你的 @Controller 方法是同步的,不過它應該是非阻塞的(短暫的處理)。請求處理在方法執行完畢時結束,返回的 T 被異步地序列化併發回客戶端。

下面是使用 Spring Web Reactive 的例子:

@Controller
public class ExampleController {

  private final MyReactiveLibrary reactiveLibrary;

  public ExampleController(@Autowired MyReactiveLibrary reactiveLibrary) {
     this.reactiveLibrary = reactiveLibrary;
  }

  @RequestMapping("hello/{who}")
  @ResponseBody
  public Mono<String> hello(@PathVariable String who) {
       return Mono.just(who)
             .map(w -> "Hello " + w + "!");
  }

  @RequestMapping(value = "heyMister", method = RequestMethod.POST)
  @ResponseBody
  public Flux<String> hey(@RequestBody Mono<Sir> body) {
      return Mono.just("Hey mister ")
            .concatWith(body
                  .flatMap(sir -> Flux.fromArray(sir.getLastName().split("")))
                  .map(String::toUpperCase)
                  .take(1)
            ).concatWith(Mono.just(". how are you?"));
  }
}

第一個端點含有一個路徑變量,它被轉成 Mono,並被映射到一個問候語裏返回給客戶端。

一個發到 /hello/SImon 的 GET 請求會獲得「Hello Simon!」的文本響應。

第二個端點相對複雜一些:它異步地接收序列化 Sir 對象(一個包含了 firstName 和 lastName 屬性的類)並使用 flatMap 方法把它映射到一個字母流裏,這個字母流包含了 lastName 的全部字母。而後它選取流裏的第一個字母,把它轉成大寫,並跟問候語串在一塊兒。

因此向 /heyMister POST 一個 JSON 對象

{
    "firstName": "Paul",
    "lastName": "tEsT"
}

會返回字符串「Hello mister T. How are you?」。

響應式 Spring Data 目前也在開發當中,它被做爲 Kay 發佈的一部分,代碼在 spring-data-commons 2.0.x 分支上。如今已經有一個里程碑版本可使用:

<dependencyManagement>
  <dependencies>
     <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-releasetrain</artifactId>
        <version>Kay-M1</version>
        <scope>import</scope>
        <type>pom</type>
     </dependency>
  </dependencies>
</dependencyManagement>

而後簡單地添加 Spring Data Commons 的依賴(它會自動從上面的 BOM 裏獲取版本號):

<dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-commons</artifactId>
</dependency>

Spring Data 對響應式的支持主要表如今新的 ReactiveCrudRepository 接口,它擴展了 Repository。這個接口暴露了 CRUD 方法,使用的是 Reactor 類型的輸入和返回值。還有一個 RxJava 1 的版本,叫做 RxJava1CrudRepository。要在 CrudRepository 裏經過 id 獲取一個實體,能夠調用「T findOne(ID id)」方法,而在 ReactiveCrudRepository 和 RxJava1CrudRepository 裏要分別調用「Mono findOne(ID id)」和「Observable findOne(ID id)」。還有其它的變種,它們接收 Mono/Single 做爲參數,異步地提供 key,並在此基礎上組合返回結果。

假設有一個響應式的後端存儲(或者 mock 的 ReactiveCrudRepository bean),下面的 controller 將從前到後都是響應式的:

@Controller
public class DataExampleController {

  private final ReactiveCrudRepository<Sir, String> reactiveRepository;

  public DataExampleController(
                 @Autowired ReactiveCrudRepository<Sir, String> repo) {
     this.reactiveRepository = repo;
  }

  @RequestMapping("data/{who}")
  @ResponseBody
  public Mono<ResponseEntity<Sir>> hello(@PathVariable String who) {
     return reactiveRepository.findOne(who)
                              .map(ResponseEntity::ok)
                              .defaultIfEmpty(ResponseEntity.status(404)
                                                            .body(null));
  }
}

請注意整個流程:咱們異步地獲取實體並用 map 把它包裝成 ResponseEntity,取得一個能夠立刻返回的 Mono。若是 Spring Data repository 找不到這個 key 的數據,會返回一個空的 Mono。咱們使用 defaultIfEmpty 顯式地返回 404。

測試 Reactor

測試 RxJava這篇文章裏提到了如何測試 Observable。正如咱們所看到的,RxJava 提供了 TestScheduler,咱們能夠把它跟 RxJava 的操做一塊兒使用,這些操做接受一個 Scheduler 參數,TestScheduler 會爲這些操做啓動虛擬的時鐘。RxJava 還提供了一個 TestSubscriber 類,能夠用它等待 Observable 執行完畢,也能夠用它對每一個事件進行斷言(onNext 的值和它的數量、觸發的 onError,等等)。在 RxJava 2 裏,TestSubscriber 就是 RS Subscriber,你能夠用它測試 Reactor 的 Flux 和 Mono!

在 Reactor 裏,上述兩個使用普遍的特性被組合到了 StepVerifier 類裏。從 reactor-addons 倉庫的 reactor-test 模塊裏能夠獲取到 StepVerifier。在建立 Publisher 實例時,調用 StepVerifier.create 方法能夠初始化一個 StepVerifier。若是要使用虛擬時鐘,能夠調用 StepVerifier.withVirtualTime 方法,這個方法接受一個 Supplier 做爲參數。之因此這樣設計,是由於它會首先保證建立一個 VirtualTimeScheduler 對象,並把它做爲默認的 Scheduler 傳給舊有的操做。StepVerifier 會對在 Supplier 裏建立的 Flux/Mono 進行配置,把基於時間的操做轉爲「虛擬時間操做」。接下來你就能夠編寫各類你所指望的用例:下一個元素應該是什麼,是否應該出現錯誤,是否應該及時向前移動,等等。藉助其它方法,好比事件與 Predicate 的匹配或者對 onNext 事件的消費,你能夠與那些值之間作一些更高級的交互(猶如在使用斷言框架)。任何地方拋出的 AssertionError 都會在最終的結果裏反應出來。最後,調用 verify() 對你的用例進行測試,這個方法會經過 StepVerifier.create 或 StepVerifier.withVirtualTime 方法對預約義的事件源進行訂閱。

讓咱們來舉一些簡單的例子來講明 StepVerifier 時如何工做的。首先要添加依賴到 POM 裏:

<dependency>
  <groupId>io.projectreactor.addons</groupId>
  <artifactId>reactor-test</artifactId>
  <version>3.0.3.RELEASE</version>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.assertj</groupId>
  <artifactId>assertj-core</artifactId>
  <version>3.5.2</version>
  <scope>test</scope>
</dependency>

假設你有一個叫做 MyReactiveLibrary 的類,你要對這個類生成的一些 Flux 進行測試:

@Component
public class MyReactiveLibrary {

  public Flux<String> alphabet5(char from) {
     return Flux.range((int) from, 5)
           .map(i -> "" + (char) i.intValue());
  }

  public Mono<String> withDelay(String value, int delaySeconds) {
     return Mono.just(value)
                .delaySubscription(Duration.ofSeconds(delaySeconds));
  }
}

第一個方法將返回給定字母以後的 5 個字母。第二個方法返回一個 Flux,它會以給定的時間間隔觸發給定值,其中的時間間隔以秒爲單位。第一個測試是要保證使用 x 調用 alphabet5 的輸出被限定在 x、y、z。使用 StepVerifier 看起來是這樣的:

@Test
public void testAlphabet5LimitsToZ() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
        .expectNext("x", "y", "z")
        .expectComplete()
        .verify();
}

第二個測試要保證 alphabet5 返回的每一個值都是字母。在這裏咱們使用斷言框架 AssertJ :

@Test
public void testAlphabet5LastItemIsAlphabeticalChar() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  StepVerifier.create(library.alphabet5('x'))
              .consumeNextWith(c -> assertThat(c)
                    .as("first is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("second is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("third is alphabetic").matches("[a-z]"))
              .consumeNextWith(c -> assertThat(c)
                    .as("fourth is alphabetic").matches("[a-z]"))
              .expectComplete()
              .verify();
}

結果這些測試都運行失敗。讓咱們檢查一下 StepVirifier 的輸出,看看能不能找出 bug:

java.lang.AssertionError: expected: onComplete(); actual: onNext({)

java.lang.AssertionError: [fourth is alphabetic] 
Expecting:
 "{"
to match pattern:
 "[a-z]"

看起來咱們的方法並無在 z 的時候停住,而是繼續發出 ASCII 字符。咱們能夠加入.take(Math.min(5,'z'-from+1)) 來修復這個 bug,或者把 Math.min 做爲 range 的第二個參數。

咱們要作的最後一個測試須要用到虛擬時鐘:咱們使用 withVirtualTime 構造器來測試方法的延遲,而不須要真的等待指定的時間:

@Test
public void testWithDelay() {
  MyReactiveLibrary library = new MyReactiveLibrary();
  Duration testDuration =
     StepVerifier.withVirtualTime(() -> library.withDelay("foo", 30))
                 .expectSubscription()
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNoEvent(Duration.ofSeconds(10))
                 .thenAwait(Duration.ofSeconds(10))
                 .expectNext("foo")
                 .expectComplete()
                 .verify();
  System.out.println(testDuration.toMillis() + "ms");
}

這個測試用例測試一個將被延遲 30 秒的 Flux:在訂閱以後的 30 秒內不會發生任何事情,而後發生一個 onNext("foo") 事件後結束。

System.out 會打印出驗證所須要的時間,在最近的一次測試中它用掉了 8 毫秒。

若是調用構造器的 create 方法,thenAwait 和 expectNoEvent 方法仍然可使用,不過它們會阻塞指定的時間。

自定義動態源

RxJava 實例解析一文中提到的動態和靜態 Observable 對 Reactor 來講也是適用的。

若是你要建立一個自定義的 Flux,須要使用 Reactor 的 FluxSink。這個類將會爲你考慮全部跟異步有關的狀況,你只須要把注意力集中在觸發事件上。

使用 Flux.create 並從回調中得到的 FluxSink 能夠用於後續的觸發事件。這個自定義的 Flux 是靜態的,爲了把它變成動態的,可使用 publish() 和 connect() 方法。基於上一篇文章中的例子,咱們幾乎能夠把它逐字逐句地翻譯成 Reactor 的版本:

SomeFeed<PriceTick> feed = new SomeFeed<>();
Flux<PriceTick> flux =
     Flux.create(emitter ->
     {
        SomeListener listener = new SomeListener() {
           @Override
           public void priceTick(PriceTick event) {
              emitter.next(event);
              if (event.isLast()) {
                 emitter.complete();
              }
           }

           @Override
           public void error(Throwable e) {
              emitter.error(e);
           }};
        feed.register(listener);
     }, FluxSink.OverflowStrategy.BUFFER);

ConnectableFlux<PriceTick> hot = flux.publish();

在鏈接到動態 Flux 以前,能夠作兩次訂閱:一個訂閱將打印每一個 tick 的細節,另外一個訂閱會打印出 instrument:

hot.subscribe(priceTick -> System.out.printf("%s %4s %6.2f%n", priceTick
     .getDate(), priceTick.getInstrument(), priceTick.getPrice()));

hot.subscribe(priceTick -> System.out.println(priceTick.getInstrument()));

接下來咱們鏈接到動態 Flux,並在程序結束前讓它運行 5 秒鐘:

hot.connect();
Thread.sleep(5000);

(要注意,若是 PriceTick 的 isLast() 方法改變了,那麼 feed 自己也會結束)。

FluxSink 經過 isCancelled() 來檢查下游的訂閱是否已取消。你還能夠經過 requestedFromDownstream() 來得到請求數,這個在遵循回壓策略時很管用。最後,你能夠經過 setCancellation 方法釋放全部使用過的資源。

要注意,FluxSink 使用了回壓,因此你必須提供一個 OverflowStrategy 來顯式地處理回壓。這個等價於使用 onBackpressureXXX 操做(例如,FluxSink.OverflowStrategy.BUFFER 等價於.onBackpressureBuffer()),它們會覆蓋來自下游的回壓信號。

結論

在這篇文章裏,咱們學習了 Reactor,一個運行在 Java 8 之上並以 Rx 規範和 Reactive Streams 規範爲基礎的第四代響應式框架。咱們展現了 RxJava 中的設計理念是如何被應用在 Reactor 上的,儘管它們之間有一些 API 設計上的差異。咱們還展現了 Reactor 如何成爲 Spring 5 的基礎,還提供了一些跟測試 Publisher、Flux 和 Mono 有關的資源。

歡迎訪問個人我的博客

關注公衆號:JAVA九點半課堂,這裏有一批優秀的程序猿,加入咱們,一塊兒探討技術,共同進步!回覆「資料」獲取 2T 行業最新資料!

相關文章
相關標籤/搜索