上篇文章咱們將 Flux 和 Mono 的操做符分了 11 類,咱們來繼續學習轉換類操做符的第 2 篇。java
轉換類的操做符數量最多,日常過程當中也是使用最頻繁的。react
將響應式流中元素順序轉換爲目標類型的響應式流,以後再將這些流鏈接起來。該方法提供了 2 個重載方法,傳遞的第 2 個參數爲內部生成響應式流的預取數量。見圖知意:git
Flux.range(3, 8) .concatMap(n -> Flux.just(n - 10, n, n + 10), 3) .subscribe(System.out::println);
concatMapDelayError 和 concatMap 區別在於,當內部生成響應式流發出 error 時,是否延遲響應 error 。該方法提供了 3 個重載方法,支持傳遞參數:是否延遲發出錯誤和預取數量。github
Flux.range(3, 8) .concatMapDelayError(n -> { if (n == 4) { return Flux.error(new NullPointerException()); } return Flux.just(n - 10, n, n + 10); }) .subscribe(System.out::println, System.err::println);
concatIterable 和 concatMap 的區別在於 內部返回的類型不一樣,一個爲 Iterable, 一個爲 響應式流。見圖知意:web
Flux.range(3, 8) .publishOn(Schedulers.single()) .concatMapIterable(n -> { if (n == 4) { throw new NullPointerException(); } return Arrays.asList(n - 10, n, n + 10); }) .onErrorContinue((e, n) -> System.err.println("數據:" + n + ",發生錯誤:" + e)) .subscribe(System.out::println);
收集響應式流中元素的間隔發出時間,轉換爲 時間間隔 和 舊元素 組成的 Tuple2 的響應式流。見圖知意:spring
Flux.interval(Duration.ofMillis(300)) .take(20) .elapsed(Schedulers.parallel()) .subscribe(System.out::println); Thread.sleep(7000);
從上層節點逐層展開方式遞歸展開樹形節點。學習
Flux.just(16, 18, 20) .expand(n -> { if (n % 2 == 0) { return Flux.just(n / 2); } else { return Flux.empty(); } }) .subscribe(System.out::println);
從上層節點逐個展開方式遞歸展開樹形節點。expand 和 expandDeep 的區別在於展開方式不一樣,另外它倆都提供了 capacityHint 指定遞歸時初始化容器的容量。測試
Flux.just(16, 18, 20) .expandDeep(n -> { if (n % 2 == 0) { return Flux.just(n / 2); } else { return Flux.empty(); } }) .subscribe(System.out::println);
本篇咱們介紹了 Reactor 部分的轉換類操做符,講解示例時都是單個操做符,相信你們都能理解。code
因爲最近學習時間不肯定,內容比較少。不管工做仍是生活的困難,咱們只要堅持,終將會被克服解決。今天的內容就學到這裏,咱們下篇繼續學習 Reactor 的操做符。orm
源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning 模塊下 ReactorTransformOperator02Test 測試類。