Reactor3 中文文檔(用戶手冊)


Reactor3 中文文檔 pdf文件 百度網盤下載


網盤地址 和 提取碼:請參見 瘋狂創客圈 的 百度網盤小視頻和小工具react

Reactor 3 參考文檔

Stephane Maldini @smaldini Simon Baslé @simonbasle3.2.0.BUILD-SNAPSHOTgit

(譯者加)本文檔的一些典型的名詞以下:Publisher(發佈者)、Subscriber(訂閱者)、Subscription(訂閱 n.)、subscribe(訂閱 v.)。event/signal(事件/信號,原文常甚至在一個句子將兩個詞來回用,但表示的意思是基本相同的, 所以若是你看到本文翻譯有時候用事件,有時候用信號,在本文檔內基本能夠認爲一個意思)。sequence/stream(序列/流,兩個詞意思類似,本文介紹的是響應式流的內容,可是出現比較多的是 sequence這個詞,主要翻譯爲「序列」,有些地方爲了更加契合且方便理解翻譯爲「流序列」)。element/item(主要指序列中的元素,文中兩個詞基本翻譯爲「元素」)。emit/produce/generate(發出/產生/生成,文中這三個英文詞也有類似之處,對於 emit 多翻譯爲 「發出」,對於後兩個多翻譯爲「生成」)、consume(消費)。Processor(未作翻譯,保留英文)。operator(譯做操做符,聲明式的可組裝的響應式方法,其組裝成的鏈譯做「操做鏈」)。

1. 關於本文檔

本節是對 Reactor參考文檔(譯者加:原文估計是多我的寫的,時而「document」時而「guide」,不影響理解的狀況下, 翻譯就一概用「文檔」了) 的簡要概述。你並不須要從頭至尾閱讀該文檔。每一節的內容都是獨立的,不過會有其餘章節的連接。spring

1.1. 最新版本 & 版權說明

本Reactor參考文檔也提供HTML形式。最新版本見 http://projectreactor.io/docs/core/release/reference/docs/index.html。數據庫

本文檔的副本你能夠自用,亦可分發給他人。不過不管是打印版仍是電子版,請免費提供。

1.2. 貢獻本文檔

本參考文檔用 Asciidoc 編寫, 其源碼見 https://github.com/reactor/reactor-core/tree/master/src/docs/asciidoc (譯者加:本翻譯源碼見 https://github.com/get-set/reactor-core/tree/master-zh/src/docs/asciidoc )。

若有任何補充,歡迎你提交 pull request。

咱們建議你將源碼 checkout 到本地,這樣能夠使用 gradle 的 asciidoctor 任務檢查文檔渲染效果。 有些章節會包含其餘文件,Github 並不必定可以渲染出來。

爲了方便讀者的反饋,多數章節在結尾都提供一個連接,這個連接能夠打開一個 Github 上的 編輯界面,從而能夠編輯相應章節的源碼。這些連接在 HTML5 的版本中可以看到,就像這樣: 翻譯建議 - 關於本文檔

1.3. 獲取幫助

Reactor項目有多種方式但願能幫助到你:

  • 與社區溝通: Gitter
  • 在 stackoverflow.com 的 project-reactor 進行提問。
  • 在 Github issues 提交 bug 。下邊這幾個庫咱們會一直關注: reactor-core (涉及 Reactor 的核心功能) 以及 reactor-addons (涉及 reactor-test 和 adapters issues)。
全部 Reactor 項目都是開源的, 包括本文檔。 若是你發現本文檔有問題,或但願補充一些內容,請參考 這裏 進行了解。

1.4. 如何開始閱讀本文檔

翻譯建議 - "關於本文檔"

2. 快速上手

這一節的內容可以幫助你上手使用 Reactor。包括以下內容:

2.1. 介紹 Reactor

Reactor 是一個用於JVM的徹底非阻塞的響應式編程框架,具有高效的需求管理(即對 「背壓(backpressure)」的控制)能力。它與 Java 8 函數式 API 直接集成,好比 CompletableFutureStream, 以及 Duration。它提供了異步序列 API Flux(用於[N]個元素)和 Mono(用於 [0|1]個元素),並徹底遵循和實現了「響應式擴展規範」(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 組件還支持非阻塞的進程間通訊(inter-process communication, IPC)。 Reactor IPC 爲 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背壓的網絡引擎,從而適合 應用於微服務架構。而且完整支持響應式編解碼(reactive encoding and decoding)。

2.2. 前提

Reactor Core 運行於 Java 8 及以上版本。

依賴 org.reactive-streams:reactive-streams:1.0.2

Andriod 支持方面:Reactor 3 並不正式支持 Andorid(若是須要能夠考慮使用 RxJava 2)。可是,在 Android SDK 26(Android 0)及以上版本應該沒問題。咱們但願可以最大程度兼顧對 Android 的支持,可是咱們並不能做出保證,具體狀況具體分析。

2.3. 瞭解 BOM

自從 reactor-core 3.0.4,隨着 Aluminium 版本發佈上車(release train)以來,Reactor 3 使用了 BOM(Bill of Materials,一種標準的 Maven artifact)。

使用 BOM 能夠管理一組良好集成的 maven artifacts,從而無需操心不一樣版本組件的互相依賴問題。

BOM 是一系列有版本信息的 artifacts,經過「列車發佈」(release train)的發佈方式管理, 每趟發佈列車由一個「代號+修飾詞」組成,好比:

Aluminium-RELEASE
Carbon-BUILD-SNAPSHOT
Aluminium-SR1
Bismuth-RELEASE
Carbon-SR32

代號替代了傳統的「主版本.次版本」的數字形式。這些代號主要來自 Periodic Table of Elements, 按首字母順序依次選取。

修飾詞有(按照時間順序):

  • BUILD-SNAPSHOT
  • M1..N: 里程碑號
  • RELEASE: 第一次 GA (General Availability) 發佈
  • SR1..N: 後續的 GA 發佈(相似於 PATCH 號或 SR(Service Release))。

2.4. 獲取 Reactor

前邊提到,使用 Reactor 的最簡單方式是在你的項目中配置 BOM 以及相關依賴。 注意,當你這樣添加依賴的時候,要省略版本( )配置,從而自動使用 BOM 中指定的版本。

固然,若是你但願使用某個版本的 artifact,仍然能夠指定。甚至徹底不使用 BOM,逐個配置 artifact 的版本也是能夠的。

2.4.1. Maven 配置

Maven 原生支持 BOM。首先,你須要在 pom.xml 內經過添加下邊的代碼引入 BOM。若是 (dependencyManagement) 已經存在,只須要添加其內容便可。

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>Bismuth-RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
注意 dependencyManagement 標籤用來補充一般使用的 dependencies 配置。

而後,在 dependencies 中添加相關的 reactor 項目,省略 <version>,以下:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
        
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>
依賴 Core 庫
沒有 version 標籤
reactor-test 提供了對 reactive streams 的單測

2.4.2. Gradle 配置

Gradle 沒有對 Maven BOM 的支持,可是你能夠使用 Spring 的 gradle-dependency-management 插件。

首先,apply 插件。

plugins {
    id "io.spring.dependency-management" version "1.0.1.RELEASE" 
}
編寫本文檔時,插件最新版本爲 1.0.1.RELEASE,請自行使用合適的版本。

而後用它引入 BOM:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE"
     }
}

Finally add a dependency to your project, without a version number:

dependencies {
     compile 'io.projectreactor:reactor-core' 
}
無需第三個 : 添加版本號。

2.4.3. Milestones 和 Snapshots

里程碑版(Milestones)和開發預覽版(developer previews)經過 Spring Milestones repository 而不是 Maven Central 來發布。 須要添加到構建配置文件中,如:

Milestones in Maven

<repositories>
        <repository>
                <id>spring-milestones</id>
                <name>Spring Milestones Repository</name>
                <url>https://repo.spring.io/milestone</url>
        </repository>
</repositories>

gradle 使用下邊的配置:

Milestones in Gradle

repositories {
  maven { url 'http://repo.spring.io/milestone' }
  mavenCentral()
}

相似的,snapshot 版也須要配置專門的庫:

BUILD-SNAPSHOTs in Maven

<repositories>
        <repository>
                <id>spring-snapshots</id>
                <name>Spring Snapshot Repository</name>
                <url>https://repo.spring.io/snapshot</url>
        </repository>
</repositories>

BUILD-SNAPSHOTs in Gradle

repositories {
  maven { url 'http://repo.spring.io/snapshot' }
  mavenCentral()
}

翻譯建議 - "快速上手"

3. 響應式編程

Reactor 是響應式編程範式的實現,總結起來有以下幾點:

響應式編程是一種關注於數據流(data streams)和變化傳遞(propagation of change)的異步編程方式。 這意味着它能夠用既有的編程語言表達靜態(如數組)或動態(如事件源)的數據流。

在響應式編程方面,微軟跨出了第一步,它在 .NET 生態中建立了響應式擴展庫(Reactive Extensions library, Rx)。接着 RxJava 在JVM上實現了響應式編程。後來,在 JVM 平臺出現了一套標準的響應式 編程規範,它定義了一系列標準接口和交互規範。並整合到 Java 9 中(使用 Flow 類)。

響應式編程一般做爲面向對象編程中的「觀察者模式」(Observer design pattern)的一種擴展。 響應式流(reactive streams)與「迭代子模式」(Iterator design pattern)也有相通之處, 由於其中也有 Iterable-Iterator 這樣的對應關係。主要的區別在於,Iterator 是基於 「拉取」(pull)方式的,而響應式流是基於「推送」(push)方式的。

使用 iterator 是一種「命令式」(imperative)編程範式,即便訪問元素的方法是 Iterable 的惟一職責。關鍵在於,何時執行 next() 獲取元素取決於開發者。在響應式流中,相對應的 角色是 Publisher-Subscriber,可是 當有新的值到來的時候 ,卻反過來由發佈者(Publisher) 通知訂閱者(Subscriber),這種「推送」模式是響應式的關鍵。此外,對推送來的數據的操做 是經過一種聲明式(declaratively)而不是命令式(imperatively)的方式表達的:開發者經過 描述「控制流程」來定義對數據流的處理邏輯。

除了數據推送,對錯誤處理(error handling)和完成(completion)信號的定義也很完善。 一個 Publisher 能夠推送新的值到它的 Subscriber(調用 onNext 方法), 一樣也能夠推送錯誤(調用 onError 方法)和完成(調用 onComplete 方法)信號。 錯誤和完成信號均可以終止響應式流。能夠用下邊的表達式描述:

onNext x 0..N [onError | onComplete]

這種方式很是靈活,不管是有/沒有值,仍是 n 個值(包括有無限個值的流,好比時鐘的持續讀秒),均可處理。

那麼咱們爲何須要這樣的異步響應式開發庫呢?

3.1. 阻塞是對資源的浪費

現代應用須要應對大量的併發用戶,並且即便現代硬件的處理能力飛速發展,軟件性能仍然是關鍵因素。

廣義來講咱們有兩種思路來提高程序性能:

  1. 並行化(parallelize) :使用更多的線程和硬件資源。
  2. 基於現有的資源來 提升執行效率

一般,Java開發者使用阻塞式(blocking)編寫代碼。這沒有問題,在出現性能瓶頸後, 咱們能夠增長處理線程,線程中一樣是阻塞的代碼。可是這種使用資源的方式會迅速面臨 資源競爭和併發問題。

更糟糕的是,阻塞會浪費資源。具體來講,好比當一個程序面臨延遲(一般是I/O方面, 好比數據庫讀寫請求或網絡調用),所在線程須要進入 idle 狀態等待數據,從而浪費資源。

因此,並行化方式並不是銀彈。這是挖掘硬件潛力的方式,可是卻帶來了複雜性,並且容易形成浪費。

3.2. 異步能夠解決問題嗎?

第二種思路——提升執行效率——能夠解決資源浪費問題。經過編寫 異步非阻塞 的代碼, (任務發起異步調用後)執行過程會切換到另外一個 使用一樣底層資源 的活躍任務,而後等 異步調用返回結果再去處理。

可是在 JVM 上如何編寫異步代碼呢?Java 提供了兩種異步編程方式:

  • 回調(Callbacks) :異步方法沒有返回值,而是採用一個 callback 做爲參數(lambda 或匿名類),當結果出來後回調這個 callback。常見的例子好比 Swings 的 EventListener
  • Futures :異步方法 當即 返回一個 Future<T>,該異步方法要返回結果的是 T 類型,經過 Future封裝。這個結果並非 馬上 能夠拿到,而是等實際處理結束纔可用。好比, ExecutorService 執行 Callable<T> 任務時會返回 Future 對象。

這些技術夠用嗎?並不是對於每一個用例都是如此,兩種方式都有侷限性。

回調很難組合起來,由於很快就會致使代碼難以理解和維護(即所謂的「回調地獄(callback hell)」)。

考慮這樣一種情景:在用戶界面上顯示用戶的5個收藏,或者若是沒有任何收藏提供5個建議。這須要3個 服務(一個提供收藏的ID列表,第二個服務獲取收藏內容,第三個提供建議內容):

回調地獄(Callback Hell)的例子

userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
基於回調的服務使用一個匿名 Callback 做爲參數。後者的兩個方法分別在異步執行成功 或異常時被調用。
獲取到收藏ID的list後調用第一個服務的回調方法 onSuccess
若是 list 爲空, 調用 suggestionService
服務 suggestionService 傳遞 List<Favorite> 給第二個回調。
既然是處理 UI,咱們須要確保消費代碼運行在 UI 線程。
使用 Java 8 Stream 來限制建議數量爲5,而後在 UI 中顯示。
在每一層,咱們都以一樣的方式處理錯誤:在一個 popup 中顯示錯誤信息。
回到收藏 ID 這一層,若是返回 list,咱們須要使用 favoriteService 來獲取 Favorite 對象。因爲只想要5個,所以使用 stream 。
再一次回調。此次對每一個ID,獲取 Favorite 對象在 UI 線程中推送到前端顯示。

這裏有很多代碼,稍微有些難以閱讀,而且還有重複代碼,咱們再來看一下用 Reactor 實現一樣功能:

使用 Reactor 實現以上回調方式一樣功能的例子

userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);
咱們獲取到收藏ID的流
咱們 異步地轉換 它們(ID) 爲 Favorite 對象(使用 flatMap),如今咱們有了 Favorite流。
一旦 Favorite 爲空,切換到 suggestionService
咱們只關注流中的最多5個元素。
最後,咱們但願在 UI 線程中進行處理。
經過描述對數據的最終處理(在 UI 中顯示)和對錯誤的處理(顯示在 popup 中)來觸發(subscribe)。

若是你想確保「收藏的ID」的數據在800ms內得到(若是超時,從緩存中獲取)呢?在基於回調的代碼中, 會比較複雜。但 Reactor 中就很簡單,在處理鏈中增長一個 timeout 的操做符便可。

Reactor 中增長超時控制的例子

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) 
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
若是流在超時時限沒有發出(emit)任何值,則發出錯誤(error)。
一旦收到錯誤,交由 cacheService 處理。
處理鏈後邊的內容與上例相似。

Futures 比回調要好一點,但即便在 Java 8 引入了 CompletableFuture,它對於多個處理的組合仍不夠好用。 編排多個 Futures 是可行的,但卻不易。此外,Future 還有一個問題:當對 Future 對象最終調用 get() 方法時,仍然會致使阻塞,而且缺少對多個值以及更進一步對錯誤的處理。

考慮另一個例子,咱們首先獲得 ID 的列表,而後經過它進一步獲取到「對應的 name 和 statistics」 爲元素的列表,整個過程用異步方式來實現。

CompletableFuture 處理組合的例子

CompletableFuture<List<String>> ids = ifhIds(); 

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
        Stream<CompletableFuture<String>> zip =
                        l.stream().map(i -> { 
                                                 CompletableFuture<String> nameTask = ifhName(i); 
                                                 CompletableFuture<Integer> statTask = ifhStat(i); 

                                                 return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
                                         });
        List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
        CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

        CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
        return allDone.thenApply(v -> combinationList.stream()
                                                                                                 .map(CompletableFuture::join) 
                                                                                                 .collect(Collectors.toList()));
});

List<String> results = result.join(); 
assertThat(results).contains(
                                "Name NameJoe has stats 103",
                                "Name NameBart has stats 104",
                                "Name NameHenry has stats 105",
                                "Name NameNicole has stats 106",
                                "Name NameABSLAJNFOAJNFOANFANSF has stats 121");
以一個 Future 開始,其中封裝了後續將獲取和處理的 ID 的 list。
獲取到 list 後邊進一步對其啓動異步處理任務。
對於 list 中的每個元素:
異步地獲得相應的 name。
異步地獲得相應的 statistics。
將兩個結果一一組合。
咱們如今有了一個 list,元素是 Future(表示組合的任務,類型是 CompletableFuture),爲了執行這些任務, 咱們須要將這個 list(元素構成的流) 轉換爲數組(List)。
將這個數組傳遞給 CompletableFuture.allOf,返回一個 Future ,當因此任務都完成了,那麼這個 Future 也就完成了。
有點麻煩的地方在於 allOf 返回的是 CompletableFuture<Void>,因此咱們遍歷這個 Future 的List, ,而後使用 join() 來手機它們的結果(不會致使阻塞,由於 AllOf 確保這些 Future 所有完成)
一旦整個異步流水線被觸發,咱們等它完成處理,而後返回結果列表。

因爲 Reactor 內置許多組合操做,所以以上例子能夠簡單地實現:

Reactor 實現與 Future 一樣功能的代碼

Flux<String> ids = ifhrIds(); 

Flux<String> combinations =
                ids.flatMap(id -> { 
                        Mono<String> nameTask = ifhrName(id); 
                        Mono<Integer> statTask = ifhrStat(id); 

                        return nameTask.zipWith(statTask, 
                                        (name, stat) -> "Name " + name + " has stats " + stat);
                });

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); 
assertThat(results).containsExactly( 
                "Name NameJoe has stats 103",
                "Name NameBart has stats 104",
                "Name NameHenry has stats 105",
                "Name NameNicole has stats 106",
                "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
這一次,咱們從一個異步方式提供的 ids 序列(Flux<String>)開始。
對於序列中的每個元素,咱們異步地處理它(flatMap 方法內)兩次。
獲取相應的 name。
獲取相應的 statistic.
異步地組合兩個值。
隨着序列中的元素值「到位」,它們收集一個 List 中。
在生成流的環節,咱們能夠繼續異步地操做 Flux 流,對其進行組合和訂閱(subscribe)。 最終咱們極可能獲得一個 Mono 。因爲是測試,咱們阻塞住(block()),等待流處理過程結束, 而後直接返回集合。
Assert 結果。

回調或 Future 遇到的窘境是相似的,這也是響應式編程要經過 Publisher-Suscriber 方式來解決的。

3.3. 從命令式編程到響應式編程

相似 Reactor 這樣的響應式庫的目標就是要彌補上述「經典」的 JVM 異步方式所帶來的不足, 此外還會關注一下幾個方面:

  • 可編排性(Composability) 以及 可讀性(Readability)
  • 使用豐富的 操做符 來處理形如 的數據
  • 訂閱(subscribe) 以前什麼都不會發生
  • 背壓(backpressure) 具體來講即 消費者可以反向告知生產者生產內容的速度的能力
  • 高層次 (同時也是有高價值的)的抽象,從而達到 併發無關 的效果

3.3.1. 可編排性與可讀性

可編排性,指的是編排多個異步任務的能力。好比咱們將前一個任務的結果傳遞給後一個任務做爲輸入, 或者將多個任務以分解再彙總(fork-join)的形式執行,或者將異步的任務做爲離散的組件在系統中 進行重用。

這種編排任務的能力與代碼的可讀性和可維護性是緊密相關的。隨着異步處理任務數量和複雜度 的提升,編寫和閱讀代碼都變得愈來愈困難。就像咱們剛纔看到的,回調模式是簡單的,可是缺點 是在複雜的處理邏輯中,回調中會層層嵌入回調,致使 回調地獄(Callback Hell) 。你能猜到 (或有過這種痛苦經歷),這樣的代碼是難以閱讀和分析的。

Reactor 提供了豐富的編排操做,從而代碼直觀反映了處理流程,而且全部的操做保持在同一層次 (儘可能避免了嵌套)。

3.3.2. 就像裝配流水線

你能夠想象數據在響應式應用中的處理,就像流過一條裝配流水線。Reactor 既是傳送帶, 又是一個個的裝配工或機器人。原材料從源頭(最初的 Publisher)流出,最終被加工爲成品, 等待被推送到消費者(或者說 Subscriber)。

原材料會通過不一樣的中間處理過程,或者做爲半成品與其餘半成品進行組裝。若是某處有齒輪卡住, 或者某件產品的包裝過程花費了過久時間,相應的工位就能夠向上遊發出信號來限制或中止發出原材料。

3.3.3. 操做符(Operators)

在 Reactor 中,操做符(operator)就像裝配線中的工位(操做員或裝配機器人)。每個操做符 對 Publisher 進行相應的處理,而後將 Publisher 包裝爲一個新的 Publisher。就像一個鏈條, 數據源自第一個 Publisher,而後順鏈條而下,在每一個環節進行相應的處理。最終,一個訂閱者 (Subscriber)終結這個過程。請記住,在訂閱者(Subscriber)訂閱(subscribe)到一個 發佈者(Publisher)以前,什麼都不會發生。

理解了操做符會建立新的 Publisher 實例這一點,可以幫助你避免一個常見的問題, 這種問題會讓你以爲處理鏈上的某個操做符沒有起做用。相關內容請參考 item

雖然響應式流規範(Reactive Streams specification)沒有規定任何操做符, 相似 Reactor 這樣的響應式庫所帶來的最大附加價值之一就是提供豐富的操做符。包括基礎的轉換操做, 到過濾操做,甚至複雜的編排和錯誤處理操做。

3.3.4. subscribe() 以前什麼都不會發生

在 Reactor 中,當你建立了一條 Publisher 處理鏈,數據還不會開始生成。事實上,你是建立了 一種抽象的對於異步處理流程的描述(從而方便重用和組裝)。

當真正「訂閱(subscrib)」的時候,你須要將 Publisher 關聯到一個 Subscriber 上,而後 纔會觸發整個鏈的流動。這時候,Subscriber 會向上遊發送一個 request 信號,一直到達源頭 的 Publisher

3.3.5. 背壓()

向上遊傳遞信號這一點也被用於實現 背壓 ,就像在裝配線上,某個工位的處理速度若是慢於流水線 速度,會對上游發送反饋信號同樣。

在響應式流規範中實際定義的機制同剛纔的類比很是接近:訂閱者能夠無限接受數據並讓它的源頭 「滿負荷」推送全部的數據,也能夠經過使用 request 機制來告知源頭它一次最多可以處理 n 個元素。

中間環節的操做也能夠影響 request。想象一個可以將每10個元素分批打包的緩存(buffer)操做。 若是訂閱者請求一個元素,那麼對於源頭來講能夠生成10個元素。此外預取策略也能夠使用了, 好比在訂閱前預先生成元素。

這樣可以將「推送」模式轉換爲「推送+拉取」混合的模式,若是下游準備好了,能夠從上游拉取 n 個元素;可是若是上游元素尚未準備好,下游仍是要等待上游的推送。

3.3.6. 熱(Hot) vs 冷(Cold)

在 Rx 家族的響應式庫中,響應式流分爲「熱」和「冷」兩種類型,區別主要在於響應式流如何 對訂閱者進行響應:

  • 一個「冷」的序列,指對於每個 Subscriber,都會收到從頭開始全部的數據。若是源頭 生成了一個 HTTP 請求,對於每個訂閱都會建立一個新的 HTTP 請求。
  • 一個「熱」的序列,指對於一個 Subscriber,只能獲取從它開始 訂閱 以後 發出的數據。不過注意,有些「熱」的響應式流能夠緩存部分或所有歷史數據。 一般意義上來講,一個「熱」的響應式流,甚至在即便沒有訂閱者接收數據的狀況下,也能夠 發出數據(這一點同 「Subscribe() 以前什麼都不會發生」的規則有衝突)。

更多關於 Reactor 中「熱」vs「冷」的內容,請參考 this reactor-specific section

翻譯建議 - "響應式編程"

4. Reactor 核心特性

Reactor 項目的主要 artifact 是 reactor-core,這是一個基於 Java 8 的實現了響應式流規範 (Reactive Streams specification)的響應式庫。

Reactor 引入了實現 Publisher 的響應式類 FluxMono,以及豐富的操做方式。 一個 Flux 對象表明一個包含 0..N 個元素的響應式序列,而一個 Mono 對象表明一個包含 零/一個(0..1)元素的結果。

這種區別爲這倆類型帶來了語義上的信息——代表了異步處理邏輯所面對的元素基數。好比, 一個 HTTP 請求產生一個響應,因此對其進行 count 操做是沒有多大意義的。表示這樣一個 結果的話,應該用 Mono<HttpResponse> 而不是 Flux<HttpResponse>,由於要置於其上的 操做一般只用於處理 0/1 個元素。

有些操做能夠改變基數,從而須要切換類型。好比,count 操做用於 Flux,可是操做 返回的結果是 Mono<Long>

4.1. Flux, 包含 0-N 個元素的異步序列

Flux

Flux<T> 是一個可以發出 0 到 N 個元素的標準的 Publisher<T>,它會被一個「錯誤(error)」 或「完成(completion)」信號終止。所以,一個 flux 的可能結果是一個 value、completion 或 error。 就像在響應式流規範中規定的那樣,這三種類型的信號被翻譯爲面向下游的 onNextonCompleteonError方法。

因爲多種不一樣的信號可能性,Flux 能夠做爲一種通用的響應式類型。注意,全部的信號事件, 包括表明終止的信號事件都是可選的:若是沒有 onNext 事件可是有一個 onComplete 事件, 那麼發出的就是 空的 有限序列,可是去掉 onComplete 那麼獲得的就是一個 無限的 空序列。 固然,無限序列也能夠不是空序列,好比,Flux.interval(Duration) 生成的是一個 Flux<Long>, 這就是一個無限地週期性發出規律 tick 的時鐘序列。

4.2. Mono, 異步的 0-1 結果

Mono

Mono<T> 是一種特殊的 Publisher<T>, 它最多發出一個元素,而後終止於一個 onComplete 信號或一個 onError 信號。

它只適用其中一部分可用於 Flux 的操做。好比,(兩個 Mono 的)結合類操做能夠忽略其中之一 而發出另外一個 Mono,也能夠將兩個都發出,對於後一種狀況會切換爲一個 Flux

例如,Mono#concatWith(Publisher) 返回一個 Flux,而 Mono#then(Mono) 返回另外一個 Mono

注意,Mono 能夠用於表示「空」的只有完成概念的異步處理(好比 Runnable)。這種用 Mono<Void> 來建立。

4.3. 簡單的建立和訂閱 Flux 或 Mono 的方法

最簡單的上手 FluxMono 的方式就是使用相應類提供的多種工廠方法之一。

好比,若是要建立一個 String 的序列,你能夠直接列舉它們,或者將它們放到一個集合裏而後用來建立 Flux,以下:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

工廠方法的其餘例子以下:

Mono<String> noData = Mono.empty(); 

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
注意,即便沒有值,工廠方法仍然採用通用的返回類型。
第一個參數是 range 的開始,第二個參數是要生成的元素個數。

在訂閱(subscribe)的時候,FluxMono 使用 Java 8 lambda 表達式。 .subscribe() 方法有多種不一樣的方法簽名,你能夠傳入各類不一樣的 lambda 形式的參數來定義回調。以下所示:

基於 lambda 的對 Flux 的訂閱(subscribe)

subscribe(); 

subscribe(Consumer<? super T> consumer); 

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);
訂閱並觸發序列。
對每個生成的元素進行消費。
對正常元素進行消費,也對錯誤進行響應。
對正常元素和錯誤均有響應,還定義了序列正常完成後的回調。
對正常元素、錯誤和完成信號均有響應, 同時也定義了對該 subscribe 方法返回的 Subscription 執行的回調。
以上方法會返回一個 Subscription 的引用,若是再也不須要更多元素你能夠經過它來取消訂閱。 取消訂閱時, 源頭會中止生成新的數據,並清理相關資源。取消和清理的操做在 Reactor 中是在 接口 Disposable 中定義的。

4.3.1. subscribe 方法示例

這一小節包含了對 subscribe 的5個不一樣簽名的方法的示例,以下是一個無參的基本方法的使用:

Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe();
配置一個在訂閱時會產生3個值的 Flux
最簡單的訂閱方式。

第二行代碼沒有任何輸出,可是它確實執行了。Flux 產生了3個值。若是咱們傳入一個 lambda, 咱們就能夠看到這幾個值,以下一個列子:

Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe(i -> System.out.println(i));
配置一個在訂閱時會產生3個值的 Flux
訂閱它並打印值。

第二行代碼會輸入以下內容:

1
2
3

爲了演示下一個方法簽名,咱們故意引入一個錯誤,以下所示:

Flux<Integer> ints = Flux.range(1, 4) 
      .map(i -> { 
        if (i <= 3) return i; 
        throw new RuntimeException("Got to 4"); 
      });
ints.subscribe(i -> System.out.println(i), 
      error -> System.err.println("Error: " + error));
配置一個在訂閱時會產生4個值的 Flux
爲了對元素進行處理,咱們須要一個 map 操做。
對於多數元素,返回值自己。
對其中一個元素拋出錯誤。
訂閱的時候定義如何進行錯誤處理。

如今咱們有兩個 lambda 表達式:一個是用來處理正常數據,一個用來處理錯誤。 剛纔的代碼輸出以下:

1
2
3
Error: java.lang.RuntimeException: Got to 4

下一個 subscribe 方法的簽名既有錯誤處理,還有一個完成後的處理,以下:

Flux<Integer> ints = Flux.range(1, 4); 
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");});
配置一個在訂閱時會產生4個值的 Flux
訂閱時定義錯誤和完成信號的處理。

錯誤和完成信號都是終止信號,而且兩者只會出現其中之一。爲了可以最終所有正常完成,你必須處理錯誤信號。

用於處理完成信號的 lambda 是一對空的括號,由於它實際上匹配的是 Runnalbe 接口中的 run 方法, 不接受參數。剛纔的代碼輸出以下:

1
2
3
4
Done

最後一個 subscribe 方法簽名包含一個自定義的 subscriber(下一節會介紹到):

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");},
    s -> ss.request(10));
ints.subscribe(ss);

上面這個例子中,咱們把一個自定義的 Subscriber 做爲 subscribe 方法的最後一個參數。 下邊的例子是這個自定義的 Subscriber,這是一個對 Subscriber 的最簡單實現:

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

        public void hookOnSubscribe(Subscription subscription) {
                System.out.println("Subscribed");
                request(1);
        }

        public void hookOnNext(T value) {
                System.out.println(value);
                request(1);
        }
}

SampleSubscriber 類繼承自 BaseSubscriber,在 Reactor 中, 推薦用戶擴展它來實現自定義的 Subscriber。這個類提供了一些 hook 方法,咱們能夠經過重寫它們來調整 subscriber 的行爲。 默認狀況下,它會觸發一個無限個數的請求,可是當你想自定義請求元素的個數的時候,擴展 BaseSubscriber 就很方便了。

擴展的時候一般至少要覆蓋 hookOnSubscribe(Subscription subscription)hookOnNext(T value) 這兩個方法。這個例子中, hookOnSubscribe 方法打印一段話到標準輸出,而後進行第一次請求。 而後 hookOnNext 一樣進行了打印,同時逐個處理剩餘請求。

SampleSubscriber 輸出以下:

Subscribed
1
2
3
4
建議你同時重寫 hookOnErrorhookOnCancel,以及 hookOnComplete 方法。 你最好也重寫 hookFinally 方法。SampleSubscribe 確實是一個最簡單的實現了 請求有限個數元素的 Subscriber

本文檔後邊還會再討論 BaseSubscriber

響應式流規範定義了另外一個 subscribe 方法的簽名,它只接收一個自定義的 Subscriber, 沒有其餘的參數,以下所示:

subscribe(Subscriber<? super T> subscriber);

若是你已經有一個 Subscriber,那麼這個方法簽名仍是挺有用的。何況,你可能還會用到它 來作一些訂閱相關(subscription-related)的回調。好比,你想要自定義「背壓(backpressure)」 而且本身來觸發請求。

在這種狀況下,使用 BaseSubscriber 抽象類就很方便,由於它提供了很好的配置「背壓」 的方法。

使用 BaseSubscriber 來配置「背壓」

Flux<String> source = someStringSource();

source.map(String::toUpperCase)
      .subscribe(new BaseSubscriber<String>() { 
          @Override
          protected void hookOnSubscribe(Subscription subscription) {
              
              request(1); 
          }

          @Override
          protected void hookOnNext(String value) {
              request(1); 
          }

          
      });
BaseSubscriber 是一個抽象類,因此咱們建立一個匿名內部類。
BaseSubscriber 定義了多種用於處理不一樣信號的 hook。它還定義了一些捕獲 Subscription 對象的現成方法,這些方法能夠用在 hook 中。
request(n) 就是這樣一個方法。它可以在任何 hook 中,經過 subscription 向上遊傳遞 背壓請求。這裏咱們在開始這個流的時候請求1個元素值。
隨着接收到新的值,咱們繼續以每次請求一個元素的節奏從源頭請求值。
其餘 hooks 有 hookOnComplete, hookOnError, hookOnCancel, and hookFinally (它會在流終止的時候被調用,傳入一個 SignalType 做爲參數)。
當你修改請求操做的時候,你必須注意讓 subscriber 向上提出足夠的需求, 不然上游的 Flux 可能會被「卡住」。因此 BaseSubscriber 在進行擴展的時候要覆蓋 hookOnSubscribeonNext,這樣你至少會調用 request 一次。

BaseSubscriber 還提供了 requestUnbounded() 方法來切換到「無限」模式(等同於 request(Long.MAX_VALUE))。

4.4. 可編程式地建立一個序列

在這一小節,咱們介紹如何經過定義相對應的事件(onNextonErroronComplete) 建立一個 FluxMono。全部這些方法都經過 API 來觸發咱們叫作 sink(池) 的事件。 sink 的類型很少,咱們快速過一下。

4.4.1. Generate

最簡單的建立 Flux 的方式就是使用 generate 方法。

這是一種 同步地逐個地 產生值的方法,意味着 sink 是一個 SynchronousSink 並且其 next() 方法在每次回調的時候最多隻能被調用一次。你也能夠調用 error(Throwable) 或者 complete(),不過是可選的。

最有用的一種方式就是同時可以記錄一個狀態值(state),從而在使用 sink 發出下一個元素的時候可以 基於這個狀態值去產生元素。此時生成器(generator)方法就是一個 BiFunction<S, SynchronousSink<T>, S>, 其中 <S> 是狀態對象的類型。你須要提供一個 Supplier<S> 來初始化狀態值,而生成器須要 在每一「回合」生成元素後返回新的狀態值(供下一回合使用)。

例如咱們使用一個 int 做爲狀態值。

基於狀態值的 generate 示例

Flux<String> flux = Flux.generate(
    () -> 0, 
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); 
      if (state == 10) sink.complete(); 
      return state + 1; 
    });
初始化狀態值(state)爲0。
咱們基於狀態值 state 來生成下一個值(state 乘以 3)。
咱們也能夠用狀態值來決定何時終止序列。
返回一個新的狀態值 state,用於下一次調用。

上面的代碼生成了「3 x」的乘法表:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

咱們也能夠使用可變(mutable)類型(譯者注:如上例,原生類型及其包裝類,以及String等屬於不可變類型) 的 <S>。上邊的例子也能夠用 AtomicLong 做爲狀態值,在每次生成後改變它的值。

可變類型的狀態變量

Flux<String> flux = Flux.generate(
    AtomicLong::new, 
    (state, sink) -> {
      long i = state.getAndIncrement(); 
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; 
    });
此次咱們初始化一個可變類型的狀態值。
改變狀態值。
返回 同一個 實例做爲新的狀態值。
若是狀態對象須要清理資源,能夠使用 generate(Supplier<S>, BiFunction, Consumer<S>) 這個簽名方法來清理狀態對象(譯者注:Comsumer 在序列終止才被調用)。

下面是一個在 generate 方法中增長 Consumer 的例子:

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { 
      long i = state.getAndIncrement(); 
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; 
    }, (state) -> System.out.println("state: " + state)); 
}
一樣,初始化一個可變對象做爲狀態變量。
改變狀態。
返回 同一個 實例做爲新的狀態。
咱們會看到最後一個狀態值(11)會被這個 Consumer lambda 輸出。

若是 state 使用了數據庫鏈接或者其餘須要最終進行清理的資源,這個 Consumer lambda 能夠用來在最後關閉鏈接或完成相關的其餘清理任務。

4.4.2. Create

做爲一個更高級的建立 Flux 的方式, create 方法的生成方式既能夠是同步, 也能夠是異步的,而且還能夠每次發出多個元素。

該方法用到了 FluxSink,後者一樣提供 nexterrorcomplete 等方法。 與 generate 不一樣的是,create 不須要狀態值,另外一方面,它能夠在回調中觸發 多個事件(即便是在將來的某個時間)。

create 有個好處就是能夠將現有的 API 轉爲響應式,好比監聽器的異步方法。

假設你有一個監聽器 API,它按 chunk 處理數據,有兩種事件:(1)一個 chunk 數據準備好的事件;(2)處理結束的事件。以下:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}

你能夠使用 create 方法將其轉化爲響應式類型 Flux<T>

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});
橋接 MyEventListener
每個 chunk 的數據轉化爲 Flux 中的一個元素。
processComplete 事件轉換爲 onComplete
全部這些都是在 myEventProcessor 執行時異步執行的。

此外,既然 create 能夠是異步地,而且可以控制背壓,你能夠經過提供一個 OverflowStrategy 來定義背壓行爲。

  • IGNORE: 徹底忽略下游背壓請求,這可能會在下游隊列積滿的時候致使 IllegalStateException
  • ERROR: 當下遊跟不上節奏的時候發出一個 IllegalStateException 的錯誤信號。
  • DROP:當下遊沒有準備好接收新的元素的時候拋棄這個元素。
  • LATEST:讓下游只獲得上游最新的元素。
  • BUFFER:(默認的)緩存全部下游沒有來得及處理的元素(這個不限大小的緩存可能致使 OutOfMemoryError)。
Mono 也有一個用於 create 的生成器(generator)—— MonoSink,它不能生成多個元素, 所以會拋棄第一個元素以後的全部元素。
推送(push)模式

create 的一個變體是 push,適合生成事件流。與 create相似,push 也能夠是異步地, 而且可以使用以上各類溢出策略(overflow strategies)管理背壓。每次只有一個生成線程能夠調用 nextcompleteerror

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }

        public void processError(Throwable e) {
            sink.error(e); 
        }
    });
});
橋接 SingleThreadEventListener API。
在監聽器所在線程中,事件經過調用 next 被推送到 sink。
complete 事件也在同一個線程中。
error 事件也在同一個線程中。
推送/拉取(push/pull)混合模式

不像 pushcreate 能夠用於 pushpull 模式,所以適合橋接監聽器的 的 API,由於事件消息會隨時異步地到來。回調方法 onRequest 能夠被註冊到 FluxSink 以便跟蹤請求。這個回調能夠被用於從源頭請求更多數據,或者經過在下游請求到來 的時候傳遞數據給 sink 以實現背壓管理。這是一種推送/拉取混合的模式, 由於下游能夠從上游拉取已經就緒的數據,上游也能夠在數據就緒的時候將其推送到下游。

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); 
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.request(n); 
        for(String s : message) {
           sink.next(s); 
        }
    });
當有請求的時候取出一個 message。
若是有就緒的 message,就發送到 sink。
後續異步到達的 message 也會被髮送給 sink。
清理(Cleaning up)

onDisposeonCancel 這兩個回調用於在被取消和終止後進行清理工做。 onDispose 可用於在 Flux 完成,有錯誤出現或被取消的時候執行清理。 onCancel 只用於針對「取消」信號執行相關操做,會先於 onDispose 執行。

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });
onCancel 在取消時被調用。
onDispose 在有完成、錯誤和取消時被調用。

4.4.3. Handle

handle 方法有些不一樣,它在 MonoFlux 中都有。然而,它是一個實例方法 (instance method),意思就是它要連接在一個現有的源後使用(與其餘操做符同樣)。

它與 generate 比較相似,由於它也使用 SynchronousSink,而且只容許元素逐個發出。 然而,handle 可被用於基於現有數據源中的元素生成任意值,有可能還會跳過一些元素。 這樣,能夠把它當作 mapfilter 的組合。handle 方法簽名以下:

handle(BiConsumer<T, SynchronousSink<R>>)

舉個例子,響應式流規範容許 null 這樣的值出如今序列中。假如你想執行一個相似 map 的操做,你想利用一個現有的具備映射功能的方法,可是它會返回 null,這時候怎麼辦呢?

例如,下邊的方法能夠用於 Integer 序列,映射爲字母或 null 。

public String alphabet(int letterNumber) {
        if (letterNumber < 1 || letterNumber > 26) {
                return null;
        }
        int letterIndexAscii = 'A' + letterNumber - 1;
        return "" + (char) letterIndexAscii;
}

咱們能夠使用 handle 來去掉其中的 null。

handle 用於一個 "映射 + 過濾 null" 的場景

Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); 
        if (letter != null) 
            sink.next(letter); 
    });

alphabet.subscribe(System.out::println);
映射到字母。
若是返回的是 null …
就不會調用 sink.next 從而過濾掉。

輸出以下:

M
I
T

4.5. 調度器(Schedulers)

Reactor, 就像 RxJava,也能夠被認爲是 併發無關(concurrency agnostic) 的。意思就是, 它並不強制要求任何併發模型。更進一步,它將選擇權交給開發者。不過,它仍是提供了一些方便 進行併發執行的庫。

在 Reactor 中,執行模式以及執行過程取決於所使用的 SchedulerScheduler 是一個擁有普遍實現類的抽象接口。 Schedulers 類提供的靜態方法用於達成以下的執行環境:

  • 當前線程(Schedulers.immediate()
  • 可重用的單線程(Schedulers.single())。注意,這個方法對全部調用者都提供同一個線程來使用, 直到該調度器(Scheduler)被廢棄。若是你想使用專注的線程,就對每個調用使用 Schedulers.newSingle()
  • 彈性線程池(Schedulers.elastic()。它根據須要建立一個線程池,重用空閒線程。線程池若是空閒時間過長 (默認爲 60s)就會被廢棄。對於 I/O 阻塞的場景比較適用。 Schedulers.elastic() 可以方便地給一個阻塞 的任務分配它本身的線程,從而不會妨礙其餘任務和資源,見 如何包裝一個同步阻塞的調用?
  • 固定大小線程池(Schedulers.parallel())。所建立線程池的大小與 CPU 個數等同。

此外,你還能夠使用 Schedulers.fromExecutorService(ExecutorService) 基於現有的 ExecutorService 建立 Scheduler。(雖然不太建議,不過你也能夠使用 Executor 來建立)。你也能夠使用 newXXX 方法來建立不一樣的調度器。好比 Schedulers.newElastic(yourScheduleName) 建立一個新的名爲 yourScheduleName 的彈性調度器。

操做符基於非阻塞算法實現,從而能夠利用到某些調度器的工做竊取(work stealing) 特性的好處。

一些操做符默認會使用一個指定的調度器(一般也容許開發者調整爲其餘調度器)例如, 經過工廠方法 Flux.interval(Duration.ofMillis(300)) 生成的每 300ms 打點一次的 Flux<Long>, 默認狀況下使用的是 Schedulers.parallel(),下邊的代碼演示瞭如何將其裝換爲 Schedulers.single()

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor 提供了兩種在響應式鏈中調整調度器 Scheduler 的方法:publishOnsubscribeOn。 它們都接受一個 Scheduler 做爲參數,從而能夠改變調度器。可是 publishOn 在鏈中出現的位置 是有講究的,而 subscribeOn 則無所謂。要理解它們的不一樣,你首先要理解 nothing happens until you subscribe()

在 Reactor 中,當你在操做鏈上添加操做符的時候,你能夠根據須要在 FluxMono 的實現中包裝其餘的 FluxMono。一旦你訂閱(subscribe)了它,一個 Subscriber 的鏈 就被建立了,一直向上到第一個 publisher 。這些對開發者是不可見的,開發者所能看到的是最外一層的 Flux (或 Mono)和 Subscription,可是具體的任務是在中間這些跟操做符相關的 subscriber 上處理的。

基於此,咱們仔細研究一下 publishOnsubscribeOn 這兩個操做符:

  • publishOn 的用法和處於訂閱鏈(subscriber chain)中的其餘操做符同樣。它將上游 信號傳給下游,同時執行指定的調度器 Scheduler 的某個工做線程上的回調。 它會 改變後續的操做符的執行所在線程 (直到下一個 publishOn 出如今這個鏈上)。
  • subscribeOn 用於訂閱(subscription)過程,做用於那個向上的訂閱鏈(發佈者在被訂閱 時才激活,訂閱的傳遞方向是向上遊的)。因此,不管你把 subscribeOn 至於操做鏈的什麼位置, 它都會影響到源頭的線程執行環境(context)。 可是,它不會影響到後續的 publishOn,後者仍可以切換其後操做符的線程執行環境。
只有操做鏈中最先的 subscribeOn 調用纔算數。

4.6. 線程模型

FluxMono 不會建立線程。一些操做符,好比 publishOn,會建立線程。同時,做爲一種任務共享形式, 這些操做符可能會從其餘任務池(work pool)——若是其餘任務池是空閒的話——那裏「偷」線程。所以, 不管是 FluxMono 仍是 Subscriber 都應該精於線程處理。它們依賴這些操做符來管理線程和任務池。

publishOn 強制下一個操做符(極可能包括下一個的下一個…)來運行在一個不一樣的線程上。 相似的,subscribeOn 強制上一個操做符(極可能包括上一個的上一個…)來運行在一個不一樣的線程上。 記住,在你訂閱(subscribe)前,你只是定義了處理流程,而沒有啓動發佈者。基於此,Reactor 能夠使用這些規則來決定如何執行操做鏈。而後,一旦你訂閱了,整個流程就開始工做了。

下邊的例子演示了支持任務共享的多線程模型:

Flux.range(1, 10000) 
    .publishOn(Schedulers.parallel()) 
    .subscribe(result)
建立一個有 10,000 個元素的 Flux
建立等同於 CPU 個數的線程(最小爲4)。
subscribe() 以前什麼都不會發生

Scheduler.parallel() 建立一個基於單線程 ExecutorService 的固定大小的任務線程池。 由於可能會有一個或兩個線程致使問題,它老是至少建立 4 個線程。而後 publishOn 方法便共享了這些任務線程, 當 publishOn 請求元素的時候,會從任一個正在發出元素的線程那裏獲取元素。這樣, 就是進行了任務共享(一種資源共享方式)。Reactor 還提供了好幾種共享資源的方式,請參考 Schedulers

Scheduler.elastic() 也能建立線程,它可以很方便地建立專門的線程(以便跑一些可能會阻塞資源的任務, 好比一個同步服務),請見 如何包裝一個同步阻塞的調用?

內部機制保證了這些操做符可以藉助自增計數器(incremental counters)和警惕條件(guard conditions) 以線程安全的方式工做。例如,若是咱們有四個線程處理一個流(就像上邊的例子),每個請求會讓計數器自增, 這樣後續的來自不一樣線程的請求就能拿到正確的元素。

4.7. 處理錯誤

若是想了解有哪些可用於錯誤處理的操做符,請參考 the relevant operator decision tree

在響應式流中,錯誤(error)是終止(terminal)事件。當有錯誤發生時,它會致使流序列中止, 而且錯誤信號會沿着操做鏈條向下傳遞,直至遇到你定義的 Subscriber 及其 onError 方法。

這樣的錯誤仍是應該在應用層面解決的。好比,你可能會將錯誤信息顯示在用戶界面,或者經過某個 REST 端點(endpoint)發出。所以,訂閱者(subscriber)的 onError 方法是應該定義的。

若是沒有定義,onError 會拋出 UnsupportedOperationException。你能夠接下來再 檢測錯誤,並經過 Exceptions.isErrorCallbackNotImplemented 方法捕獲和處理它。

Reactor 還提供了其餘的用於在鏈中處理錯誤的方法,即錯誤處理操做(error-handling operators)。

在你瞭解錯誤處理操做符以前,你必須牢記 響應式流中的任何錯誤都是一個終止事件。 即便用了錯誤處理操做符,也不會讓源頭流序列繼續。而是將 onError 信號轉化爲一個 新的 序列 的開始。換句話說,它代替了被終結的 上游 流序列。

如今咱們來逐個看看錯誤處理的方法。須要的時候咱們會同時用到命令式編程風格的 try 代碼塊來做比較。

4.7.1. 「錯誤處理」方法

你也許熟悉在 try-catch 代碼塊中處理異常的幾種方法。常見的包括以下幾種:

  1. 捕獲並返回一個靜態的缺省值。
  2. 捕獲並執行一個異常處理方法。
  3. 捕獲並動態計算一個候補值來頂替。
  4. 捕獲,並再包裝爲某一個 業務相關的異常,而後再拋出業務異常。
  5. 捕獲,記錄錯誤日誌,而後繼續拋出。
  6. 使用 finally 來清理資源,或使用 Java 7 引入的 "try-with-resource"。

以上全部這些在 Reactor 都有相應的基於 error-handling 操做符處理方式。

在開始研究這些操做符以前,咱們先準備好響應式鏈(reactive chain)方式和 try-catch 代碼塊方式(以便對比)。

當訂閱的時候,位於鏈結尾的 onError 回調方法和 catch 塊相似,一旦有異常,執行過程會跳入到 catch:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) 
    .map(v -> doSecondTransform(v)); 
s.subscribe(value -> System.out.println("RECEIVED " + value), 
            error -> System.err.println("CAUGHT " + error) 
);
執行 map 轉換,有可能拋出異常。
若是沒問題,執行第二個 map 轉換操做。
全部轉換成功的值都打印出來。
一旦有錯誤,序列(sequence)終止,並打印錯誤信息。

這與 try/catch 代碼塊是相似的:

try {
    for (int i = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); 
        String v2 = doSecondTransform(v1); 
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); 
}
若是這裏拋出異常…
…後續的代碼跳過…
…執行過程直接到這。

既然咱們準備了兩種方式作對比,咱們就來看一下不一樣的錯誤處理場景,以及相應的操做符。

靜態缺省值

與第 (1) 條(捕獲並返回一個靜態的缺省值)對應的是 onErrorReturn

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");

你還能夠經過判斷錯誤信息的內容,來篩選哪些要給出缺省值,哪些仍然讓錯誤繼續傳遞下去:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
異常處理方法

若是你不僅是想要在發生錯誤的時候給出缺省值,而是但願提供一種更安全的處理數據的方式, 能夠使用 onErrorResume。這與第 (2) 條(捕獲並執行一個異常處理方法)相似。

假設,你會嘗試從一個外部的不穩定服務獲取數據,但仍然會在本地緩存一份 可能 有些過時的數據, 由於緩存的讀取更加可靠。能夠這樣來作:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k)) 
    .onErrorResume(e -> getFromCache(k));
對於每個 key, 異步地調用一個外部服務。
若是對外部服務的調用失敗,則再去緩存中查找該 key。注意,這裏不管 e 是什麼,都會執行異常處理方法。

就像 onErrorReturnonErrorResume 也有能夠用於預先過濾錯誤內容的方法變體,能夠基於異常類或 Predicate 進行過濾。它其實是用一個 Function 來做爲參數,還能夠返回一個新的流序列。

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(error -> { 
        if (error instanceof TimeoutException) 
            return getFromCache(k);
        else if (error instanceof UnknownKeyException)  
            return registerNewEntry(k, "DEFAULT");
        else
            return Flux.error(error); 
    });
這個函數式容許開發者自行決定如何處理。
若是源超時,使用本地緩存。
若是源找不到對應的 key,建立一個新的實體。
不然, 將問題「從新拋出」。
動態候補值

有時候並不想提供一個錯誤處理方法,而是想在接收到錯誤的時候計算一個候補的值。這相似於第 (3) 條(捕獲並動態計算一個候補值)。

例如,若是你的返回類型自己就有可能包裝有異常(好比 Future.complete(T success) vs Future.completeExceptionally(Throwable error)),你有可能使用流中的錯誤包裝起來實例化 返回值。

這也能夠使用上一種錯誤處理方法的方式(使用 onErrorResume)解決,代碼以下:

erroringFlux.onErrorResume(error -> Mono.just( 
        myWrapper.fromError(error) 
));
onErrorResume 中,使用 Mono.just 建立一個 Mono
將異常包裝到另外一個類中。
捕獲並從新拋出

在「錯誤處理方法」的例子中,基於 flatMap 方法的最後一行,咱們能夠猜到如何作到第 (4) 條(捕獲,包裝到一個業務相關的異常,而後拋出業務異常):

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
        new BusinessException("oops, SLA exceeded", original)
    );

然而還有一個更加直接的方法—— onErrorMap

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
記錄錯誤日誌

若是對於錯誤你只是想在不改變它的狀況下作出響應(如記錄日誌),並讓錯誤繼續傳遞下去, 那麼能夠用 doOnError 方法。這對應第 (5) 條(捕獲,記錄錯誤日誌,並繼續拋出)。 這個方法與其餘以 doOn 開頭的方法同樣,只起反作用("side-effect")。它們對序列都是隻讀, 而不會帶來任何改動。

以下邊的例子所示,咱們會記錄錯誤日誌,而且還經過變量自增統計錯誤發生個數。

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k)) 
    .doOnError(e -> {
        failureStat.increment();
        log("uh oh, falling back, service failed for key " + k); 
    })
    .onErrorResume(e -> getFromCache(k));
對外部服務的調用失敗…
…記錄錯誤日誌…
…而後回調錯誤處理方法。
使用資源和 try-catch 代碼塊

最後一個要與命令式編程對應的對比就是使用 Java 7 "try-with-resources" 或 finally 代碼塊清理資源。這是第 (6) 條(使用 finally 代碼塊清理資源或使用 Java 7 引入的 "try-with-resource")。在 Reactor 中都有對應的方法: usingdoFinally

AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); 
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

Flux<String> flux =
Flux.using(
        () -> disposableInstance, 
        disposable -> Flux.just(disposable.toString()), 
        Disposable::dispose 
);
第一個 lambda 生成資源,這裏咱們返回模擬的(mock) Disposable
第二個 lambda 處理資源,返回一個 Flux<T>
第三個 lambda 在 2) 中的資源 Flux 終止或取消的時候,用於清理資源。
在訂閱或執行流序列以後, isDisposed 會置爲 true

另外一方面, doFinally 在序列終止(不管是 onCompleteonError仍是取消)的時候被執行, 而且可以判斷是什麼類型的終止事件(完成、錯誤仍是取消?)。

LongAdder statsCancel = new LongAdder(); 

Flux<String> flux =
Flux.just("foo", "bar")
    .doFinally(type -> {
        if (type == SignalType.CANCEL) 
          statsCancel.increment(); 
    })
    .take(1);
咱們想進行統計,因此用到了 LongAdder
doFinallySignalType 檢查了終止信號的類型。
若是隻是取消,那麼統計數據自增。
take(1) 可以在發出 1 個元素後取消流。
演示終止方法 onError

爲了演示當錯誤出現的時候如何致使上游序列終止,咱們使用 Flux.interval 構造一個更加直觀的例子。 這個 interval 操做符會在每 x 單位的時間發出一個自增的 Long 值。

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100);
注意 interval 默認基於一個 timer Scheduler 來執行。 若是咱們想在 main 方法中運行, 咱們須要調用 sleep,這樣程序就能夠在尚未產生任何值的時候就退出了。

每 250ms 打印出一行信息,以下:

tick 0
tick 1
tick 2
Uh oh

即便多給了 1 秒鐘時間,也沒有更多的 tick 信號由 interval 產生了,因此序列確實被錯誤信號終止了。

重試

還有一個用於錯誤處理的操做符你可能會用到,就是 retry,見文知意,用它能夠對出現錯誤的序列進行重試。

問題是它對於上游 Flux 是基於重訂閱(re-subscribing)的方式。這實際上已經一個不一樣的序列了, 發出錯誤信號的序列仍然是終止了的。爲了驗證這一點,咱們能夠在繼續用上邊的例子,增長一個 retry(1) 代替 onErrorReturn 來重試一次。

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .elapsed() 
    .retry(1)
    .subscribe(System.out::println, System.err::println); 

Thread.sleep(2100);
elapsed 會關聯從當前值與上個值發出的時間間隔(譯者加:以下邊輸出的內容中的 259/249/251…)。
咱們仍是要看一下 onError 時的內容。
確保咱們有足夠的時間能夠進行 4x2 次 tick。

輸出以下:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
一個新的 interval 從 tick 0 開始。多出來的 250ms 間隔來自於第 4 次 tick, 就是致使出現異常並執行 retry 的那次(譯者加:我在機器上測試的時候 elapsed 「顯示」的時間間隔沒有加倍,可是確實有第 4 次的間隔)。

可見, retry(1) 不過是再一次重新訂閱了原始的 interval,從 tick 0 開始。第二次, 因爲異常再次出現,便將異常傳遞到下游了。

還有一個「高配版」的 retryretryWhen),它使用一個伴隨("companion") Flux 來判斷對某次錯誤是否要重試。這個伴隨 Flux 是由操做符建立的,可是由開發者包裝它, 從而實現對重試操做的配置。

這個伴隨 Flux 是一個 Flux<Throwable>,它做爲 retryWhen 的惟一參數被傳遞給一個 Function,你能夠定義這個 Function 並讓它返回一個新的 Publisher<?>。重試的循環 會這樣運行:

  1. 每次出現錯誤,錯誤信號會發送給伴隨 Flux,後者已經被你用 Function 包裝。
  2. 若是伴隨 Flux 發出元素,就會觸發重試。
  3. 若是伴隨 Flux 完成(complete),重試循環也會中止,而且原始序列也會 完成(complete)
  4. 若是伴隨 Flux 產生一個錯誤,重試循環中止,原始序列也中止 完成,而且這個錯誤會致使 原始序列失敗並終止。

瞭解前兩個場景的區別是很重要的。若是讓伴隨 Flux 完成(complete)等於吞掉了錯誤。以下代碼用 retryWhen 模仿了 retry(3) 的效果:

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) 
    .doOnError(System.out::println) 
    .retryWhen(companion -> companion.take(3));
持續產生錯誤。
在 retry 以前doOnError 可讓咱們看到錯誤。
這裏,咱們認爲前 3 個錯誤是能夠重試的(take(3)),再有錯誤就放棄。

事實上,上邊例子最終獲得的是一個 空的 Flux,可是卻 成功 完成了。反觀對同一個 Flux 調用 retry(3) 的話,最終是以最後一個 error 終止 Flux,故而 retryWhen 與之不一樣。

實現一樣的效果須要一些額外的技巧:

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), 
          (error, index) -> { 
            if (index < 4) return index; 
            else throw Exceptions.propagate(error); 
          })
    );
技巧一:使用 zip 和一個「重試個數 + 1」的 range
zip 方法讓你能夠在對重試次數計數的同時,仍掌握着原始的錯誤(error)。
容許三次重試,小於 4 的時候發出一個值。
爲了使序列以錯誤結束。咱們將原始異常在三次重試以後拋出。
相似的代碼也能夠被用於實現 exponential backoff and retry 模式 (譯者加:重試指定的次數, 且每一次重試之間停頓的時間逐漸增長),參考 FAQ

4.7.2. 在操做符或函數式中處理異常

整體來講,全部的操做符自身均可能包含觸發異常的代碼,或自定義的可能致使失敗的代碼, 因此它們都自帶一些錯誤處理方式。

通常來講,一個 不受檢異常(Unchecked Exception) 老是由 onError 傳遞。例如, 在一個 map 方法中拋出 RuntimeException 會被翻譯爲一個 onError 事件,以下:

Flux.just("foo")
    .map(s -> { throw new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));

上邊代碼輸出以下:

ERROR: java.lang.IllegalArgumentException: foo
Exception 能夠在其被傳遞給 onError 以前,使用 hook 進行調整。

Reactor,定義了一系列的可以致使「嚴重失敗」的錯誤(好比 OutOfMemoryError),也可參考 Exceptions.throwIfFatal 方法。這些錯誤意味着 Reactor 無力處理只能拋出,沒法傳遞下去。

還有些狀況下不受檢異常仍然沒法傳遞下去(多數處於subscribe 和 request 階段), 由於可能因爲多線程競爭致使兩次 onErroronComplete 的狀況。當這種競爭發生的時候, 沒法傳遞下去的錯誤信號就被「丟棄」了。這些狀況仍然能夠經過自定義的 hook 來搞定,見 丟棄事件的 Hooks

你可能會問:「那麼 受檢查異常(Checked Exceptions)?」

若是你須要調用一個聲明爲 throws 異常的方法,你仍然須要使用 try-catch 代碼塊處理異常。 有幾種方式:

  1. 捕獲異常,並修復它,流序列正常繼續。
  2. 捕獲異常,並把它包裝(wrap)到一個 不受檢異常 中,而後拋出(中斷序列)。工具類 Exceptions 可用於這種方式(咱們立刻會講到)。
  3. 若是你氣我返回一個 Flux (例如在 flatMap 中),將異常包裝在一個產生錯誤的 Flux中: return Flux.error(checkedException)(流序列也會終止)。

Reactor 有一個工具類 Exceptions,能夠確保在收到受檢異常的時候將其包裝(wrap)起來。

  • 若是須要,能夠使用 Exceptions.propagate 方法來包裝異常,它一樣會首先調用 throwIfFatal, 而且不會包裝 RuntimeException
  • 使用 Exceptions.unwrap 方法來獲得原始的未包裝的異常(追溯最初的異常)。

下面是一個 map 的例子,它使用的 convert 方法會拋出 IOException

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}

如今想象你將這個方法用於一個 map 中,你必須明確捕獲這個異常,而且你的 map 方法不能再次拋出它。 因此你能夠將其以 RuntimeException 的形式傳遞給 onError

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

當後邊訂閱上邊的這個 Flux 並響應錯誤(好比在用戶界面)的時候,若是你想處理 IOException, 你還能夠再將其轉換爲原始的異常。以下:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

4.8. Processors

Processors 既是一種特別的發佈者(Publisher)又是一種訂閱者(Subscriber)。 那意味着你能夠 訂閱一個 Processor(一般它們會實現 Flux),也能夠調用相關方法來手動 插入數據到序列,或終止序列。

Processor 有多種類型,它們都有特別的語義規則,可是在你研究它們以前,最好問一下 本身以下幾個問題:

4.8.1. 我是否須要使用 Processor?

多數狀況下,你應該進行避免使用 Processor,它們較難正確使用,主要用於一些特殊場景下。

若是你以爲 Processor 適合你的使用場景,請首先看一下是否嘗試過如下兩種替代方式:

  1. 是否有一個或多個操做符的組合可以知足需求?(見 我須要哪一個操做符?
  2. "generator" 操做符是否能解決問題?(一般這些操做符 能夠用來橋接非響應式的 API,它們提供了一個「sink」,在生成數據流序列方面, 概念上相似於 Processor

若是看了以上替代方案,你仍然以爲須要一個 Processor,閱讀 現有的 Processors 總覽 這一節來了解一下不一樣的實現吧。

4.8.2. 使用 Sink 門面對象來線程安全地生成流

比起直接使用 Reactor 的 Processors,更好的方式是經過調用一次 sink() 來獲得 ProcessorSink

FluxProcessor 的 sink 是線程安全的「生產者(producer)」,所以可以在應用程序中 多線程併發地生成數據。例如,一個線程安全的序列化(serialized)的 sink 可以經過 UnicastProcessor 建立:

UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);

多個生產者線程能夠併發地生成數據到如下的序列化 sink。

sink.next(n);

根據 Processor 及其配置,next 產生的溢出有兩種可能的處理方式:

  • 一個無限的 processor 經過丟棄或緩存自行處理溢出。
  • 一個有限的 processor 阻塞在 IGNORE 策略,或將 overflowStrategy 應用於 sink

4.8.3. 現有的 Processors 總覽

Reactor Core 內置多種 Processor。這些 processor 具備不一樣的語法,大概分爲三類。 下邊簡要介紹一下這三種 processor:

  • 直接的(direct)DirectProcessorUnicastProcessor):這些 processors 只能經過直接 調用 Sink 的方法來推送數據。
  • 同步的(synchronous)EmitterProcessorReplayProcessor):這些 processors 既能夠 直接調用 Sink 方法來推送數據,也能夠經過訂閱到一個上游的發佈者來同步地產生數據。
  • 異步的(asynchronous)WorkQueueProcessorTopicProcessor):這些 processors 能夠將從多個上游發佈者獲得的數據推送下去。因爲使用了 RingBuffer 的數據結構來 緩存多個來自上游的數據,所以更加有健壯性。

異步的 processor 在實例化的時候最複雜,由於有許多不一樣的選項。所以它們暴露出一個 Builder 接口。 而簡單的 processors 有靜態的工廠方法。

DirectProcessor

DirectProcessor 能夠將信號分發給零到多個訂閱者(Subscriber)。它是最容易實例化的,使用靜態方法 create() 便可。另外一方面,它的不足是沒法處理背壓。因此,當 DirectProcessor 推送的是 N 個元素,而至少有一個訂閱者的請求個數少於 N 的時候,就會發出一個 IllegalStateException

一旦 Processor 終止(一般經過調用它的 Sinkerror(Throwable)complete() 方法), 雖然它容許更多的訂閱者訂閱它,可是會當即向它們從新發送終止信號。

UnicastProcessor

UnicastProcessor 能夠使用一個內置的緩存來處理背壓。代價就是它最多隻能有一個訂閱者。

UnicastProcessor 有多種選項,所以提供多種不一樣的 create 靜態方法。例如,它默認是 無限的(unbounded) :若是你在在訂閱者尚未請求數據的狀況下讓它推送數據,它會緩存全部數據。

能夠經過提供一個自定義的 Queue 的具體實現傳遞給 create 工廠方法來改變默認行爲。若是給出的隊列是 有限的(bounded), 而且緩存已滿,並且未收到下游的請求,processor 會拒絕推送數據。

在上邊 有限的 例子中,還能夠在構造 processor 的時候提供一個回調方法,這個回調方法能夠在每個 被拒絕推送的元素上調用,從而讓開發者有機會清理這些元素。

EmitterProcessor

EmitterProcessor 可以向多個訂閱者發送數據,而且能夠對每個訂閱者進行背壓處理。它自己也能夠訂閱一個 Publisher 並同步得到數據。

最初若是沒有訂閱者,它仍然容許推送一些數據到緩存,緩存大小由 bufferSize 定義。 以後若是仍然沒有訂閱者訂閱它並消費數據,對 onNext 的調用會阻塞,直到有訂閱者接入 (這時只能併發地訂閱了)。

所以第一個訂閱者會收到最多 bufferSize 個元素。然而以後, processor 不會從新發送(replay) 數據給後續的訂閱者。這些後續接入的訂閱者只能獲取到它們開始訂閱 以後 推送的數據。這個內部的 緩存會繼續用於背壓的目的。

默認狀況下,若是全部的訂閱者都取消了(基本意味着它們都再也不訂閱(un-subscribed)了), 它會清空內部緩存,而且再也不接受更多的訂閱者。這一點能夠經過 create 靜態工廠方法的 autoCancel 參數來配置。

ReplayProcessor

ReplayProcessor 會緩存直接經過自身的 Sink 推送的元素,以及來自上游發佈者的元素, 而且後來的訂閱者也會收到重發(replay)的這些元素。

能夠經過多種配置方式建立它:

  • 緩存一個元素(cacheLast)。
  • 緩存必定個數的歷史元素(create(int)),全部的歷史元素(create())。
  • 緩存基於時間窗期間內的元素(createTimeout(Duration))。
  • 緩存基於歷史個數和時間窗的元素(createSizeOrTimeout(int, Duration))。
TopicProcessor

TopicProcessor 是一個異步的 processor,它可以重發來自多個上游發佈者的元素, 這須要在建立它的時候配置 shared (見 build()share(boolean) 配置)。

注意,若是你企圖在併發環境下經過併發的上游 Publisher 調用 TopicProcessoronNextonComplete,或 onError 方法,就必須配置 shared。

不然,併發調用就是非法的,從而 processor 是徹底兼容響應式流規範的。

TopicProcessor 可以對多個訂閱者發送數據。它經過對每個訂閱者關聯一個線程來實現這一點, 這個線程會一直執行直到 processor 發出 onErroronComplete 信號,或關聯的訂閱者被取消。 最多能夠接受的訂閱者個數由構造者方法 executor 指定,經過提供一個有限線程數的 ExecutorService 來限制這一個數。

這個 processor 基於一個 RingBuffer 數據結構來存儲已發送的數據。每個訂閱者線程 自行管理其相關的數據在 RingBuffer 中的索引。

這個 processor 也有一個 autoCancel 構造器方法:若是設置爲 true (默認的),那麼當 全部的訂閱者取消以後,源 Publisher(s) 也就被取消了。

WorkQueueProcessor

WorkQueueProcessor 也是一個異步的 processor,也可以重發來自多個上游發佈者的元素, 一樣在建立時須要配置 shared (它多數構造器配置與 TopicProcessor 相同)。

它放鬆了對響應式流規範的兼容,可是好處就在於相對於 TopicProcessor 來講須要更少的資源。 它仍然基於 RingBuffer,可是再也不要求每個訂閱者都關聯一個線程,所以相對於 TopicProcessor 來講更具擴展性。

代價在於分發模式有些區別:來自訂閱者的請求會彙總在一塊兒,而且這個 processor 每次只對一個 訂閱者發送數據,所以須要循環(round-robin)對訂閱者發送數據,而不是一次所有發出的模式。

沒法保證徹底公平的循環分發。

WorkQueueProcessor 多數構造器方法與 TopicProcessor 相同,好比 autoCancelshare, 以及 waitStrategy。下游訂閱者的最大數目一樣由構造器 executor 配置的 ExecutorService 決定。

你最好注意不要有太多訂閱者訂閱 WorkQueueProcessor,由於這 會鎖住 processor。 若是你須要限制訂閱者數量,最好使用一個 ThreadPoolExecutorForkJoinPool。這個 processor 可以檢測到(線程池)容量並在訂閱者過多時拋出異常。

翻譯建議 - "Reactor 核心特性"

5. 對 Kotlin 的支持

5.1. 簡介

Kotlin 是一種運行於 JVM(及其餘平臺)上的靜態(statically-typed)語言。 使用它能夠在擁有與現有 Java 庫良好https://kotlinlang.org/docs/reference/java-interop.html[互操做性] 的同時編寫簡介優雅的代碼。

本小節介紹了 Reactor 3.1 如何可以完美支持 Kotlin。

5.2. 前提

Kotlin 支持 Kotlin 1.1+ 及依賴 kotlin-stdlib (或 kotlin-stdlib-jre7 / kotlin-stdlib-jre8 之一)

5.3. 擴展

多虧了其良好的 Java 互操做性 以及 Kotlin 擴展(extensions), Reactor Kotlin APIs 既可以使用 Java APIs,還可以收益於一些 Reactor 內置的專門支持 Kotlin 的 APIs。

注意 Kotlin 的擴展須要 import 纔可以使用。因此好比 Throwable.toFlux 的 Kotlin 擴展必須在 import reactor.core.publisher.toFlux 後纔可以使用。多數場景下 IDE 應該可以自動給出這種相似 static import 的建議。

例如,https://kotlinlang.org/docs/reference/inline-functions.html#reified-type-parameters[Kotlin 參數類型推導(reified type parameters)] 對於 JVM 的 通用類型擦除(generics type erasure)提供了一種變通解決方案, Reactor 就能夠經過擴展(extension)來應用到這種特性。

下面是對「Reactor with Java」和「Reactor with Kotlin + extensions」的比較:

Java Kotlin + extensions
Mono.just("foo") "foo".toMono()
Flux.fromIterable(list) list.toFlux()
Mono.error(new RuntimeException()) RuntimeException().toMono()
Flux.error(new RuntimeException()) RuntimeException().toFlux()
flux.ofType(Foo.class) flux.ofType<Foo>() or flux.ofType(Foo::class)
StepVerifier.create(flux).verifyComplete() flux.test().verifyComplete()

可參考 Reactor KDoc API 中詳細的關於 Kotlin 擴展的文檔。

5.4. Null 值安全

Kotlin的一個關鍵特性就是 null 值安全 ——從而能夠在編譯時處理 null 值,而不是在運行時拋出著名的 NullPointerException。 這樣,經過「可能爲空(nullability)」的聲明,以及可以代表「有值或空值」的語法(避免使用相似 Optional 來進行包裝),使得應用程序更加安全。(Kotlin容許在函數參數中使用可能爲空的值, 請參考 comprehensive guide to Kotlin null-safety

儘管 Java 的類型系統不容許這樣的 null 值安全的表達方式, Reactor now provides null-safety 對全部 Reactor API 經過工具友好的(tooling-friendly)註解(在 reactor.util.annotation 包中定義)來支持。 默認狀況下,Java APIs 用於 Kotlin 的話會被做爲 平臺類型(platform types) 而放鬆對 null 的檢查。 Kotlin 對 JSR 305 註解的支持 + Reactor 可爲空(nullability)的註解,爲全部 Reactor API 和 Kotlin 開發者確保「null 值安全」的特性 (在編譯期處理 null 值)。

JSR 305 的檢查能夠經過增長 -Xjsr305 編譯參數進行配置: -Xjsr305={strict|warn|ignore}

對於 kotlin 1.1.50+,默認的配置爲 -Xjsr305=warn。若是但願 Reactor API 可以全面支持 null 值安全 則須要配置爲 strict。不過你能夠認爲這是實驗性的(experimental),由於 Reactor API 「可能爲空」 的聲明可能甚至在小版本的發佈中都會不斷改進,並且未來也可能增長新的檢查。

目前尚不支持通用類型參數、可變類型以及數組元素的「可爲空」。不過應該包含在接下來的發佈中,最新信息請看 這個issues

翻譯建議 - "對 Kotlin 的支持"

6. 測試

不管你是編寫了一個簡單的 Reactor 操做鏈,仍是開發了自定義的操做符,對它進行 自動化的測試老是一個好主意。

Reactor 內置一些專門用於測試的元素,放在一個專門的 artifact 裏: reactor-test。 你能夠在 on Githubreactor-core 庫裏找到這個項目。

若是要用它來進行測試,添加 scope 爲 test 的依賴。

reactor-test 用 Maven 配置 <dependencies>

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    
</dependency>
若是你使用了 BOM,你不須要指定 <version>

reactor-test 用 Gradle 配置 dependencies

dependencies {
   testCompile 'io.projectreactor:reactor-test'
}

reactor-test 的兩個主要用途:

  • 使用 StepVerifier 一步一步地測試一個給定場景的序列。
  • 使用 TestPublisher 生成數據來測試下游的操做符(包括你本身的operator)。

6.1. 使用 StepVerifier 來測試

最多見的測試 Reactor 序列的場景就是定義一個 FluxMono,而後在訂閱它的時候測試它的行爲。

當你的測試關注於每一次的事件的時候,就很是容易轉化爲使用 StepVerifier 的測試場景: 下一個指望的事件是什麼?你是否指望使用 Flux 來發出一個特別的值?或者接下來 300ms 什麼都不作?全部這些均可以使用 StepVerifier API 來表示。

例如,你可能會使用以下的工具方法來包裝一個 Flux

public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}

要測試它的話,你須要校驗以下內容:

我但願這個 Flux 先發出 foo,而後發出 bar,而後 生成一個內容爲 boom 的錯誤。 最後訂閱並校驗它們。

使用 StepVerifier API 來表示以上的驗證過程:

@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("foo", "bar"); 

  StepVerifier.create( 
    appendBoomError(source)) 
    .expectNext("foo") 
    .expectNext("bar")
    .expectErrorMessage("boom") 
    .verify(); 
}
因爲被測試方法須要一個 Flux,定義一個簡單的 Flux 用於測試。
建立一個 StepVerifier 構造器來包裝和校驗一個 Flux
傳進來須要測試的 Flux(即待測方法的返回結果)。
第一個咱們指望的信號是 onNext,它的值爲 foo
最後咱們指望的是一個終止信號 onError,異常內容應該爲 boom
不要忘了使用 verify() 觸發測試。

API 是一個構造器,經過傳入一個要測試的序列來建立一個 StepVerifier。從而你能夠:

  • 表示你 指望 發生的下一個信號。若是收到其餘信號(或者信號與指望不匹配),整個測試就會 失敗(AssertionError)。例如你可能會用到 expectNext(T...)expectNextCount(long)
  • 消費 下一個信號。當你想要跳過部分序列或者當你想對信號內容進行自定義的 assertion 的時候會用到它(好比要校驗是否有一個 onNext 信號,並校驗對應發出的元素是不是一個 size 爲 5 的 List)。你可能會用到 consumeNextWith(Consumer<T>)
  • 更多樣的操做 好比暫停或運行一段代碼。好比,你想對測試狀態或內容進行調整或處理, 你可能會用到 thenAwait(Duration)then(Runnable)

對於終止事件,相應的指望方法(expectComplete()expectError(),及其全部的變體方法) 使用以後就不能再繼續增長別的指望方法了。最後你只能對 StepVerifier 進行一些額外的配置並 觸發校驗(一般調用 verify() 及其變體方法)。

從 StepVerifier 內部來看,它訂閱了待測試的 FluxMono,而後將序列中的每一個信號與測試 場景的指望進行比對。若是匹配的話,測試成功。若是有不匹配的狀況,則拋出 AssertionError 異常。

請記住是 verify() 觸發了校驗過程。這個 API 還有一些結合了 verify() 與指望的終止信號 的方法:verifyComplete()verifyError()verifyErrorMessage(String) 等。

注意,若是有一個傳入 lambda 的指望方法拋出了 AssertionError,會被報告爲測試失敗。 這可用於自定義 assertion。

默認狀況下,verify() 方法(及同源的 verifyThenAssertThatverifyComplete()等) 沒有超時的概念。它可能會永遠阻塞住。你能夠使用 StepVerifier.setDefaultTimeout(Duration) 來設置一個全局的超時時間,或使用 verify(Duration) 指定。

6.2. 操控時間

StepVerifier 能夠用來測試基於時間的操做符,從而避免測試的長時間運行。能夠使用構造器 StepVerifier.withVirtualTime 達到這一點。

示例以下:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... 繼續追加指望方法

虛擬時間(virtual time) 的功能會在 Reactor 的調度器(Scheduler)工廠方法中插入一個自定義的 調度器。這些基於時間的操做符一般默認使用 Schedulers.parallel() 調度器。(虛擬時間的) 技巧在於使用一個 VirtualTimeScheduler 來代替默認調度器。然而一個重要的前提就是,只有在初始化 虛擬時間調度器以後的操做符纔會起做用。

爲了提升 StepVerifier 正常起做用的機率,它通常不接收一個簡單的 Flux 做爲輸入,而是接收 一個 Supplier,從而能夠在配置好訂閱者 以後 「懶建立」待測試的 flux。

要注意的是,Supplier<Publisher<T>> 可用於「懶建立」,不然不能保證虛擬時間 能徹底起做用。尤爲要避免提早實例化 Flux,要在 Supplier 中用 lambda 建立並返回 Flux 變量。

有兩種處理時間的指望方法,不管是否配置虛擬時間都是可用的:

  • thenAwait(Duration) 暫停校驗步驟(容許信號延遲發出)。
  • expectNoEvent(Duration) 一樣讓序列持續必定的時間,期間若是有 任何 信號發出則測試失敗。

兩個方法都會基於給定的持續時間暫停線程的執行,若是是在虛擬時間模式下就相應地使用虛擬時間。

expectNoEvent 將訂閱(subscription)也認做一個事件。假設你用它做爲第一步,若是檢測 到有訂閱信號,也會失敗。這時候能夠使用 expectSubscription().expectNoEvent(duration) 來代替。

爲了快速校驗前邊提到的 Mono.delay,咱們能夠這樣完成代碼:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription() 
    .expectNoEvent(Duration.ofDays(1)) 
    .expectNext(0) 
    .verifyComplete();
如上 tip
期待一天內沒有信號發生。
而後期待一個 next 信號爲 0
而後期待完成(同時觸發校驗)。

咱們也能夠使用 thenAwait(Duration.ofDays(1)),可是 expectNoEvent 的好處是 可以驗證在此以前不會發生什麼。

注意 verify() 返回一個 Duration,這是整個測試的 真實時間

虛擬時間並不是銀彈。請記住 全部的 調度器都會被替換爲 VirtualTimeScheduler。 有些時候你能夠鎖定校驗過程,由於虛擬時鐘在遇到第一個指望校驗以前並不會開始,因此對於 「無數據「的指望校驗也必須可以運行在虛擬時間模式下。在無限序列中,虛擬時間模式的發揮 空間也頗有限,由於它可能致使線程(序列的發出和校驗的運行都在這個線程上)卡住。

6.3. 使用 StepVerifier 進行「後校驗」

當配置完你測試場景的最後的指望方法後,你能夠使用 verifyThenAssertThat() 來代替 verify() 觸發執行後的校驗。

verifyThenAssertThat() 返回一個 StepVerifier.Assertions 對象,你能夠用它來校驗 整個測試場景成功剛結束後的一些狀態(它也會調用 verify())。典型應用就是校驗有多少 元素被操做符丟棄(參考 Hooks)。

6.4. 測試 Context

更多關於 Context 的內容請參考 增長一個 Context 到響應式序列

StepVerifier 有一些指望方法能夠用來測試 Context

  • expectAccessibleContext: 返回一個 ContextExpectations 對象,你能夠用它來在 Context 上配置指望校驗。必定記住要調用 then() 來返回到對序列的指望校驗上來。
  • expectNoAccessibleContext: 是對「沒有Context」的校驗。一般用於 被測試的 Publisher 並非一個響應式的,或沒有任何操做符可以傳遞 Context (好比一個 generatePublisher).

此外,還能夠用 StepVerifierOptions 方法傳入一個測試用的初始 ContextStepVerifier, 從而能夠建立一個校驗(verifier)。

這些特性經過下邊的代碼演示:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
                                StepVerifierOptions.create().withInitialContext(Context.of("foo", "bar"))) 
                            .expectAccessibleContext() 
                            .contains("foo", "bar") 
                            .then() 
                            .expectNext(11)
                            .verifyComplete();
使用 StepVerifierOptions 建立 StepVerifier 並傳入初始 Context
開始對 Context 進行校驗,這裏只是確保 Context 正常傳播了。
Context 進行校驗的例子:好比驗證是否包含一個 "foo" - "bar" 鍵值對。
使用 then() 切換回對序列的校驗。
不要忘了用 verify() 觸發整個校驗過程。

6.5. 用 TestPublisher 手動發出元素

對於更多高級的測試,若是可以徹底掌控源發出的數據就會方便不少,由於這樣就能夠在測試的 時候更加有的放矢地發出想測的數據。

另外一種狀況就是你實現了本身的操做符,而後想校驗它的行爲——尤爲是在源不穩定的時候——是否符合響應式流規範。

reactor-test 提供了 TestPublisher 類來應對這兩種需求。這個類本質上是一個 Publisher<T>, 你能夠經過可編程的方式來用它發出各類信號:

  • next(T) 以及 next(T, T...) 發出 1-n 個 onNext 信號。
  • emit(T...) 起一樣做用,而且會執行 complete()
  • complete() 會發出終止信號 onComplete
  • error(Throwable) 會發出終止信號 onError

使用 create 工廠方法就能夠獲得一個正常的 TestPublisher。而使用 createNonCompliant 工廠方法能夠建立一個「不正常」的 TestPublisher。後者須要傳入由 TestPublisher.Violation 枚舉指定的一組選項,這些選項可用於告訴 publisher 忽略哪些問題。枚舉值有:

  • REQUEST_OVERFLOW: 容許 next 在請求不足的時候也能夠調用,而不會觸發 IllegalStateException
  • ALLOW_NULL: 容許 next 可以發出一個 null 值而不會觸發 NullPointerException
  • CLEANUP_ON_TERMINATE: 能夠重複屢次發出終止信號,包括 complete()error()emit()

最後,TestPublisher 還能夠用不一樣的 assert* 來跟蹤其內部的訂閱狀態。

使用轉換方法 flux()mono() 能夠將其做爲 FluxMono 來使用。

6.6. 用 PublisherProbe 檢查執行路徑

當構建複雜的操做鏈時,可能會有多個子序列,從而致使多個執行路徑。

多數時候,這些子序列會生成一個足夠明確的 onNext 信號,你能夠經過檢查最終結果 來判斷它是否執行。

考慮下邊這個方法,它構建了一條操做鏈,並使用 switchIfEmpty 方法在源爲空的狀況下, 替換成另外一個源。

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}

很容易就能夠測試出 switchIfEmpty 的哪個邏輯分支被使用了,以下:

@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}

@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}

可是若是例子中的方法返回的是一個 Mono<Void> 呢?它等待源發送結束,執行一個額外的任務, 而後就結束了。若是源是空的,則執行另外一個備用的相似於 Runnable 的任務,以下:

private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) 
            .switchIfEmpty(doWhenEmpty); 
}
then() 方法會忽略 command,它只關心是否結束。
兩個都是空序列,這個時候如何區分(哪邊執行了)呢?

爲了驗證執行路徑是通過了 doWhenEmpty 的,你須要編寫額外的代碼,好比你須要一個這樣的 Mono<Void>

  • 可以捕獲到它被訂閱的事實。
  • 以上事實須要在整個執行結束 以後 再進行驗證。

在 3.1 版本之前,你須要爲每一種狀態維護一個 AtomicBoolean 變量,而後在相應的 doOn* 回調中觀察它的值。這須要添加很多的額外代碼。好在,版本 3.1.0 以後能夠使用 PublisherProbe來作, 以下:

@Test
public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); 

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) 
                .verifyComplete();

    probe.assertWasSubscribed(); 
    probe.assertWasRequested(); 
    probe.assertWasNotCancelled(); 
}
建立一個探針(probe),它會轉化爲一個空序列。
在須要使用 Mono<Void> 的位置調用 probe.mono() 來替換爲探針。
序列結束以後,你能夠用這個探針來判斷序列是如何使用的,你能夠檢查是它從哪(條路徑)被訂閱的…
…對於請求也是同樣的…
…以及是否被取消了。

你也能夠在使用 Flux<T> 的位置經過調用 .flux() 方法來放置探針。若是你既須要用探針檢查執行路徑 還須要它可以發出數據,你能夠用 PublisherProbe.of(Publisher) 方法包裝一個 Publisher<T> 來搞定。

翻譯建議 - "測試"

7. 調試 Reactor

從命令式和同步式編程切換到響應式和異步式編程有時候是使人生畏的。 學習曲線中最陡峭的異步就是出錯時如何分析和調試。

在命令式世界,調試一般都是很是直觀的:直接看 stack trace 就能夠找到問題出現的位置, 以及:是否問題責任所有出在你本身的代碼?問題是否是發生在某些庫代碼?若是是, 那你的哪部分代碼調用了庫,是否是傳參不合適致使的問題?

7.1. 典型的 Reactor Stack Trace

當你切換到異步代碼,事情就變得複雜的多了。

看一下下邊的 stack trace:

一段典型的 Reactor stack trace

java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.emitScalar(FluxFlatMap.java:380)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:349)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:119)
	at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:144)
	at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:99)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:172)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:316)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:94)
	at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
	at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:67)
	at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:98)
	at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668)
	at reactor.core.publisher.Mono.subscribe(Mono.java:2629)
	at reactor.core.publisher.Mono.subscribe(Mono.java:2604)
	at reactor.core.publisher.Mono.subscribe(Mono.java:2582)
	at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:722)

這裏邊有好多信息,咱們獲得了一個 IndexOutOfBoundsException,內容是 "源發出了 不止一個元素"。

咱們也許能夠很快假定這個源是一個 Flux/Mono,並經過下一行提到的 MonoSingle 肯定是 Mono。 看上去是來自一個 single 操做符的抱怨。

查看 Javadoc 中關於操做符 Mono#single 的說明,咱們看到 single 有一個規定: 源必須只能發出一個元素。看來是有一個源發出了多於一個元素,從而違反了這一規定。

咱們能夠更進一步找出那個源嗎?下邊的這些內容幫不上什麼忙,只是打印了一些內部的彷佛是一個響應式鏈的信息, 主要是一些 subscriberequest 的調用。

粗略過一下這些行,咱們至少能夠勾畫出一個大體的出問題的鏈:大概涉及一個 MonoSingle、一個 FluxFlatMap,以及一個 FluxRange(每個都對應 trace 中的幾行,但整體涉及這三個類)。 因此難道是 range().flatMap().single() 這樣的鏈?

可是若是在咱們的應用中多處都用到這一模式,那怎麼辦?經過這些仍是不能肯定什麼, 搜索 single 也找不到問題所在。最後一行指向了咱們的代碼。咱們彷佛終於接近真相了。

不過,等等… 當咱們找到源碼文件,咱們只能找到一個已存在的 Flux 被訂閱了,以下:

toDebug.subscribe(System.out::println, Throwable::printStackTrace);

全部這些都發生在訂閱時,可是 Flux 自己沒有在這裏 聲明 。更糟的是, 當咱們找到變量聲明的地方,咱們看到:

public Mono<String> toDebug; //請忽略 public 的屬性

變量聲明的地方並無 實例化 。咱們必須作最壞的打算,那就是在這個應用中, 可能在幾個不一樣的代碼路徑上對這個變量賦了值,但咱們不肯定是哪個致使了問題。

這是一種 Reactor 運行時錯誤,而不是編譯錯誤。

咱們但願找到的是操做符在哪裏添加到操做鏈上的 —— 也就是 Flux 在哪裏 聲明的。咱們一般說這個 Flux 是被 組裝(assembly) 的。

7.2. 開啓調試模式

即使 stack trace 可以對有些許經驗的開發者傳遞一些信息,可是在一些複雜的狀況下, 這並非一種理想的方式。

幸運的是,Reactor 內置了一種面向調試的能力—— 操做期測量(assembly-time instrumentation)

這經過 在應用啓動的時候 (或至少在有問題的 FluxMono 實例化以前) 加入自定義的 Hook.onOperator 鉤子(hook),以下:

Hooks.onOperatorDebug();

這行代碼——經過包裝操做符的構造方法,並在此捕捉 stack trace——來監測對這個 Flux(或 Mono)的操做符的調用(也就是「組裝」鏈的地方)。因爲這些在 操做鏈被聲明的地方就搞定,這個 hook 應該在 早於 聲明的時候被激活, 最保險的方式就是在你程序的最開始就激活它。

以後,若是發生了異常,致使失敗的操做符可以找到捕捉點並補充 stack trace。

在下一小節,咱們看一下 stack trace 會有什麼不一樣,以及如何對其進行分析。

7.3. 閱讀調試模式的 Stack Trace

咱們在對上邊的例子激活 operatorStacktrace 調試功能後,stack trace 以下:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:314) 
...

...
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668)
	at reactor.core.publisher.Mono.subscribe(Mono.java:2629)
	at reactor.core.publisher.Mono.subscribe(Mono.java:2604)
	at reactor.core.publisher.Mono.subscribe(Mono.java:2582)
	at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:727)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] : 
	reactor.core.publisher.Flux.single(Flux.java:5335)
	reactor.guide.GuideTests.scatterAndGather(GuideTests.java:689)
	reactor.guide.GuideTests.populateDebug(GuideTests.java:702)
	org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
	org.junit.rules.RunRules.evaluate(RunRules.java:20)
Error has been observed by the following operator(s): 
	|_	Flux.single(TestWatcher.java:55)
這一條是新的:能夠發現外層操做符捕捉到了 stack trace。
第一部分的 stack trace 多數與上邊同樣,顯示了操做符內部的信息(因此省略了這一塊)。
從這裏開始,是在調試模式下顯示的內容。
首先咱們得到了關於操做符組裝的信息。
以及錯誤沿着操做鏈傳播的軌跡(從錯誤點到訂閱點)。
每個看到這個錯誤的操做符都會列出,包括類和行信息。若是操做符是在 Reactor 源碼內部組裝的,行信息會被忽略。

可見,捕獲的 stack trace 做爲 OnAssemblyException 添加到原始錯誤信息的以後。有兩部分, 可是第一部分更加有意思。它顯示了操做符觸發異常的路徑。這裏顯示的是 scatterAndGather 方法中的 single 致使的問題,而 scatterAndGather 方法是在 JUnit 中被 populateDebug 方法調用的。

既然咱們已經有足夠的信息來查出罪魁禍首,咱們就來看一下 scatterAndGather 方法吧:

private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); 
}
找到了,就是這個 single

如今咱們能夠發現錯誤的根源是將多個 HTTP 請求轉化爲 URLs 的 flatMap 方法後邊接的是 single, 這太嚴格了。使用 git blame 找到代碼做者,並同他討論事後,發現他是原本是想用不那麼嚴格的 take(1) 方法的。

咱們解決了問題。

錯誤被如下這些操做符觀察(observed)了:

調試信息的第二部分在這個例子中意義不大,由於錯誤實際發生在最後一個操做符上(離 subscribe 最近的一個)。 另外一個例子可能更加清楚:

FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();

如今想象一下在 findAllUserByName 內部有個 map 方法報錯了。咱們可能會看到以下的 trace:

Error has been observed by the following operator(s):
        |_        Flux.map(FakeRepository.java:27)
        |_        Flux.map(FakeRepository.java:28)
        |_        Flux.filter(FakeUtils1.java:29)
        |_        Flux.transform(GuideDebuggingExtraTests.java:41)
        |_        Flux.elapsed(FakeUtils2.java:30)
        |_        Flux.transform(GuideDebuggingExtraTests.java:42)

這與鏈上收到錯誤通知的操做符是一致:

  1. 異常源自第一個 map
  2. 被第二個 map 看到(都在 findAllUserByName 方法中)。
  3. 接着被一個 filter 和一個 transform 看到,說明鏈的這部分是由一個可重複使用的轉換方法組裝的 (這裏是 applyFilters 工具方法)。
  4. 最後被一個 elapsed 和一個 transform 看到,相似的, elapsed 由第二個轉換方法(enrichUser) 組裝。

用這種形式的檢測方式構造 stack trace 是成本較高的。也所以這種調試模式做爲最終大招, 只應該在可控的方式下激活。

7.3.1. 用 checkpoint() 方式替代

調試模式是全局性的,會影響到程序中每個組裝到一個 FluxMono 的操做符。好處在於能夠進行 過後調試(after-the-fact debugging):不管錯誤是什麼,咱們都會獲得足夠的調試信息。

就像前邊見到的那樣,這種全局性的調試會由於成本較高而影響性能(其影響在於生成的 stack traces 數量)。 若是咱們能大概定位到疑似出問題的操做符的話就能夠不用花那麼大的成本。然而,問題出現後, 咱們一般沒法定位到哪個操做符可能存在問題,由於缺乏一些 trace 信息,咱們得修改代碼, 打開調試模式,指望可以復現問題。

這種狀況下,咱們須要切換到調試模式,並進行一些必要的準備工做以便可以更好的發現復現的問題, 並捕捉到全部的信息。(譯者加:這兩段感受有點廢話。。。)

若是你能肯定是在你的代碼中組裝的響應式鏈存在問題,並且程序的可服務性又是很重要的, 那麼你能夠 使用 checkpoint() 操做符,它有兩種調試技術可用

你能夠把這個操做符加到鏈中。這時 checkpoint 操做符就像是一個 hook,但只對它所在的鏈起做用。

還有一個 checkpoint(String) 的方法變體,你能夠傳入一個獨特的字符串以方便在 assembly traceback 中識別信息。 這樣會省略 stack trace,你能夠依賴這個字符串(如下改稱「定位描述符」)來定位到組裝點。checkpoint(String)checkpoint 有更低的執行成本。

checkpoint(String) 在它的輸出中包含 "light" (能夠方便用於搜索),以下所示:

...
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.FluxElapsed] is identified by light checkpoint [light checkpoint identifier].

最後的但一樣重要的是,若是你既想經過 checkpoint 添加定位描述符,同時又依賴於 stack trace 來定位組裝點,你能夠使用 checkpoint("description", true) 來實現這一點。這時回溯信息又出來了, 同時附加了定位描述符,以下例所示:

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : 
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:174)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescription(FluxOnAssemblyTest.java:159)
Error has been observed by the following operator(s):
	|_	ParallelFlux.checkpointnull
descriptionCorrelation1234 是經過 checkpoint 給出的定位描述符。

定位描述符能夠是靜態的字符串、或人類可讀的描述、或一個 correlation ID(例如, 來自 HTTP 請求頭的信息)。

當全局調試模式和 checkpoint() 都開啓的時候,checkpoint 的 stacks 輸出會做爲 suppressed 錯誤輸出,按照聲明順序添加在操做符圖(graph)的後面。

7.4. 記錄流的日誌

除了基於 stack trace 的調試和分析,還有一個有效的工具能夠跟蹤異步序列並記錄日誌。

就是 log() 操做符。將其加到操做鏈上以後,它會讀(只讀,peek)每個 在其上游的 FluxMono 事件(包括 onNextonErroronComplete, 以及 訂閱取消、和 請求)。

邊注:關於 logging 的具體實現

log 操做符經過 SLF4J 使用相似 Log4J 和 Logback 這樣的公共的日誌工具, 若是 SLF4J 不存在的話,則直接將日誌輸出到控制檯。

控制檯使用 System.err 記錄 WARNERROR 級別的日誌,使用 System.out 記錄其餘級別的日誌。

若是你喜歡使用 JDK java.util.logging,在 3.0.x 你能夠設置 JDK 的系統屬性 reactor.logging.fallback

假設咱們配置並激活了 logback,以及一個形如 range(1,10).take(3) 的操做鏈。經過將 log() 放在 take 以前, 咱們就能夠看到它內部是如何運行的,以及什麼樣的事件會向上遊傳播給 range,以下所示:

Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();

輸出以下(經過 logger 的 console appender):

10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(unbounded) 
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) 
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel()

這裏,除了 logger 本身的格式(時間、線程、級別、消息),log() 操做符 還輸出了其餘一些格式化的東西:

reactor.Flux.Range.1 是自動生成的日誌 類別(category),以防你在操做鏈中屢次使用 同一個操做符。經過它你能夠分辨出來是哪一個操做符的事件(這裏是 range 的)。 你能夠調用 log(String) 方法用自定義的類別替換這個標識符。在幾個用於分隔的字符以後, 打印出了實際的事件。這裏是一個 onSubscribe 調用、一個 request 調用、三個 onNext 調用, 以及一個 cancel 調用。對於第一行的 onSubscribe,咱們知道了 Subscriber 的具體實現, 一般與操做符指定的實現是一致的,在方括號內有一些額外信息,包括這個操做符是否可以 經過同步或異步融合(fusion,具體見附錄 [microfusion])的方式進行自動優化。
第二行,咱們能夠看到是一個由下游傳播上來的個數無限的請求。
而後 range 一下發出三個值。
最後一行,咱們看到了 cancel()

最後一行,(4),最有意思。咱們看到 take 在這裏發揮做用了。在它拿到足夠的元素以後, 就將序列切斷了。簡單來講,take() 致使源在發出用戶請求的數量後 cancel() 了。

翻譯建議 - "調試 Reactor"

8. 高級特性與概念

這一章涉及以下的 Reactor 的高級特性與概念:

8.1. 打包重用操做符

從代碼整潔的角度來講,重用代碼是一個好辦法。Reactor 提供了幾種幫你打包重用代碼的方式, 主要經過使用操做符或者經常使用的「操做符組合」的方法來實現。若是你以爲一段操做鏈很經常使用, 你能夠將這段操做鏈打包封裝後備用。

8.1.1. 使用 transform 操做符

transform 操做符能夠將一段操做鏈封裝爲一個函數式(function)。這個函數式能在操做期(assembly time) 將被封裝的操做鏈中的操做符還原並接入到調用 transform 的位置。這樣作和直接將被封裝的操做符 加入到鏈上的效果是同樣的。示例以下:

Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
      .map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
        .doOnNext(System.out::println)
        .transform(filterAndMap)
        .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));

Transform Operator : encapsulate flows

上邊例子的輸出以下:

blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE

8.1.2. 使用 compose 操做符

compose 操做符與 transform 相似,也可以將幾個操做符封裝到一個函數式中。 主要的區別就是,這個函數式做用到原始序列上的話,是 基於每個訂閱者的(on a per-subscriber basis) 。這意味着它對每個 subscription 能夠生成不一樣的操做鏈(經過維護一些狀態值)。 以下例所示:

AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
        if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
        .map(String::toUpperCase);
        }
        return f.filter(color -> !color.equals("purple"))
                .map(String::toUpperCase);
};

Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
    .doOnNext(System.out::println)
    .compose(filterAndMap);

composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));

Compose Operator : Per Subscriber transformation

上邊的例子輸出以下:

blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple

8.2. Hot vs Cold

到目前爲止,咱們一直認爲 Flux(和 Mono)都是這樣的:它們都表明了一種異步的數據序列, 在訂閱(subscribe)以前什麼都不會發生。

可是實際上,廣義上有兩種發佈者:「熱」與「冷」(hot and cold)。

(本文檔)到目前介紹的其實都是 cold 家族的發佈者。它們爲每個訂閱(subscription) 都生成數據。若是沒有建立任何訂閱(subscription),那麼就不會生成數據。

試想一個 HTTP 請求:每個新的訂閱者都會觸發一個 HTTP 調用,可是若是沒有訂閱者關心結果的話, 那就不會有任何調用。

另外一方面, 發佈者,不依賴於訂閱者的數量。即便沒有訂閱者它們也會發出數據, 若是有一個訂閱者接入進來,那麼它就會收到訂閱以後發出的元素。對於熱發佈者, 在你訂閱它以前,確實已經發生了什麼。

just 是 Reactor 中少數幾個「熱」操做符的例子之一:它直接在組裝期(assembly time) 就拿到數據,若是以後有誰訂閱它,就從新發送數據給訂閱者。再拿 HTTP 調用舉例,若是給 just 傳入的數據是一個 HTTP 調用的結果,那麼以後在初始化 just 的時候纔會進行惟一的一次網絡調用。

若是想將 just 轉化爲一種 的發佈者,你能夠使用 defer。它可以將剛纔例子中對 HTTP 的請求延遲到訂閱時(這樣的話,對於每個新的訂閱來講,都會發生一次網絡調用)。

Reactor 中多數其餘的 發佈者是擴展自 Processor 的。

考慮其餘兩個例子,以下是第一個例子:

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                          .doOnNext(System.out::println)
                          .filter(s -> s.startsWith("o"))
                          .map(String::toUpperCase);

source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));

第一個例子輸出以下:

blue
green
orange
Subscriber 1: ORANGE
purple
blue
green
orange
Subscriber 2: ORANGE
purple

Replaying behavior

兩個訂閱者都觸發了全部的顏色,由於每個訂閱者都會讓構造 Flux 的操做符運行一次。

將下邊的例子與第一個例子對比:

UnicastProcessor<String> hotSource = UnicastProcessor.create();

Flux<String> hotFlux = hotSource.publish()
                                .autoConnect()
                                .map(String::toUpperCase);


hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");
hotSource.onNext("green");

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();

第二個例子輸出以下:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

Broadcasting a subscription

第一個訂閱者收到了全部的四個顏色,第二個訂閱者因爲是在前兩個顏色發出以後訂閱的, 故而收到了以後的兩個顏色,在輸出中有兩次 "ORANGE" 和 "PURPLE"。從這個例子可見, 不管是否有訂閱者接入進來,這個 Flux 都會運行。

8.3. 使用 ConnectableFlux 對多個訂閱者進行廣播

有時候,你不只想要延遲到某一個訂閱者訂閱以後纔開始發出數據,可能還但願在多個訂閱者 到齊 以後 纔開始。

ConnectableFlux 的用意便在於此。Flux API 中有兩種主要的返回 ConnectableFlux 的方式:publishreplay

  • publish 會嘗試知足各個不一樣訂閱者的需求(背壓),並綜合這些請求反饋給源。 尤爲是若是有某個訂閱者的需求爲 0,publish 會 暫停 它對源的請求。
  • replay 將對第一個訂閱後產生的數據進行緩存,最多緩存數量取決於配置(時間/緩存大小)。 它會對後續接入的訂閱者從新發送數據。

ConnectableFlux 提供了多種對下游訂閱的管理。包括:

  • connect 當有足夠的訂閱接入後,能夠對 flux 手動執行一次。它會觸發對上游源的訂閱。
  • autoConnect(n) 與 connect 相似,不過是在有 n 個訂閱的時候自動觸發。
  • refCount(n) 不只可以在訂閱者接入的時候自動觸發,還會檢測訂閱者的取消動做。若是訂閱者數量不夠, 會將源「斷開鏈接」,再有新的訂閱者接入的時候纔會繼續「連上」源。
  • refCount(int, Duration) 增長了一個 "優雅的倒計時":一旦訂閱者數量過低了,它會等待 Duration 的時間,若是沒有新的訂閱者接入纔會與源「斷開鏈接」。

示例以下:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> co = source.publish();

co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

co.connect();

The preceding code produces the following output:

done subscribing
will now connect
subscribed to source
1
1
2
2
3
3

使用 autoConnect

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

Flux<Integer> autoCo = source.publish().autoConnect(2);

autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});

以上代碼輸出以下:

subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3

8.4. 三種分批處理方式

當你有許多的元素,而且想將他們分批處理,Reactor 整體上有三種方案:分組(grouping)、 窗口(windowing)(譯者注:感受這個不翻譯更明白。。。)、緩存(buffering)。 這三種在概念上相似,由於它們都是將 Flux<T> 進行彙集。分組和分段操做都會建立一個 Flux<Flux<T>>,而緩存操做獲得的是一個 Collection<T>(譯者注:應該是一個 Flux<Collection<T>>)。

8.4.1. 用 Flux<GroupedFlux<T>> 進行分組

分組可以根據 key 將源 Flux<T> 拆分爲多個批次。

對應的操做符是 groupBy

每一組用 GroupedFlux<T> 類型表示,使用它的 key() 方法能夠獲得該組的 key。

在組內,元素並不須要是連續的。當源發出一個新的元素,該元素會被分發到與之匹配的 key 所對應的組中(若是尚未該 key 對應的組,則建立一個)。

這意味着組:

  1. 是互相沒有交集的(一個元素只屬於一個組)。
  2. 會包含原始序列中任意位置的元素。
  3. 不會爲空。
StepVerifier.create(
        Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
                .groupBy(i -> i % 2 == 0 ? "even" : "odd")
                .concatMap(g -> g.defaultIfEmpty(-1) //若是組爲空,顯示爲 -1
                                .map(String::valueOf) //轉換爲字符串
                                .startWith(g.key())) //以該組的 key 開頭
        )
        .expectNext("odd", "1", "3", "5", "11", "13")
        .expectNext("even", "2", "4", "6", "12")
        .verifyComplete();
分組操做適用於分組個數很少的場景。並且全部的組都必須被消費,這樣 groupBy 才能持續從上游獲取數據。有時候這兩種要求在一塊兒——好比元素數量超多, 可是並行的用來消費的 flatMap 又太少的時候——會致使程序卡死。

8.4.2. 使用 Flux<Flux<T>> 進行 window 操做

window 操做是 根據個數、時間等條件,或可以定義邊界的發佈者(boundary-defining Publisher), 把源 Flux<T> 拆分爲 windows

對應的操做符有 windowwindowTimeoutwindowUntilwindowWhile,以及 windowWhen

groupBy 的主要區別在於,窗口操做可以保持序列順序。而且同一時刻最多隻能有兩個 window 是開啓的。

它們 能夠 重疊。操做符參數有 maxSizeskipmaxSize 指定收集多少個元素就關閉 window,而 skip 指定收集多數個元素後就打開下一個 window。因此若是 maxSize > skip 的話, 一個新的 window 的開啓會先於當前 window 的關閉, 從而兩者會有重疊。

重疊的 window 示例以下:

StepVerifier.create(
        Flux.range(1, 10)
                .window(5, 3) //overlapping windows
                .concatMap(g -> g.defaultIfEmpty(-1)) //將 windows 顯示爲 -1
        )
                .expectNext(1, 2, 3, 4, 5)
                .expectNext(4, 5, 6, 7, 8)
                .expectNext(7, 8, 9, 10)
                .expectNext(10)
                .verifyComplete();
若是將兩個參數的配置反過來(maxSize < skip),序列中的一些元素就會被丟棄掉, 而不屬於任何 window。

對基於判斷條件的 windowUntilwindowWhile,若是序列中的元素不匹配判斷條件, 那麼可能致使 空 windows,以下例所示:

StepVerifier.create(
        Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
                .windowWhile(i -> i % 2 == 0)
                .concatMap(g -> g.defaultIfEmpty(-1))
        )
                .expectNext(-1, -1, -1) //分別被奇數 1 3 5 觸發
                .expectNext(2, 4, 6) // 被 11 觸發
                .expectNext(12) // 被 13 觸發
                .expectNext(-1) // 空的 completion window,若是 onComplete 前的元素可以匹配上的話就沒有這個了
                .verifyComplete();

8.4.3. 使用 Flux<List<T>> 進行緩存

緩存與窗口相似,不一樣在於:緩存操做以後會發出 buffers (類型爲Collection<T>, 默認是 List<T>),而不是 windows (類型爲 Flux<T>)。

緩存的操做符與窗口的操做符是對應的:bufferbufferTimeoutbufferUntilbufferWhile, 以及bufferWhen

若是說對於窗口操做符來講,是開啓一個窗口,那麼對於緩存操做符來講,就是建立一個新的集合, 而後對其添加元素。而窗口操做符在關閉窗口的時候,緩存操做符則是發出一個集合。

緩存操做也會有丟棄元素或內容重疊的狀況,以下:

StepVerifier.create(
        Flux.range(1, 10)
                .buffer(5, 3) // 緩存重疊
        )
                .expectNext(Arrays.asList(1, 2, 3, 4, 5))
                .expectNext(Arrays.asList(4, 5, 6, 7, 8))
                .expectNext(Arrays.asList(7, 8, 9, 10))
                .expectNext(Collections.singletonList(10))
                .verifyComplete();

不像窗口方法,bufferUntilbufferWhile 不會發出空的 buffer,以下例所示:

StepVerifier.create(
        Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
                .bufferWhile(i -> i % 2 == 0)
        )
        .expectNext(Arrays.asList(2, 4, 6)) // 被 11 觸發
        .expectNext(Collections.singletonList(12)) // 被 13 觸發
        .verifyComplete();

8.5. 使用 ParallelFlux 進行並行處理

現在多核架構已然普及,可以方便的進行並行處理是很重要的。Reactor 提供了一種特殊的類型 ParallelFlux 來實現並行,它可以將操做符調整爲並行處理方式。

你能夠對任何 Flux 使用 parallel() 操做符來獲得一個 ParallelFlux. 不過這個操做符自己並不會進行並行處理,而是將負載劃分到多個「軌道(rails)」上 (默認狀況下,軌道個數與 CPU 核數相等)。

爲了配置 ParallelFlux 如何並行地執行每個軌道,你須要使用 runOn(Scheduler)。 注意,Schedulers.parallel() 是推薦的專門用於並行處理的調度器。

下邊有兩個用於比較的例子,第一個以下:

Flux.range(1, 10)
    .parallel(2) 
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
咱們給定一個軌道數字,而不是依賴於 CPU 核數。

下邊是第二個例子:

Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));

第一個例子輸出以下:

main -> 1
main -> 2
main -> 3
main -> 4
main -> 5
main -> 6
main -> 7
main -> 8
main -> 9
main -> 10

第二個例子在兩個線程中並行執行,輸出以下:

parallel-1 -> 1
parallel-2 -> 2
parallel-1 -> 3
parallel-2 -> 4
parallel-1 -> 5
parallel-2 -> 6
parallel-1 -> 7
parallel-1 -> 9
parallel-2 -> 8
parallel-2 -> 10

若是在並行地處理以後,須要退回到一個「正常」的 Flux 而使後續的操做鏈按非並行模式執行, 你能夠對 ParallelFlux 使用 sequential() 方法。

注意,當你在對 ParallelFlux 使用一個 Subscriber 而不是基於 lambda 進行訂閱(subscribe()) 的時候,sequential() 會自動地被偷偷應用。

注意 subscribe(Subscriber<T>) 會合並全部的執行軌道,而 subscribe(Consumer<T>) 會在全部軌道上運行。 若是 subscribe() 方法中是一個 lambda,那麼有幾個軌道,lambda 就會被執行幾回。

你還能夠使用 groups() 做爲 Flux<GroupedFlux<T>> 進入到各個軌道或組裏邊, 而後能夠經過 composeGroup() 添加額外的操做符。

8.6. 替換默認的 Schedulers

就像咱們在 調度器(Schedulers) 這一節看到的那樣, Reactor Core 內置許多 Scheduler 的具體實現。 你能夠用形如 new* 的工廠方法來建立調度器,每一種調度器都有一個單例對象,你能夠使用單例工廠方法 (好比 Schedulers.elastic() 而不是 Schedulers.newElastic())來獲取它。

當你不明確指定調度器的時候,那些須要調度器的操做符會使用這些默認的單例調度器對象。例如, Flux#delayElements(Duration) 使用的是 Schedulers.parallel() 調度器對象。

然而有些狀況下,你可能須要「一刀切」(就不用對每個操做符都傳入你本身的調度器做爲參數了) 地調整這些默認調度器。 一個典型的例子就是,假設你須要對每個被調度的任務統計執行時長, 就想把默認的調度器包裝一下,而後添加計時功能。

那麼能夠使用 Schedulers.Factory 類來改變默認的調度器。默認狀況下,一個 Factory 會使用一些「命名比較直白」 的方法來建立全部的標準 Scheduler。每個方法你均可以用本身的實現方式來重寫。

此外,Factory 還提供一個額外的自定義方法 decorateExecutorService。它會在建立每個 reactor-core 調度器——內部有一個 ScheduledExecutorService(即便是好比用 Schedulers.newParallel() 方法建立的這種非默認的調度器)——的時候被調用。

你能夠經過調整 ScheduledExecutorService 來改變調度器:(譯者加:decorateExecutorService 方法)經過一個 Supplier 參數暴露出來,你能夠直接繞過這個 supplier 返回你本身的調度器實例,或者用 (譯者加: Schedulers.ScheduledExecutorService 的)get() 獲得默認實例,而後包裝它, 這取決於配置的調度器類型。

當你搞定了一個定製好的 Factory 後,你必須使用 Schedulers.setFactory(Factory) 方法來安裝它。

最後,對於調度器來講,有一個可自定義的 hook:onHandleError。這個 hook 會在提交到這個調度器的 Runnable 任務拋出異常的時候被調用(注意,若是還設置了一個 UncaughtExceptionHandler, 那麼它和 hook 都會被調用)。

8.7. 使用全局的 Hooks

Reactor 還有另一類可配置的應用於多種場合的回調,它們都在 Hooks 類中定義,整體來講有三類:

8.7.1. 丟棄事件的 Hooks

當生成源的操做符不聽從響應式流規範的時候,Dropping hooks(用於處理丟棄事件的 hooks)會被調用。 這種類型的錯誤是處於正常的執行路徑以外的(也就是說它們不能經過 onError 傳播)。

典型的例子是,假設一個發佈者即便在被調用 onCompleted 以後仍然能夠經過操做符調用 onNext。 這種狀況下,onNext 的值會被 丟棄,若是有多餘的 onError 的信號亦是如此。

相應的 hook,onNextDropped 以及 onErrorDropped,能夠提供一個全局的 Consumer, 以便可以在丟棄的狀況發生時進行處理。例如,你能夠使用它來對丟棄事件記錄日誌,或進行資源清理 (使用資源的值可能壓根沒有到達響應式鏈的下游)。

連續設置兩次 hook 的話都會起做用:提供的每個 consumer 都會被調用。使用 Hooks.resetOn*Dropped() 方法能夠將 hooks 所有重置爲默認。

8.7.2. 內部錯誤 Hook

若是操做符在執行其 onNextonError 以及 onComplete 方法的時候拋出異常,那麼 onOperatorError 這一個 hook 會被調用。

與上一類 hook 不一樣,這個 hook 仍是處在正常的執行路徑中的。一個典型的例子就是包含一個 map 函數式的 map 操做符拋出的異常(好比零做爲除數),這時候仍是會執行到 onError 的。

首先,它會將異常傳遞給 onOperatorError。利用這個 hook 你能夠檢查這個錯誤(以及有問題的相關數據), 並能夠 改變 這個異常。固然你還能夠作些別的事情,好比記錄日誌或返回原始異常。

注意,onOperatorError hook 也能夠被屢次設置:你能夠提供一個 String 爲一個特別的 BiFunction 類型的函數式設置識別符,不一樣識別符的函數式都會被執行,固然,重複使用一個識別符的話, 則後來的設置會覆蓋前邊的設置。

所以,默認的 hook 能夠使用 Hooks.resetOnOperatorError() 方法重置,而提供識別符的 hook 能夠使用 Hooks.resetOnOperatorError(String) 方法來重置。

8.7.3. 組裝 Hooks

這些組裝(assembly) hooks 關聯了操做符的生命週期。它們會在一個操做鏈被組裝起來的時候(即實例化的時候) 被調用。每個新的操做符組裝到操做鏈上的時候,onEachOperator 都會返回一個不一樣的發佈者, 從而能夠利用它動態調整操做符。onLastOperator 與之相似,不過只會在被操做鏈上的最後一個 (subscribe 調用以前的)操做符調用。

相似於 onOperatorError,也能夠疊加,而且經過識別符來標識。也是用相似的方式重置所有或部分 hooks。

8.7.4. 預置 Hooks

Hooks 工具類還提供了一些預置的 hooks。利用他們能夠改變一些默認的處理方式,而不用本身 編寫 hook:

  • onNextDroppedFail()onNextDropped 一般會拋出 Exceptions.failWithCancel() 異常。 如今它默認還會以 DEBUG 級別對被丟棄的值記錄日誌。若是想回到原來的只是拋出異常的方式,使用 onNextDroppedFail()
  • onOperatorDebug(): 這個方法會激活 debug mode。它與 onOperatorError hook 關聯,因此調用 resetOnOperatorError() 同時也會重置它。不過它內部也用到了特別的識別符, 你能夠經過 resetOnOperatorDebug() 方法來重置它。

8.8. 增長一個 Context 到響應式序列

當從命令式編程風格切換到響應式編程風格的時候,一個技術上最大的挑戰就是線程處理。

與習慣作法不一樣的是,在響應式編程中,一個線程(Thread)能夠被用於處理多個同時運行的異步序列 (其實是非阻塞的)。執行過程也會常常從一個線程切換到另外一個線程。

這樣的狀況下,對於開發者來講,若是依賴線程模型中相對「穩定」的特性——好比 ThreadLocal ——就會變得很難。由於它會讓你將數據綁定到一個 線程 上,因此在響應式環境中使用就變得 比較困難。所以,將使用了 ThreadLocal 的庫應用於 Reactor 的時候就會帶來新的挑戰。一般會更糟, 它用起來效果會更差,甚至會失敗。 好比,使用 Logback 的 MDC 來存儲日誌關聯的 ID,就是一個很是符合 這種狀況的例子。

一般的對 ThreadLocal 的替代方案是將環境相關的數據 C,同業務數據 T 一塊兒置於序列中, 好比使用 Tuple2<T, C>。這種方案看起來並很差,何況會在方法和 Flux 泛型中暴露環境數據信息。

自從版本 3.1.0,Reactor 引入了一個相似於 ThreadLocal 的高級功能:Context。它做用於一個 Flux 或一個 Mono 上,而不是應用於一個線程(Thread)。

爲了說明,這裏有個讀寫 Context 的簡單例子:

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                                   .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "World"));

StepVerifier.create(r)
            .expectNext("Hello World")
            .verifyComplete();

接下來的幾個小節,咱們來了解 Context 是什麼以及如何用,從而最終能夠理解上邊的例子。

這是一個主要面向庫開發人員的高級功能。這須要開發者對 Subscription 的生命週期 充分理解,而且明白它主要用於 subscription 相關的庫。

8.8.1. Context API

Context 是一個相似於 Map(這種數據結構)的接口:它存儲鍵值(key-value)對,你須要經過 key 來獲取值:

  • key 和 value 都是 Object 類型,因此 Context 能夠包含任意數量的任意對象。
  • Context不可變的(immutable)
  • put(Object key, Object value) 方法來存儲一個鍵值對,返回一個新的 Context 對象。 你也能夠用 putAll(Context) 方法將兩個 context 合併爲一個新的 context。
  • hasKey(Object key) 方法檢查一個 key 是否已經存在。
  • getOrDefault(Object key, T defaultValue) 方法取回 key 對應的值(類型轉換爲 T), 或在找不到這個 key 的狀況下返回一個默認值。
  • getOrEmpty(Object key) 來獲得一個 Optional<T> (context 會嘗試將值轉換爲 T)。
  • delete(Object key) 來刪除 key 關聯的值,並返回一個新的 Context
建立一個 Context 時,你能夠用靜態方法 Context.of 預先存儲最多 5 個鍵值對。 它接受 2, 4, 6, 8 或 10 個 Object 對象,兩兩一對做爲鍵值對添加到 Context。 你也能夠用 Context.empty() 方法來建立一個空的 Context

8.8.2. 把 Context 綁定到 Flux and Writing

爲了使用 context,它必需要綁定到一個指定的序列,而且鏈上的每一個操做符均可以訪問它。 注意,這裏的操做符必須是 Reactor 內置的操做符,由於 Context 是 Reactor 特有的。

實際上,一個 Context 是綁定到每個鏈中的 Subscriber 上的。 它使用 Subscription 的傳播機制來讓本身對每個操做符均可見(從最後一個 subscribe 沿鏈向上)。

爲了填充 Context ——只能在訂閱時(subscription time)——你須要使用 subscriberContext 操做符。

subscriberContext(Context) 方法會將你提供的 Context 與來自下游(記住,Context 是從下游 向上遊傳播的)的 Context合併。 這經過調用 putAll 實現,最後會生成一個新的 Context 給上游。

你也能夠用更高級的 subscriberContext(Function<Context, Context>)。它接受來自下游的 Context,而後你能夠根據須要添加或刪除值,而後返回新的 Context。你甚至能夠返回一個徹底不一樣 的對象,雖然不太建議這樣(這樣可能影響到依賴這個 Context 的庫)。

8.8.3. 讀取 Context

填充 Context 是一方面,讀取數據一樣重要。多數時候,添加內容到 Context 是最終用戶的責任, 可是利用這些信息是庫的責任,由於庫一般是客戶代碼的上游。

讀取 context 數據使用靜態方法 Mono.subscriberContext()

8.8.4. 簡單的例子

本例的初衷是爲了讓你對如何使用 Context 有個更好的理解。

讓咱們先回頭看一下最初的例子:

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext() 
                                   .map( ctx -> s + " " + ctx.get(key))) 
                .subscriberContext(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello World") 
            .verifyComplete();
操做鏈以調用 subscriberContext(Function) 結尾,將 "World" 做爲 "message" 這個 key 的 值添加到 Context 中。
對源調用 flatMapMono.subscriberContext() 方法拿到 Context
而後使用 map 讀取關聯到 "message" 的值,而後與原來的值鏈接。
最後 Mono<String> 確實發出了 "Hello World"
上邊的數字順序並非按照代碼行順序排的,這並不是錯誤:它表明了執行順序。雖然 subscriberContext 是鏈上的最後一個環節,但確實最早執行的(緣由在於 subscription 信號 是從下游向上的)。

注意在你的操做鏈中,寫入讀取 Context相對位置 很重要:由於 Context 是不可變的,它的內容只能被上游的操做符看到,以下例所示:

String key = "message";
Mono<String> r = Mono.just("Hello")
                     .subscriberContext(ctx -> ctx.put(key, "World")) 
                     .flatMap( s -> Mono.subscriberContext()
                                        .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger")));  

StepVerifier.create(r)
            .expectNext("Hello Stranger") 
            .verifyComplete();
寫入 Context 的位置太靠上了…
因此在 flatMap 就沒有 key 關聯的值,使用了默認值。
結果 Mono<String> 發出了 "Hello Stranger"

下面的例子一樣說明了 Context 的不可變性(Mono.subscriberContext() 老是返回由 subscriberContext 配置的 Context):

String key = "message";

Mono<String> r = Mono.subscriberContext() 
        .map( ctx -> ctx.put(key, "Hello")) 
        .flatMap( ctx -> Mono.subscriberContext()) 
        .map( ctx -> ctx.getOrDefault(key,"Default")); 

StepVerifier.create(r)
        .expectNext("Default") 
        .verifyComplete();
拿到 Context
map 方法中咱們嘗試修改它。
flatMap 中再次獲取 Context
讀取 Context 中可能的值。
值歷來沒有被設置爲 "Hello"

相似的,若是屢次對 Context 中的同一個 key 賦值的話,要看 寫入的相對順序 : 讀取 Context 的操做符只能拿到下游最近的一次寫入的值,以下例所示:

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext()
                                   .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Reactor")) 
                .subscriberContext(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello Reactor") 
            .verifyComplete();
寫入 "message" 的值。
另外一次寫入 "message" 的值。
map 方法值能拿到下游最近的一次寫入的值: "Reactor"

這裏,首先 Context 中的 key 被賦值 "World"。而後訂閱信號(subscription signal)向上遊 移動,又發生了另外一次寫入。此次生成了第二個不變的 Context,裏邊的值是 "Reactor"。以後, 數據開始流動, flatMap 拿到最近的 Context ,也就是第二個值爲 ReactorContext

你可能會以爲 Context 是與數據信號一塊傳播的。若是是那樣的話,在兩次寫入操做中間加入的一個 flatMap 會使用最上游的這個 Context。但並非這樣的,以下:

String key = "message";
Mono<String> r = Mono.just("Hello")
                     .flatMap( s -> Mono.subscriberContext()
                                        .map( ctx -> s + " " + ctx.get(key))) 
                     .subscriberContext(ctx -> ctx.put(key, "Reactor")) 
                     .flatMap( s -> Mono.subscriberContext()
                                        .map( ctx -> s + " " + ctx.get(key))) 
                     .subscriberContext(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello Reactor World") 
            .verifyComplete();
這裏是第一次賦值。
這裏是第二次賦值。
第一個 flatMap 看到的是第二次的賦值。
第二個 flatMap 將上一個的結果與 第一次賦值 的 context 值鏈接。
Mono 發出的是 "Hello Reactor World"

緣由在於 Context 是與 Subscriber 關聯的,而每個操做符訪問的 Context 來自其下游的 Subscriber

最後一個有意思的傳播方式是,對 Context 的賦值也能夠在一個 flatMap 內部,以下:

String key = "message";
Mono<String> r =
        Mono.just("Hello")
            .flatMap( s -> Mono.subscriberContext()
                               .map( ctx -> s + " " + ctx.get(key))
            )
            .flatMap( s -> Mono.subscriberContext()
                               .map( ctx -> s + " " + ctx.get(key))
                               .subscriberContext(ctx -> ctx.put(key, "Reactor")) 
            )
            .subscriberContext(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello World Reactor")
            .verifyComplete();
這個 subscriberContext 不會影響所在 flatMap 以外的任何東西。
這個 subscriberContext 會影響主序列的 Context

上邊的例子中,最後發出的值是 "Hello World Reactor" 而不是 "Hello Reactor World",由於賦值 "Reactor" 的 subscriberContext 是做用於第二個 flatMap 的內部序列的。因此不會在主序列可見/ 傳播,第一個 flatMap 也看不到它。傳播(Propagation) + 不可變性(immutability)將相似 flatMap 這樣的操做符中的建立的內部序列中的 Context 與外部隔離開來。

8.8.5. 完整的例子

讓咱們來看一個實際的從 Context 中讀取值的例子:一個響應式的 HTTP 客戶端將一個 Mono<String> (用於 PUT 請求)做爲數據源,同時經過一個特定的 key 使用 Context 將關聯的ID信息放入請求頭中。

從用戶角度,是這樣調用的:

doPut("www.example.com", Mono.just("Walter"))

爲了傳播一個關聯ID,應該這樣調用:

doPut("www.example.com", Mono.just("Walter"))
        .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))

由上可見,用戶代碼使用了 subscriberContext 來爲 ContextHTTP_CORRELATION_ID 賦值。上游的操做符是一個由 HTTP 客戶端庫返回的 Mono<Tuple2<Integer, String>> (一個簡化的 HTTP 響應)。因此可以正確將信息從用戶代碼傳遞給庫代碼。

下邊的例子演示了從庫的角度由 context 讀取值的模擬代碼,若是可以找到關聯ID,則「增長請求」:

static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
        Mono<Tuple2<String, Optional<Object>>> dataAndContext =
                        data.zipWith(Mono.subscriberContext() 
                                         .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))); 

        return dataAndContext
                        .<String>handle((dac, sink) -> {
                                if (dac.getT2().isPresent()) { 
                                        sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get());
                                }
                                else {
                                        sink.next("PUT <" + dac.getT1() + "> sent to " + url);
                                }
                                sink.complete();
                        })
                        .map(msg -> Tuples.of(200, msg));
}
Mono.subscriberContext() 拿到 Context
提取出關聯ID的值——是一個 Optional
若是值存在,那麼就將其加入請求頭。

在這段庫代碼片斷中,你能夠看到它是如何將 MonoMono.subscriberContext() zip 起來的。 返回的是一個 Tuple2<String, Context>,這個 Context 包含來自下游的 HTTP_CORRELATION_ID 的值。

庫代碼接着用 map 讀取出那個 key 的值 Optional<String>,若是值存在,將其做爲 X-Correlation-ID 請求頭。 最後一塊而用 handle 來處理。

用來驗證上邊的庫代碼的測試程序以下:

@Test
public void contextForLibraryReactivePut() {
        Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
                        .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
                        .filter(t -> t.getT1() < 300)
                        .map(Tuple2::getT2);

        StepVerifier.create(put)
                    .expectNext("PUT <Walter> sent to www.example.com with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
                    .verifyComplete();
}

8.9. 空值安全

雖然 Java 的類型系統沒有表達空值安全(null-safety)的機制,可是 Reactor 如今提供了基於註解的用於聲明 「可能爲空(nullability)」的 API,相似於 Spring Framework 5 中提供的 API。

Reactor 自身就用到了這些註解,你也能夠將其用於任何基於 Reactor 的本身的空值安全的 Java API 中。 不過,在 方法體內部 對「可能爲空」的類型的使用就不在這一特性的範圍內了。

這些註解是基於 JSR 305 的註解(是受相似 IntelliJ IDEA 這樣的工具支持的 JSR)做爲元註解(meta-annotated)的。當 Java 開發者在編寫空值安全的代碼時, 它們可以提供有用的警告信息,以便避免在運行時(runtime)出現 NullPointerException 異常。 JSR 305 元註解使得工具提供商能夠以一種通用的方式提供對空值安全的支持,從而 Reactor 的註解就不用重複造輪子了。

對於 Kotlin 1.1.5+,須要(同時也推薦)在項目 classpath 中添加對 JSR 305 的依賴。

它們也可在 Kotlin 中使用,Kotlin 原生支持 空值安全。具體請參考 this dedicated section

reactor.util.annotation 包提供如下註解:

  • @NonNull 代表一個具體的參數、返回值或域值不能爲 null。 (若是參數或返回值應用了 @NonNullApi 則無需再加它)。
  • @Nullable 代表一個參數、返回值或域值能夠爲 null
  • @NonNullApi 是一個包級別的註解,代表默認狀況下參數或返回值不能爲 null
(Reactor 的空值安全的註解)對於通用類型參數(generic type arguments)、可變參數(varargs),以及數組元素(array elements) 尚不支持。參考 issue #878 查看最新信息。

翻譯建議 - "高級特性與概念"

Appendix A: 我須要哪一個操做符?

TIP:在這一節,若是一個操做符是專屬於 FluxMono 的,那麼會給它註明前綴。 公共的操做符沒有前綴。若是一個具體的用例涉及多個操做符的組合,這裏以方法調用的方式展示, 會以一個點(.)開頭,並將參數置於圓括號內,好比: .methodCall(parameter)

我想搞定:

A.1. 建立一個新序列,它…

  • 發出一個 T,我已經有了:just
    • …基於一個 Optional<T>Mono#justOrEmpty(Optional<T>)
    • …基於一個可能爲 null 的 T:Mono#justOrEmpty(T)
  • 發出一個 T,且仍是由 just 方法返回
    • …可是「懶」建立的:使用 Mono#fromSupplier 或用 defer 包裝 just
  • 發出許多 T,這些元素我能夠明確列舉出來:Flux#just(T...)
  • 基於迭代數據結構:
    • 一個數組:Flux#fromArray
    • 一個集合或 iterable:Flux#fromIterable
    • 一個 Integer 的 range:Flux#range
    • 一個 Stream 提供給每個訂閱:Flux#fromStream(Supplier<Stream>)
  • 基於一個參數值給出的源:
    • 一個 Supplier<T>Mono#fromSupplier
    • 一個任務:Mono#fromCallableMono#fromRunnable
    • 一個 CompletableFuture<T>Mono#fromFuture
  • 直接完成:empty
  • 當即生成錯誤:error
    • …可是「懶」的方式生成 Throwableerror(Supplier<Throwable>)
  • 什麼都不作:never
  • 訂閱時才決定:defer
  • 依賴一個可回收的資源:using
  • 可編程地生成事件(能夠使用狀態):
    • 同步且逐個的:Flux#generate
    • 異步(也可同步)的,每次儘量多發出元素:Flux#createMono#create 也是異步的,只不過只能發一個)

A.2. 對序列進行轉化

  • 我想轉化一個序列:
    • 1對1地轉化(好比字符串轉化爲它的長度):map
      • …類型轉化:cast
      • …爲了得到每一個元素的序號:Flux#index
    • 1對n地轉化(如字符串轉化爲一串字符):flatMap + 使用一個工廠方法
    • 1對n地轉化可自定義轉化方法和/或狀態:handle
    • 對每個元素執行一個異步操做(如對 url 執行 http 請求):flatMap + 一個異步的返回類型爲 Publisher 的方法
      • …忽略一些數據:在 flatMap lambda 中根據條件返回一個 Mono.empty()
      • …保留原來的序列順序:Flux#flatMapSequential(對每一個元素的異步任務會當即執行,但會將結果按照原序列順序排序)
      • …當 Mono 元素的異步任務會返回多個元素的序列時:Mono#flatMapMany
  • 我想添加一些數據元素到一個現有的序列:
    • 在開頭添加:Flux#startWith(T...)
    • 在最後添加:Flux#concatWith(T...)
  • 我想將 Flux 轉化爲集合(一下都是針對 Flux 的)
    • 轉化爲 List:collectListcollectSortedList
    • 轉化爲 Map:collectMapcollectMultiMap
    • 轉化爲自定義集合:collect
    • 計數:count
    • reduce 算法(將上個元素的reduce結果與當前元素值做爲輸入執行reduce方法,如sum) reduce
      • …將每次 reduce 的結果當即發出:scan
    • 轉化爲一個 boolean 值:
      • 對全部元素判斷都爲true:all
      • 對至少一個元素判斷爲true:any
      • 判斷序列是否有元素(不爲空):hasElements
      • 判斷序列中是否有匹配的元素:hasElement
  • 我想合併 publishers…
    • 按序鏈接:Flux#concat.concatWith(other)
      • …即便有錯誤,也會等全部的 publishers 鏈接完成:Flux#concatDelayError
      • …按訂閱順序鏈接(這裏的合併仍然能夠理解成序列的鏈接):Flux#mergeSequential
    • 按元素髮出的順序合併(不管哪一個序列的,元素先到先合併):Flux#merge / .mergeWith(other)
      • …元素類型會發生變化:Flux#zip / Flux#zipWith
    • 將元素組合:
      • 2個 Monos 組成1個 Tuple2Mono#zipWith
      • n個 Monos 的元素都發出來後組成一個 Tuple:Mono#zip
    • 在終止信號出現時「採起行動」:
      • 在 Mono 終止時轉換爲一個 Mono<Void>Mono#and
      • 當 n 個 Mono 都終止時返回 Mono<Void>Mono#when
      • 返回一個存放組合數據的類型,對於被合併的多個序列:
        • 每一個序列都發出一個元素時:Flux#zip
        • 任何一個序列發出元素時:Flux#combineLatest
    • 只取各個序列的第一個元素:Flux#firstMono#firstmono.or (otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux)
    • 由一個序列觸發(相似於 flatMap,不過「喜新厭舊」):switchMap
    • 由每一個新序列開始時觸發(也是「喜新厭舊」風格):switchOnNext
  • 我想重複一個序列:repeat
    • …可是以必定的間隔重複:Flux.interval(duration).flatMap(tick -> myExistingPublisher)
  • 我有一個空序列,可是…
    • 我想要一個缺省值來代替:defaultIfEmpty
    • 我想要一個缺省的序列來代替:switchIfEmpty
  • 我有一個序列,可是我對序列的元素值不感興趣:ignoreElements
    • …而且我但願用 Mono 來表示序列已經結束:then
    • …而且我想在序列結束後等待另外一個任務完成:thenEmpty
    • …而且我想在序列結束以後返回一個 MonoMono#then(mono)
    • …而且我想在序列結束以後返回一個值:Mono#thenReturn(T)
    • …而且我想在序列結束以後返回一個 FluxthenMany
  • 我有一個 Mono 但我想延遲完成…
    • …當有1個或N個其餘 publishers 都發出(或結束)時才完成:Mono#delayUntilOther
      • …使用一個函數式來定義如何獲取「其餘 publisher」:Mono#delayUntil(Function)
  • 我想基於一個遞歸的生成序列的規則擴展每個元素,而後合併爲一個序列發出:
    • …廣度優先:expand(Function)
    • …深度優先:expandDeep(Function)

A.3. 「窺視」(只讀)序列

  • 再不對序列形成改變的狀況下,我想:
    • 獲得通知或執行一些操做:
      • 發出元素:doOnNext
      • 序列完成:Flux#doOnCompleteMono#doOnSuccess
      • 因錯誤終止:doOnError
      • 取消:doOnCancel
      • 訂閱時:doOnSubscribe
      • 請求時:doOnRequest
      • 完成或錯誤終止:doOnTerminate(Mono的方法可能包含有結果)
        • 可是在終止信號向下遊傳遞 以後doAfterTerminate
      • 全部類型的信號(Signal):Flux#doOnEach
      • 全部結束的狀況(完成complete、錯誤error、取消cancel):doFinally
    • 記錄日誌:log
  • 我想知道全部的事件:
    • 每個事件都體現爲一個 single 對象:
      • 執行 callback:doOnEach
      • 每一個元素轉化爲 single 對象:materialize
        • …在轉化回元素:dematerialize
    • 轉化爲一行日誌:log

A.4. 過濾序列

  • 我想過濾一個序列
    • 基於給定的判斷條件:filter
      • …異步地進行判斷:filterWhen
    • 僅限於指定類型的對象:ofType
    • 忽略全部元素:ignoreElements
    • 去重:
      • 對於整個序列:Flux#distinct
      • 去掉連續重複的元素:Flux#distinctUntilChanged
  • 我只想要一部分序列:
    • 只要 N 個元素:
      • 從序列的第一個元素開始算:Flux#take(long)
        • …取一段時間內發出的元素:Flux#take(Duration)
        • …只取第一個元素放到 Mono 中返回:Flux#next()
        • …使用 request(N) 而不是取消:Flux#limitRequest(long)
      • 從序列的最後一個元素倒數:Flux#takeLast
      • 直到知足某個條件(包含):Flux#takeUntil(基於判斷條件),Flux#takeUntilOther(基於對 publisher 的比較)
      • 直到知足某個條件(不包含):Flux#takeWhile
    • 最多隻取 1 個元素:
      • 給定序號:Flux#elementAt
      • 最後一個:.takeLast(1)
        • …若是爲序列空則發出錯誤信號:Flux#last()
        • …若是序列爲空則返回默認值:Flux#last(T)
    • 跳過一些元素:
      • 從序列的第一個元素開始跳過:Flux#skip(long)
        • …跳過一段時間內發出的元素:Flux#skip(Duration)
      • 跳過最後的 n 個元素:Flux#skipLast
      • 直到知足某個條件(包含):Flux#skipUntil(基於判斷條件),Flux#skipUntilOther (基於對 publisher 的比較)
      • 直到知足某個條件(不包含):Flux#skipWhile
    • 採樣:
      • 給定採樣週期:Flux#sample(Duration)
        • 取採樣週期裏的第一個元素而不是最後一個:sampleFirst
      • 基於另外一個 publisher:Flux#sample(Publisher)
      • 基於 publisher「超時」:Flux#sampleTimeout (每個元素會觸發一個 publisher,若是這個 publisher 不被下一個元素觸發的 publisher 覆蓋就發出這個元素)
  • 我只想要一個元素(若是多於一個就返回錯誤)…
    • 若是序列爲空,發出錯誤信號:Flux#single()
    • 若是序列爲空,發出一個缺省值:Flux#single(T)
    • 若是序列爲空就返回一個空序列:Flux#singleOrEmpty

A.5. 錯誤處理

  • 我想建立一個錯誤序列:error
    • …替換一個完成的 Flux.concat(Flux.error(e))
    • …替換一個完成的 Mono.then(Mono.error(e))
    • …若是元素超時未發出:timeout
    • …「懶」建立:error(Supplier<Throwable>)
  • 我想要相似 try/catch 的表達方式:
    • 拋出異常:error
    • 捕獲異常:
      • 而後返回缺省值:onErrorReturn
      • 而後返回一個 FluxMonoonErrorResume
      • 包裝異常後再拋出:.onErrorMap(t -> new RuntimeException(t))
    • finally 代碼塊:doFinally
    • Java 7 以後的 try-with-resources 寫法:using 工廠方法
  • 我想從錯誤中恢復…
    • 返回一個缺省的:
      • 的值:onErrorReturn
      • PublisherFlux#onErrorResumeMono#onErrorResume
    • 重試:retry
      • …由一個用於伴隨 Flux 觸發:retryWhen
  • 我想處理回壓錯誤(向上遊發出「MAX」的 request,若是下游的 request 比較少,則應用策略)…
    • 拋出 IllegalStateExceptionFlux#onBackpressureError
    • 丟棄策略:Flux#onBackpressureDrop
      • …可是不丟棄最後一個元素:Flux#onBackpressureLatest
    • 緩存策略(有限或無限):Flux#onBackpressureBuffer
      • …當有限的緩存空間用滿則應用給定策略:Flux#onBackpressureBuffer 帶有策略 BufferOverflowStrategy

A.6. 基於時間的操做

  • 我想將元素轉換爲帶有時間信息的 Tuple2<Long, T>
    • 從訂閱時開始:elapsed
    • 記錄時間戳:timestamp
  • 若是元素間延遲過長則停止序列:timeout
  • 以固定的週期發出元素:Flux#interval
  • 在一個給定的延遲後發出 0:static Mono.delay.
  • 我想引入延遲:
    • 對每個元素:Mono#delayElementFlux#delayElements
    • 延遲訂閱:delaySubscription

A.7. 拆分 Flux

  • 我想將一個 Flux<T> 拆分爲一個 Flux<Flux<T>>
    • 以個數爲界:window(int)
      • …會出現重疊或丟棄的狀況:window(int, int)
    • 以時間爲界:window(Duration)
      • …會出現重疊或丟棄的狀況:window(Duration, Duration)
    • 以個數或時間爲界:windowTimeout(int, Duration)
    • 基於對元素的判斷條件:windowUntil
      • …觸發判斷條件的元素會分到下一波(cutBefore 變量):.windowUntil(predicate, true)
      • …知足條件的元素在一波,直到不知足條件的元素髮出開始下一波:windowWhile (不知足條件的元素會被丟棄)
    • 經過另外一個 Publisher 的每個 onNext 信號來拆分序列:window(Publisher)windowWhen
  • 我想將一個 Flux<T> 的元素拆分到集合…
    • 拆分爲一個一個的 List:
      • 以個數爲界:buffer(int)
        • …會出現重疊或丟棄的狀況:buffer(int, int)
      • 以時間爲界:buffer(Duration)
        • …會出現重疊或丟棄的狀況:buffer(Duration, Duration)
      • 以個數或時間爲界:bufferTimeout(int, Duration)
      • 基於對元素的判斷條件:bufferUntil(Predicate)
        • …觸發判斷條件的元素會分到下一個buffer:.bufferUntil(predicate, true)
        • …知足條件的元素在一個buffer,直到不知足條件的元素髮出開始下一buffer:bufferWhile(Predicate)
      • 經過另外一個 Publisher 的每個 onNext 信號來拆分序列:buffer(Publisher)bufferWhen
    • 拆分到指定類型的 "collection":buffer(int, Supplier<C>)
  • 我想將 Flux<T> 中具備共同特徵的元素分組到子 Flux:groupBy(Function<T,K>) TIP:注意返回值是 Flux<GroupedFlux<K, T>>,每個 GroupedFlux 具備相同的 key 值 K,能夠經過 key() 方法獲取。

A.8. 回到同步的世界

  • 我有一個 Flux<T>,我想:
    • 在拿到第一個元素前阻塞:Flux#blockFirst
      • …並給出超時時限:Flux#blockFirst(Duration)
    • 在拿到最後一個元素前阻塞(若是序列爲空則返回 null):Flux#blockLast
      • …並給出超時時限:Flux#blockLast(Duration)
    • 同步地轉換爲 Iterable<T>Flux#toIterable
    • 同步地轉換爲 Java 8 Stream<T>Flux#toStream
  • 我有一個 Mono<T>,我想:
    • 在拿到元素前阻塞:Mono#block
      • …並給出超時時限:Mono#block(Duration)
    • 轉換爲 CompletableFuture<T>Mono#toFuture

翻譯建議 - "我須要哪一個操做符?"

Appendix B: FAQ,最佳實踐,以及「我如何…?」

B.1. 如何包裝一個同步阻塞的調用?

不少時候,信息源是同步和阻塞的。在 Reactor 中,咱們用如下方式處理這種信息源:

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());
使用 fromCallable 方法生成一個 Mono;
返回同步、阻塞的資源;
使用 Schedulers.elastic() 確保對每個訂閱來講運行在一個專門的線程上。

由於調用返回一個值,因此你應該使用 Mono。你應該使用 Schedulers.elastic 由於它會建立一個專門的線程來等待阻塞的調用返回。

注意 subscribeOn 方法並不會「訂閱」這個 Mono。它只是指定了訂閱操做使用哪一個 Scheduler

B.2. 用在 Flux 上的操做符好像沒起做用,爲啥?

請確認你確實對調用 .subscribe() 的發佈者應用了這個操做符。

Reactor 的操做符是裝飾器(decorators)。它們會返回一個不一樣的(發佈者)實例, 這個實例對上游序列進行了包裝並增長了一些的處理行爲。因此,最推薦的方式是將操做符「串」起來。

對比下邊的兩個例子:

沒有串起來(不正確的)

Flux<String> flux = Flux.just("foo", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); 
flux.subscribe(next -> System.out.println("Received: " + next));
問題在這, flux 變量並無改變。

串起來(正確的)

Flux<String> flux = Flux.just("foo", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));

下邊的例子更好(由於更簡潔):

串起來(最好的)

Flux<String> secrets = Flux
  .just("foo", "chain")
  .map(secret -> secret.replaceAll(".", "*"))
  .subscribe(next -> System.out.println("Received: " + next));

第一個例子的輸出:

Received: foo
Received: chain

後兩個例子的輸出:

Received: ***
Received: *****

B.3. Mono zipWith/zipWhen 沒有被調用

例子

myMethod.process("a") // 這個方法返回 Mono<Void>
        .zipWith(myMethod.process("b"), combinator) //沒有被調用
        .subscribe();

若是源 Mono 爲空或是一個 Mono<Void>Mono<Void> 一般用於「空」的場景), 下邊的組合操做就不會被調用。

對於相似 zipWith 的用於轉換的操做符來講,這是比較典型的場景。 這些操做符依賴於數據元素來轉換爲輸出的元素。 若是任何一個序列是空的,則返回的就是一個空序列,因此請謹慎使用。 例如在 then() 以後使用 zipWith() 就會致使這一問題。

對於以 Function 做爲參數的 and 更是如此,由於返回的 Mono 是依賴於收到的數據懶加載的(而對於空序列或 Void 的序列來講是沒有數據發出來的)。

你能夠使用 .defaultIfEmpty(T) 將空序列替換爲包含 T 類型缺省值的序列(而不是 Void 序列), 從而能夠避免相似的狀況出現。舉例以下:

zipWhen 前使用 defaultIfEmpty

myMethod.emptySequenceForKey("a") // 這個方法返回一個空的 Mono<String>
        .defaultIfEmpty("") // 將空序列轉換爲包含字符串 "" 的序列
        .zipWhen(aString -> myMethod.process("b")) // 當 "" 發出時被調用
        .subscribe();

B.4. 如何用 retryWhen 來實現 retry(3) 的效果?

retryWhen 方法比較複雜,但願下邊的一段模擬 retry(3) 的代碼可以幫你更好地理解它的工做方式:

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), 
          (error, index) -> { 
            if (index < 4) return index; 
            else throw Exceptions.propagate(error); 
          })
    );
技巧一:使用 zip 和一個「重試個數 + 1」的 range
zip 方法讓你能夠在對重試次數計數的同時,仍掌握着原始的錯誤(error)。
容許三次重試,小於 4 的時候發出一個值。
爲了使序列以錯誤結束。咱們將原始異常在三次重試以後拋出。

B.5. 如何使用 retryWhen 進行 exponential backoff?

Exponential backoff 的意思是進行的屢次重試之間的間隔愈來愈長, 從而避免對源系統形成過載,甚至宕機。基本原理是,若是源產生了一個錯誤, 那麼已是處於不穩定狀態,可能不會馬上覆原。因此,若是馬上就重試可能會產生另外一個錯誤, 致使源更加不穩定。

下面是一段實現 exponential backoff 效果的例子,每次重試的間隔都會遞增 (僞代碼: delay = attempt number * 100 milliseconds):

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
        .doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) 
        .zipWith(Flux.range(1, 4), (error, index) -> { 
          if (index < 4) return index;
          else throw Exceptions.propagate(error);
        })
        .flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) 
        .doOnNext(s -> System.out.println("retried at " + LocalTime.now())) 
    );
記錄錯誤出現的時間;
使用 retryWhen + zipWith 的技巧實現重試3次的效果;
經過 flatMap 來實現延遲時間遞增的效果;
一樣記錄重試的時間。

訂閱它,輸出以下:

java.lang.IllegalArgumentException at 18:02:29.338
retried at 18:02:29.459 
java.lang.IllegalArgumentException at 18:02:29.460
retried at 18:02:29.663 
java.lang.IllegalArgumentException at 18:02:29.663
retried at 18:02:29.964 
java.lang.IllegalArgumentException at 18:02:29.964
第一次重試延遲大約 100ms
第二次重試延遲大約 200ms
第三次重試延遲大約 300ms

B.6. How do I ensure thread affinity using publishOn()?

Schedulers 所述,publishOn() 能夠用來切換執行線程。 publishOn 可以影響到其以後的操做符的執行線程,直到有新的 publishOn 出現。 因此 publishOn 的位置很重要。

好比下邊的例子, map() 中的 transform 方法是在 scheduler1 的一個工做線程上執行的, 而 doOnNext() 中的 processNext 方法是在 scheduler2 的一個工做線程上執行的。 單線程的調度器可能用於對不一樣階段的任務或不一樣的訂閱者確保線程關聯性。

EmitterProcessor<Integer> processor = EmitterProcessor.create();
processor.publishOn(scheduler1)
         .map(i -> transform(i))
         .publishOn(scheduler2)
         .doOnNext(i -> processNext(i))
         .subscribe();

翻譯建議 - "FAQ,最佳實踐,以及「我如何…?」"

Appendix C: Reactor-Extra

reactor-extra 爲知足 reactor-core 用戶的更高級需求,提供了一些額外的操做符和工具。

因爲這是一個單獨的包,使用時須要明確它的依賴:

dependencies {
     compile 'io.projectreactor:reactor-core'
     compile 'io.projectreactor.addons:reactor-extra' 
}
添加 reactor-extra 的依賴。參考 獲取 Reactor 瞭解爲何使用BOM的狀況下不須要指定 version。

C.1. TupleUtils 以及函數式接口

在 Java 8 提供的函數式接口基礎上,reactor.function 包又提供了一些支持 3 到 8 個值的 FunctionPredicateConsumer

TupleUtils 提供的靜態方法能夠方便地用於將相應的 Tuple 函數式接口的 lambda 轉換爲更簡單的接口。

這使得咱們在使用 Tuple 中各成員的時候更加容易,好比:

.map(tuple -> {
  String firstName = tuple.getT1();
  String lastName = tuple.getT2();
  String address = tuple.getT3();

  return new Customer(firstName, lastName, address);
});

能夠用下面的方式代替:

.map(TupleUtils.function(Customer::new));
(由於 Customer 的構造方法符合 Consumer3 的函數式接口標籤)

C.2. MathFlux 的數學操做符

Treactor.math 包的 MathFlux 提供了一些用於數學計算的操做符,如 maxminsumIntaverageDouble

C.3. 重複與重試工具

reactor.retry 包中有一些可以幫助實現 Flux#repeatWhenFlux#retryWhen 的工具。入口點(entry points)就是 RepeatRetry 接口的工廠方法。

兩個接口均可用做可變的構建器(mutative builder),而且相應的實現(implementing) 均可做爲 Function 用於對應的操做符。

C.4. 調度器

Reactor-extra 提供了若干專用的調度器: - ForkJoinPoolScheduler,位於 reactor.scheduler.forkjoin 包; - SwingScheduler,位於 reactor.swing 包; - SwtScheduler,位於 reactor.swing 包。

相關文章
相關標籤/搜索