本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor Operatorshtml
本節的內容來自我翻譯的Reactor 3 參考文檔——如何選擇操做符。因爲部分朋友打開github.io網速比較慢或上不去,貼出來方便你們查閱。react
若是一個操做符是專屬於
Flux
或Mono
的,那麼會給它註明前綴。
公共的操做符沒有前綴。若是一個具體的用例涉及多個操做符的組合,這裏以方法調用的方式展示,
會以一個點(.)開頭,並將參數置於圓括號內,好比:.methodCall(parameter)
。git
1)建立一個新序列,它...github
T
,我已經有了:just
Optional<T>
:Mono#justOrEmpty(Optional<T>)
null
的 T:Mono#justOrEmpty(T)
T
,且仍是由 just
方法返回
Mono#fromSupplier
或用 defer
包裝 just
T
,這些元素我能夠明確列舉出來:Flux#just(T...)
Flux#fromArray
Flux#fromIterable
Flux#range
Stream
提供給每個訂閱:Flux#fromStream(Supplier<Stream>)
Supplier<T>
:Mono#fromSupplier
Mono#fromCallable
,Mono#fromRunnable
CompletableFuture<T>
:Mono#fromFuture
empty
error
Throwable
:error(Supplier<Throwable>)
never
defer
using
Flux#generate
Flux#create
Mono#create
也是異步的,只不過只能發一個)2)對序列進行轉化算法
我想轉化一個序列:編程
map
cast
Flux#index
flatMap
+ 使用一個工廠方法handle
flatMap
+ 一個異步的返回類型爲 Publisher
的方法Mono.empty()
Flux#flatMapSequential
(對每一個元素的異步任務會當即執行,但會將結果按照原序列順序排序)Mono#flatMapMany
我想添加一些數據元素到一個現有的序列:數組
Flux#startWith(T...)
Flux#concatWith(T...)
我想將 Flux
轉化爲集合(一下都是針對 Flux
的)緩存
collectList
,collectSortedList
collectMap
,collectMultiMap
collect
count
reduce
scan
all
any
hasElements
hasElement
我想合併 publishers...數據結構
Flux#concat
或 .concatWith(other)
Flux#concatDelayError
Flux#mergeSequential
Flux#merge
/ .mergeWith(other)
Flux#zip
/ Flux#zipWith
Tuple2
:Mono#zipWith
Mono#zip
Mono<Void>
:Mono#and
Mono<Void>
:Mono#when
Flux#zip
Flux#combineLatest
Flux#first
,Mono#first
,mono.or<br/>(otherMono).or(thirdMono)
,`flux.or(otherFlux).or(thirdFlux)flatMap
,不過「喜新厭舊」):switchMap
switchOnNext
我想重複一個序列:repeat
異步
Flux.interval(duration).flatMap(tick -> myExistingPublisher)
我有一個空序列,可是...
defaultIfEmpty
switchIfEmpty
我有一個序列,可是我對序列的元素值不感興趣:ignoreElements
Mono
來表示序列已經結束:then
thenEmpty
Mono
:Mono#then(mono)
Mono#thenReturn(T)
Flux
:thenMany
我有一個 Mono 但我想延遲完成...
Mono#delayUntilOther
Mono#delayUntil(Function)
expand(Function)
expandDeep(Function)
3)「窺視」(只讀)序列
再不對序列形成改變的狀況下,我想:
doOnNext
Flux#doOnComplete
,Mono#doOnSuccess
doOnError
doOnCancel
doOnSubscribe
doOnRequest
doOnTerminate
(Mono的方法可能包含有結果)
doAfterTerminate
Signal
):Flux#doOnEach
doFinally
log
single
對象:doOnEach
single
對象:materialize
dematerialize
log
4)過濾序列
我想過濾一個序列
filter
filterWhen
ofType
ignoreElements
Flux#distinct
Flux#distinctUntilChanged
我只想要一部分序列:
Flux#take(long)
Flux#take(Duration)
Mono
中返回:Flux#next()
request(N)
而不是取消:Flux#limitRequest(long)
Flux#takeLast
Flux#takeUntil
(基於判斷條件),Flux#takeUntilOther
(基於對 publisher 的比較)Flux#takeWhile
Flux#elementAt
.takeLast(1)
Flux#last()
Flux#last(T)
Flux#skip(long)
Flux#skip(Duration)
Flux#skipLast
Flux#skipUntil
(基於判斷條件),Flux#skipUntilOther
(基於對 publisher 的比較)Flux#skipWhile
Flux#sample(Duration)
sampleFirst
Flux#sample(Publisher)
Flux#sampleTimeout
(每個元素會觸發一個 publisher,若是這個 publisher 不被下一個元素觸發的 publisher 覆蓋就發出這個元素)Flux#single()
Flux#single(T)
Flux#singleOrEmpty
5)錯誤處理
我想建立一個錯誤序列:error
...
Flux
:.concat(Flux.error(e))
Mono
:.then(Mono.error(e))
timeout
error(Supplier<Throwable>)
我想要相似 try/catch 的表達方式:
error
onErrorReturn
Flux
或 Mono
:onErrorResume
.onErrorMap(t -> new RuntimeException(t))
doFinally
using
工廠方法我想從錯誤中恢復...
onErrorReturn
Publisher
:Flux#onErrorResume
和 Mono#onErrorResume
retry
retryWhen
IllegalStateException
:Flux#onBackpressureError
Flux#onBackpressureDrop
Flux#onBackpressureLatest
Flux#onBackpressureBuffer
Flux#onBackpressureBuffer
帶有策略 BufferOverflowStrategy
6) 基於時間的操做
我想將元素轉換爲帶有時間信息的 Tuple2<Long, T>
...
elapsed
timestamp
若是元素間延遲過長則停止序列:timeout
以固定的週期發出元素:Flux#interval
在一個給定的延遲後發出 0
:static Mono.delay
.
Mono#delayElement
,Flux#delayElements
delaySubscription
7)拆分 Flux
我想將一個 Flux<T>
拆分爲一個 Flux<Flux<T>>
:
window(int)
window(int, int)
window(Duration)
window(Duration, Duration)
windowTimeout(int, Duration)
windowUntil
cutBefore
變量):.windowUntil(predicate, true)
windowWhile
(不知足條件的元素會被丟棄)window(Publisher)
,windowWhen
我想將一個 Flux<T>
的元素拆分到集合...
List
:buffer(int)
buffer(int, int)
buffer(Duration)
buffer(Duration, Duration)
bufferTimeout(int, Duration)
bufferUntil(Predicate)
.bufferUntil(predicate, true)
bufferWhile(Predicate)
buffer(Publisher)
,bufferWhen
buffer(int, Supplier<C>)
Flux<T>
中具備共同特徵的元素分組到子 Flux:groupBy(Function<T,K>)
(注意返回值是 Flux<GroupedFlux<K, T>>
,每個 GroupedFlux
具備相同的 key 值 K
,能夠經過 key()
方法獲取)。8)回到同步的世界
我有一個 Flux<T>
,我想:
Flux#blockFirst
Flux#blockFirst(Duration)
Flux#blockLast
Flux#blockLast(Duration)
Iterable<T>
:Flux#toIterable
Stream<T>
:Flux#toStream
Mono<T>
,我想:
Mono#block
Mono#block(Duration)
CompletableFuture<T>
:Mono#toFuture