學習響應式編程 Reactor (2) - 初識 reactor

Reactor

Reactor 是用於 Java 的異步非阻塞響應式編程框架,同時具有背壓控制的能力。它與 Java 8 函數式 Api 直接集成,好比 分爲CompletableFuture、Stream、以及 Duration
。它提供了異步 Api 響應流 Flux (用於 [0 - N] 個元素)和 Mono (用於 [0或1] 個元素),並徹底遵照和實現了響應式規範。html

引入 reactor

reactor 自 3.0.4 版本以後,採用了 BOM (Bill Of Materials)的方式,使用 BOM 能夠管理一組良好集成的 maven artifacts,而無需擔憂不一樣版本組件之間的相互依賴問題,在 maven 項目中在 dependencyManagement 中 加入 reactor 的 bom 定義便可。java

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Dysprosium-SR8</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

在須要使用 reactor 的項目中,依賴對應 reactor 模塊便可。react

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
</dependencies>

在我學習的過程當中,reactor 的最新版本是 Dysprosium-SR8 ,它的命名來自元素週期表,按順序遞增。經過 https://github.com/reactor/reactor/releases 獲取最新版本。git

reactor bom 中 包含了以下組件:github

序號 模塊 artifactId 說明
1 Reactor Core reactor-core 基於 Java 8 的響應式流規範實現
2 Reactor Test reactor-test reactor 的測試工具集
3 Reactor Extra reactor-extra 爲 Flux 額外提供的操做符
4 Reactor Netty reactor-netty 基於 Netty 實現的 TCP、UDP、HTTP 的客戶端和服務端
5 Reactor Adapter reactor-adapter 和其餘響應式庫(如RxJava二、SWT Scheduler、 Akka Scheduler)的適配器
6 Reactor Kafka reactor-kafka Apache Kafka 的響應式橋接實現
7 Reactor Kotlin Extensions reactor-kotlin-extensions 在 Dysprosium 版本後額外提供的 Kotlin 擴展
8 Reactor RabbitMQ reactor-rabbitmq RabbitMQ 的響應式橋接實現
9 Reactor Pool reactor-pool 響應式應用程序的通用對象池
10 Reactor Tools reactor-tools 一組用於改善 Project Reactor 調試和開發經驗的工具。

序號 [1 - 3] 爲咱們學習 Reactor 過程當中主要涉及的模塊,序號 [4 - 9] 在咱們學習 Spring WebFlux 的過程當中會有所涉及,序號 [10] 是用於 Reactor 調試的,下面會講到。web

使用 gradle 的同窗請自行百度。
若是須要嚐鮮 Reactor 里程碑版或開發者預覽版的同窗,添加 Spring Milestones repository 的倉庫便可。spring

Reactor 之 初體驗

上面說了那麼多,咱們先來體驗下 Reactor。編程

在學習 Java Stream 的環節中,不知是否有同窗有提出這樣的疑問:在進行中間操做或終端消費操縱時,如何獲取流中元素的序號值呢?
假若有以下單詞 [ the, quick, brown, fox, jumped, over, the, lazy, dog ] ,使用 Stream 能否實現輸出時並打印每一個單詞的序號呢?
仔細想一想,彷佛沒有直接的辦法能夠獲取,咱們只能經過外部建立變量獲取並遞增來實現。框架

來看下 Stream 的實現:異步

AtomicInteger index = new AtomicInteger(1);
Arrays.stream(WORDS)
        .map(word -> StrUtil.format("{}. {}", index.getAndIncrement(), word))
        .forEach(System.out::println);

來看下 Reactor 的實現:

Flux.fromArray(WORDS)                                                   // ①
        .zipWith(Flux.range(1, Integer.MAX_VALUE),                      // ②
                (word, index) -> StrUtil.format("{}. {}", index, word)) // ③
        .subscribe(System.out::println);                                // ④

先不看 Reactor 代碼的含義,感受怎麼樣,Reactor 的代碼看起來是否是更清新一點,沒有定義任何三方變量解決了這個問題。

有了 Stream 的基礎,Reactor 的代碼很容易理解了,咱們稍微來解釋下 Reactor 上段的代碼:

  1. 序號① 的代碼 Flux 是咱們以前提到的 一個可以發出 0 到 N 個元素的響應流發佈者,fromArray 是它的靜態方法,用來建立 Flux 響應流
  2. 序號② 的代碼 Flux 的 range 操做符和 Stream 的 range 相同,用來生成 整數 Flux 響應流;zipWith 操做符用來合併兩個 Flux,並將響應流中的元素一一對應,當其中一個響應流完成時,合併結束,以前未結束的響應流剩下的元素將被忽略
  3. 序號③ 的代碼 zipWith 操做符 支持傳遞一個 BiFunction 的函數式接口實現,定義如何來合併兩個數據流中的元素,本例中咱們將索引和單詞鏈接起來
  4. 序號④ 的代碼 subscribe 即爲訂閱方法,此處咱們作了數據流中元素輸出至控制檯

Reactor 之 測試 & 調試

測試

Reactor 的測試須要依賴測試模塊:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

編寫測試代碼以下:

// 建立 Flux 響應流
Flux<String> source = Flux.just("foo", "bar");
// 使用 concatWith 操做符鏈接 2 個響應流
Flux<String> boom = source.concatWith(Mono.error(new IllegalArgumentException("boom")));
// 建立一個 StepVerifier 構造器來包裝和校驗一個 Flux。
StepVerifier.create(boom)
        .expectNext("foo")          // 第一個咱們指望的信號是 onNext,它的值爲 foo
        .expectNext("bar")          // 第二個咱們指望的信號是 onNext,它的值爲 bar
        .expectErrorMessage("boom") // 最後咱們指望的是一個終止信號 onError,異常內容應該爲 boom
        .verify();                  // 使用 verify() 觸發測試。

除了正常測試外,Reactor 還提供了諸如:

  1. 測試基於時間操做符相關的方法,使用 StepVerifier.withVirtualTime 來進行
  2. 使用 StepVerifier 的 expectAccessibleContext 和 expectNoAccessibleContext 來測試 Context
  3. 用 TestPublisher 手動發出元素
  4. 用 PublisherProbe 檢查執行路徑

測試方面暫時不是咱們學習的重點,這塊內容,咱們快速跳過,等到學習到相關場景,須要的時候,咱們回過頭來再彌補。

調試

響應式編程的調試使人生畏,由於它不像命令式編程,咱們能夠從異常的堆棧信息中看到發生錯誤代碼的位置及具體錯誤信息,這也是響應式編程學習曲線比較陡峭的緣由。

有以下代碼:

Flux.range(1, 3)
    .flatMap(n -> Mono.just(n + 100))
    .single()
    .map(n -> n * 2)
    .subscribe(System.out::println);

執行測試時,打印錯誤日誌以下:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4219)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4190)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4126)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4073)
	at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:79)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	...
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)

咱們從上述的錯誤中獲取到發生了 IndexOutOfBoundsException 數據越界異常,從上往下看,應該是 MonoSingle 響應式流發出了不止一個元素,查看 Mono#singe 操做符描述,咱們看到 single 有一個規定: 源必須只能發出一個元素。看來是有一個源發出了多於一個元素,從而違反了這一規定。

粗略過一下這些行,咱們能夠大概勾畫出一個大體的出問題的鏈:涉及 MonoSingle、FluxFlatMap、FluxRange(每個都對應 trace 中的幾行,但整體涉及這三個類)。 因此難道是 range().flatMap().single() 這樣的鏈?

可是若是在咱們的應用中多處都用到這一模式,那怎麼辦?經過這些仍是不能肯定爲何會拋除這個異常, 搜索 single 也找不到問題所在。直到最後幾行指向了咱們的代碼,查看代碼和咱們以前的預測的調用鏈同樣。

可是最終咱們怎麼快速肯定代碼的問題在哪裏呢?

方案1: 開啓調試模式

使用 Hooks.onOperatorDebug(); 在程序初始的地方開啓調試模式

錯誤日誌以下:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
	reactor.core.publisher.Flux.single(Flux.java:7851)
	top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:83)
Error has been observed at the following site(s):
	|_ Flux.single ⇢ at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:83)
	|_    Mono.map ⇢ at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:84)
Stack trace:
		at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
		at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
		at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
		at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
		at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4219)
		at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4190)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4126)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4073)
		at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:85)

咱們從 Error has been observed at the following site(s) 這行錯誤起,能夠看到錯誤沿着操做鏈傳播的軌跡(從錯誤點到訂閱點),咱們從 Assembly trace from
producer 這行開始的錯誤中也看到了源代碼 83 行開始報錯,也肯定了上一行的 flatMap 操做符發出了不止一個元素致使。

方案2: 使用 checkpoint 操做符埋點調試

使用方案1開啓全局調試有較高的成本即影響性能,咱們能夠在可能發生錯誤的代碼中加入操做符 checkpoint 來檢測本段響應式流的問題,而不影響其餘數據流的執行。
checkpoint 一般用在明確的操做符的錯誤檢查,相似於埋點檢查的概念。同時該操做符支持 3個重載方法:checkpoint(); checkpoint(String description); checkpoint
(String description, boolean forceStackTrace);
description 爲埋點描述,forceStackTrace 爲是否打印堆棧

方案3: 啓用調試代理

1. 在項目中引入 reactor-tools 依賴

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
</dependency>

2. 使用 ReactorDebugAgent.init(); 初始化代理

因爲該代理是在加載類時對其進行檢測,所以放置它的最佳位置是在main(String [])方法中的全部其餘項以前

3. 若是是測試類,使用以下代碼處理現有的類

注意,在測試類中須要提早運行,好比在 @Before 中

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();

總結

本篇咱們瞭解瞭如何引入 Reactor ;初步體驗了 Reactor 的 Hello World 代碼;最後咱們瞭解瞭如何測試及調試 Reactor,這些內容爲咱們後面學習 Reactor 的基礎,但願你們都能掌握。

今天的內容就學到這裏,咱們下篇開始 Reactor 的基礎和特性學習。

源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note{:target="_blank"} 下 02-reactor-core-learning 模塊。

參考

  1. Reactor 3 Reference Guide
  2. Reactor 3 中文指南
相關文章
相關標籤/搜索