附2:Reactor 3 之選擇合適的操做符——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor Operatorshtml

本節的內容來自我翻譯的Reactor 3 參考文檔——如何選擇操做符。因爲部分朋友打開github.io網速比較慢或上不去,貼出來方便你們查閱。react

若是一個操做符是專屬於 FluxMono 的,那麼會給它註明前綴。
公共的操做符沒有前綴。若是一個具體的用例涉及多個操做符的組合,這裏以方法調用的方式展示,
會以一個點(.)開頭,並將參數置於圓括號內,好比: .methodCall(parameter)git

1)建立一個新序列,它...github

  • 發出一個 T,我已經有了:just
    • ...基於一個 Optional<T>Mono#justOrEmpty(Optional<T>)
    • ...基於一個可能爲 null 的 T:Mono#justOrEmpty(T)
  • 發出一個 T,且仍是由 just 方法返回
    • ...可是「懶」建立的:使用 Mono#fromSupplier 或用 defer 包裝 just
  • 發出許多 T,這些元素我能夠明確列舉出來:Flux#just(T...)
  • 基於迭代數據結構:
    • 一個數組:Flux#fromArray
    • 一個集合或 iterable:Flux#fromIterable
    • 一個 Integer 的 range:Flux#range
    • 一個 Stream 提供給每個訂閱:Flux#fromStream(Supplier<Stream>)
  • 基於一個參數值給出的源:
    • 一個 Supplier<T>Mono#fromSupplier
    • 一個任務:Mono#fromCallableMono#fromRunnable
    • 一個 CompletableFuture<T>Mono#fromFuture
  • 直接完成:empty
  • 當即生成錯誤:error
    • ...可是「懶」的方式生成 Throwableerror(Supplier<Throwable>)
  • 什麼都不作:never
  • 訂閱時才決定:defer
  • 依賴一個可回收的資源:using
  • 可編程地生成事件(可使用狀態):
    • 同步且逐個的:Flux#generate
    • 異步(也可同步)的,每次儘量多發出元素:Flux#create
      Mono#create 也是異步的,只不過只能發一個)

2)對序列進行轉化算法

  • 我想轉化一個序列:編程

    • 1對1地轉化(好比字符串轉化爲它的長度):map
    • ...類型轉化:cast
    • ...爲了得到每一個元素的序號:Flux#index
    • 1對n地轉化(如字符串轉化爲一串字符):flatMap + 使用一個工廠方法
    • 1對n地轉化可自定義轉化方法和/或狀態:handle
    • 對每個元素執行一個異步操做(如對 url 執行 http 請求):flatMap + 一個異步的返回類型爲 Publisher 的方法
    • ...忽略一些數據:在 flatMap lambda 中根據條件返回一個 Mono.empty()
    • ...保留原來的序列順序:Flux#flatMapSequential(對每一個元素的異步任務會當即執行,但會將結果按照原序列順序排序)
    • ...當 Mono 元素的異步任務會返回多個元素的序列時:Mono#flatMapMany
  • 我想添加一些數據元素到一個現有的序列:數組

    • 在開頭添加:Flux#startWith(T...)
    • 在最後添加:Flux#concatWith(T...)
  • 我想將 Flux 轉化爲集合(一下都是針對 Flux 的)緩存

    • 轉化爲 List:collectListcollectSortedList
    • 轉化爲 Map:collectMapcollectMultiMap
    • 轉化爲自定義集合:collect
    • 計數:count
    • reduce 算法(將上個元素的reduce結果與當前元素值做爲輸入執行reduce方法,如sum) reduce
    • ...將每次 reduce 的結果當即發出:scan
    • 轉化爲一個 boolean 值:
    • 對全部元素判斷都爲true:all
    • 對至少一個元素判斷爲true:any
    • 判斷序列是否有元素(不爲空):hasElements
    • 判斷序列中是否有匹配的元素:hasElement
  • 我想合併 publishers...數據結構

    • 按序鏈接:Flux#concat.concatWith(other)
    • ...即便有錯誤,也會等全部的 publishers 鏈接完成:Flux#concatDelayError
    • ...按訂閱順序鏈接(這裏的合併仍然能夠理解成序列的鏈接):Flux#mergeSequential
    • 按元素髮出的順序合併(不管哪一個序列的,元素先到先合併):Flux#merge / .mergeWith(other)
    • ...元素類型會發生變化:Flux#zip / Flux#zipWith
    • 將元素組合:
    • 2個 Monos 組成1個 Tuple2Mono#zipWith
    • n個 Monos 的元素都發出來後組成一個 Tuple:Mono#zip
    • 在終止信號出現時「採起行動」:
    • 在 Mono 終止時轉換爲一個 Mono<Void>Mono#and
    • 當 n 個 Mono 都終止時返回 Mono<Void>Mono#when
    • 返回一個存放組合數據的類型,對於被合併的多個序列:
      • 每一個序列都發出一個元素時:Flux#zip
      • 任何一個序列發出元素時:Flux#combineLatest
    • 只取各個序列的第一個元素:Flux#firstMono#firstmono.or<br/>(otherMono).or(thirdMono),`flux.or(otherFlux).or(thirdFlux)
    • 由一個序列觸發(相似於 flatMap,不過「喜新厭舊」):switchMap
    • 由每一個新序列開始時觸發(也是「喜新厭舊」風格):switchOnNext
  • 我想重複一個序列:repeat異步

    • ...可是以必定的間隔重複:Flux.interval(duration).flatMap(tick -&gt; myExistingPublisher)
  • 我有一個空序列,可是...

    • 我想要一個缺省值來代替:defaultIfEmpty
    • 我想要一個缺省的序列來代替:switchIfEmpty
  • 我有一個序列,可是我對序列的元素值不感興趣:ignoreElements

    • ...而且我但願用 Mono 來表示序列已經結束:then
    • ...而且我想在序列結束後等待另外一個任務完成:thenEmpty
    • ...而且我想在序列結束以後返回一個 MonoMono#then(mono)
    • ...而且我想在序列結束以後返回一個值:Mono#thenReturn(T)
    • ...而且我想在序列結束以後返回一個 FluxthenMany
  • 我有一個 Mono 但我想延遲完成...

    • ...當有1個或N個其餘 publishers 都發出(或結束)時才完成:Mono#delayUntilOther
    • ...使用一個函數式來定義如何獲取「其餘 publisher」:Mono#delayUntil(Function)
  • 我想基於一個遞歸的生成序列的規則擴展每個元素,而後合併爲一個序列發出:
    • ...廣度優先:expand(Function)
    • ...深度優先:expandDeep(Function)

3)「窺視」(只讀)序列

  • 再不對序列形成改變的狀況下,我想:

    • 獲得通知或執行一些操做:
    • 發出元素:doOnNext
    • 序列完成:Flux#doOnCompleteMono#doOnSuccess
    • 因錯誤終止:doOnError
    • 取消:doOnCancel
    • 訂閱時:doOnSubscribe
    • 請求時:doOnRequest
    • 完成或錯誤終止:doOnTerminate(Mono的方法可能包含有結果)
      • 可是在終止信號向下遊傳遞 以後doAfterTerminate
    • 全部類型的信號(Signal):Flux#doOnEach
    • 全部結束的狀況(完成complete、錯誤error、取消cancel):doFinally
    • 記錄日誌:log
  • 我想知道全部的事件:
    • 每個事件都體現爲一個 single 對象:
    • 執行 callback:doOnEach
    • 每一個元素轉化爲 single 對象:materialize
      • ...在轉化回元素:dematerialize
    • 轉化爲一行日誌:log

4)過濾序列

  • 我想過濾一個序列

    • 基於給定的判斷條件:filter
    • ...異步地進行判斷:filterWhen
    • 僅限於指定類型的對象:ofType
    • 忽略全部元素:ignoreElements
    • 去重:
    • 對於整個序列:Flux#distinct
    • 去掉連續重複的元素:Flux#distinctUntilChanged
  • 我只想要一部分序列:

    • 只要 N 個元素:
    • 從序列的第一個元素開始算:Flux#take(long)
      • ...取一段時間內發出的元素:Flux#take(Duration)
      • ...只取第一個元素放到 Mono 中返回:Flux#next()
      • ...使用 request(N) 而不是取消:Flux#limitRequest(long)
    • 從序列的最後一個元素倒數:Flux#takeLast
    • 直到知足某個條件(包含):Flux#takeUntil(基於判斷條件),Flux#takeUntilOther(基於對 publisher 的比較)
    • 直到知足某個條件(不包含):Flux#takeWhile
    • 最多隻取 1 個元素:
    • 給定序號:Flux#elementAt
    • 最後一個:.takeLast(1)
      • ...若是爲序列空則發出錯誤信號:Flux#last()
      • ...若是序列爲空則返回默認值:Flux#last(T)
    • 跳過一些元素:
    • 從序列的第一個元素開始跳過:Flux#skip(long)
      • ...跳過一段時間內發出的元素:Flux#skip(Duration)
    • 跳過最後的 n 個元素:Flux#skipLast
    • 直到知足某個條件(包含):Flux#skipUntil(基於判斷條件),Flux#skipUntilOther (基於對 publisher 的比較)
    • 直到知足某個條件(不包含):Flux#skipWhile
    • 採樣:
    • 給定採樣週期:Flux#sample(Duration)
      • 取採樣週期裏的第一個元素而不是最後一個:sampleFirst
    • 基於另外一個 publisher:Flux#sample(Publisher)
    • 基於 publisher「超時」:Flux#sampleTimeout (每個元素會觸發一個 publisher,若是這個 publisher 不被下一個元素觸發的 publisher 覆蓋就發出這個元素)
  • 我只想要一個元素(若是多於一個就返回錯誤)...
    • 若是序列爲空,發出錯誤信號:Flux#single()
    • 若是序列爲空,發出一個缺省值:Flux#single(T)
    • 若是序列爲空就返回一個空序列:Flux#singleOrEmpty

5)錯誤處理

  • 我想建立一個錯誤序列:error...

    • ...替換一個完成的 Flux.concat(Flux.error(e))
    • ...替換一個完成的 Mono.then(Mono.error(e))
    • ...若是元素超時未發出:timeout
    • ...「懶」建立:error(Supplier&lt;Throwable&gt;)
  • 我想要相似 try/catch 的表達方式:

    • 拋出異常:error
    • 捕獲異常:
    • 而後返回缺省值:onErrorReturn
    • 而後返回一個 FluxMonoonErrorResume
    • 包裝異常後再拋出:.onErrorMap(t -&gt; new RuntimeException(t))
    • finally 代碼塊:doFinally
    • Java 7 以後的 try-with-resources 寫法:using 工廠方法
  • 我想從錯誤中恢復...

    • 返回一個缺省的:
    • 的值:onErrorReturn
    • PublisherFlux#onErrorResumeMono#onErrorResume
    • 重試:retry
    • ...由一個用於伴隨 Flux 觸發:retryWhen
  • 我想處理回壓錯誤(向上遊發出「MAX」的 request,若是下游的 request 比較少,則應用策略)...
    • 拋出 IllegalStateExceptionFlux#onBackpressureError
    • 丟棄策略:Flux#onBackpressureDrop
    • ...可是不丟棄最後一個元素:Flux#onBackpressureLatest
    • 緩存策略(有限或無限):Flux#onBackpressureBuffer
    • ...當有限的緩存空間用滿則應用給定策略:Flux#onBackpressureBuffer 帶有策略 BufferOverflowStrategy

6) 基於時間的操做

  • 我想將元素轉換爲帶有時間信息的 Tuple2&lt;Long, T&gt;...

    • 從訂閱時開始:elapsed
    • 記錄時間戳:timestamp
  • 若是元素間延遲過長則停止序列:timeout

  • 以固定的週期發出元素:Flux#interval

  • 在一個給定的延遲後發出 0:static Mono.delay.

  • 我想引入延遲:
    • 對每個元素:Mono#delayElementFlux#delayElements
    • 延遲訂閱:delaySubscription

7)拆分 Flux

  • 我想將一個 Flux&lt;T&gt; 拆分爲一個 Flux&lt;Flux&lt;T&gt;&gt;

    • 以個數爲界:window(int)
    • ...會出現重疊或丟棄的狀況:window(int, int)
    • 以時間爲界:window(Duration)
    • ...會出現重疊或丟棄的狀況:window(Duration, Duration)
    • 以個數或時間爲界:windowTimeout(int, Duration)
    • 基於對元素的判斷條件:windowUntil
    • ...觸發判斷條件的元素會分到下一波(cutBefore 變量):.windowUntil(predicate, true)
    • ...知足條件的元素在一波,直到不知足條件的元素髮出開始下一波:windowWhile (不知足條件的元素會被丟棄)
    • 經過另外一個 Publisher 的每個 onNext 信號來拆分序列:window(Publisher)windowWhen
  • 我想將一個 Flux&lt;T&gt; 的元素拆分到集合...

    • 拆分爲一個一個的 List:
    • 以個數爲界:buffer(int)
      • ...會出現重疊或丟棄的狀況:buffer(int, int)
    • 以時間爲界:buffer(Duration)
      • ...會出現重疊或丟棄的狀況:buffer(Duration, Duration)
    • 以個數或時間爲界:bufferTimeout(int, Duration)
    • 基於對元素的判斷條件:bufferUntil(Predicate)
      • ...觸發判斷條件的元素會分到下一個buffer:.bufferUntil(predicate, true)
      • ...知足條件的元素在一個buffer,直到不知足條件的元素髮出開始下一buffer:bufferWhile(Predicate)
    • 經過另外一個 Publisher 的每個 onNext 信號來拆分序列:buffer(Publisher)bufferWhen
    • 拆分到指定類型的 "collection":buffer(int, Supplier&lt;C&gt;)
  • 我想將 Flux&lt;T&gt; 中具備共同特徵的元素分組到子 Flux:groupBy(Function&lt;T,K&gt;)(注意返回值是 Flux&lt;GroupedFlux&lt;K, T&gt;&gt;,每個 GroupedFlux 具備相同的 key 值 K,能夠經過 key() 方法獲取)。

8)回到同步的世界

  • 我有一個 Flux&lt;T&gt;,我想:

    • 在拿到第一個元素前阻塞:Flux#blockFirst
    • ...並給出超時時限:Flux#blockFirst(Duration)
    • 在拿到最後一個元素前阻塞(若是序列爲空則返回 null):Flux#blockLast
    • ...並給出超時時限:Flux#blockLast(Duration)
    • 同步地轉換爲 Iterable&lt;T&gt;Flux#toIterable
    • 同步地轉換爲 Java 8 Stream&lt;T&gt;Flux#toStream
  • 我有一個 Mono&lt;T&gt;,我想:
    • 在拿到元素前阻塞:Mono#block
    • ...並給出超時時限:Mono#block(Duration)
    • 轉換爲 CompletableFuture&lt;T&gt;Mono#toFuture
相關文章
相關標籤/搜索