數據在響應式流中的處理,就像流過一條裝配流水線。Reactor 既是傳送帶,又是一個個的裝配工或機器人。原材料從源頭(最初的 Publisher )流出,通過一個個的裝配線中裝配工或機器人的工位加工(operator 操做),最終被加工成成品,等待被推送到消費者( subscribe 操做)。html
在 Reactor 中,每一個操做符對 Publisher 進行處理,而後將 Publisher 包裝爲另外一個新的 Publisher 。就像一個鏈條,數據源自第一個 Publisher ,而後順鏈條而下,在每一個環節進行相應的處理。最終,訂閱者(Subscriber )終結這個過程。因此, 響應式編程按照鏈式方式進行開發。java
注意,如同 Java Stream 的終端操做,訂閱者( Subscriber )在沒有訂閱( subscribe )到一個發佈者( Publisher )以前,什麼也不會發生。react
如同 Java Stream 的中間操做同樣,Reactor 的 Flux 和 Mono 也爲咱們提供了多種操做符(遠多於 Stream ),咱們將它們分類以下:git
序號 | 類型 | 操做符 |
---|---|---|
1 | 轉換 | as, cast, collect, collectList, collectMap, collectMultimap, collectSortedList, concatMap, concatMapDelayError, concatMapIterable, elapsed, expand, expandDeep, flatMap, flatMapDelayError, flatMapIterable, flatMapSequential, flatMapSequentialDelayError, groupJoin, handle, index, join, map, switchMap, switchOnFirst, then, thenEmpty, thenMany, timestamp, transform, transformDeferred |
2 | 篩選 | blockFirst, blockLast, distinct, distinctUntilChanged, elementAt, filter, filterWhen, ignoreElements, last, next, ofType, or, repeat, retry, single, singleOrEmpty, sort, take, takeLast, takeUntil, takeUntilOther, takeWhile |
3 | 組合 | concatWith, concatWithValues, mergeOrderWith, mergeWith, startWith, withLatestFrom, zipWith, zipWithIterable |
4 | 條件 | defaultIfEmpty, delayUntil, retryWhen, switchIfEmpty |
5 | 時間 | delayElements, delaySequence, delaySubscription, sample, sampleFirst, sampleTimeout, skip, skipLast, skipUntil, skipUntilOther, skipWhile, timeout |
6 | 統計 | count, reduce, reduceWith, scan, scanWith |
7 | 匹配 | all, any, hasElement, hasElements |
8 | 分組 | buffer, bufferTimeout, bufferUntil, bufferUntilChanged, bufferWhen, groupBy, window, windowTimeout, windowUntil, windowUntilChanged, windowWhen, windowWhile |
9 | 事件 | doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, onBackpressureBuffer, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorMap, onErrorResume, onErrorReturn, onErrorStop |
10 | 調試 | checkpoint, hide, log |
11 | 其它 | cache, dematerialize, limitRate, limitRequest, materialize, metrics, name, onTerminateDetach, parallel, publish, publishNext, publishOn, replay, share, subscribeOn, subscriberContext, subscribeWith, tag |
接下來咱們來挨個學習各種的操做符,如同前面學習響應式流建立同樣,講解操做符時,若是是 Flux 或 Mono 獨有的,會在方法名前增長類名前綴。github
轉換類的操做符數量最多,日常過程當中也是使用最頻繁的。web
將響應式流轉換爲目標類型,既能夠是非響應式對象,也能夠是 Flux 或 Mono。spring
Flux.range(3, 8) .as(Mono::from) .subscribe(System.out::println);
將響應式流內的元素強轉爲目標類型,若是類型不匹配(非父類類型或當前類型),將拋出 ClassCastException ,見圖知意:編程
Flux.range(1, 3) .cast(Number.class) .subscribe(System.out::println);
經過應用收集器,將 Flux 發出的全部元素收集到一個容器中。當此流完成時,發出收集的結果。 Flux 提供了 2 個重載方法,主要區別在於應用的收集器不一樣,一個是 Java Stream 的 Collector, 另外一個是自定義收集方法(同 Java Stream 中 collect 方法):ide
<R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector); <E> Mono<E> collect(Supplier<E> containerSupplier, BiConsumer<E,? super T> collector);
見圖知意:函數
Flux.range(1, 5) .collect(Collectors.toList()) .subscribe(System.out::println);
當此 Flux 完成時,將此流發出的全部元素收集到一個列表中,該列表由生成的 Mono 發出。見圖知意:
Flux.range(1, 5) .collectList() .subscribe(System.out::println);
將 Flux 發出的全部元素按照鍵生成器和值生成器收集到 Map 中,以後由 Mono 發出。Flux 提供了 3 個重載方法:
<K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor); <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor); <K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,V>> mapSupplier);
它們的主要區別在因而否提供值生成器和初始的Map,意同 Java Stream 中的 Collectors#toMap 。見圖知意:
Flux.just(1, 2, 3, 4, 5, 3, 1) .collectMap(n -> n, n -> n + 100) .subscribe(System.out::println);
collectMultimap 與 collectMap 的區別在於,map 中的 value 類型不一樣,一個是集合,一個是元素。 collectMultimap 對於流中出現重複的 key 的 value,加入到了集合中,而 collectMap 作了替換。在這點上,reactor 不如 Java Stream 中的 Collectors#toMap 方法,沒有提供 key 重複時的合併函數。也提供了 3 個重載方法。
<K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor); <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor); <K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor, Function<? super T,? extends V> valueExtractor, Supplier<Map<K,Collection<V>>> mapSupplier)
見圖知意:
Flux.just(1, 2, 3, 4, 5, 3, 1) .collectMultimap(n -> n, n -> n + 100) .subscribe(System.out::println);
將 Flux 發出的元素在完成時進行排序,以後由 Mono 發出。Flux 提供了 2 個重載方法:
Mono<List<T>> collectSortedList(); Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator);
見圖知意:
Flux.just(1, 3, 5, 3, 2, 5, 1, 4) .collectSortedList() .subscribe(System.out::println);
本篇咱們介紹了 Reactor 操做符的分類,以後介紹了部分轉換類操做符,講解示例時都是單個操做符,相信你們都能理解。
今天的內容就學到這裏,咱們下篇繼續學習 Reactor 的操做符。
源碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorTransformOperatorTest 測試類。