瘋狂創客圈 經典圖書 : 《Netty Zookeeper Redis 高併發實戰》 面試必備 + 面試必備 + 面試必備 【博客園總入口 】html
瘋狂創客圈 經典圖書 : 《SpringCloud、Nginx高併發核心編程》 大廠必備 + 大廠必備 + 大廠必備 【博客園總入口 】前端
入大廠+漲工資必備: 高併發【 億級流量IM實戰】 實戰系列 【 SpringCloud Nginx秒殺】 實戰系列 【博客園總入口 】java
網盤地址 和 提取碼:請參見 瘋狂創客圈 的 百度網盤小視頻和小工具react
Stephane Maldini @smaldini Simon Baslé @simonbasle3.2.0.BUILD-SNAPSHOTgit
Appendix A: 我須要哪一個操做符?github
Appendix B: FAQ,最佳實踐,以及「我如何…?」面試
Appendix C: Reactor-Extra算法
(譯者加)本文檔的一些典型的名詞以下:Publisher (發佈者)、Subscriber (訂閱者)、Subscription (訂閱 n.)、subscribe (訂閱 v.)。event /signal (事件/信號,原文常甚至在一個句子將兩個詞來回用,但表示的意思是基本相同的, 所以若是你看到本文翻譯有時候用事件,有時候用信號,在本文檔內基本能夠認爲一個意思)。sequence /stream (序列/流,兩個詞意思類似,本文介紹的是響應式流的內容,可是出現比較多的是 sequence這個詞,主要翻譯爲「序列」,有些地方爲了更加契合且方便理解翻譯爲「流序列」)。element /item (主要指序列中的元素,文中兩個詞基本翻譯爲「元素」)。emit /produce /generate (發出/產生/生成,文中這三個英文詞也有類似之處,對於 emit 多翻譯爲 「發出」,對於後兩個多翻譯爲「生成」)、consume (消費)。Processor (未作翻譯,保留英文)。operator (譯做操做符,聲明式的可組裝的響應式方法,其組裝成的鏈譯做「操做鏈」)。 |
|
---|---|
本節是對 Reactor參考文檔(譯者加:原文估計是多我的寫的,時而「document」時而「guide」,不影響理解的狀況下, 翻譯就一概用「文檔」了) 的簡要概述。你並不須要從頭至尾閱讀該文檔。每一節的內容都是獨立的,不過會有其餘章節的連接。spring
本Reactor參考文檔也提供HTML形式。最新版本見 http://projectreactor.io/docs/core/release/reference/docs/index.html。數據庫
本文檔的副本你能夠自用,亦可分發給他人。不過不管是打印版仍是電子版,請免費提供。
本參考文檔用 Asciidoc 編寫, 其源碼見 https://github.com/reactor/reactor-core/tree/master/src/docs/asciidoc (譯者加:本翻譯源碼見 https://github.com/get-set/reactor-core/tree/master-zh/src/docs/asciidoc )。
若有任何補充,歡迎你提交 pull request。
咱們建議你將源碼 checkout 到本地,這樣能夠使用 gradle 的 asciidoctor
任務檢查文檔渲染效果。 有些章節會包含其餘文件,Github 並不必定可以渲染出來。
爲了方便讀者的反饋,多數章節在結尾都提供一個連接,這個連接能夠打開一個 Github 上的 編輯界面,從而能夠編輯相應章節的源碼。這些連接在 HTML5 的版本中可以看到,就像這樣: 翻譯建議 - 關於本文檔。 | |
---|---|
Reactor項目有多種方式但願能幫助到你:
project-reactor
進行提問。全部 Reactor 項目都是開源的, 包括本文檔。 若是你發現本文檔有問題,或但願補充一些內容,請參考 這裏 進行了解。 | |
---|---|
Flux
, 包含 0-N 個元素的異步序列" 和 "Mono
, 異步的 0-1 結果";reactor-test
項目,參考 測試。這一節的內容可以幫助你上手使用 Reactor。包括以下內容:
Reactor 是一個用於JVM的徹底非阻塞的響應式編程框架,具有高效的需求管理(即對 「背壓(backpressure)」的控制)能力。它與 Java 8 函數式 API 直接集成,好比 CompletableFuture
, Stream
, 以及 Duration
。它提供了異步序列 API Flux
(用於[N]個元素)和 Mono
(用於 [0|1]個元素),並徹底遵循和實現了「響應式擴展規範」(Reactive Extensions Specification)。
Reactor 的 reactor-ipc
組件還支持非阻塞的進程間通訊(inter-process communication, IPC)。 Reactor IPC 爲 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背壓的網絡引擎,從而適合 應用於微服務架構。而且完整支持響應式編解碼(reactive encoding and decoding)。
Reactor Core 運行於 Java 8
及以上版本。
依賴 org.reactive-streams:reactive-streams:1.0.2
。
Andriod 支持方面:Reactor 3 並不正式支持 Andorid(若是須要能夠考慮使用 RxJava 2)。可是,在 Android SDK 26(Android 0)及以上版本應該沒問題。咱們但願可以最大程度兼顧對 Android 的支持,可是咱們並不能做出保證,具體狀況具體分析。 | |
---|---|
自從 reactor-core 3.0.4
,隨着 Aluminium
版本發佈上車(release train)以來,Reactor 3 使用了 BOM(Bill of Materials,一種標準的 Maven artifact)。
使用 BOM 能夠管理一組良好集成的 maven artifacts,從而無需操心不一樣版本組件的互相依賴問題。
BOM 是一系列有版本信息的 artifacts,經過「列車發佈」(release train)的發佈方式管理, 每趟發佈列車由一個「代號+修飾詞」組成,好比:
Aluminium-RELEASE Carbon-BUILD-SNAPSHOT Aluminium-SR1 Bismuth-RELEASE Carbon-SR32
代號替代了傳統的「主版本.次版本」的數字形式。這些代號主要來自 Periodic Table of Elements, 按首字母順序依次選取。
修飾詞有(按照時間順序):
BUILD-SNAPSHOT
M1
..N
: 里程碑號RELEASE
: 第一次 GA (General Availability) 發佈SR1
..N
: 後續的 GA 發佈(相似於 PATCH 號或 SR(Service Release))。前邊提到,使用 Reactor 的最簡單方式是在你的項目中配置 BOM 以及相關依賴。 注意,當你這樣添加依賴的時候,要省略版本(
固然,若是你但願使用某個版本的 artifact,仍然能夠指定。甚至徹底不使用 BOM,逐個配置 artifact 的版本也是能夠的。
Maven 原生支持 BOM。首先,你須要在 pom.xml
內經過添加下邊的代碼引入 BOM。若是 (dependencyManagement
) 已經存在,只須要添加其內容便可。
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意 dependencyManagement 標籤用來補充一般使用的 dependencies 配置。 |
|
---|---|
而後,在 dependencies
中添加相關的 reactor 項目,省略 <version>
,以下:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies>
依賴 Core 庫 | |
---|---|
沒有 version 標籤 | |
reactor-test 提供了對 reactive streams 的單測 |
Gradle 沒有對 Maven BOM 的支持,可是你能夠使用 Spring 的 gradle-dependency-management 插件。
首先,apply 插件。
plugins { id "io.spring.dependency-management" version "1.0.1.RELEASE" }
編寫本文檔時,插件最新版本爲 1.0.1.RELEASE,請自行使用合適的版本。 | |
---|---|
而後用它引入 BOM:
dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE" } }
Finally add a dependency to your project, without a version number:
dependencies { compile 'io.projectreactor:reactor-core' }
無需第三個 : 添加版本號。 |
|
---|---|
里程碑版(Milestones)和開發預覽版(developer previews)經過 Spring Milestones repository 而不是 Maven Central 來發布。 須要添加到構建配置文件中,如:
Milestones in Maven
<repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones Repository</name> <url>https://repo.spring.io/milestone</url> </repository> </repositories>
gradle 使用下邊的配置:
Milestones in Gradle
repositories { maven { url 'http://repo.spring.io/milestone' } mavenCentral() }
相似的,snapshot 版也須要配置專門的庫:
BUILD-SNAPSHOTs in Maven
<repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshot Repository</name> <url>https://repo.spring.io/snapshot</url> </repository> </repositories>
BUILD-SNAPSHOTs in Gradle
repositories { maven { url 'http://repo.spring.io/snapshot' } mavenCentral() }
Reactor 是響應式編程範式的實現,總結起來有以下幾點:
響應式編程是一種關注於數據流(data streams)和變化傳遞(propagation of change)的異步編程方式。 這意味着它能夠用既有的編程語言表達靜態(如數組)或動態(如事件源)的數據流。
在響應式編程方面,微軟跨出了第一步,它在 .NET 生態中建立了響應式擴展庫(Reactive Extensions library, Rx)。接着 RxJava 在JVM上實現了響應式編程。後來,在 JVM 平臺出現了一套標準的響應式 編程規範,它定義了一系列標準接口和交互規範。並整合到 Java 9 中(使用 Flow
類)。
響應式編程一般做爲面向對象編程中的「觀察者模式」(Observer design pattern)的一種擴展。 響應式流(reactive streams)與「迭代子模式」(Iterator design pattern)也有相通之處, 由於其中也有 Iterable
-Iterator
這樣的對應關係。主要的區別在於,Iterator 是基於 「拉取」(pull)方式的,而響應式流是基於「推送」(push)方式的。
使用 iterator 是一種「命令式」(imperative)編程範式,即便訪問元素的方法是 Iterable
的惟一職責。關鍵在於,何時執行 next()
獲取元素取決於開發者。在響應式流中,相對應的 角色是 Publisher-Subscriber
,可是 當有新的值到來的時候 ,卻反過來由發佈者(Publisher) 通知訂閱者(Subscriber),這種「推送」模式是響應式的關鍵。此外,對推送來的數據的操做 是經過一種聲明式(declaratively)而不是命令式(imperatively)的方式表達的:開發者經過 描述「控制流程」來定義對數據流的處理邏輯。
除了數據推送,對錯誤處理(error handling)和完成(completion)信號的定義也很完善。 一個 Publisher
能夠推送新的值到它的 Subscriber
(調用 onNext
方法), 一樣也能夠推送錯誤(調用 onError
方法)和完成(調用 onComplete
方法)信號。 錯誤和完成信號均可以終止響應式流。能夠用下邊的表達式描述:
onNext x 0..N [onError | onComplete]
這種方式很是靈活,不管是有/沒有值,仍是 n 個值(包括有無限個值的流,好比時鐘的持續讀秒),均可處理。
那麼咱們爲何須要這樣的異步響應式開發庫呢?
現代應用須要應對大量的併發用戶,並且即便現代硬件的處理能力飛速發展,軟件性能仍然是關鍵因素。
廣義來講咱們有兩種思路來提高程序性能:
一般,Java開發者使用阻塞式(blocking)編寫代碼。這沒有問題,在出現性能瓶頸後, 咱們能夠增長處理線程,線程中一樣是阻塞的代碼。可是這種使用資源的方式會迅速面臨 資源競爭和併發問題。
更糟糕的是,阻塞會浪費資源。具體來講,好比當一個程序面臨延遲(一般是I/O方面, 好比數據庫讀寫請求或網絡調用),所在線程須要進入 idle 狀態等待數據,從而浪費資源。
因此,並行化方式並不是銀彈。這是挖掘硬件潛力的方式,可是卻帶來了複雜性,並且容易形成浪費。
第二種思路——提升執行效率——能夠解決資源浪費問題。經過編寫 異步非阻塞 的代碼, (任務發起異步調用後)執行過程會切換到另外一個 使用一樣底層資源 的活躍任務,而後等 異步調用返回結果再去處理。
可是在 JVM 上如何編寫異步代碼呢?Java 提供了兩種異步編程方式:
callback
做爲參數(lambda 或匿名類),當結果出來後回調這個 callback
。常見的例子好比 Swings 的 EventListener
。Future<T>
,該異步方法要返回結果的是 T
類型,經過 Future
封裝。這個結果並非 馬上 能夠拿到,而是等實際處理結束纔可用。好比, ExecutorService
執行 Callable<T>
任務時會返回 Future
對象。這些技術夠用嗎?並不是對於每一個用例都是如此,兩種方式都有侷限性。
回調很難組合起來,由於很快就會致使代碼難以理解和維護(即所謂的「回調地獄(callback hell)」)。
考慮這樣一種情景:在用戶界面上顯示用戶的5個收藏,或者若是沒有任何收藏提供5個建議。這須要3個 服務(一個提供收藏的ID列表,第二個服務獲取收藏內容,第三個提供建議內容):
回調地獄(Callback Hell)的例子
userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() { public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } });
基於回調的服務使用一個匿名 Callback 做爲參數。後者的兩個方法分別在異步執行成功 或異常時被調用。 |
|
---|---|
獲取到收藏ID的list後調用第一個服務的回調方法 onSuccess 。 |
|
若是 list 爲空, 調用 suggestionService 。 |
|
服務 suggestionService 傳遞 List<Favorite> 給第二個回調。 |
|
既然是處理 UI,咱們須要確保消費代碼運行在 UI 線程。 | |
使用 Java 8 Stream 來限制建議數量爲5,而後在 UI 中顯示。 |
|
在每一層,咱們都以一樣的方式處理錯誤:在一個 popup 中顯示錯誤信息。 | |
回到收藏 ID 這一層,若是返回 list,咱們須要使用 favoriteService 來獲取 Favorite 對象。因爲只想要5個,所以使用 stream 。 |
|
再一次回調。此次對每一個ID,獲取 Favorite 對象在 UI 線程中推送到前端顯示。 |
這裏有很多代碼,稍微有些難以閱讀,而且還有重複代碼,咱們再來看一下用 Reactor 實現一樣功能:
使用 Reactor 實現以上回調方式一樣功能的例子
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
咱們獲取到收藏ID的流 | |
---|---|
咱們 異步地轉換 它們(ID) 爲 Favorite 對象(使用 flatMap ),如今咱們有了 Favorite 流。 |
|
一旦 Favorite 爲空,切換到 suggestionService 。 |
|
咱們只關注流中的最多5個元素。 | |
最後,咱們但願在 UI 線程中進行處理。 | |
經過描述對數據的最終處理(在 UI 中顯示)和對錯誤的處理(顯示在 popup 中)來觸發(subscribe )。 |
若是你想確保「收藏的ID」的數據在800ms內得到(若是超時,從緩存中獲取)呢?在基於回調的代碼中, 會比較複雜。但 Reactor 中就很簡單,在處理鏈中增長一個 timeout
的操做符便可。
Reactor 中增長超時控制的例子
userService.getFavorites(userId) .timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
若是流在超時時限沒有發出(emit)任何值,則發出錯誤(error)。 | |
---|---|
一旦收到錯誤,交由 cacheService 處理。 |
|
處理鏈後邊的內容與上例相似。 |
Futures 比回調要好一點,但即便在 Java 8 引入了 CompletableFuture
,它對於多個處理的組合仍不夠好用。 編排多個 Futures 是可行的,但卻不易。此外,Future
還有一個問題:當對 Future
對象最終調用 get()
方法時,仍然會致使阻塞,而且缺少對多個值以及更進一步對錯誤的處理。
考慮另一個例子,咱們首先獲得 ID 的列表,而後經過它進一步獲取到「對應的 name 和 statistics」 爲元素的列表,整個過程用異步方式來實現。
CompletableFuture
處理組合的例子
CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip = l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); List<String> results = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");
以一個 Future 開始,其中封裝了後續將獲取和處理的 ID 的 list。 | |
---|---|
獲取到 list 後邊進一步對其啓動異步處理任務。 | |
對於 list 中的每個元素: | |
異步地獲得相應的 name。 | |
異步地獲得相應的 statistics。 | |
將兩個結果一一組合。 | |
咱們如今有了一個 list,元素是 Future(表示組合的任務,類型是 CompletableFuture ),爲了執行這些任務, 咱們須要將這個 list(元素構成的流) 轉換爲數組(List )。 |
|
將這個數組傳遞給 CompletableFuture.allOf ,返回一個 Future ,當因此任務都完成了,那麼這個 Future 也就完成了。 |
|
有點麻煩的地方在於 allOf 返回的是 CompletableFuture<Void> ,因此咱們遍歷這個 Future 的List , ,而後使用 join() 來手機它們的結果(不會致使阻塞,由於 AllOf 確保這些 Future 所有完成) |
|
一旦整個異步流水線被觸發,咱們等它完成處理,而後返回結果列表。 |
因爲 Reactor 內置許多組合操做,所以以上例子能夠簡單地實現:
Reactor 實現與 Future 一樣功能的代碼
Flux<String> ids = ifhrIds(); Flux<String> combinations = ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); Mono<List<String>> result = combinations.collectList(); List<String> results = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
這一次,咱們從一個異步方式提供的 ids 序列(Flux<String> )開始。 |
|
---|---|
對於序列中的每個元素,咱們異步地處理它(flatMap 方法內)兩次。 |
|
獲取相應的 name。 | |
獲取相應的 statistic. | |
異步地組合兩個值。 | |
隨着序列中的元素值「到位」,它們收集一個 List 中。 |
|
在生成流的環節,咱們能夠繼續異步地操做 Flux 流,對其進行組合和訂閱(subscribe)。 最終咱們極可能獲得一個 Mono 。因爲是測試,咱們阻塞住(block() ),等待流處理過程結束, 而後直接返回集合。 |
|
Assert 結果。 |
回調或 Future 遇到的窘境是相似的,這也是響應式編程要經過 Publisher-Suscriber
方式來解決的。
相似 Reactor 這樣的響應式庫的目標就是要彌補上述「經典」的 JVM 異步方式所帶來的不足, 此外還會關注一下幾個方面:
可編排性,指的是編排多個異步任務的能力。好比咱們將前一個任務的結果傳遞給後一個任務做爲輸入, 或者將多個任務以分解再彙總(fork-join)的形式執行,或者將異步的任務做爲離散的組件在系統中 進行重用。
這種編排任務的能力與代碼的可讀性和可維護性是緊密相關的。隨着異步處理任務數量和複雜度 的提升,編寫和閱讀代碼都變得愈來愈困難。就像咱們剛纔看到的,回調模式是簡單的,可是缺點 是在複雜的處理邏輯中,回調中會層層嵌入回調,致使 回調地獄(Callback Hell) 。你能猜到 (或有過這種痛苦經歷),這樣的代碼是難以閱讀和分析的。
Reactor 提供了豐富的編排操做,從而代碼直觀反映了處理流程,而且全部的操做保持在同一層次 (儘可能避免了嵌套)。
你能夠想象數據在響應式應用中的處理,就像流過一條裝配流水線。Reactor 既是傳送帶, 又是一個個的裝配工或機器人。原材料從源頭(最初的 Publisher
)流出,最終被加工爲成品, 等待被推送到消費者(或者說 Subscriber
)。
原材料會通過不一樣的中間處理過程,或者做爲半成品與其餘半成品進行組裝。若是某處有齒輪卡住, 或者某件產品的包裝過程花費了過久時間,相應的工位就能夠向上遊發出信號來限制或中止發出原材料。
在 Reactor 中,操做符(operator)就像裝配線中的工位(操做員或裝配機器人)。每個操做符 對 Publisher
進行相應的處理,而後將 Publisher
包裝爲一個新的 Publisher
。就像一個鏈條, 數據源自第一個 Publisher
,而後順鏈條而下,在每一個環節進行相應的處理。最終,一個訂閱者 (Subscriber
)終結這個過程。請記住,在訂閱者(Subscriber
)訂閱(subscribe)到一個 發佈者(Publisher
)以前,什麼都不會發生。
理解了操做符會建立新的 Publisher 實例這一點,可以幫助你避免一個常見的問題, 這種問題會讓你以爲處理鏈上的某個操做符沒有起做用。相關內容請參考 item 。 |
|
---|---|
雖然響應式流規範(Reactive Streams specification)沒有規定任何操做符, 相似 Reactor 這樣的響應式庫所帶來的最大附加價值之一就是提供豐富的操做符。包括基礎的轉換操做, 到過濾操做,甚至複雜的編排和錯誤處理操做。
subscribe()
以前什麼都不會發生在 Reactor 中,當你建立了一條 Publisher
處理鏈,數據還不會開始生成。事實上,你是建立了 一種抽象的對於異步處理流程的描述(從而方便重用和組裝)。
當真正「訂閱(subscrib)」的時候,你須要將 Publisher
關聯到一個 Subscriber
上,而後 纔會觸發整個鏈的流動。這時候,Subscriber
會向上遊發送一個 request
信號,一直到達源頭 的 Publisher
。
向上遊傳遞信號這一點也被用於實現 背壓 ,就像在裝配線上,某個工位的處理速度若是慢於流水線 速度,會對上游發送反饋信號同樣。
在響應式流規範中實際定義的機制同剛纔的類比很是接近:訂閱者能夠無限接受數據並讓它的源頭 「滿負荷」推送全部的數據,也能夠經過使用 request
機制來告知源頭它一次最多可以處理 n
個元素。
中間環節的操做也能夠影響 request
。想象一個可以將每10個元素分批打包的緩存(buffer
)操做。 若是訂閱者請求一個元素,那麼對於源頭來講能夠生成10個元素。此外預取策略也能夠使用了, 好比在訂閱前預先生成元素。
這樣可以將「推送」模式轉換爲「推送+拉取」混合的模式,若是下游準備好了,能夠從上游拉取 n 個元素;可是若是上游元素尚未準備好,下游仍是要等待上游的推送。
在 Rx 家族的響應式庫中,響應式流分爲「熱」和「冷」兩種類型,區別主要在於響應式流如何 對訂閱者進行響應:
Subscriber
,都會收到從頭開始全部的數據。若是源頭 生成了一個 HTTP 請求,對於每個訂閱都會建立一個新的 HTTP 請求。Subscriber
,只能獲取從它開始 訂閱 以後 發出的數據。不過注意,有些「熱」的響應式流能夠緩存部分或所有歷史數據。 一般意義上來講,一個「熱」的響應式流,甚至在即便沒有訂閱者接收數據的狀況下,也能夠 發出數據(這一點同 「Subscribe()
以前什麼都不會發生」的規則有衝突)。更多關於 Reactor 中「熱」vs「冷」的內容,請參考 this reactor-specific section。
Reactor 項目的主要 artifact 是 reactor-core
,這是一個基於 Java 8 的實現了響應式流規範 (Reactive Streams specification)的響應式庫。
Reactor 引入了實現 Publisher
的響應式類 Flux
和 Mono
,以及豐富的操做方式。 一個 Flux
對象表明一個包含 0..N 個元素的響應式序列,而一個 Mono
對象表明一個包含 零/一個(0..1)元素的結果。
這種區別爲這倆類型帶來了語義上的信息——代表了異步處理邏輯所面對的元素基數。好比, 一個 HTTP 請求產生一個響應,因此對其進行 count
操做是沒有多大意義的。表示這樣一個 結果的話,應該用 Mono<HttpResponse>
而不是 Flux<HttpResponse>
,由於要置於其上的 操做一般只用於處理 0/1 個元素。
有些操做能夠改變基數,從而須要切換類型。好比,count
操做用於 Flux
,可是操做 返回的結果是 Mono<Long>
。
Flux
, 包含 0-N 個元素的異步序列Flux<T>
是一個可以發出 0 到 N 個元素的標準的 Publisher<T>
,它會被一個「錯誤(error)」 或「完成(completion)」信號終止。所以,一個 flux 的可能結果是一個 value、completion 或 error。 就像在響應式流規範中規定的那樣,這三種類型的信號被翻譯爲面向下游的 onNext
,onComplete
和onError
方法。
因爲多種不一樣的信號可能性,Flux
能夠做爲一種通用的響應式類型。注意,全部的信號事件, 包括表明終止的信號事件都是可選的:若是沒有 onNext
事件可是有一個 onComplete
事件, 那麼發出的就是 空的 有限序列,可是去掉 onComplete
那麼獲得的就是一個 無限的 空序列。 固然,無限序列也能夠不是空序列,好比,Flux.interval(Duration)
生成的是一個 Flux<Long>
, 這就是一個無限地週期性發出規律 tick 的時鐘序列。
Mono
, 異步的 0-1 結果Mono<T>
是一種特殊的 Publisher<T>
, 它最多發出一個元素,而後終止於一個 onComplete
信號或一個 onError
信號。
它只適用其中一部分可用於 Flux
的操做。好比,(兩個 Mono
的)結合類操做能夠忽略其中之一 而發出另外一個 Mono
,也能夠將兩個都發出,對於後一種狀況會切換爲一個 Flux
。
例如,Mono#concatWith(Publisher)
返回一個 Flux
,而 Mono#then(Mono)
返回另外一個 Mono
。
注意,Mono
能夠用於表示「空」的只有完成概念的異步處理(好比 Runnable
)。這種用 Mono<Void>
來建立。
最簡單的上手 Flux
和 Mono
的方式就是使用相應類提供的多種工廠方法之一。
好比,若是要建立一個 String
的序列,你能夠直接列舉它們,或者將它們放到一個集合裏而後用來建立 Flux,以下:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar"); List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> seq2 = Flux.fromIterable(iterable);
工廠方法的其餘例子以下:
Mono<String> noData = Mono.empty(); Mono<String> data = Mono.just("foo"); Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
注意,即便沒有值,工廠方法仍然採用通用的返回類型。 | |
---|---|
第一個參數是 range 的開始,第二個參數是要生成的元素個數。 |
在訂閱(subscribe)的時候,Flux
和 Mono
使用 Java 8 lambda 表達式。 .subscribe()
方法有多種不一樣的方法簽名,你能夠傳入各類不一樣的 lambda 形式的參數來定義回調。以下所示:
基於 lambda 的對 Flux
的訂閱(subscribe)
subscribe(); subscribe(Consumer<? super T> consumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
訂閱並觸發序列。 | |
---|---|
對每個生成的元素進行消費。 | |
對正常元素進行消費,也對錯誤進行響應。 | |
對正常元素和錯誤均有響應,還定義了序列正常完成後的回調。 | |
對正常元素、錯誤和完成信號均有響應, 同時也定義了對該 subscribe 方法返回的 Subscription 執行的回調。 |
以上方法會返回一個 Subscription 的引用,若是再也不須要更多元素你能夠經過它來取消訂閱。 取消訂閱時, 源頭會中止生成新的數據,並清理相關資源。取消和清理的操做在 Reactor 中是在 接口 Disposable 中定義的。 |
|
---|---|
subscribe
方法示例這一小節包含了對 subscribe
的5個不一樣簽名的方法的示例,以下是一個無參的基本方法的使用:
Flux<Integer> ints = Flux.range(1, 3); ints.subscribe();
配置一個在訂閱時會產生3個值的 Flux 。 |
|
---|---|
最簡單的訂閱方式。 |
第二行代碼沒有任何輸出,可是它確實執行了。Flux
產生了3個值。若是咱們傳入一個 lambda, 咱們就能夠看到這幾個值,以下一個列子:
Flux<Integer> ints = Flux.range(1, 3); ints.subscribe(i -> System.out.println(i));
配置一個在訂閱時會產生3個值的 Flux 。 |
|
---|---|
訂閱它並打印值。 |
第二行代碼會輸入以下內容:
1 2 3
爲了演示下一個方法簽名,咱們故意引入一個錯誤,以下所示:
Flux<Integer> ints = Flux.range(1, 4) .map(i -> { if (i <= 3) return i; throw new RuntimeException("Got to 4"); }); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error: " + error));
配置一個在訂閱時會產生4個值的 Flux 。 |
|
---|---|
爲了對元素進行處理,咱們須要一個 map 操做。 | |
對於多數元素,返回值自己。 | |
對其中一個元素拋出錯誤。 | |
訂閱的時候定義如何進行錯誤處理。 |
如今咱們有兩個 lambda 表達式:一個是用來處理正常數據,一個用來處理錯誤。 剛纔的代碼輸出以下:
1 2 3 Error: java.lang.RuntimeException: Got to 4
下一個 subscribe
方法的簽名既有錯誤處理,還有一個完成後的處理,以下:
Flux<Integer> ints = Flux.range(1, 4); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> {System.out.println("Done");});
配置一個在訂閱時會產生4個值的 Flux 。 |
|
---|---|
訂閱時定義錯誤和完成信號的處理。 |
錯誤和完成信號都是終止信號,而且兩者只會出現其中之一。爲了可以最終所有正常完成,你必須處理錯誤信號。
用於處理完成信號的 lambda 是一對空的括號,由於它實際上匹配的是 Runnalbe
接口中的 run
方法, 不接受參數。剛纔的代碼輸出以下:
1 2 3 4 Done
最後一個 subscribe
方法簽名包含一個自定義的 subscriber
(下一節會介紹到):
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>(); Flux<Integer> ints = Flux.range(1, 4); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> {System.out.println("Done");}, s -> ss.request(10)); ints.subscribe(ss);
上面這個例子中,咱們把一個自定義的 Subscriber
做爲 subscribe
方法的最後一個參數。 下邊的例子是這個自定義的 Subscriber
,這是一個對 Subscriber
的最簡單實現:
package io.projectreactor.samples; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; public class SampleSubscriber<T> extends BaseSubscriber<T> { public void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); } public void hookOnNext(T value) { System.out.println(value); request(1); } }
SampleSubscriber
類繼承自 BaseSubscriber
,在 Reactor 中, 推薦用戶擴展它來實現自定義的 Subscriber
。這個類提供了一些 hook 方法,咱們能夠經過重寫它們來調整 subscriber 的行爲。 默認狀況下,它會觸發一個無限個數的請求,可是當你想自定義請求元素的個數的時候,擴展 BaseSubscriber
就很方便了。
擴展的時候一般至少要覆蓋 hookOnSubscribe(Subscription subscription)
和 hookOnNext(T value)
這兩個方法。這個例子中, hookOnSubscribe
方法打印一段話到標準輸出,而後進行第一次請求。 而後 hookOnNext
一樣進行了打印,同時逐個處理剩餘請求。
SampleSubscriber
輸出以下:
Subscribed 1 2 3 4
建議你同時重寫 hookOnError 、hookOnCancel ,以及 hookOnComplete 方法。 你最好也重寫 hookFinally 方法。SampleSubscribe 確實是一個最簡單的實現了 請求有限個數元素的 Subscriber 。 |
|
---|---|
本文檔後邊還會再討論 BaseSubscriber
。
響應式流規範定義了另外一個 subscribe
方法的簽名,它只接收一個自定義的 Subscriber
, 沒有其餘的參數,以下所示:
subscribe(Subscriber<? super T> subscriber);
若是你已經有一個 Subscriber
,那麼這個方法簽名仍是挺有用的。何況,你可能還會用到它 來作一些訂閱相關(subscription-related)的回調。好比,你想要自定義「背壓(backpressure)」 而且本身來觸發請求。
在這種狀況下,使用 BaseSubscriber
抽象類就很方便,由於它提供了很好的配置「背壓」 的方法。
使用 BaseSubscriber
來配置「背壓」
Flux<String> source = someStringSource(); source.map(String::toUpperCase) .subscribe(new BaseSubscriber<String>() { @Override protected void hookOnSubscribe(Subscription subscription) { request(1); } @Override protected void hookOnNext(String value) { request(1); } });
BaseSubscriber 是一個抽象類,因此咱們建立一個匿名內部類。 |
|
---|---|
BaseSubscriber 定義了多種用於處理不一樣信號的 hook。它還定義了一些捕獲 Subscription 對象的現成方法,這些方法能夠用在 hook 中。 |
|
request(n) 就是這樣一個方法。它可以在任何 hook 中,經過 subscription 向上遊傳遞 背壓請求。這裏咱們在開始這個流的時候請求1個元素值。 |
|
隨着接收到新的值,咱們繼續以每次請求一個元素的節奏從源頭請求值。 | |
其餘 hooks 有 hookOnComplete , hookOnError , hookOnCancel , and hookFinally (它會在流終止的時候被調用,傳入一個 SignalType 做爲參數)。 |
當你修改請求操做的時候,你必須注意讓 subscriber 向上提出足夠的需求, 不然上游的 Flux 可能會被「卡住」。因此 BaseSubscriber 在進行擴展的時候要覆蓋 hookOnSubscribe 和 onNext ,這樣你至少會調用 request 一次。 |
|
---|---|
BaseSubscriber
還提供了 requestUnbounded()
方法來切換到「無限」模式(等同於 request(Long.MAX_VALUE)
)。
在這一小節,咱們介紹如何經過定義相對應的事件(onNext
、onError
和onComplete
) 建立一個 Flux
或 Mono
。全部這些方法都經過 API 來觸發咱們叫作 sink(池) 的事件。 sink 的類型很少,咱們快速過一下。
最簡單的建立 Flux
的方式就是使用 generate
方法。
這是一種 同步地, 逐個地 產生值的方法,意味着 sink 是一個 SynchronousSink
並且其 next()
方法在每次回調的時候最多隻能被調用一次。你也能夠調用 error(Throwable)
或者 complete()
,不過是可選的。
最有用的一種方式就是同時可以記錄一個狀態值(state),從而在使用 sink 發出下一個元素的時候可以 基於這個狀態值去產生元素。此時生成器(generator)方法就是一個 BiFunction<S, SynchronousSink<T>, S>
, 其中 <S>
是狀態對象的類型。你須要提供一個 Supplier<S>
來初始化狀態值,而生成器須要 在每一「回合」生成元素後返回新的狀態值(供下一回合使用)。
例如咱們使用一個 int
做爲狀態值。
基於狀態值的 generate
示例
Flux<String> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; });
初始化狀態值(state)爲0。 | |
---|---|
咱們基於狀態值 state 來生成下一個值(state 乘以 3)。 | |
咱們也能夠用狀態值來決定何時終止序列。 | |
返回一個新的狀態值 state,用於下一次調用。 |
上面的代碼生成了「3 x」的乘法表:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30
咱們也能夠使用可變(mutable)類型(譯者注:如上例,原生類型及其包裝類,以及String等屬於不可變類型) 的 <S>
。上邊的例子也能夠用 AtomicLong
做爲狀態值,在每次生成後改變它的值。
可變類型的狀態變量
Flux<String> flux = Flux.generate( AtomicLong::new, (state, sink) -> { long i = state.getAndIncrement(); sink.next("3 x " + i + " = " + 3*i); if (i == 10) sink.complete(); return state; });
此次咱們初始化一個可變類型的狀態值。 | |
---|---|
改變狀態值。 | |
返回 同一個 實例做爲新的狀態值。 |
若是狀態對象須要清理資源,能夠使用 generate(Supplier<S>, BiFunction, Consumer<S>) 這個簽名方法來清理狀態對象(譯者注:Comsumer 在序列終止才被調用)。 |
|
---|---|
下面是一個在 generate 方法中增長 Consumer
的例子:
Flux<String> flux = Flux.generate( AtomicLong::new, (state, sink) -> { long i = state.getAndIncrement(); sink.next("3 x " + i + " = " + 3*i); if (i == 10) sink.complete(); return state; }, (state) -> System.out.println("state: " + state)); }
一樣,初始化一個可變對象做爲狀態變量。 | |
---|---|
改變狀態。 | |
返回 同一個 實例做爲新的狀態。 | |
咱們會看到最後一個狀態值(11)會被這個 Consumer lambda 輸出。 |
若是 state 使用了數據庫鏈接或者其餘須要最終進行清理的資源,這個 Consumer
lambda 能夠用來在最後關閉鏈接或完成相關的其餘清理任務。
做爲一個更高級的建立 Flux
的方式, create
方法的生成方式既能夠是同步, 也能夠是異步的,而且還能夠每次發出多個元素。
該方法用到了 FluxSink
,後者一樣提供 next
,error
和 complete
等方法。 與 generate
不一樣的是,create
不須要狀態值,另外一方面,它能夠在回調中觸發 多個事件(即便是在將來的某個時間)。
create 有個好處就是能夠將現有的 API 轉爲響應式,好比監聽器的異步方法。 |
|
---|---|
假設你有一個監聽器 API,它按 chunk 處理數據,有兩種事件:(1)一個 chunk 數據準備好的事件;(2)處理結束的事件。以下:
interface MyEventListener<T> { void onDataChunk(List<T> chunk); void processComplete(); }
你能夠使用 create
方法將其轉化爲響應式類型 Flux<T>
:
Flux<String> bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); });
橋接 MyEventListener 。 |
|
---|---|
每個 chunk 的數據轉化爲 Flux 中的一個元素。 |
|
processComplete 事件轉換爲 onComplete 。 |
|
全部這些都是在 myEventProcessor 執行時異步執行的。 |
此外,既然 create
能夠是異步地,而且可以控制背壓,你能夠經過提供一個 OverflowStrategy
來定義背壓行爲。
IGNORE
: 徹底忽略下游背壓請求,這可能會在下游隊列積滿的時候致使 IllegalStateException
。ERROR
: 當下遊跟不上節奏的時候發出一個 IllegalStateException
的錯誤信號。DROP
:當下遊沒有準備好接收新的元素的時候拋棄這個元素。LATEST
:讓下游只獲得上游最新的元素。BUFFER
:(默認的)緩存全部下游沒有來得及處理的元素(這個不限大小的緩存可能致使 OutOfMemoryError
)。Mono 也有一個用於 create 的生成器(generator)—— MonoSink ,它不能生成多個元素, 所以會拋棄第一個元素以後的全部元素。 |
|
---|---|
create
的一個變體是 push
,適合生成事件流。與 create
相似,push
也能夠是異步地, 而且可以使用以上各類溢出策略(overflow strategies)管理背壓。每次只有一個生成線程能夠調用 next
,complete
或 error
。
Flux<String> bridge = Flux.push(sink -> { myEventProcessor.register( new SingleThreadEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } public void processError(Throwable e) { sink.error(e); } }); });
橋接 SingleThreadEventListener API。 |
|
---|---|
在監聽器所在線程中,事件經過調用 next 被推送到 sink。 |
|
complete 事件也在同一個線程中。 |
|
error 事件也在同一個線程中。 |
不像 push
,create
能夠用於 push
或 pull
模式,所以適合橋接監聽器的 的 API,由於事件消息會隨時異步地到來。回調方法 onRequest
能夠被註冊到 FluxSink
以便跟蹤請求。這個回調能夠被用於從源頭請求更多數據,或者經過在下游請求到來 的時候傳遞數據給 sink 以實現背壓管理。這是一種推送/拉取混合的模式, 由於下游能夠從上游拉取已經就緒的數據,上游也能夠在數據就緒的時候將其推送到下游。
Flux<String> bridge = Flux.create(sink -> { myMessageProcessor.register( new MyMessageListener<String>() { public void onMessage(List<String> messages) { for(String s : messages) { sink.next(s); } } }); sink.onRequest(n -> { List<String> messages = myMessageProcessor.request(n); for(String s : message) { sink.next(s); } });
當有請求的時候取出一個 message。 | |
---|---|
若是有就緒的 message,就發送到 sink。 | |
後續異步到達的 message 也會被髮送給 sink。 |
onDispose
和 onCancel
這兩個回調用於在被取消和終止後進行清理工做。 onDispose
可用於在 Flux
完成,有錯誤出現或被取消的時候執行清理。 onCancel
只用於針對「取消」信號執行相關操做,會先於 onDispose
執行。
Flux<String> bridge = Flux.create(sink -> { sink.onRequest(n -> channel.poll(n)) .onCancel(() -> channel.cancel()) .onDispose(() -> channel.close()) });
onCancel 在取消時被調用。 |
|
---|---|
onDispose 在有完成、錯誤和取消時被調用。 |
handle
方法有些不一樣,它在 Mono
和 Flux
中都有。然而,它是一個實例方法 (instance method),意思就是它要連接在一個現有的源後使用(與其餘操做符同樣)。
它與 generate
比較相似,由於它也使用 SynchronousSink
,而且只容許元素逐個發出。 然而,handle
可被用於基於現有數據源中的元素生成任意值,有可能還會跳過一些元素。 這樣,能夠把它當作 map
與 filter
的組合。handle
方法簽名以下:
handle(BiConsumer<T, SynchronousSink<R>>)
舉個例子,響應式流規範容許 null
這樣的值出如今序列中。假如你想執行一個相似 map
的操做,你想利用一個現有的具備映射功能的方法,可是它會返回 null,這時候怎麼辦呢?
例如,下邊的方法能夠用於 Integer 序列,映射爲字母或 null 。
public String alphabet(int letterNumber) { if (letterNumber < 1 || letterNumber > 26) { return null; } int letterIndexAscii = 'A' + letterNumber - 1; return "" + (char) letterIndexAscii; }
咱們能夠使用 handle
來去掉其中的 null。
將 handle
用於一個 "映射 + 過濾 null" 的場景
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20) .handle((i, sink) -> { String letter = alphabet(i); if (letter != null) sink.next(letter); }); alphabet.subscribe(System.out::println);
映射到字母。 | |
---|---|
若是返回的是 null … | |
就不會調用 sink.next 從而過濾掉。 |
輸出以下:
M I T
Reactor, 就像 RxJava,也能夠被認爲是 併發無關(concurrency agnostic) 的。意思就是, 它並不強制要求任何併發模型。更進一步,它將選擇權交給開發者。不過,它仍是提供了一些方便 進行併發執行的庫。
在 Reactor 中,執行模式以及執行過程取決於所使用的 Scheduler
。 Scheduler
是一個擁有普遍實現類的抽象接口。 Schedulers
類提供的靜態方法用於達成以下的執行環境:
Schedulers.immediate()
)Schedulers.single()
)。注意,這個方法對全部調用者都提供同一個線程來使用, 直到該調度器(Scheduler)被廢棄。若是你想使用專注的線程,就對每個調用使用 Schedulers.newSingle()
。Schedulers.elastic()
。它根據須要建立一個線程池,重用空閒線程。線程池若是空閒時間過長 (默認爲 60s)就會被廢棄。對於 I/O 阻塞的場景比較適用。 Schedulers.elastic()
可以方便地給一個阻塞 的任務分配它本身的線程,從而不會妨礙其餘任務和資源,見 如何包裝一個同步阻塞的調用?。Schedulers.parallel()
)。所建立線程池的大小與 CPU 個數等同。此外,你還能夠使用 Schedulers.fromExecutorService(ExecutorService)
基於現有的 ExecutorService
建立 Scheduler
。(雖然不太建議,不過你也能夠使用 Executor
來建立)。你也能夠使用 newXXX
方法來建立不一樣的調度器。好比 Schedulers.newElastic(yourScheduleName)
建立一個新的名爲 yourScheduleName
的彈性調度器。
操做符基於非阻塞算法實現,從而能夠利用到某些調度器的工做竊取(work stealing) 特性的好處。 | |
---|---|
一些操做符默認會使用一個指定的調度器(一般也容許開發者調整爲其餘調度器)例如, 經過工廠方法 Flux.interval(Duration.ofMillis(300))
生成的每 300ms 打點一次的 Flux<Long>
, 默認狀況下使用的是 Schedulers.parallel()
,下邊的代碼演示瞭如何將其裝換爲 Schedulers.single()
:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
Reactor 提供了兩種在響應式鏈中調整調度器 Scheduler
的方法:publishOn
和 subscribeOn
。 它們都接受一個 Scheduler
做爲參數,從而能夠改變調度器。可是 publishOn
在鏈中出現的位置 是有講究的,而 subscribeOn
則無所謂。要理解它們的不一樣,你首先要理解 nothing happens until you subscribe()。
在 Reactor 中,當你在操做鏈上添加操做符的時候,你能夠根據須要在 Flux
和 Mono
的實現中包裝其餘的 Flux
和 Mono
。一旦你訂閱(subscribe)了它,一個 Subscriber
的鏈 就被建立了,一直向上到第一個 publisher 。這些對開發者是不可見的,開發者所能看到的是最外一層的 Flux
(或 Mono
)和 Subscription
,可是具體的任務是在中間這些跟操做符相關的 subscriber 上處理的。
基於此,咱們仔細研究一下 publishOn
和 subscribeOn
這兩個操做符:
publishOn
的用法和處於訂閱鏈(subscriber chain)中的其餘操做符同樣。它將上游 信號傳給下游,同時執行指定的調度器 Scheduler
的某個工做線程上的回調。 它會 改變後續的操做符的執行所在線程 (直到下一個 publishOn
出如今這個鏈上)。subscribeOn
用於訂閱(subscription)過程,做用於那個向上的訂閱鏈(發佈者在被訂閱 時才激活,訂閱的傳遞方向是向上遊的)。因此,不管你把 subscribeOn
至於操做鏈的什麼位置, 它都會影響到源頭的線程執行環境(context)。 可是,它不會影響到後續的 publishOn
,後者仍可以切換其後操做符的線程執行環境。只有操做鏈中最先的 subscribeOn 調用纔算數。 |
|
---|---|
Flux
和 Mono
不會建立線程。一些操做符,好比 publishOn
,會建立線程。同時,做爲一種任務共享形式, 這些操做符可能會從其餘任務池(work pool)——若是其餘任務池是空閒的話——那裏「偷」線程。所以, 不管是 Flux
、Mono
仍是 Subscriber
都應該精於線程處理。它們依賴這些操做符來管理線程和任務池。
publishOn
強制下一個操做符(極可能包括下一個的下一個…)來運行在一個不一樣的線程上。 相似的,subscribeOn
強制上一個操做符(極可能包括上一個的上一個…)來運行在一個不一樣的線程上。 記住,在你訂閱(subscribe)前,你只是定義了處理流程,而沒有啓動發佈者。基於此,Reactor 能夠使用這些規則來決定如何執行操做鏈。而後,一旦你訂閱了,整個流程就開始工做了。
下邊的例子演示了支持任務共享的多線程模型:
Flux.range(1, 10000) .publishOn(Schedulers.parallel()) .subscribe(result)
建立一個有 10,000 個元素的 Flux 。 |
|
---|---|
建立等同於 CPU 個數的線程(最小爲4)。 | |
subscribe() 以前什麼都不會發生。 |
Scheduler.parallel()
建立一個基於單線程 ExecutorService
的固定大小的任務線程池。 由於可能會有一個或兩個線程致使問題,它老是至少建立 4 個線程。而後 publishOn 方法便共享了這些任務線程, 當 publishOn
請求元素的時候,會從任一個正在發出元素的線程那裏獲取元素。這樣, 就是進行了任務共享(一種資源共享方式)。Reactor 還提供了好幾種共享資源的方式,請參考 Schedulers。
Scheduler.elastic()
也能建立線程,它可以很方便地建立專門的線程(以便跑一些可能會阻塞資源的任務, 好比一個同步服務),請見 如何包裝一個同步阻塞的調用?。
內部機制保證了這些操做符可以藉助自增計數器(incremental counters)和警惕條件(guard conditions) 以線程安全的方式工做。例如,若是咱們有四個線程處理一個流(就像上邊的例子),每個請求會讓計數器自增, 這樣後續的來自不一樣線程的請求就能拿到正確的元素。
若是想了解有哪些可用於錯誤處理的操做符,請參考 the relevant operator decision tree。 | |
---|---|
在響應式流中,錯誤(error)是終止(terminal)事件。當有錯誤發生時,它會致使流序列中止, 而且錯誤信號會沿着操做鏈條向下傳遞,直至遇到你定義的 Subscriber
及其 onError
方法。
這樣的錯誤仍是應該在應用層面解決的。好比,你可能會將錯誤信息顯示在用戶界面,或者經過某個 REST 端點(endpoint)發出。所以,訂閱者(subscriber)的 onError
方法是應該定義的。
若是沒有定義,onError 會拋出 UnsupportedOperationException 。你能夠接下來再 檢測錯誤,並經過 Exceptions.isErrorCallbackNotImplemented 方法捕獲和處理它。 |
|
---|---|
Reactor 還提供了其餘的用於在鏈中處理錯誤的方法,即錯誤處理操做(error-handling operators)。
在你瞭解錯誤處理操做符以前,你必須牢記 響應式流中的任何錯誤都是一個終止事件。 即便用了錯誤處理操做符,也不會讓源頭流序列繼續。而是將 onError 信號轉化爲一個 新的 序列 的開始。換句話說,它代替了被終結的 上游 流序列。 |
|
---|---|
如今咱們來逐個看看錯誤處理的方法。須要的時候咱們會同時用到命令式編程風格的 try
代碼塊來做比較。
你也許熟悉在 try-catch 代碼塊中處理異常的幾種方法。常見的包括以下幾種:
業務相關的異常
,而後再拋出業務異常。finally
來清理資源,或使用 Java 7 引入的 "try-with-resource"。以上全部這些在 Reactor 都有相應的基於 error-handling 操做符處理方式。
在開始研究這些操做符以前,咱們先準備好響應式鏈(reactive chain)方式和 try-catch 代碼塊方式(以便對比)。
當訂閱的時候,位於鏈結尾的 onError
回調方法和 catch
塊相似,一旦有異常,執行過程會跳入到 catch:
Flux<String> s = Flux.range(1, 10) .map(v -> doSomethingDangerous(v)) .map(v -> doSecondTransform(v)); s.subscribe(value -> System.out.println("RECEIVED " + value), error -> System.err.println("CAUGHT " + error) );
執行 map 轉換,有可能拋出異常。 | |
---|---|
若是沒問題,執行第二個 map 轉換操做。 | |
全部轉換成功的值都打印出來。 | |
一旦有錯誤,序列(sequence)終止,並打印錯誤信息。 |
這與 try/catch 代碼塊是相似的:
try { for (int i = 1; i < 11; i++) { String v1 = doSomethingDangerous(i); String v2 = doSecondTransform(v1); System.out.println("RECEIVED " + v2); } } catch (Throwable t) { System.err.println("CAUGHT " + t); }
若是這裏拋出異常… | |
---|---|
…後續的代碼跳過… | |
…執行過程直接到這。 |
既然咱們準備了兩種方式作對比,咱們就來看一下不一樣的錯誤處理場景,以及相應的操做符。
與第 (1) 條(捕獲並返回一個靜態的缺省值)對應的是 onErrorReturn
:
Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn("RECOVERED");
你還能夠經過判斷錯誤信息的內容,來篩選哪些要給出缺省值,哪些仍然讓錯誤繼續傳遞下去:
Flux.just(10) .map(this::doSomethingDangerous) .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
若是你不僅是想要在發生錯誤的時候給出缺省值,而是但願提供一種更安全的處理數據的方式, 能夠使用 onErrorResume
。這與第 (2) 條(捕獲並執行一個異常處理方法)相似。
假設,你會嘗試從一個外部的不穩定服務獲取數據,但仍然會在本地緩存一份 可能 有些過時的數據, 由於緩存的讀取更加可靠。能夠這樣來作:
Flux.just("key1", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(e -> getFromCache(k));
對於每個 key, 異步地調用一個外部服務。 | |
---|---|
若是對外部服務的調用失敗,則再去緩存中查找該 key。注意,這裏不管 e 是什麼,都會執行異常處理方法。 |
就像 onErrorReturn
,onErrorResume
也有能夠用於預先過濾錯誤內容的方法變體,能夠基於異常類或 Predicate
進行過濾。它其實是用一個 Function
來做爲參數,還能夠返回一個新的流序列。
Flux.just("timeout1", "unknown", "key2") .flatMap(k -> callExternalService(k)) .onErrorResume(error -> { if (error instanceof TimeoutException) return getFromCache(k); else if (error instanceof UnknownKeyException) return registerNewEntry(k, "DEFAULT"); else return Flux.error(error); });
這個函數式容許開發者自行決定如何處理。 | |
---|---|
若是源超時,使用本地緩存。 | |
若是源找不到對應的 key,建立一個新的實體。 | |
不然, 將問題「從新拋出」。 |
有時候並不想提供一個錯誤處理方法,而是想在接收到錯誤的時候計算一個候補的值。這相似於第 (3) 條(捕獲並動態計算一個候補值)。
例如,若是你的返回類型自己就有可能包裝有異常(好比 Future.complete(T success)
vs Future.completeExceptionally(Throwable error)
),你有可能使用流中的錯誤包裝起來實例化 返回值。
這也能夠使用上一種錯誤處理方法的方式(使用 onErrorResume
)解決,代碼以下:
erroringFlux.onErrorResume(error -> Mono.just( myWrapper.fromError(error) ));
在 onErrorResume 中,使用 Mono.just 建立一個 Mono 。 |
|
---|---|
將異常包裝到另外一個類中。 |
在「錯誤處理方法」的例子中,基於 flatMap
方法的最後一行,咱們能夠猜到如何作到第 (4) 條(捕獲,包裝到一個業務相關的異常,而後拋出業務異常):
Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorResume(original -> Flux.error( new BusinessException("oops, SLA exceeded", original) );
然而還有一個更加直接的方法—— onErrorMap
:
Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
若是對於錯誤你只是想在不改變它的狀況下作出響應(如記錄日誌),並讓錯誤繼續傳遞下去, 那麼能夠用 doOnError
方法。這對應第 (5) 條(捕獲,記錄錯誤日誌,並繼續拋出)。 這個方法與其餘以 doOn
開頭的方法同樣,只起反作用("side-effect")。它們對序列都是隻讀, 而不會帶來任何改動。
以下邊的例子所示,咱們會記錄錯誤日誌,而且還經過變量自增統計錯誤發生個數。
LongAdder failureStat = new LongAdder(); Flux<String> flux = Flux.just("unknown") .flatMap(k -> callExternalService(k)) .doOnError(e -> { failureStat.increment(); log("uh oh, falling back, service failed for key " + k); }) .onErrorResume(e -> getFromCache(k));
對外部服務的調用失敗… | |
---|---|
…記錄錯誤日誌… | |
…而後回調錯誤處理方法。 |
最後一個要與命令式編程對應的對比就是使用 Java 7 "try-with-resources" 或 finally
代碼塊清理資源。這是第 (6) 條(使用 finally
代碼塊清理資源或使用 Java 7 引入的 "try-with-resource")。在 Reactor 中都有對應的方法: using
和 doFinally
:
AtomicBoolean isDisposed = new AtomicBoolean(); Disposable disposableInstance = new Disposable() { @Override public void dispose() { isDisposed.set(true); } @Override public String toString() { return "DISPOSABLE"; } }; Flux<String> flux = Flux.using( () -> disposableInstance, disposable -> Flux.just(disposable.toString()), Disposable::dispose );
第一個 lambda 生成資源,這裏咱們返回模擬的(mock) Disposable 。 |
|
---|---|
第二個 lambda 處理資源,返回一個 Flux<T> 。 |
|
第三個 lambda 在 2) 中的資源 Flux 終止或取消的時候,用於清理資源。 |
|
在訂閱或執行流序列以後, isDisposed 會置爲 true 。 |
另外一方面, doFinally
在序列終止(不管是 onComplete
、onError
仍是取消)的時候被執行, 而且可以判斷是什麼類型的終止事件(完成、錯誤仍是取消?)。
LongAdder statsCancel = new LongAdder(); Flux<String> flux = Flux.just("foo", "bar") .doFinally(type -> { if (type == SignalType.CANCEL) statsCancel.increment(); }) .take(1);
咱們想進行統計,因此用到了 LongAdder 。 |
|
---|---|
doFinally 用 SignalType 檢查了終止信號的類型。 |
|
若是隻是取消,那麼統計數據自增。 | |
take(1) 可以在發出 1 個元素後取消流。 |
onError
爲了演示當錯誤出現的時候如何致使上游序列終止,咱們使用 Flux.interval
構造一個更加直觀的例子。 這個 interval 操做符會在每 x 單位的時間發出一個自增的 Long
值。
Flux<String> flux = Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .onErrorReturn("Uh oh"); flux.subscribe(System.out::println); Thread.sleep(2100);
注意 interval 默認基於一個 timer Scheduler 來執行。 若是咱們想在 main 方法中運行, 咱們須要調用 sleep ,這樣程序就能夠在尚未產生任何值的時候就退出了。 |
|
---|---|
每 250ms 打印出一行信息,以下:
tick 0 tick 1 tick 2 Uh oh
即便多給了 1 秒鐘時間,也沒有更多的 tick 信號由 interval
產生了,因此序列確實被錯誤信號終止了。
還有一個用於錯誤處理的操做符你可能會用到,就是 retry
,見文知意,用它能夠對出現錯誤的序列進行重試。
問題是它對於上游 Flux
是基於重訂閱(re-subscribing)的方式。這實際上已經一個不一樣的序列了, 發出錯誤信號的序列仍然是終止了的。爲了驗證這一點,咱們能夠在繼續用上邊的例子,增長一個 retry(1)
代替 onErrorReturn
來重試一次。
Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3) return "tick " + input; throw new RuntimeException("boom"); }) .elapsed() .retry(1) .subscribe(System.out::println, System.err::println); Thread.sleep(2100);
elapsed 會關聯從當前值與上個值發出的時間間隔(譯者加:以下邊輸出的內容中的 259/249/251…)。 |
|
---|---|
咱們仍是要看一下 onError 時的內容。 |
|
確保咱們有足夠的時間能夠進行 4x2 次 tick。 |
輸出以下:
259,tick 0 249,tick 1 251,tick 2 506,tick 0 248,tick 1 253,tick 2 java.lang.RuntimeException: boom
一個新的 interval 從 tick 0 開始。多出來的 250ms 間隔來自於第 4 次 tick, 就是致使出現異常並執行 retry 的那次(譯者加:我在機器上測試的時候 elapsed 「顯示」的時間間隔沒有加倍,可是確實有第 4 次的間隔)。 |
|
---|---|
可見, retry(1)
不過是再一次重新訂閱了原始的 interval
,從 tick 0 開始。第二次, 因爲異常再次出現,便將異常傳遞到下游了。
還有一個「高配版」的 retry
(retryWhen
),它使用一個伴隨("companion") Flux
來判斷對某次錯誤是否要重試。這個伴隨 Flux
是由操做符建立的,可是由開發者包裝它, 從而實現對重試操做的配置。
這個伴隨 Flux
是一個 Flux<Throwable>
,它做爲 retryWhen
的惟一參數被傳遞給一個 Function
,你能夠定義這個 Function
並讓它返回一個新的 Publisher<?>
。重試的循環 會這樣運行:
Flux
,後者已經被你用 Function
包裝。Flux
發出元素,就會觸發重試。Flux
完成(complete),重試循環也會中止,而且原始序列也會 完成(complete)。Flux
產生一個錯誤,重試循環中止,原始序列也中止 或 完成,而且這個錯誤會致使 原始序列失敗並終止。瞭解前兩個場景的區別是很重要的。若是讓伴隨 Flux
完成(complete)等於吞掉了錯誤。以下代碼用 retryWhen
模仿了 retry(3)
的效果:
Flux<String> flux = Flux .<String>error(new IllegalArgumentException()) .doOnError(System.out::println) .retryWhen(companion -> companion.take(3));
持續產生錯誤。 | |
---|---|
在 retry 以前 的 doOnError 可讓咱們看到錯誤。 |
|
這裏,咱們認爲前 3 個錯誤是能夠重試的(take(3) ),再有錯誤就放棄。 |
事實上,上邊例子最終獲得的是一個 空的 Flux
,可是卻 成功 完成了。反觀對同一個 Flux
調用 retry(3)
的話,最終是以最後一個 error 終止 Flux
,故而 retryWhen
與之不一樣。
實現一樣的效果須要一些額外的技巧:
Flux<String> flux = Flux.<String>error(new IllegalArgumentException()) .retryWhen(companion -> companion .zipWith(Flux.range(1, 4), (error, index) -> { if (index < 4) return index; else throw Exceptions.propagate(error); }) );
技巧一:使用 zip 和一個「重試個數 + 1」的 range 。 |
|
---|---|
zip 方法讓你能夠在對重試次數計數的同時,仍掌握着原始的錯誤(error)。 |
|
容許三次重試,小於 4 的時候發出一個值。 | |
爲了使序列以錯誤結束。咱們將原始異常在三次重試以後拋出。 |
相似的代碼也能夠被用於實現 exponential backoff and retry 模式 (譯者加:重試指定的次數, 且每一次重試之間停頓的時間逐漸增長),參考 FAQ。 | |
---|---|
整體來講,全部的操做符自身均可能包含觸發異常的代碼,或自定義的可能致使失敗的代碼, 因此它們都自帶一些錯誤處理方式。
通常來講,一個 不受檢異常(Unchecked Exception) 老是由 onError
傳遞。例如, 在一個 map
方法中拋出 RuntimeException
會被翻譯爲一個 onError
事件,以下:
Flux.just("foo") .map(s -> { throw new IllegalArgumentException(s); }) .subscribe(v -> System.out.println("GOT VALUE"), e -> System.out.println("ERROR: " + e));
上邊代碼輸出以下:
ERROR: java.lang.IllegalArgumentException: foo
Exception 能夠在其被傳遞給 onError 以前,使用 hook 進行調整。 |
|
---|---|
Reactor,定義了一系列的可以致使「嚴重失敗」的錯誤(好比 OutOfMemoryError
),也可參考 Exceptions.throwIfFatal
方法。這些錯誤意味着 Reactor 無力處理只能拋出,沒法傳遞下去。
還有些狀況下不受檢異常仍然沒法傳遞下去(多數處於subscribe 和 request 階段), 由於可能因爲多線程競爭致使兩次 onError 或 onComplete 的狀況。當這種競爭發生的時候, 沒法傳遞下去的錯誤信號就被「丟棄」了。這些狀況仍然能夠經過自定義的 hook 來搞定,見 丟棄事件的 Hooks。 |
|
---|---|
你可能會問:「那麼 受檢查異常(Checked Exceptions)?」
若是你須要調用一個聲明爲 throws
異常的方法,你仍然須要使用 try-catch
代碼塊處理異常。 有幾種方式:
Exceptions
可用於這種方式(咱們立刻會講到)。Flux
(例如在 flatMap
中),將異常包裝在一個產生錯誤的 Flux
中: return Flux.error(checkedException)
(流序列也會終止)。Reactor 有一個工具類 Exceptions
,能夠確保在收到受檢異常的時候將其包裝(wrap)起來。
Exceptions.propagate
方法來包裝異常,它一樣會首先調用 throwIfFatal
, 而且不會包裝 RuntimeException
。Exceptions.unwrap
方法來獲得原始的未包裝的異常(追溯最初的異常)。下面是一個 map
的例子,它使用的 convert 方法會拋出 IOException
:
public String convert(int i) throws IOException { if (i > 3) { throw new IOException("boom " + i); } return "OK " + i; }
如今想象你將這個方法用於一個 map
中,你必須明確捕獲這個異常,而且你的 map
方法不能再次拋出它。 因此你能夠將其以 RuntimeException
的形式傳遞給 onError
:
Flux<String> converted = Flux .range(1, 10) .map(i -> { try { return convert(i); } catch (IOException e) { throw Exceptions.propagate(e); } });
當後邊訂閱上邊的這個 Flux
並響應錯誤(好比在用戶界面)的時候,若是你想處理 IOException, 你還能夠再將其轉換爲原始的異常。以下:
converted.subscribe( v -> System.out.println("RECEIVED: " + v), e -> { if (Exceptions.unwrap(e) instanceof IOException) { System.out.println("Something bad happened with I/O"); } else { System.out.println("Something bad happened"); } } );
Processors 既是一種特別的發佈者(Publisher
)又是一種訂閱者(Subscriber
)。 那意味着你能夠 訂閱一個 Processor
(一般它們會實現 Flux
),也能夠調用相關方法來手動 插入數據到序列,或終止序列。
Processor 有多種類型,它們都有特別的語義規則,可是在你研究它們以前,最好問一下 本身以下幾個問題:
多數狀況下,你應該進行避免使用 Processor
,它們較難正確使用,主要用於一些特殊場景下。
若是你以爲 Processor
適合你的使用場景,請首先看一下是否嘗試過如下兩種替代方式:
Processor
)若是看了以上替代方案,你仍然以爲須要一個 Processor
,閱讀 現有的 Processors 總覽 這一節來了解一下不一樣的實現吧。
Sink
門面對象來線程安全地生成流比起直接使用 Reactor 的 Processors
,更好的方式是經過調用一次 sink()
來獲得 Processor
的 Sink
。
FluxProcessor
的 sink 是線程安全的「生產者(producer)」,所以可以在應用程序中 多線程併發地生成數據。例如,一個線程安全的序列化(serialized)的 sink 可以經過 UnicastProcessor
建立:
UnicastProcessor<Integer> processor = UnicastProcessor.create(); FluxSink<Integer> sink = processor.sink(overflowStrategy);
多個生產者線程能夠併發地生成數據到如下的序列化 sink。
sink.next(n);
根據 Processor
及其配置,next
產生的溢出有兩種可能的處理方式:
IGNORE
策略,或將 overflowStrategy
應用於 sink
。Reactor Core 內置多種 Processor
。這些 processor 具備不一樣的語法,大概分爲三類。 下邊簡要介紹一下這三種 processor:
DirectProcessor
和 UnicastProcessor
):這些 processors 只能經過直接 調用 Sink
的方法來推送數據。EmitterProcessor
和 ReplayProcessor
):這些 processors 既能夠 直接調用 Sink
方法來推送數據,也能夠經過訂閱到一個上游的發佈者來同步地產生數據。WorkQueueProcessor
和 TopicProcessor
):這些 processors 能夠將從多個上游發佈者獲得的數據推送下去。因爲使用了 RingBuffer
的數據結構來 緩存多個來自上游的數據,所以更加有健壯性。異步的 processor 在實例化的時候最複雜,由於有許多不一樣的選項。所以它們暴露出一個 Builder
接口。 而簡單的 processors 有靜態的工廠方法。
DirectProcessor
能夠將信號分發給零到多個訂閱者(Subscriber
)。它是最容易實例化的,使用靜態方法 create()
便可。另外一方面,它的不足是沒法處理背壓。因此,當 DirectProcessor
推送的是 N 個元素,而至少有一個訂閱者的請求個數少於 N 的時候,就會發出一個 IllegalStateException
。
一旦 Processor
終止(一般經過調用它的 Sink
的 error(Throwable)
或 complete()
方法), 雖然它容許更多的訂閱者訂閱它,可是會當即向它們從新發送終止信號。
UnicastProcessor
能夠使用一個內置的緩存來處理背壓。代價就是它最多隻能有一個訂閱者。
UnicastProcessor
有多種選項,所以提供多種不一樣的 create
靜態方法。例如,它默認是 無限的(unbounded) :若是你在在訂閱者尚未請求數據的狀況下讓它推送數據,它會緩存全部數據。
能夠經過提供一個自定義的 Queue
的具體實現傳遞給 create
工廠方法來改變默認行爲。若是給出的隊列是 有限的(bounded), 而且緩存已滿,並且未收到下游的請求,processor 會拒絕推送數據。
在上邊 有限的 例子中,還能夠在構造 processor 的時候提供一個回調方法,這個回調方法能夠在每個 被拒絕推送的元素上調用,從而讓開發者有機會清理這些元素。
EmitterProcessor
可以向多個訂閱者發送數據,而且能夠對每個訂閱者進行背壓處理。它自己也能夠訂閱一個 Publisher
並同步得到數據。
最初若是沒有訂閱者,它仍然容許推送一些數據到緩存,緩存大小由 bufferSize
定義。 以後若是仍然沒有訂閱者訂閱它並消費數據,對 onNext
的調用會阻塞,直到有訂閱者接入 (這時只能併發地訂閱了)。
所以第一個訂閱者會收到最多 bufferSize
個元素。然而以後, processor 不會從新發送(replay) 數據給後續的訂閱者。這些後續接入的訂閱者只能獲取到它們開始訂閱 以後 推送的數據。這個內部的 緩存會繼續用於背壓的目的。
默認狀況下,若是全部的訂閱者都取消了(基本意味着它們都再也不訂閱(un-subscribed)了), 它會清空內部緩存,而且再也不接受更多的訂閱者。這一點能夠經過 create
靜態工廠方法的 autoCancel
參數來配置。
ReplayProcessor
會緩存直接經過自身的 Sink
推送的元素,以及來自上游發佈者的元素, 而且後來的訂閱者也會收到重發(replay)的這些元素。
能夠經過多種配置方式建立它:
cacheLast
)。create(int)
),全部的歷史元素(create()
)。createTimeout(Duration)
)。createSizeOrTimeout(int, Duration)
)。TopicProcessor
是一個異步的 processor,它可以重發來自多個上游發佈者的元素, 這須要在建立它的時候配置 shared
(見 build()
的 share(boolean)
配置)。
注意,若是你企圖在併發環境下經過併發的上游 Publisher 調用 TopicProcessor
的 onNext
、 onComplete
,或 onError
方法,就必須配置 shared。
不然,併發調用就是非法的,從而 processor 是徹底兼容響應式流規範的。
TopicProcessor
可以對多個訂閱者發送數據。它經過對每個訂閱者關聯一個線程來實現這一點, 這個線程會一直執行直到 processor 發出 onError
或 onComplete
信號,或關聯的訂閱者被取消。 最多能夠接受的訂閱者個數由構造者方法 executor
指定,經過提供一個有限線程數的 ExecutorService
來限制這一個數。
這個 processor 基於一個 RingBuffer
數據結構來存儲已發送的數據。每個訂閱者線程 自行管理其相關的數據在 RingBuffer
中的索引。
這個 processor 也有一個 autoCancel
構造器方法:若是設置爲 true
(默認的),那麼當 全部的訂閱者取消以後,源 Publisher
(s) 也就被取消了。
WorkQueueProcessor
也是一個異步的 processor,也可以重發來自多個上游發佈者的元素, 一樣在建立時須要配置 shared
(它多數構造器配置與 TopicProcessor
相同)。
它放鬆了對響應式流規範的兼容,可是好處就在於相對於 TopicProcessor
來講須要更少的資源。 它仍然基於 RingBuffer
,可是再也不要求每個訂閱者都關聯一個線程,所以相對於 TopicProcessor
來講更具擴展性。
代價在於分發模式有些區別:來自訂閱者的請求會彙總在一塊兒,而且這個 processor 每次只對一個 訂閱者發送數據,所以須要循環(round-robin)對訂閱者發送數據,而不是一次所有發出的模式。
沒法保證徹底公平的循環分發。 | |
---|---|
WorkQueueProcessor
多數構造器方法與 TopicProcessor
相同,好比 autoCancel
、share
, 以及 waitStrategy
。下游訂閱者的最大數目一樣由構造器 executor
配置的 ExecutorService
決定。
你最好注意不要有太多訂閱者訂閱 WorkQueueProcessor ,由於這 會鎖住 processor。 若是你須要限制訂閱者數量,最好使用一個 ThreadPoolExecutor 或 ForkJoinPool 。這個 processor 可以檢測到(線程池)容量並在訂閱者過多時拋出異常。 |
|
---|---|
翻譯建議 - "Reactor 核心特性"
Kotlin 是一種運行於 JVM(及其餘平臺)上的靜態(statically-typed)語言。 使用它能夠在擁有與現有 Java 庫良好https://kotlinlang.org/docs/reference/java-interop.html[互操做性] 的同時編寫簡介優雅的代碼。
本小節介紹了 Reactor 3.1 如何可以完美支持 Kotlin。
Kotlin 支持 Kotlin 1.1+ 及依賴 kotlin-stdlib
(或 kotlin-stdlib-jre7
/ kotlin-stdlib-jre8
之一)
多虧了其良好的 Java 互操做性 以及 Kotlin 擴展(extensions), Reactor Kotlin APIs 既可以使用 Java APIs,還可以收益於一些 Reactor 內置的專門支持 Kotlin 的 APIs。
注意 Kotlin 的擴展須要 import 纔可以使用。因此好比 Throwable.toFlux 的 Kotlin 擴展必須在 import reactor.core.publisher.toFlux 後纔可以使用。多數場景下 IDE 應該可以自動給出這種相似 static import 的建議。 |
|
---|---|
例如,https://kotlinlang.org/docs/reference/inline-functions.html#reified-type-parameters[Kotlin 參數類型推導(reified type parameters)] 對於 JVM 的 通用類型擦除(generics type erasure)提供了一種變通解決方案, Reactor 就能夠經過擴展(extension)來應用到這種特性。
下面是對「Reactor with Java」和「Reactor with Kotlin + extensions」的比較:
Java | Kotlin + extensions |
---|---|
Mono.just("foo") |
"foo".toMono() |
Flux.fromIterable(list) |
list.toFlux() |
Mono.error(new RuntimeException()) |
RuntimeException().toMono() |
Flux.error(new RuntimeException()) |
RuntimeException().toFlux() |
flux.ofType(Foo.class) |
flux.ofType<Foo>() or flux.ofType(Foo::class) |
StepVerifier.create(flux).verifyComplete() |
flux.test().verifyComplete() |
可參考 Reactor KDoc API 中詳細的關於 Kotlin 擴展的文檔。
Kotlin的一個關鍵特性就是 null 值安全 ——從而能夠在編譯時處理 null
值,而不是在運行時拋出著名的 NullPointerException
。 這樣,經過「可能爲空(nullability)」的聲明,以及可以代表「有值或空值」的語法(避免使用相似 Optional
來進行包裝),使得應用程序更加安全。(Kotlin容許在函數參數中使用可能爲空的值, 請參考 comprehensive guide to Kotlin null-safety)
儘管 Java 的類型系統不容許這樣的 null 值安全的表達方式, Reactor now provides null-safety 對全部 Reactor API 經過工具友好的(tooling-friendly)註解(在 reactor.util.annotation
包中定義)來支持。 默認狀況下,Java APIs 用於 Kotlin 的話會被做爲 平臺類型(platform types) 而放鬆對 null 的檢查。 Kotlin 對 JSR 305 註解的支持 + Reactor 可爲空(nullability)的註解,爲全部 Reactor API 和 Kotlin 開發者確保「null 值安全」的特性 (在編譯期處理 null 值)。
JSR 305 的檢查能夠經過增長 -Xjsr305
編譯參數進行配置: -Xjsr305={strict|warn|ignore}
。
對於 kotlin 1.1.50+,默認的配置爲 -Xjsr305=warn
。若是但願 Reactor API 可以全面支持 null 值安全 則須要配置爲 strict
。不過你能夠認爲這是實驗性的(experimental),由於 Reactor API 「可能爲空」 的聲明可能甚至在小版本的發佈中都會不斷改進,並且未來也可能增長新的檢查。
目前尚不支持通用類型參數、可變類型以及數組元素的「可爲空」。不過應該包含在接下來的發佈中,最新信息請看 這個issues。 | |
---|---|
翻譯建議 - "對 Kotlin 的支持"
不管你是編寫了一個簡單的 Reactor 操做鏈,仍是開發了自定義的操做符,對它進行 自動化的測試老是一個好主意。
Reactor 內置一些專門用於測試的元素,放在一個專門的 artifact 裏: reactor-test
。 你能夠在 on Github 的 reactor-core 庫裏找到這個項目。
若是要用它來進行測試,添加 scope 爲 test 的依賴。
reactor-test 用 Maven 配置 <dependencies>
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>
若是你使用了 BOM,你不須要指定 <version> 。 |
|
---|---|
reactor-test 用 Gradle 配置 dependencies
dependencies { testCompile 'io.projectreactor:reactor-test' }
reactor-test
的兩個主要用途:
StepVerifier
一步一步地測試一個給定場景的序列。TestPublisher
生成數據來測試下游的操做符(包括你本身的operator)。StepVerifier
來測試最多見的測試 Reactor 序列的場景就是定義一個 Flux
或 Mono
,而後在訂閱它的時候測試它的行爲。
當你的測試關注於每一次的事件的時候,就很是容易轉化爲使用 StepVerifier
的測試場景: 下一個指望的事件是什麼?你是否指望使用 Flux
來發出一個特別的值?或者接下來 300ms 什麼都不作?全部這些均可以使用 StepVerifier
API 來表示。
例如,你可能會使用以下的工具方法來包裝一個 Flux
:
public <T> Flux<T> appendBoomError(Flux<T> source) { return source.concatWith(Mono.error(new IllegalArgumentException("boom"))); }
要測試它的話,你須要校驗以下內容:
我但願這個
Flux
先發出foo
,而後發出bar
,而後 生成一個內容爲 boom 的錯誤。 最後訂閱並校驗它們。
使用 StepVerifier
API 來表示以上的驗證過程:
@Test public void testAppendBoomError() { Flux<String> source = Flux.just("foo", "bar"); StepVerifier.create( appendBoomError(source)) .expectNext("foo") .expectNext("bar") .expectErrorMessage("boom") .verify(); }
因爲被測試方法須要一個 Flux ,定義一個簡單的 Flux 用於測試。 |
|
---|---|
建立一個 StepVerifier 構造器來包裝和校驗一個 Flux 。 |
|
傳進來須要測試的 Flux (即待測方法的返回結果)。 |
|
第一個咱們指望的信號是 onNext ,它的值爲 foo 。 |
|
最後咱們指望的是一個終止信號 onError ,異常內容應該爲 boom 。 |
|
不要忘了使用 verify() 觸發測試。 |
API 是一個構造器,經過傳入一個要測試的序列來建立一個 StepVerifier
。從而你能夠:
AssertionError
)。例如你可能會用到 expectNext(T...)
或 expectNextCount(long)
。assertion
的時候會用到它(好比要校驗是否有一個 onNext
信號,並校驗對應發出的元素是不是一個 size 爲 5 的 List)。你可能會用到 consumeNextWith(Consumer<T>)
。thenAwait(Duration)
和 then(Runnable)
。對於終止事件,相應的指望方法(expectComplete()
、expectError()
,及其全部的變體方法) 使用以後就不能再繼續增長別的指望方法了。最後你只能對 StepVerifier
進行一些額外的配置並 觸發校驗(一般調用 verify()
及其變體方法)。
從 StepVerifier 內部來看,它訂閱了待測試的 Flux
或 Mono
,而後將序列中的每一個信號與測試 場景的指望進行比對。若是匹配的話,測試成功。若是有不匹配的狀況,則拋出 AssertionError
異常。
請記住是 verify() 觸發了校驗過程。這個 API 還有一些結合了 verify() 與指望的終止信號 的方法:verifyComplete() 、verifyError() 、verifyErrorMessage(String) 等。 |
|
---|---|
注意,若是有一個傳入 lambda 的指望方法拋出了 AssertionError
,會被報告爲測試失敗。 這可用於自定義 assertion。
默認狀況下,verify() 方法(及同源的 verifyThenAssertThat 、verifyComplete() 等) 沒有超時的概念。它可能會永遠阻塞住。你能夠使用 StepVerifier.setDefaultTimeout(Duration) 來設置一個全局的超時時間,或使用 verify(Duration) 指定。 |
|
---|---|
StepVerifier
能夠用來測試基於時間的操做符,從而避免測試的長時間運行。能夠使用構造器 StepVerifier.withVirtualTime
達到這一點。
示例以下:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1))) //... 繼續追加指望方法
虛擬時間(virtual time) 的功能會在 Reactor 的調度器(Scheduler
)工廠方法中插入一個自定義的 調度器。這些基於時間的操做符一般默認使用 Schedulers.parallel()
調度器。(虛擬時間的) 技巧在於使用一個 VirtualTimeScheduler
來代替默認調度器。然而一個重要的前提就是,只有在初始化 虛擬時間調度器以後的操做符纔會起做用。
爲了提升 StepVerifier
正常起做用的機率,它通常不接收一個簡單的 Flux
做爲輸入,而是接收 一個 Supplier
,從而能夠在配置好訂閱者 以後 「懶建立」待測試的 flux。
要注意的是,Supplier<Publisher<T>> 可用於「懶建立」,不然不能保證虛擬時間 能徹底起做用。尤爲要避免提早實例化 Flux ,要在 Supplier 中用 lambda 建立並返回 Flux 變量。 |
|
---|---|
有兩種處理時間的指望方法,不管是否配置虛擬時間都是可用的:
thenAwait(Duration)
暫停校驗步驟(容許信號延遲發出)。expectNoEvent(Duration)
一樣讓序列持續必定的時間,期間若是有 任何 信號發出則測試失敗。兩個方法都會基於給定的持續時間暫停線程的執行,若是是在虛擬時間模式下就相應地使用虛擬時間。
expectNoEvent 將訂閱(subscription )也認做一個事件。假設你用它做爲第一步,若是檢測 到有訂閱信號,也會失敗。這時候能夠使用 expectSubscription().expectNoEvent(duration) 來代替。 |
|
---|---|
爲了快速校驗前邊提到的 Mono.delay
,咱們能夠這樣完成代碼:
StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1))) .expectSubscription() .expectNoEvent(Duration.ofDays(1)) .expectNext(0) .verifyComplete();
如上 tip。 | |
---|---|
期待一天內沒有信號發生。 | |
而後期待一個 next 信號爲 0 。 |
|
而後期待完成(同時觸發校驗)。 |
咱們也能夠使用 thenAwait(Duration.ofDays(1))
,可是 expectNoEvent
的好處是 可以驗證在此以前不會發生什麼。
注意 verify()
返回一個 Duration
,這是整個測試的 真實時間。
虛擬時間並不是銀彈。請記住 全部的 調度器都會被替換爲 VirtualTimeScheduler 。 有些時候你能夠鎖定校驗過程,由於虛擬時鐘在遇到第一個指望校驗以前並不會開始,因此對於 「無數據「的指望校驗也必須可以運行在虛擬時間模式下。在無限序列中,虛擬時間模式的發揮 空間也頗有限,由於它可能致使線程(序列的發出和校驗的運行都在這個線程上)卡住。 |
|
---|---|
StepVerifier
進行「後校驗」當配置完你測試場景的最後的指望方法後,你能夠使用 verifyThenAssertThat()
來代替 verify()
觸發執行後的校驗。
verifyThenAssertThat()
返回一個 StepVerifier.Assertions
對象,你能夠用它來校驗 整個測試場景成功剛結束後的一些狀態(它也會調用 verify())。典型應用就是校驗有多少 元素被操做符丟棄(參考 Hooks)。
Context
更多關於 Context
的內容請參考 增長一個 Context 到響應式序列。
StepVerifier
有一些指望方法能夠用來測試 Context
:
expectAccessibleContext
: 返回一個 ContextExpectations
對象,你能夠用它來在 Context
上配置指望校驗。必定記住要調用 then()
來返回到對序列的指望校驗上來。expectNoAccessibleContext
: 是對「沒有Context
」的校驗。一般用於 被測試的 Publisher
並非一個響應式的,或沒有任何操做符可以傳遞 Context
(好比一個 generate
的 Publisher
).此外,還能夠用 StepVerifierOptions
方法傳入一個測試用的初始 Context
給 StepVerifier
, 從而能夠建立一個校驗(verifier)。
這些特性經過下邊的代碼演示:
StepVerifier.create(Mono.just(1).map(i -> i + 10), StepVerifierOptions.create().withInitialContext(Context.of("foo", "bar"))) .expectAccessibleContext() .contains("foo", "bar") .then() .expectNext(11) .verifyComplete();
使用 StepVerifierOptions 建立 StepVerifier 並傳入初始 Context 。 |
|
---|---|
開始對 Context 進行校驗,這裏只是確保 Context 正常傳播了。 |
|
對 Context 進行校驗的例子:好比驗證是否包含一個 "foo" - "bar" 鍵值對。 |
|
使用 then() 切換回對序列的校驗。 |
|
不要忘了用 verify() 觸發整個校驗過程。 |
TestPublisher
手動發出元素對於更多高級的測試,若是可以徹底掌控源發出的數據就會方便不少,由於這樣就能夠在測試的 時候更加有的放矢地發出想測的數據。
另外一種狀況就是你實現了本身的操做符,而後想校驗它的行爲——尤爲是在源不穩定的時候——是否符合響應式流規範。
reactor-test
提供了 TestPublisher
類來應對這兩種需求。這個類本質上是一個 Publisher<T>
, 你能夠經過可編程的方式來用它發出各類信號:
next(T)
以及 next(T, T...)
發出 1-n 個 onNext
信號。emit(T...)
起一樣做用,而且會執行 complete()
。complete()
會發出終止信號 onComplete
。error(Throwable)
會發出終止信號 onError
。使用 create
工廠方法就能夠獲得一個正常的 TestPublisher
。而使用 createNonCompliant
工廠方法能夠建立一個「不正常」的 TestPublisher
。後者須要傳入由 TestPublisher.Violation
枚舉指定的一組選項,這些選項可用於告訴 publisher 忽略哪些問題。枚舉值有:
REQUEST_OVERFLOW
: 容許 next
在請求不足的時候也能夠調用,而不會觸發 IllegalStateException
。ALLOW_NULL
: 容許 next
可以發出一個 null
值而不會觸發 NullPointerException
。CLEANUP_ON_TERMINATE
: 能夠重複屢次發出終止信號,包括 complete()
、error()
和 emit()
。最後,TestPublisher
還能夠用不一樣的 assert*
來跟蹤其內部的訂閱狀態。
使用轉換方法 flux()
和 mono()
能夠將其做爲 Flux
和 Mono
來使用。
PublisherProbe
檢查執行路徑當構建複雜的操做鏈時,可能會有多個子序列,從而致使多個執行路徑。
多數時候,這些子序列會生成一個足夠明確的 onNext
信號,你能夠經過檢查最終結果 來判斷它是否執行。
考慮下邊這個方法,它構建了一條操做鏈,並使用 switchIfEmpty
方法在源爲空的狀況下, 替換成另外一個源。
public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) { return source .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+"))) .switchIfEmpty(fallback); }
很容易就能夠測試出 switchIfEmpty 的哪個邏輯分支被使用了,以下:
@Test public void testSplitPathIsUsed() { StepVerifier.create(processOrFallback(Mono.just("just a phrase with tabs!"), Mono.just("EMPTY_PHRASE"))) .expectNext("just", "a", "phrase", "with", "tabs!") .verifyComplete(); } @Test public void testEmptyPathIsUsed() { StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE"))) .expectNext("EMPTY_PHRASE") .verifyComplete(); }
可是若是例子中的方法返回的是一個 Mono<Void>
呢?它等待源發送結束,執行一個額外的任務, 而後就結束了。若是源是空的,則執行另外一個備用的相似於 Runnable 的任務,以下:
private Mono<String> executeCommand(String command) { return Mono.just(command + " DONE"); } public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) { return commandSource .flatMap(command -> executeCommand(command).then()) .switchIfEmpty(doWhenEmpty); }
then() 方法會忽略 command,它只關心是否結束。 |
|
---|---|
兩個都是空序列,這個時候如何區分(哪邊執行了)呢? |
爲了驗證執行路徑是通過了 doWhenEmpty
的,你須要編寫額外的代碼,好比你須要一個這樣的 Mono<Void>
:
在 3.1 版本之前,你須要爲每一種狀態維護一個 AtomicBoolean
變量,而後在相應的 doOn*
回調中觀察它的值。這須要添加很多的額外代碼。好在,版本 3.1.0 以後能夠使用 PublisherProbe
來作, 以下:
@Test public void testCommandEmptyPathIsUsed() { PublisherProbe<Void> probe = PublisherProbe.empty(); StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) .verifyComplete(); probe.assertWasSubscribed(); probe.assertWasRequested(); probe.assertWasNotCancelled(); }
建立一個探針(probe),它會轉化爲一個空序列。 | |
---|---|
在須要使用 Mono<Void> 的位置調用 probe.mono() 來替換爲探針。 |
|
序列結束以後,你能夠用這個探針來判斷序列是如何使用的,你能夠檢查是它從哪(條路徑)被訂閱的… | |
…對於請求也是同樣的… | |
…以及是否被取消了。 |
你也能夠在使用 Flux<T>
的位置經過調用 .flux()
方法來放置探針。若是你既須要用探針檢查執行路徑 還須要它可以發出數據,你能夠用 PublisherProbe.of(Publisher)
方法包裝一個 Publisher<T>
來搞定。
從命令式和同步式編程切換到響應式和異步式編程有時候是使人生畏的。 學習曲線中最陡峭的異步就是出錯時如何分析和調試。
在命令式世界,調試一般都是很是直觀的:直接看 stack trace 就能夠找到問題出現的位置, 以及:是否問題責任所有出在你本身的代碼?問題是否是發生在某些庫代碼?若是是, 那你的哪部分代碼調用了庫,是否是傳參不合適致使的問題?
當你切換到異步代碼,事情就變得複雜的多了。
看一下下邊的 stack trace:
一段典型的 Reactor stack trace
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120) at reactor.core.publisher.FluxFlatMap$FlatMapMain.emitScalar(FluxFlatMap.java:380) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:349) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:119) at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:144) at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:99) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:172) at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:316) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:94) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:67) at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:98) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668) at reactor.core.publisher.Mono.subscribe(Mono.java:2629) at reactor.core.publisher.Mono.subscribe(Mono.java:2604) at reactor.core.publisher.Mono.subscribe(Mono.java:2582) at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:722)
這裏邊有好多信息,咱們獲得了一個 IndexOutOfBoundsException
,內容是 "源發出了 不止一個元素"。
咱們也許能夠很快假定這個源是一個 Flux/Mono,並經過下一行提到的 MonoSingle
肯定是 Mono。 看上去是來自一個 single
操做符的抱怨。
查看 Javadoc 中關於操做符 Mono#single
的說明,咱們看到 single
有一個規定: 源必須只能發出一個元素。看來是有一個源發出了多於一個元素,從而違反了這一規定。
咱們能夠更進一步找出那個源嗎?下邊的這些內容幫不上什麼忙,只是打印了一些內部的彷佛是一個響應式鏈的信息, 主要是一些 subscribe
和 request
的調用。
粗略過一下這些行,咱們至少能夠勾畫出一個大體的出問題的鏈:大概涉及一個 MonoSingle
、一個 FluxFlatMap
,以及一個 FluxRange
(每個都對應 trace 中的幾行,但整體涉及這三個類)。 因此難道是 range().flatMap().single()
這樣的鏈?
可是若是在咱們的應用中多處都用到這一模式,那怎麼辦?經過這些仍是不能肯定什麼, 搜索 single
也找不到問題所在。最後一行指向了咱們的代碼。咱們彷佛終於接近真相了。
不過,等等… 當咱們找到源碼文件,咱們只能找到一個已存在的 Flux
被訂閱了,以下:
toDebug.subscribe(System.out::println, Throwable::printStackTrace);
全部這些都發生在訂閱時,可是 Flux
自己沒有在這裏 聲明 。更糟的是, 當咱們找到變量聲明的地方,咱們看到:
public Mono<String> toDebug; //請忽略 public 的屬性
變量聲明的地方並無 實例化 。咱們必須作最壞的打算,那就是在這個應用中, 可能在幾個不一樣的代碼路徑上對這個變量賦了值,但咱們不肯定是哪個致使了問題。
這是一種 Reactor 運行時錯誤,而不是編譯錯誤。 | |
---|---|
咱們但願找到的是操做符在哪裏添加到操做鏈上的 —— 也就是 Flux
在哪裏 聲明的。咱們一般說這個 Flux
是被 組裝(assembly) 的。
即使 stack trace 可以對有些許經驗的開發者傳遞一些信息,可是在一些複雜的狀況下, 這並非一種理想的方式。
幸運的是,Reactor 內置了一種面向調試的能力—— 操做期測量(assembly-time instrumentation)。
這經過 在應用啓動的時候 (或至少在有問題的 Flux
或 Mono
實例化以前) 加入自定義的 Hook.onOperator
鉤子(hook),以下:
Hooks.onOperatorDebug();
這行代碼——經過包裝操做符的構造方法,並在此捕捉 stack trace——來監測對這個 Flux
(或 Mono
)的操做符的調用(也就是「組裝」鏈的地方)。因爲這些在 操做鏈被聲明的地方就搞定,這個 hook 應該在 早於 聲明的時候被激活, 最保險的方式就是在你程序的最開始就激活它。
以後,若是發生了異常,致使失敗的操做符可以找到捕捉點並補充 stack trace。
在下一小節,咱們看一下 stack trace 會有什麼不一樣,以及如何對其進行分析。
咱們在對上邊的例子激活 operatorStacktrace
調試功能後,stack trace 以下:
java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:120) at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:314) ... ... at reactor.core.publisher.Mono.subscribeWith(Mono.java:2668) at reactor.core.publisher.Mono.subscribe(Mono.java:2629) at reactor.core.publisher.Mono.subscribe(Mono.java:2604) at reactor.core.publisher.Mono.subscribe(Mono.java:2582) at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:727) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoSingle] : reactor.core.publisher.Flux.single(Flux.java:5335) reactor.guide.GuideTests.scatterAndGather(GuideTests.java:689) reactor.guide.GuideTests.populateDebug(GuideTests.java:702) org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) org.junit.rules.RunRules.evaluate(RunRules.java:20) Error has been observed by the following operator(s): |_ Flux.single(TestWatcher.java:55)
這一條是新的:能夠發現外層操做符捕捉到了 stack trace。 | |
---|---|
第一部分的 stack trace 多數與上邊同樣,顯示了操做符內部的信息(因此省略了這一塊)。 | |
從這裏開始,是在調試模式下顯示的內容。 | |
首先咱們得到了關於操做符組裝的信息。 | |
以及錯誤沿着操做鏈傳播的軌跡(從錯誤點到訂閱點)。 | |
每個看到這個錯誤的操做符都會列出,包括類和行信息。若是操做符是在 Reactor 源碼內部組裝的,行信息會被忽略。 |
可見,捕獲的 stack trace 做爲 OnAssemblyException
添加到原始錯誤信息的以後。有兩部分, 可是第一部分更加有意思。它顯示了操做符觸發異常的路徑。這裏顯示的是 scatterAndGather
方法中的 single
致使的問題,而 scatterAndGather
方法是在 JUnit 中被 populateDebug
方法調用的。
既然咱們已經有足夠的信息來查出罪魁禍首,咱們就來看一下 scatterAndGather
方法吧:
private Mono<String> scatterAndGather(Flux<String> urls) { return urls.flatMap(url -> doRequest(url)) .single(); }
找到了,就是這個 single 。 |
|
---|---|
如今咱們能夠發現錯誤的根源是將多個 HTTP 請求轉化爲 URLs 的 flatMap
方法後邊接的是 single
, 這太嚴格了。使用 git blame
找到代碼做者,並同他討論事後,發現他是原本是想用不那麼嚴格的 take(1)
方法的。
咱們解決了問題。
錯誤被如下這些操做符觀察(observed)了:
調試信息的第二部分在這個例子中意義不大,由於錯誤實際發生在最後一個操做符上(離 subscribe
最近的一個)。 另外一個例子可能更加清楚:
FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane")) .transform(FakeUtils1.applyFilters) .transform(FakeUtils2.enrichUser) .blockLast();
如今想象一下在 findAllUserByName
內部有個 map
方法報錯了。咱們可能會看到以下的 trace:
Error has been observed by the following operator(s): |_ Flux.map(FakeRepository.java:27) |_ Flux.map(FakeRepository.java:28) |_ Flux.filter(FakeUtils1.java:29) |_ Flux.transform(GuideDebuggingExtraTests.java:41) |_ Flux.elapsed(FakeUtils2.java:30) |_ Flux.transform(GuideDebuggingExtraTests.java:42)
這與鏈上收到錯誤通知的操做符是一致:
map
。map
看到(都在 findAllUserByName
方法中)。filter
和一個 transform
看到,說明鏈的這部分是由一個可重複使用的轉換方法組裝的 (這裏是 applyFilters
工具方法)。elapsed
和一個 transform
看到,相似的, elapsed
由第二個轉換方法(enrichUser
) 組裝。用這種形式的檢測方式構造 stack trace 是成本較高的。也所以這種調試模式做爲最終大招, 只應該在可控的方式下激活。
checkpoint()
方式替代調試模式是全局性的,會影響到程序中每個組裝到一個 Flux
或 Mono
的操做符。好處在於能夠進行 過後調試(after-the-fact debugging):不管錯誤是什麼,咱們都會獲得足夠的調試信息。
就像前邊見到的那樣,這種全局性的調試會由於成本較高而影響性能(其影響在於生成的 stack traces 數量)。 若是咱們能大概定位到疑似出問題的操做符的話就能夠不用花那麼大的成本。然而,問題出現後, 咱們一般沒法定位到哪個操做符可能存在問題,由於缺乏一些 trace 信息,咱們得修改代碼, 打開調試模式,指望可以復現問題。
這種狀況下,咱們須要切換到調試模式,並進行一些必要的準備工做以便可以更好的發現復現的問題, 並捕捉到全部的信息。(譯者加:這兩段感受有點廢話。。。)
若是你能肯定是在你的代碼中組裝的響應式鏈存在問題,並且程序的可服務性又是很重要的, 那麼你能夠 使用 checkpoint() 操做符,它有兩種調試技術可用。
你能夠把這個操做符加到鏈中。這時 checkpoint
操做符就像是一個 hook,但只對它所在的鏈起做用。
還有一個 checkpoint(String)
的方法變體,你能夠傳入一個獨特的字符串以方便在 assembly traceback 中識別信息。 這樣會省略 stack trace,你能夠依賴這個字符串(如下改稱「定位描述符」)來定位到組裝點。checkpoint(String)
比 checkpoint
有更低的執行成本。
checkpoint(String)
在它的輸出中包含 "light" (能夠方便用於搜索),以下所示:
... Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly site of producer [reactor.core.publisher.FluxElapsed] is identified by light checkpoint [light checkpoint identifier].
最後的但一樣重要的是,若是你既想經過 checkpoint 添加定位描述符,同時又依賴於 stack trace 來定位組裝點,你能夠使用 checkpoint("description", true)
來實現這一點。這時回溯信息又出來了, 同時附加了定位描述符,以下例所示:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:174) reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescription(FluxOnAssemblyTest.java:159) Error has been observed by the following operator(s): |_ ParallelFlux.checkpointnull
descriptionCorrelation1234 是經過 checkpoint 給出的定位描述符。 |
|
---|---|
定位描述符能夠是靜態的字符串、或人類可讀的描述、或一個 correlation ID(例如, 來自 HTTP 請求頭的信息)。
當全局調試模式和 checkpoint() 都開啓的時候,checkpoint 的 stacks 輸出會做爲 suppressed 錯誤輸出,按照聲明順序添加在操做符圖(graph)的後面。 |
|
---|---|
除了基於 stack trace 的調試和分析,還有一個有效的工具能夠跟蹤異步序列並記錄日誌。
就是 log()
操做符。將其加到操做鏈上以後,它會讀(只讀,peek)每個 在其上游的 Flux
或 Mono
事件(包括 onNext
、onError
、 onComplete
, 以及 訂閱、 取消、和 請求)。
邊注:關於 logging 的具體實現
log
操做符經過 SLF4J 使用相似 Log4J 和 Logback 這樣的公共的日誌工具, 若是 SLF4J 不存在的話,則直接將日誌輸出到控制檯。
控制檯使用 System.err
記錄 WARN
和 ERROR
級別的日誌,使用 System.out
記錄其餘級別的日誌。
若是你喜歡使用 JDK java.util.logging
,在 3.0.x 你能夠設置 JDK 的系統屬性 reactor.logging.fallback
。
假設咱們配置並激活了 logback,以及一個形如 range(1,10).take(3)
的操做鏈。經過將 log()
放在 take 以前, 咱們就能夠看到它內部是如何運行的,以及什麼樣的事件會向上遊傳播給 range,以下所示:
Flux<Integer> flux = Flux.range(1, 10) .log() .take(3); flux.subscribe();
輸出以下(經過 logger 的 console appender):
10:45:20.200 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | request(unbounded) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(1) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(2) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | onNext(3) 10:45:20.205 [main] INFO reactor.Flux.Range.1 - | cancel()
這裏,除了 logger 本身的格式(時間、線程、級別、消息),log()
操做符 還輸出了其餘一些格式化的東西:
reactor.Flux.Range.1 是自動生成的日誌 類別(category),以防你在操做鏈中屢次使用 同一個操做符。經過它你能夠分辨出來是哪一個操做符的事件(這裏是 range 的)。 你能夠調用 log(String) 方法用自定義的類別替換這個標識符。在幾個用於分隔的字符以後, 打印出了實際的事件。這裏是一個 onSubscribe 調用、一個 request 調用、三個 onNext 調用, 以及一個 cancel 調用。對於第一行的 onSubscribe ,咱們知道了 Subscriber 的具體實現, 一般與操做符指定的實現是一致的,在方括號內有一些額外信息,包括這個操做符是否可以 經過同步或異步融合(fusion,具體見附錄 [microfusion])的方式進行自動優化。 |
|
---|---|
第二行,咱們能夠看到是一個由下游傳播上來的個數無限的請求。 | |
而後 range 一下發出三個值。 | |
最後一行,咱們看到了 cancel() 。 |
最後一行,(4),最有意思。咱們看到 take
在這裏發揮做用了。在它拿到足夠的元素以後, 就將序列切斷了。簡單來講,take()
致使源在發出用戶請求的數量後 cancel()
了。
翻譯建議 - "調試 Reactor"
這一章涉及以下的 Reactor 的高級特性與概念:
ConnectableFlux
對多個訂閱者進行廣播ParallelFlux
進行並行處理Schedulers
從代碼整潔的角度來講,重用代碼是一個好辦法。Reactor 提供了幾種幫你打包重用代碼的方式, 主要經過使用操做符或者經常使用的「操做符組合」的方法來實現。若是你以爲一段操做鏈很經常使用, 你能夠將這段操做鏈打包封裝後備用。
transform
操做符transform
操做符能夠將一段操做鏈封裝爲一個函數式(function)。這個函數式能在操做期(assembly time) 將被封裝的操做鏈中的操做符還原並接入到調用 transform
的位置。這樣作和直接將被封裝的操做符 加入到鏈上的效果是同樣的。示例以下:
Function<Flux<String>, Flux<String>> filterAndMap = f -> f.filter(color -> !color.equals("orange")) .map(String::toUpperCase); Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .doOnNext(System.out::println) .transform(filterAndMap) .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));
上邊例子的輸出以下:
blue Subscriber to Transformed MapAndFilter: BLUE green Subscriber to Transformed MapAndFilter: GREEN orange purple Subscriber to Transformed MapAndFilter: PURPLE
compose
操做符compose
操做符與 transform
相似,也可以將幾個操做符封裝到一個函數式中。 主要的區別就是,這個函數式做用到原始序列上的話,是 基於每個訂閱者的(on a per-subscriber basis) 。這意味着它對每個 subscription 能夠生成不一樣的操做鏈(經過維護一些狀態值)。 以下例所示:
AtomicInteger ai = new AtomicInteger(); Function<Flux<String>, Flux<String>> filterAndMap = f -> { if (ai.incrementAndGet() == 1) { return f.filter(color -> !color.equals("orange")) .map(String::toUpperCase); } return f.filter(color -> !color.equals("purple")) .map(String::toUpperCase); }; Flux<String> composedFlux = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .doOnNext(System.out::println) .compose(filterAndMap); composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d)); composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));
上邊的例子輸出以下:
blue Subscriber 1 to Composed MapAndFilter :BLUE green Subscriber 1 to Composed MapAndFilter :GREEN orange purple Subscriber 1 to Composed MapAndFilter :PURPLE blue Subscriber 2 to Composed MapAndFilter: BLUE green Subscriber 2 to Composed MapAndFilter: GREEN orange Subscriber 2 to Composed MapAndFilter: ORANGE purple
到目前爲止,咱們一直認爲 Flux
(和 Mono
)都是這樣的:它們都表明了一種異步的數據序列, 在訂閱(subscribe)以前什麼都不會發生。
可是實際上,廣義上有兩種發佈者:「熱」與「冷」(hot and cold)。
(本文檔)到目前介紹的其實都是 cold 家族的發佈者。它們爲每個訂閱(subscription) 都生成數據。若是沒有建立任何訂閱(subscription),那麼就不會生成數據。
試想一個 HTTP 請求:每個新的訂閱者都會觸發一個 HTTP 調用,可是若是沒有訂閱者關心結果的話, 那就不會有任何調用。
另外一方面,熱 發佈者,不依賴於訂閱者的數量。即便沒有訂閱者它們也會發出數據, 若是有一個訂閱者接入進來,那麼它就會收到訂閱以後發出的元素。對於熱發佈者, 在你訂閱它以前,確實已經發生了什麼。
just
是 Reactor 中少數幾個「熱」操做符的例子之一:它直接在組裝期(assembly time) 就拿到數據,若是以後有誰訂閱它,就從新發送數據給訂閱者。再拿 HTTP 調用舉例,若是給 just
傳入的數據是一個 HTTP 調用的結果,那麼以後在初始化 just 的時候纔會進行惟一的一次網絡調用。
若是想將 just
轉化爲一種 冷 的發佈者,你能夠使用 defer
。它可以將剛纔例子中對 HTTP 的請求延遲到訂閱時(這樣的話,對於每個新的訂閱來講,都會發生一次網絡調用)。
Reactor 中多數其餘的 熱 發佈者是擴展自 Processor 的。 |
|
---|---|
考慮其餘兩個例子,以下是第一個例子:
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .doOnNext(System.out::println) .filter(s -> s.startsWith("o")) .map(String::toUpperCase); source.subscribe(d -> System.out.println("Subscriber 1: "+d)); source.subscribe(d -> System.out.println("Subscriber 2: "+d));
第一個例子輸出以下:
blue green orange Subscriber 1: ORANGE purple blue green orange Subscriber 2: ORANGE purple
兩個訂閱者都觸發了全部的顏色,由於每個訂閱者都會讓構造 Flux
的操做符運行一次。
將下邊的例子與第一個例子對比:
UnicastProcessor<String> hotSource = UnicastProcessor.create(); Flux<String> hotFlux = hotSource.publish() .autoConnect() .map(String::toUpperCase); hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d)); hotSource.onNext("blue"); hotSource.onNext("green"); hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d)); hotSource.onNext("orange"); hotSource.onNext("purple"); hotSource.onComplete();
第二個例子輸出以下:
Subscriber 1 to Hot Source: BLUE Subscriber 1 to Hot Source: GREEN Subscriber 1 to Hot Source: ORANGE Subscriber 2 to Hot Source: ORANGE Subscriber 1 to Hot Source: PURPLE Subscriber 2 to Hot Source: PURPLE
第一個訂閱者收到了全部的四個顏色,第二個訂閱者因爲是在前兩個顏色發出以後訂閱的, 故而收到了以後的兩個顏色,在輸出中有兩次 "ORANGE" 和 "PURPLE"。從這個例子可見, 不管是否有訂閱者接入進來,這個 Flux 都會運行。
ConnectableFlux
對多個訂閱者進行廣播有時候,你不只想要延遲到某一個訂閱者訂閱以後纔開始發出數據,可能還但願在多個訂閱者 到齊 以後 纔開始。
ConnectableFlux
的用意便在於此。Flux
API 中有兩種主要的返回 ConnectableFlux
的方式:publish
和 replay
。
publish
會嘗試知足各個不一樣訂閱者的需求(背壓),並綜合這些請求反饋給源。 尤爲是若是有某個訂閱者的需求爲 0
,publish 會 暫停 它對源的請求。replay
將對第一個訂閱後產生的數據進行緩存,最多緩存數量取決於配置(時間/緩存大小)。 它會對後續接入的訂閱者從新發送數據。ConnectableFlux
提供了多種對下游訂閱的管理。包括:
connect
當有足夠的訂閱接入後,能夠對 flux 手動執行一次。它會觸發對上游源的訂閱。autoConnect(n)
與 connect 相似,不過是在有 n
個訂閱的時候自動觸發。refCount(n)
不只可以在訂閱者接入的時候自動觸發,還會檢測訂閱者的取消動做。若是訂閱者數量不夠, 會將源「斷開鏈接」,再有新的訂閱者接入的時候纔會繼續「連上」源。refCount(int, Duration)
增長了一個 "優雅的倒計時":一旦訂閱者數量過低了,它會等待 Duration
的時間,若是沒有新的訂閱者接入纔會與源「斷開鏈接」。示例以下:
Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("subscribed to source")); ConnectableFlux<Integer> co = source.publish(); co.subscribe(System.out::println, e -> {}, () -> {}); co.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("done subscribing"); Thread.sleep(500); System.out.println("will now connect"); co.connect();
The preceding code produces the following output:
done subscribing will now connect subscribed to source 1 1 2 2 3 3
使用 autoConnect
:
Flux<Integer> source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("subscribed to source")); Flux<Integer> autoCo = source.publish().autoConnect(2); autoCo.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("subscribed first"); Thread.sleep(500); System.out.println("subscribing second"); autoCo.subscribe(System.out::println, e -> {}, () -> {});
以上代碼輸出以下:
subscribed first subscribing second subscribed to source 1 1 2 2 3 3
當你有許多的元素,而且想將他們分批處理,Reactor 整體上有三種方案:分組(grouping)、 窗口(windowing)(譯者注:感受這個不翻譯更明白。。。)、緩存(buffering)。 這三種在概念上相似,由於它們都是將 Flux<T>
進行彙集。分組和分段操做都會建立一個 Flux<Flux<T>>
,而緩存操做獲得的是一個 Collection<T>
(譯者注:應該是一個 Flux<Collection<T>>
)。
Flux<GroupedFlux<T>>
進行分組分組可以根據 key 將源 Flux<T>
拆分爲多個批次。
對應的操做符是 groupBy
。
每一組用 GroupedFlux<T>
類型表示,使用它的 key()
方法能夠獲得該組的 key。
在組內,元素並不須要是連續的。當源發出一個新的元素,該元素會被分發到與之匹配的 key 所對應的組中(若是尚未該 key 對應的組,則建立一個)。
這意味着組:
StepVerifier.create( Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) .groupBy(i -> i % 2 == 0 ? "even" : "odd") .concatMap(g -> g.defaultIfEmpty(-1) //若是組爲空,顯示爲 -1 .map(String::valueOf) //轉換爲字符串 .startWith(g.key())) //以該組的 key 開頭 ) .expectNext("odd", "1", "3", "5", "11", "13") .expectNext("even", "2", "4", "6", "12") .verifyComplete();
分組操做適用於分組個數很少的場景。並且全部的組都必須被消費,這樣 groupBy 才能持續從上游獲取數據。有時候這兩種要求在一塊兒——好比元素數量超多, 可是並行的用來消費的 flatMap 又太少的時候——會致使程序卡死。 |
|
---|---|
Flux<Flux<T>>
進行 window 操做window 操做是 根據個數、時間等條件,或可以定義邊界的發佈者(boundary-defining Publisher
), 把源 Flux<T>
拆分爲 windows。
對應的操做符有 window
、windowTimeout
、windowUntil
、windowWhile
,以及 windowWhen
。
與 groupBy
的主要區別在於,窗口操做可以保持序列順序。而且同一時刻最多隻能有兩個 window 是開啓的。
它們 能夠 重疊。操做符參數有 maxSize
和 skip
,maxSize
指定收集多少個元素就關閉 window,而 skip
指定收集多數個元素後就打開下一個 window。因此若是 maxSize > skip
的話, 一個新的 window 的開啓會先於當前 window 的關閉, 從而兩者會有重疊。
重疊的 window 示例以下:
StepVerifier.create( Flux.range(1, 10) .window(5, 3) //overlapping windows .concatMap(g -> g.defaultIfEmpty(-1)) //將 windows 顯示爲 -1 ) .expectNext(1, 2, 3, 4, 5) .expectNext(4, 5, 6, 7, 8) .expectNext(7, 8, 9, 10) .expectNext(10) .verifyComplete();
若是將兩個參數的配置反過來(maxSize < skip ),序列中的一些元素就會被丟棄掉, 而不屬於任何 window。 |
|
---|---|
對基於判斷條件的 windowUntil
和 windowWhile
,若是序列中的元素不匹配判斷條件, 那麼可能致使 空 windows,以下例所示:
StepVerifier.create( Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) .windowWhile(i -> i % 2 == 0) .concatMap(g -> g.defaultIfEmpty(-1)) ) .expectNext(-1, -1, -1) //分別被奇數 1 3 5 觸發 .expectNext(2, 4, 6) // 被 11 觸發 .expectNext(12) // 被 13 觸發 .expectNext(-1) // 空的 completion window,若是 onComplete 前的元素可以匹配上的話就沒有這個了 .verifyComplete();
Flux<List<T>>
進行緩存緩存與窗口相似,不一樣在於:緩存操做以後會發出 buffers (類型爲Collection<T>
, 默認是 List<T>
),而不是 windows (類型爲 Flux<T>
)。
緩存的操做符與窗口的操做符是對應的:buffer
、bufferTimeout
、bufferUntil
、bufferWhile
, 以及bufferWhen
。
若是說對於窗口操做符來講,是開啓一個窗口,那麼對於緩存操做符來講,就是建立一個新的集合, 而後對其添加元素。而窗口操做符在關閉窗口的時候,緩存操做符則是發出一個集合。
緩存操做也會有丟棄元素或內容重疊的狀況,以下:
StepVerifier.create( Flux.range(1, 10) .buffer(5, 3) // 緩存重疊 ) .expectNext(Arrays.asList(1, 2, 3, 4, 5)) .expectNext(Arrays.asList(4, 5, 6, 7, 8)) .expectNext(Arrays.asList(7, 8, 9, 10)) .expectNext(Collections.singletonList(10)) .verifyComplete();
不像窗口方法,bufferUntil
和 bufferWhile
不會發出空的 buffer,以下例所示:
StepVerifier.create( Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) .bufferWhile(i -> i % 2 == 0) ) .expectNext(Arrays.asList(2, 4, 6)) // 被 11 觸發 .expectNext(Collections.singletonList(12)) // 被 13 觸發 .verifyComplete();
ParallelFlux
進行並行處理現在多核架構已然普及,可以方便的進行並行處理是很重要的。Reactor 提供了一種特殊的類型 ParallelFlux
來實現並行,它可以將操做符調整爲並行處理方式。
你能夠對任何 Flux
使用 parallel()
操做符來獲得一個 ParallelFlux
. 不過這個操做符自己並不會進行並行處理,而是將負載劃分到多個「軌道(rails)」上 (默認狀況下,軌道個數與 CPU 核數相等)。
爲了配置 ParallelFlux 如何並行地執行每個軌道,你須要使用 runOn(Scheduler)
。 注意,Schedulers.parallel()
是推薦的專門用於並行處理的調度器。
下邊有兩個用於比較的例子,第一個以下:
Flux.range(1, 10) .parallel(2) .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
咱們給定一個軌道數字,而不是依賴於 CPU 核數。 | |
---|---|
下邊是第二個例子:
Flux.range(1, 10) .parallel(2) .runOn(Schedulers.parallel()) .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
第一個例子輸出以下:
main -> 1 main -> 2 main -> 3 main -> 4 main -> 5 main -> 6 main -> 7 main -> 8 main -> 9 main -> 10
第二個例子在兩個線程中並行執行,輸出以下:
parallel-1 -> 1 parallel-2 -> 2 parallel-1 -> 3 parallel-2 -> 4 parallel-1 -> 5 parallel-2 -> 6 parallel-1 -> 7 parallel-1 -> 9 parallel-2 -> 8 parallel-2 -> 10
若是在並行地處理以後,須要退回到一個「正常」的 Flux
而使後續的操做鏈按非並行模式執行, 你能夠對 ParallelFlux
使用 sequential()
方法。
注意,當你在對 ParallelFlux 使用一個 Subscriber
而不是基於 lambda 進行訂閱(subscribe()
) 的時候,sequential()
會自動地被偷偷應用。
注意 subscribe(Subscriber<T>)
會合並全部的執行軌道,而 subscribe(Consumer<T>)
會在全部軌道上運行。 若是 subscribe()
方法中是一個 lambda,那麼有幾個軌道,lambda 就會被執行幾回。
你還能夠使用 groups()
做爲 Flux<GroupedFlux<T>>
進入到各個軌道或組裏邊, 而後能夠經過 composeGroup()
添加額外的操做符。
Schedulers
就像咱們在 調度器(Schedulers) 這一節看到的那樣, Reactor Core 內置許多 Scheduler
的具體實現。 你能夠用形如 new*
的工廠方法來建立調度器,每一種調度器都有一個單例對象,你能夠使用單例工廠方法 (好比 Schedulers.elastic()
而不是 Schedulers.newElastic()
)來獲取它。
當你不明確指定調度器的時候,那些須要調度器的操做符會使用這些默認的單例調度器對象。例如, Flux#delayElements(Duration)
使用的是 Schedulers.parallel()
調度器對象。
然而有些狀況下,你可能須要「一刀切」(就不用對每個操做符都傳入你本身的調度器做爲參數了) 地調整這些默認調度器。 一個典型的例子就是,假設你須要對每個被調度的任務統計執行時長, 就想把默認的調度器包裝一下,而後添加計時功能。
那麼能夠使用 Schedulers.Factory
類來改變默認的調度器。默認狀況下,一個 Factory
會使用一些「命名比較直白」 的方法來建立全部的標準 Scheduler
。每個方法你均可以用本身的實現方式來重寫。
此外,Factory
還提供一個額外的自定義方法 decorateExecutorService
。它會在建立每個 reactor-core 調度器——內部有一個 ScheduledExecutorService
(即便是好比用 Schedulers.newParallel()
方法建立的這種非默認的調度器)——的時候被調用。
你能夠經過調整 ScheduledExecutorService
來改變調度器:(譯者加:decorateExecutorService
方法)經過一個 Supplier
參數暴露出來,你能夠直接繞過這個 supplier 返回你本身的調度器實例,或者用 (譯者加: Schedulers.ScheduledExecutorService
的)get()
獲得默認實例,而後包裝它, 這取決於配置的調度器類型。
當你搞定了一個定製好的 Factory 後,你必須使用 Schedulers.setFactory(Factory) 方法來安裝它。 |
|
---|---|
最後,對於調度器來講,有一個可自定義的 hook:onHandleError
。這個 hook 會在提交到這個調度器的 Runnable
任務拋出異常的時候被調用(注意,若是還設置了一個 UncaughtExceptionHandler
, 那麼它和 hook 都會被調用)。
Reactor 還有另一類可配置的應用於多種場合的回調,它們都在 Hooks
類中定義,整體來講有三類:
當生成源的操做符不聽從響應式流規範的時候,Dropping hooks(用於處理丟棄事件的 hooks)會被調用。 這種類型的錯誤是處於正常的執行路徑以外的(也就是說它們不能經過 onError
傳播)。
典型的例子是,假設一個發佈者即便在被調用 onCompleted
以後仍然能夠經過操做符調用 onNext
。 這種狀況下,onNext
的值會被 丟棄,若是有多餘的 onError
的信號亦是如此。
相應的 hook,onNextDropped
以及 onErrorDropped
,能夠提供一個全局的 Consumer
, 以便可以在丟棄的狀況發生時進行處理。例如,你能夠使用它來對丟棄事件記錄日誌,或進行資源清理 (使用資源的值可能壓根沒有到達響應式鏈的下游)。
連續設置兩次 hook 的話都會起做用:提供的每個 consumer 都會被調用。使用 Hooks.resetOn*Dropped()
方法能夠將 hooks 所有重置爲默認。
若是操做符在執行其 onNext
、onError
以及 onComplete
方法的時候拋出異常,那麼 onOperatorError
這一個 hook 會被調用。
與上一類 hook 不一樣,這個 hook 仍是處在正常的執行路徑中的。一個典型的例子就是包含一個 map 函數式的 map
操做符拋出的異常(好比零做爲除數),這時候仍是會執行到 onError
的。
首先,它會將異常傳遞給 onOperatorError
。利用這個 hook 你能夠檢查這個錯誤(以及有問題的相關數據), 並能夠 改變 這個異常。固然你還能夠作些別的事情,好比記錄日誌或返回原始異常。
注意,onOperatorError
hook 也能夠被屢次設置:你能夠提供一個 String
爲一個特別的 BiFunction
類型的函數式設置識別符,不一樣識別符的函數式都會被執行,固然,重複使用一個識別符的話, 則後來的設置會覆蓋前邊的設置。
所以,默認的 hook 能夠使用 Hooks.resetOnOperatorError()
方法重置,而提供識別符的 hook 能夠使用 Hooks.resetOnOperatorError(String)
方法來重置。
這些組裝(assembly) hooks 關聯了操做符的生命週期。它們會在一個操做鏈被組裝起來的時候(即實例化的時候) 被調用。每個新的操做符組裝到操做鏈上的時候,onEachOperator
都會返回一個不一樣的發佈者, 從而能夠利用它動態調整操做符。onLastOperator
與之相似,不過只會在被操做鏈上的最後一個 (subscribe
調用以前的)操做符調用。
相似於 onOperatorError
,也能夠疊加,而且經過識別符來標識。也是用相似的方式重置所有或部分 hooks。
Hooks
工具類還提供了一些預置的 hooks。利用他們能夠改變一些默認的處理方式,而不用本身 編寫 hook:
onNextDroppedFail()
:onNextDropped
一般會拋出 Exceptions.failWithCancel()
異常。 如今它默認還會以 DEBUG 級別對被丟棄的值記錄日誌。若是想回到原來的只是拋出異常的方式,使用 onNextDroppedFail()
。onOperatorDebug()
: 這個方法會激活 debug mode。它與 onOperatorError
hook 關聯,因此調用 resetOnOperatorError()
同時也會重置它。不過它內部也用到了特別的識別符, 你能夠經過 resetOnOperatorDebug()
方法來重置它。當從命令式編程風格切換到響應式編程風格的時候,一個技術上最大的挑戰就是線程處理。
與習慣作法不一樣的是,在響應式編程中,一個線程(Thread
)能夠被用於處理多個同時運行的異步序列 (其實是非阻塞的)。執行過程也會常常從一個線程切換到另外一個線程。
這樣的狀況下,對於開發者來講,若是依賴線程模型中相對「穩定」的特性——好比 ThreadLocal
——就會變得很難。由於它會讓你將數據綁定到一個 線程 上,因此在響應式環境中使用就變得 比較困難。所以,將使用了 ThreadLocal
的庫應用於 Reactor 的時候就會帶來新的挑戰。一般會更糟, 它用起來效果會更差,甚至會失敗。 好比,使用 Logback 的 MDC 來存儲日誌關聯的 ID,就是一個很是符合 這種狀況的例子。
一般的對 ThreadLocal
的替代方案是將環境相關的數據 C
,同業務數據 T
一塊兒置於序列中, 好比使用 Tuple2<T, C>
。這種方案看起來並很差,何況會在方法和 Flux
泛型中暴露環境數據信息。
自從版本 3.1.0
,Reactor 引入了一個相似於 ThreadLocal
的高級功能:Context
。它做用於一個 Flux
或一個 Mono
上,而不是應用於一個線程(Thread
)。
爲了說明,這裏有個讀寫 Context
的簡單例子:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete();
接下來的幾個小節,咱們來了解 Context
是什麼以及如何用,從而最終能夠理解上邊的例子。
這是一個主要面向庫開發人員的高級功能。這須要開發者對 Subscription 的生命週期 充分理解,而且明白它主要用於 subscription 相關的庫。 |
|
---|---|
Context
APIContext
是一個相似於 Map
(這種數據結構)的接口:它存儲鍵值(key-value)對,你須要經過 key 來獲取值:
Object
類型,因此 Context
能夠包含任意數量的任意對象。Context
是 不可變的(immutable)。put(Object key, Object value)
方法來存儲一個鍵值對,返回一個新的 Context
對象。 你也能夠用 putAll(Context)
方法將兩個 context 合併爲一個新的 context。hasKey(Object key)
方法檢查一個 key 是否已經存在。getOrDefault(Object key, T defaultValue)
方法取回 key 對應的值(類型轉換爲 T
), 或在找不到這個 key 的狀況下返回一個默認值。getOrEmpty(Object key)
來獲得一個 Optional<T>
(context 會嘗試將值轉換爲 T
)。delete(Object key)
來刪除 key 關聯的值,並返回一個新的 Context
。建立一個 Context 時,你能夠用靜態方法 Context.of 預先存儲最多 5 個鍵值對。 它接受 2, 4, 6, 8 或 10 個 Object 對象,兩兩一對做爲鍵值對添加到 Context 。 你也能夠用 Context.empty() 方法來建立一個空的 Context 。 |
|
---|---|
Context
綁定到 Flux
and Writing爲了使用 context,它必需要綁定到一個指定的序列,而且鏈上的每一個操做符均可以訪問它。 注意,這裏的操做符必須是 Reactor 內置的操做符,由於 Context
是 Reactor 特有的。
實際上,一個 Context
是綁定到每個鏈中的 Subscriber
上的。 它使用 Subscription
的傳播機制來讓本身對每個操做符均可見(從最後一個 subscribe
沿鏈向上)。
爲了填充 Context
——只能在訂閱時(subscription time)——你須要使用 subscriberContext
操做符。
subscriberContext(Context)
方法會將你提供的 Context
與來自下游(記住,Context
是從下游 向上遊傳播的)的 Context
合併。 這經過調用 putAll
實現,最後會生成一個新的 Context
給上游。
你也能夠用更高級的 subscriberContext(Function<Context, Context>) 。它接受來自下游的 Context ,而後你能夠根據須要添加或刪除值,而後返回新的 Context 。你甚至能夠返回一個徹底不一樣 的對象,雖然不太建議這樣(這樣可能影響到依賴這個 Context 的庫)。 |
|
---|---|
填充 Context
是一方面,讀取數據一樣重要。多數時候,添加內容到 Context
是最終用戶的責任, 可是利用這些信息是庫的責任,由於庫一般是客戶代碼的上游。
讀取 context 數據使用靜態方法 Mono.subscriberContext()
。
本例的初衷是爲了讓你對如何使用 Context
有個更好的理解。
讓咱們先回頭看一下最初的例子:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete();
操做鏈以調用 subscriberContext(Function) 結尾,將 "World" 做爲 "message" 這個 key 的 值添加到 Context 中。 |
|
---|---|
對源調用 flatMap 用 Mono.subscriberContext() 方法拿到 Context 。 |
|
而後使用 map 讀取關聯到 "message" 的值,而後與原來的值鏈接。 |
|
最後 Mono<String> 確實發出了 "Hello World" 。 |
上邊的數字順序並非按照代碼行順序排的,這並不是錯誤:它表明了執行順序。雖然 subscriberContext 是鏈上的最後一個環節,但確實最早執行的(緣由在於 subscription 信號 是從下游向上的)。 |
|
---|---|
注意在你的操做鏈中,寫入 與 讀取 Context
的 相對位置 很重要:由於 Context
是不可變的,它的內容只能被上游的操做符看到,以下例所示:
String key = "message"; Mono<String> r = Mono.just("Hello") .subscriberContext(ctx -> ctx.put(key, "World")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.getOrDefault(key, "Stranger"))); StepVerifier.create(r) .expectNext("Hello Stranger") .verifyComplete();
寫入 Context 的位置太靠上了… |
|
---|---|
因此在 flatMap 就沒有 key 關聯的值,使用了默認值。 |
|
結果 Mono<String> 發出了 "Hello Stranger" 。 |
下面的例子一樣說明了 Context
的不可變性(Mono.subscriberContext()
老是返回由 subscriberContext
配置的 Context
):
String key = "message"; Mono<String> r = Mono.subscriberContext() .map( ctx -> ctx.put(key, "Hello")) .flatMap( ctx -> Mono.subscriberContext()) .map( ctx -> ctx.getOrDefault(key,"Default")); StepVerifier.create(r) .expectNext("Default") .verifyComplete();
拿到 Context 。 |
|
---|---|
在 map 方法中咱們嘗試修改它。 |
|
在 flatMap 中再次獲取 Context 。 |
|
讀取 Context 中可能的值。 |
|
值歷來沒有被設置爲 "Hello" 。 |
相似的,若是屢次對 Context
中的同一個 key 賦值的話,要看 寫入的相對順序 : 讀取 Context
的操做符只能拿到下游最近的一次寫入的值,以下例所示:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor") .verifyComplete();
寫入 "message" 的值。 |
|
---|---|
另外一次寫入 "message" 的值。 |
|
map 方法值能拿到下游最近的一次寫入的值: "Reactor" 。 |
這裏,首先 Context
中的 key 被賦值 "World"
。而後訂閱信號(subscription signal)向上遊 移動,又發生了另外一次寫入。此次生成了第二個不變的 Context
,裏邊的值是 "Reactor"
。以後, 數據開始流動, flatMap
拿到最近的 Context
,也就是第二個值爲 Reactor
的 Context
。
你可能會以爲 Context
是與數據信號一塊傳播的。若是是那樣的話,在兩次寫入操做中間加入的一個 flatMap
會使用最上游的這個 Context
。但並非這樣的,以下:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "Reactor")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello Reactor World") .verifyComplete();
這裏是第一次賦值。 | |
---|---|
這裏是第二次賦值。 | |
第一個 flatMap 看到的是第二次的賦值。 |
|
第二個 flatMap 將上一個的結果與 第一次賦值 的 context 值鏈接。 |
|
Mono 發出的是 "Hello Reactor World" 。 |
緣由在於 Context
是與 Subscriber
關聯的,而每個操做符訪問的 Context
來自其下游的 Subscriber
。
最後一個有意思的傳播方式是,對 Context
的賦值也能夠在一個 flatMap
內部,以下:
String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) ) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) .subscriberContext(ctx -> ctx.put(key, "Reactor")) ) .subscriberContext(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World Reactor") .verifyComplete();
這個 subscriberContext 不會影響所在 flatMap 以外的任何東西。 |
|
---|---|
這個 subscriberContext 會影響主序列的 Context 。 |
上邊的例子中,最後發出的值是 "Hello World Reactor"
而不是 "Hello Reactor World",由於賦值 "Reactor" 的 subscriberContext
是做用於第二個 flatMap
的內部序列的。因此不會在主序列可見/ 傳播,第一個 flatMap
也看不到它。傳播(Propagation) + 不可變性(immutability)將相似 flatMap
這樣的操做符中的建立的內部序列中的 Context
與外部隔離開來。
讓咱們來看一個實際的從 Context
中讀取值的例子:一個響應式的 HTTP 客戶端將一個 Mono<String>
(用於 PUT
請求)做爲數據源,同時經過一個特定的 key 使用 Context 將關聯的ID信息放入請求頭中。
從用戶角度,是這樣調用的:
doPut("www.example.com", Mono.just("Walter"))
爲了傳播一個關聯ID,應該這樣調用:
doPut("www.example.com", Mono.just("Walter")) .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
由上可見,用戶代碼使用了 subscriberContext
來爲 Context
的 HTTP_CORRELATION_ID
賦值。上游的操做符是一個由 HTTP 客戶端庫返回的 Mono<Tuple2<Integer, String>>
(一個簡化的 HTTP 響應)。因此可以正確將信息從用戶代碼傳遞給庫代碼。
下邊的例子演示了從庫的角度由 context 讀取值的模擬代碼,若是可以找到關聯ID,則「增長請求」:
static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId"; Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) { Mono<Tuple2<String, Optional<Object>>> dataAndContext = data.zipWith(Mono.subscriberContext() .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))); return dataAndContext .<String>handle((dac, sink) -> { if (dac.getT2().isPresent()) { sink.next("PUT <" + dac.getT1() + "> sent to " + url + " with header X-Correlation-ID = " + dac.getT2().get()); } else { sink.next("PUT <" + dac.getT1() + "> sent to " + url); } sink.complete(); }) .map(msg -> Tuples.of(200, msg)); }
用 Mono.subscriberContext() 拿到 Context 。 |
|
---|---|
提取出關聯ID的值——是一個 Optional 。 |
|
若是值存在,那麼就將其加入請求頭。 |
在這段庫代碼片斷中,你能夠看到它是如何將 Mono
和 Mono.subscriberContext()
zip 起來的。 返回的是一個 Tuple2<String, Context>
,這個 Context
包含來自下游的 HTTP_CORRELATION_ID
的值。
庫代碼接着用 map
讀取出那個 key 的值 Optional<String>
,若是值存在,將其做爲 X-Correlation-ID
請求頭。 最後一塊而用 handle
來處理。
用來驗證上邊的庫代碼的測試程序以下:
@Test public void contextForLibraryReactivePut() { Mono<String> put = doPut("www.example.com", Mono.just("Walter")) .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf")) .filter(t -> t.getT1() < 300) .map(Tuple2::getT2); StepVerifier.create(put) .expectNext("PUT <Walter> sent to www.example.com with header X-Correlation-ID = 2-j3r9afaf92j-afkaf") .verifyComplete(); }
雖然 Java 的類型系統沒有表達空值安全(null-safety)的機制,可是 Reactor 如今提供了基於註解的用於聲明 「可能爲空(nullability)」的 API,相似於 Spring Framework 5 中提供的 API。
Reactor 自身就用到了這些註解,你也能夠將其用於任何基於 Reactor 的本身的空值安全的 Java API 中。 不過,在 方法體內部 對「可能爲空」的類型的使用就不在這一特性的範圍內了。
這些註解是基於 JSR 305 的註解(是受相似 IntelliJ IDEA 這樣的工具支持的 JSR)做爲元註解(meta-annotated)的。當 Java 開發者在編寫空值安全的代碼時, 它們可以提供有用的警告信息,以便避免在運行時(runtime)出現 NullPointerException
異常。 JSR 305 元註解使得工具提供商能夠以一種通用的方式提供對空值安全的支持,從而 Reactor 的註解就不用重複造輪子了。
對於 Kotlin 1.1.5+,須要(同時也推薦)在項目 classpath 中添加對 JSR 305 的依賴。 | |
---|---|
它們也可在 Kotlin 中使用,Kotlin 原生支持 空值安全。具體請參考 this dedicated section 。
reactor.util.annotation
包提供如下註解:
@NonNull
代表一個具體的參數、返回值或域值不能爲 null
。 (若是參數或返回值應用了 @NonNullApi
則無需再加它)。@Nullable
代表一個參數、返回值或域值能夠爲 null
。@NonNullApi
是一個包級別的註解,代表默認狀況下參數或返回值不能爲 null
。(Reactor 的空值安全的註解)對於通用類型參數(generic type arguments)、可變參數(varargs),以及數組元素(array elements) 尚不支持。參考 issue #878 查看最新信息。 | |
---|---|
TIP:在這一節,若是一個操做符是專屬於 Flux
或 Mono
的,那麼會給它註明前綴。 公共的操做符沒有前綴。若是一個具體的用例涉及多個操做符的組合,這裏以方法調用的方式展示, 會以一個點(.)開頭,並將參數置於圓括號內,好比: .methodCall(parameter)
。
我想搞定:
T
,我已經有了:just
Optional<T>
:Mono#justOrEmpty(Optional<T>)
null
的 T:Mono#justOrEmpty(T)
T
,且仍是由 just
方法返回
Mono#fromSupplier
或用 defer
包裝 just
T
,這些元素我能夠明確列舉出來:Flux#just(T...)
Flux#fromArray
Flux#fromIterable
Flux#range
Stream
提供給每個訂閱:Flux#fromStream(Supplier<Stream>)
Supplier<T>
:Mono#fromSupplier
Mono#fromCallable
,Mono#fromRunnable
CompletableFuture<T>
:Mono#fromFuture
empty
error
Throwable
:error(Supplier<Throwable>)
never
defer
using
Flux#generate
Flux#create
(Mono#create
也是異步的,只不過只能發一個)map
cast
Flux#index
flatMap
+ 使用一個工廠方法handle
flatMap
+ 一個異步的返回類型爲 Publisher
的方法
Mono.empty()
Flux#flatMapSequential
(對每一個元素的異步任務會當即執行,但會將結果按照原序列順序排序)Mono#flatMapMany
Flux#startWith(T...)
Flux#concatWith(T...)
Flux
轉化爲集合(一下都是針對 Flux
的)
collectList
,collectSortedList
collectMap
,collectMultiMap
collect
count
reduce
scan
all
any
hasElements
hasElement
Flux#concat
或 .concatWith(other)
Flux#concatDelayError
Flux#mergeSequential
Flux#merge
/ .mergeWith(other)
Flux#zip
/ Flux#zipWith
Tuple2
:Mono#zipWith
Mono#zip
Mono<Void>
:Mono#and
Mono<Void>
:Mono#when
Flux#zip
Flux#combineLatest
Flux#first
,Mono#first
,mono.or (otherMono).or(thirdMono)
,`flux.or(otherFlux).or(thirdFlux)flatMap
,不過「喜新厭舊」):switchMap
switchOnNext
repeat
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
defaultIfEmpty
switchIfEmpty
ignoreElements
Mono
來表示序列已經結束:then
thenEmpty
Mono
:Mono#then(mono)
Mono#thenReturn(T)
Flux
:thenMany
Mono#delayUntilOther
Mono#delayUntil(Function)
expand(Function)
expandDeep(Function)
doOnNext
Flux#doOnComplete
,Mono#doOnSuccess
doOnError
doOnCancel
doOnSubscribe
doOnRequest
doOnTerminate
(Mono的方法可能包含有結果)
doAfterTerminate
Signal
):Flux#doOnEach
doFinally
log
single
對象:
doOnEach
single
對象:materialize
dematerialize
log
filter
filterWhen
ofType
ignoreElements
Flux#distinct
Flux#distinctUntilChanged
Flux#take(long)
Flux#take(Duration)
Mono
中返回:Flux#next()
request(N)
而不是取消:Flux#limitRequest(long)
Flux#takeLast
Flux#takeUntil
(基於判斷條件),Flux#takeUntilOther
(基於對 publisher 的比較)Flux#takeWhile
Flux#elementAt
.takeLast(1)
Flux#last()
Flux#last(T)
Flux#skip(long)
Flux#skip(Duration)
Flux#skipLast
Flux#skipUntil
(基於判斷條件),Flux#skipUntilOther
(基於對 publisher 的比較)Flux#skipWhile
Flux#sample(Duration)
sampleFirst
Flux#sample(Publisher)
Flux#sampleTimeout
(每個元素會觸發一個 publisher,若是這個 publisher 不被下一個元素觸發的 publisher 覆蓋就發出這個元素)Flux#single()
Flux#single(T)
Flux#singleOrEmpty
error
…
Flux
:.concat(Flux.error(e))
Mono
:.then(Mono.error(e))
timeout
error(Supplier<Throwable>)
error
onErrorReturn
Flux
或 Mono
:onErrorResume
.onErrorMap(t -> new RuntimeException(t))
doFinally
using
工廠方法onErrorReturn
Publisher
:Flux#onErrorResume
和 Mono#onErrorResume
retry
retryWhen
IllegalStateException
:Flux#onBackpressureError
Flux#onBackpressureDrop
Flux#onBackpressureLatest
Flux#onBackpressureBuffer
Flux#onBackpressureBuffer
帶有策略 BufferOverflowStrategy
Tuple2<Long, T>
…
elapsed
timestamp
timeout
Flux#interval
0
:static Mono.delay
.Mono#delayElement
,Flux#delayElements
delaySubscription
Flux
Flux<T>
拆分爲一個 Flux<Flux<T>>
:
window(int)
window(int, int)
window(Duration)
window(Duration, Duration)
windowTimeout(int, Duration)
windowUntil
cutBefore
變量):.windowUntil(predicate, true)
windowWhile
(不知足條件的元素會被丟棄)window(Publisher)
,windowWhen
Flux<T>
的元素拆分到集合…
List
:
buffer(int)
buffer(int, int)
buffer(Duration)
buffer(Duration, Duration)
bufferTimeout(int, Duration)
bufferUntil(Predicate)
.bufferUntil(predicate, true)
bufferWhile(Predicate)
buffer(Publisher)
,bufferWhen
buffer(int, Supplier<C>)
Flux<T>
中具備共同特徵的元素分組到子 Flux:groupBy(Function<T,K>)
TIP:注意返回值是 Flux<GroupedFlux<K, T>>
,每個 GroupedFlux
具備相同的 key 值 K
,能夠經過 key()
方法獲取。Flux<T>
,我想:
Flux#blockFirst
Flux#blockFirst(Duration)
Flux#blockLast
Flux#blockLast(Duration)
Iterable<T>
:Flux#toIterable
Stream<T>
:Flux#toStream
Mono<T>
,我想:
Mono#block
Mono#block(Duration)
CompletableFuture<T>
:Mono#toFuture
翻譯建議 - "我須要哪一個操做符?"
不少時候,信息源是同步和阻塞的。在 Reactor 中,咱們用如下方式處理這種信息源:
Mono blockingWrapper = Mono.fromCallable(() -> { return /* make a remote synchronous call */ }); blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic());
使用 fromCallable 方法生成一個 Mono; |
|
---|---|
返回同步、阻塞的資源; | |
使用 Schedulers.elastic() 確保對每個訂閱來講運行在一個專門的線程上。 |
由於調用返回一個值,因此你應該使用 Mono。你應該使用 Schedulers.elastic
由於它會建立一個專門的線程來等待阻塞的調用返回。
注意 subscribeOn
方法並不會「訂閱」這個 Mono
。它只是指定了訂閱操做使用哪一個 Scheduler
。
Flux
上的操做符好像沒起做用,爲啥?請確認你確實對調用 .subscribe()
的發佈者應用了這個操做符。
Reactor 的操做符是裝飾器(decorators)。它們會返回一個不一樣的(發佈者)實例, 這個實例對上游序列進行了包裝並增長了一些的處理行爲。因此,最推薦的方式是將操做符「串」起來。
對比下邊的兩個例子:
沒有串起來(不正確的)
Flux<String> flux = Flux.just("foo", "chain"); flux.map(secret -> secret.replaceAll(".", "*")); flux.subscribe(next -> System.out.println("Received: " + next));
問題在這, flux 變量並無改變。 |
|
---|---|
串起來(正確的)
Flux<String> flux = Flux.just("foo", "chain"); flux = flux.map(secret -> secret.replaceAll(".", "*")); flux.subscribe(next -> System.out.println("Received: " + next));
下邊的例子更好(由於更簡潔):
串起來(最好的)
Flux<String> secrets = Flux .just("foo", "chain") .map(secret -> secret.replaceAll(".", "*")) .subscribe(next -> System.out.println("Received: " + next));
第一個例子的輸出:
Received: foo Received: chain
後兩個例子的輸出:
Received: *** Received: *****
Mono
zipWith
/zipWhen
沒有被調用例子
myMethod.process("a") // 這個方法返回 Mono<Void> .zipWith(myMethod.process("b"), combinator) //沒有被調用 .subscribe();
若是源 Mono
爲空或是一個 Mono<Void>
(Mono<Void>
一般用於「空」的場景), 下邊的組合操做就不會被調用。
對於相似 zipWith
的用於轉換的操做符來講,這是比較典型的場景。 這些操做符依賴於數據元素來轉換爲輸出的元素。 若是任何一個序列是空的,則返回的就是一個空序列,因此請謹慎使用。 例如在 then()
以後使用 zipWith()
就會致使這一問題。
對於以 Function
做爲參數的 and
更是如此,由於返回的 Mono 是依賴於收到的數據懶加載的(而對於空序列或 Void
的序列來講是沒有數據發出來的)。
你能夠使用 .defaultIfEmpty(T)
將空序列替換爲包含 T
類型缺省值的序列(而不是 Void
序列), 從而能夠避免相似的狀況出現。舉例以下:
在 zipWhen
前使用 defaultIfEmpty
myMethod.emptySequenceForKey("a") // 這個方法返回一個空的 Mono<String> .defaultIfEmpty("") // 將空序列轉換爲包含字符串 "" 的序列 .zipWhen(aString -> myMethod.process("b")) // 當 "" 發出時被調用 .subscribe();
retryWhen
來實現 retry(3)
的效果?retryWhen
方法比較複雜,但願下邊的一段模擬 retry(3)
的代碼可以幫你更好地理解它的工做方式:
Flux<String> flux = Flux.<String>error(new IllegalArgumentException()) .retryWhen(companion -> companion .zipWith(Flux.range(1, 4), (error, index) -> { if (index < 4) return index; else throw Exceptions.propagate(error); }) );
技巧一:使用 zip 和一個「重試個數 + 1」的 range 。 |
|
---|---|
zip 方法讓你能夠在對重試次數計數的同時,仍掌握着原始的錯誤(error)。 |
|
容許三次重試,小於 4 的時候發出一個值。 | |
爲了使序列以錯誤結束。咱們將原始異常在三次重試以後拋出。 |
retryWhen
進行 exponential backoff?Exponential backoff 的意思是進行的屢次重試之間的間隔愈來愈長, 從而避免對源系統形成過載,甚至宕機。基本原理是,若是源產生了一個錯誤, 那麼已是處於不穩定狀態,可能不會馬上覆原。因此,若是馬上就重試可能會產生另外一個錯誤, 致使源更加不穩定。
下面是一段實現 exponential backoff 效果的例子,每次重試的間隔都會遞增 (僞代碼: delay = attempt number * 100 milliseconds):
Flux<String> flux = Flux.<String>error(new IllegalArgumentException()) .retryWhen(companion -> companion .doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) .zipWith(Flux.range(1, 4), (error, index) -> { if (index < 4) return index; else throw Exceptions.propagate(error); }) .flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) .doOnNext(s -> System.out.println("retried at " + LocalTime.now())) );
記錄錯誤出現的時間; | |
---|---|
使用 retryWhen + zipWith 的技巧實現重試3次的效果; |
|
經過 flatMap 來實現延遲時間遞增的效果; |
|
一樣記錄重試的時間。 |
訂閱它,輸出以下:
java.lang.IllegalArgumentException at 18:02:29.338 retried at 18:02:29.459 java.lang.IllegalArgumentException at 18:02:29.460 retried at 18:02:29.663 java.lang.IllegalArgumentException at 18:02:29.663 retried at 18:02:29.964 java.lang.IllegalArgumentException at 18:02:29.964
第一次重試延遲大約 100ms | |
---|---|
第二次重試延遲大約 200ms | |
第三次重試延遲大約 300ms |
publishOn()
?如 Schedulers 所述,publishOn()
能夠用來切換執行線程。 publishOn
可以影響到其以後的操做符的執行線程,直到有新的 publishOn
出現。 因此 publishOn
的位置很重要。
好比下邊的例子, map()
中的 transform
方法是在 scheduler1
的一個工做線程上執行的, 而 doOnNext()
中的 processNext
方法是在 scheduler2
的一個工做線程上執行的。 單線程的調度器可能用於對不一樣階段的任務或不一樣的訂閱者確保線程關聯性。
EmitterProcessor<Integer> processor = EmitterProcessor.create(); processor.publishOn(scheduler1) .map(i -> transform(i)) .publishOn(scheduler2) .doOnNext(i -> processNext(i)) .subscribe();
reactor-extra
爲知足 reactor-core
用戶的更高級需求,提供了一些額外的操做符和工具。
因爲這是一個單獨的包,使用時須要明確它的依賴:
dependencies { compile 'io.projectreactor:reactor-core' compile 'io.projectreactor.addons:reactor-extra' }
添加 reactor-extra 的依賴。參考 獲取 Reactor 瞭解爲何使用BOM的狀況下不須要指定 version。 | |
---|---|
TupleUtils
以及函數式接口在 Java 8 提供的函數式接口基礎上,reactor.function
包又提供了一些支持 3 到 8 個值的 Function
、Predicate
和 Consumer
。
TupleUtils
提供的靜態方法能夠方便地用於將相應的 Tuple
函數式接口的 lambda 轉換爲更簡單的接口。
這使得咱們在使用 Tuple
中各成員的時候更加容易,好比:
.map(tuple -> { String firstName = tuple.getT1(); String lastName = tuple.getT2(); String address = tuple.getT3(); return new Customer(firstName, lastName, address); });
能夠用下面的方式代替:
.map(TupleUtils.function(Customer::new));
(由於 Customer 的構造方法符合 Consumer3 的函數式接口標籤) |
|
---|---|
MathFlux
的數學操做符Treactor.math
包的 MathFlux
提供了一些用於數學計算的操做符,如 max
、min
、sumInt
、averageDouble
…
reactor.retry
包中有一些可以幫助實現 Flux#repeatWhen
和 Flux#retryWhen
的工具。入口點(entry points)就是 Repeat
和 Retry
接口的工廠方法。
兩個接口均可用做可變的構建器(mutative builder),而且相應的實現(implementing) 均可做爲 Function
用於對應的操做符。
Reactor-extra 提供了若干專用的調度器: - ForkJoinPoolScheduler
,位於 reactor.scheduler.forkjoin
包; - SwingScheduler
,位於 reactor.swing
包; - SwtScheduler
,位於 reactor.swing
包。