本系列文章索引《響應式Spring的道法術器》
前情提要 響應式編程 | 響應式流 | lambda與函數式
本文源碼html
Project Reactor(如下簡稱「Reactor」)與Spring是兄弟項目,側重於Server端的響應式編程,主要 artifact 是 reactor-core,這是一個基於 Java 8 的實現了響應式流規範 (Reactive Streams specification)的響應式庫。java
本文對Reactor的介紹以基本的概念和簡單的使用爲主,深度以可以知足基本的Spring WebFlux使用爲準。在下一章,我會結合Reactor的設計模式、併發調度模型等原理層面的內容系統介紹Reactor的使用。react
光說不練假把式,咱們先把練習用的項目搭起來。先建立一個maven項目,而後添加依賴:git
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.1.4.RELEASE</version> </dependency>
最新版本可到 http://search.maven.org 查詢,複製過來便可。另外出於測試的須要,添加以下依賴:github
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.1.4.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
好了,咱們開始Coding吧。編程
Reactor中的發佈者(Publisher)由Flux
和Mono
兩個類定義,它們都提供了豐富的操做符(operator)。一個Flux對象表明一個包含0..N個元素的響應式序列,而一個Mono對象表明一個包含零/一個(0..1)元素的結果。設計模式
既然是「數據流」的發佈者,Flux和Mono均可以發出三種「數據信號」:元素值、錯誤信號、完成信號,錯誤信號和完成信號都是終止信號,完成信號用於告知下游訂閱者該數據流正常結束,錯誤信號終止數據流的同時將錯誤傳遞給下游訂閱者。api
下圖所示就是一個Flux類型的數據流,黑色箭頭是時間軸。它連續發出「1」 - 「6」共6個元素值,以及一個完成信號(圖中⑥後邊的加粗豎線來表示),完成信號告知訂閱者數據流已經結束。數組
下圖所示是一個Mono類型的數據流,它發出一個元素值後,又發出一個完成信號。緩存
既然Flux具備發佈一個數據元素的能力,爲何還要專門定義一個Mono類呢?舉個例子,一個HTTP請求產生一個響應,因此對其進行「count」操做是沒有多大意義的。表示這樣一個結果的話,應該用Mono<HttpResponse>
而不是 Flux<HttpResponse>
,對於的操做一般只用於處理 0/1 個元素。它們從語義上就原生包含着元素個數的信息,從而避免了對Mono對象進行多元素場景下的處理。
有些操做能夠改變基數,從而須要切換類型。好比,count操做用於Flux,可是操做返回的結果是
Mono<Long>
。
咱們能夠用以下代碼聲明上邊兩幅圖所示的Flux和Mono:
Flux.just(1, 2, 3, 4, 5, 6); Mono.just(1);
Flux和Mono提供了多種建立數據流的方法,just
就是一種比較直接的聲明數據流的方式,其參數就是數據元素。
對於圖中的Flux,還能夠經過以下方式聲明(分別基於數組、集合和Stream生成):
Integer[] array = new Integer[]{1,2,3,4,5,6}; Flux.fromArray(array); List<Integer> list = Arrays.asList(array); Flux.fromIterable(list); Stream<Integer> stream = list.stream(); Flux.fromStream(stream);
不過,這三種信號都不是必定要具有的:
好比,對於只有完成/錯誤信號的數據流:
// 只有完成信號的空數據流 Flux.just(); Flux.empty(); Mono.empty(); Mono.justOrEmpty(Optional.empty()); // 只有錯誤信號的數據流 Flux.error(new Exception("some error")); Mono.error(new Exception("some error"));
你可能會納悶,空的數據流有什麼用?舉個例子,當咱們從響應式的DB中獲取結果的時候(假設DAO層是ReactiveRepository<User>
),就有可能爲空:
Mono<User> findById(long id); Flux<User> findAll();
不管是空仍是發生異常,都須要經過完成/錯誤信號告知訂閱者,已經查詢完畢,可是抱歉沒有獲得值,禮貌問題嘛~
數據流有了,假設咱們想把每一個數據元素原封不動地打印出來:
Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print); System.out.println(); Mono.just(1).subscribe(System.out::println);
輸出以下:
123456 1
可見,subscribe
方法中的lambda表達式做用在了每個數據元素上。此外,Flux和Mono還提供了多個subscribe
方法的變體:
// 訂閱並觸發數據流 subscribe(); // 訂閱並指定對正常數據元素如何處理 subscribe(Consumer<? super T> consumer); // 訂閱並定義對正常數據元素和錯誤信號的處理 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); // 訂閱並定義對正常數據元素、錯誤信號和完成信號的處理 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); // 訂閱並定義對正常數據元素、錯誤信號和完成信號的處理,以及訂閱發生時的處理邏輯 subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
1)若是是訂閱上邊聲明的Flux:
Flux.just(1, 2, 3, 4, 5, 6).subscribe( System.out::println, System.err::println, () -> System.out.println("Completed!"));
輸出以下:
1 2 3 4 5 6 Completed!
2)再舉一個有錯誤信號的例子:
Mono.error(new Exception("some error")).subscribe( System.out::println, System.err::println, () -> System.out.println("Completed!") );
輸出以下:
java.lang.Exception: some error
打印出了錯誤信號,沒有輸出Completed!
代表沒有發出完成信號。
這裏須要注意的一點是,Flux.just(1, 2, 3, 4, 5, 6)
僅僅聲明瞭這個數據流,此時數據元素並未發出,只有subscribe()
方法調用的時候纔會觸發數據流。因此,訂閱前什麼都不會發生。
從命令式和同步式編程切換到響應式和異步式編程有時候是使人生畏的。學習曲線中最陡峭的地方就是出錯時如何分析和調試。
在命令式世界,調試一般都是很是直觀的:直接看 stack trace 就能夠找到問題出現的位置, 以及其餘信息:是否問題責任所有出在你本身的代碼?問題是否是發生在某些庫代碼?若是是, 那你的哪部分代碼調用了庫,是否是傳參不合適致使的問題?等等。
當你切換到響應式的異步代碼,事情就變得複雜的多了。不過咱們先不接觸過於複雜的內容,先了解一個基本的單元測試工具——StepVerifier
。
最多見的測試 Reactor 序列的場景就是定義一個 Flux 或 Mono,而後在訂閱它的時候測試它的行爲。
當你的測試關注於每個數據元素的時候,就很是貼近使用 StepVerifier 的測試場景: 下一個指望的數據或信號是什麼?你是否指望使用 Flux 來發出某一個特別的值?或者是否接下來 300ms 什麼都不作?——全部這些均可以使用 StepVerifier API 來表示。
仍是以那個1-6的Flux以及會發出錯誤信號的Mono爲例:
private Flux<Integer> generateFluxFrom1To6() { return Flux.just(1, 2, 3, 4, 5, 6); } private Mono<Integer> generateMonoWithError() { return Mono.error(new Exception("some error")); } @Test public void testViaStepVerifier() { StepVerifier.create(generateFluxFrom1To6()) .expectNext(1, 2, 3, 4, 5, 6) .expectComplete() .verify(); StepVerifier.create(generateMonoWithError()) .expectErrorMessage("some error") .verify(); }
其中,expectNext
用於測試下一個指望的數據元素,expectErrorMessage
用於校驗下一個元素是否爲錯誤信號,expectComplete
用於測試下一個元素是否爲完成信號。
StepVerifier
還提供了其餘豐富的測試方法,咱們會在後續的介紹中陸續接觸到。
一般狀況下,咱們須要對源發佈者發出的原始數據流進行多個階段的處理,並最終獲得咱們須要的數據。這種感受就像是一條流水線,從流水線的源頭進入傳送帶的是原料,通過流水線上各個工位的處理,逐漸由原料變成半成品、零件、組件、成品,最終成爲消費者須要的包裝品。這其中,流水線源頭的下料機就至關於源發佈者,消費者就至關於訂閱者,流水線上的一道道工序就至關於一個一個的操做符(Operator)。
下面介紹一些咱們經常使用的操做符。
1)map - 元素映射爲新元素
map
操做能夠將數據元素進行轉換/映射,獲得一個新元素。
public final <V> Flux<V> map(Function<? super T,? extends V> mapper) public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)
上圖是Flux的map操做示意圖,上方的箭頭是原始序列的時間軸,下方的箭頭是通過map處理後的數據序列時間軸。
map
接受一個Function
的函數式接口爲參數,這個函數式的做用是定義轉換操做的策略。舉例說明:
StepVerifier.create(Flux.range(1, 6) // 1 .map(i -> i * i)) // 2 .expectNext(1, 4, 9, 16, 25, 36) //3 .expectComplete(); // 4
Flux.range(1, 6)
用於生成從「1」開始的,自增爲1的「6」個整型數據;map
接受lambdai -> i * i
爲參數,表示對每一個數據進行平方;verifyComplete()
至關於expectComplete().verify()
。2)flatMap - 元素映射爲流
flatMap
操做能夠將每一個數據元素轉換/映射爲一個流,而後將這些流合併爲一個大的數據流。
注意到,流的合併是異步的,先來先到,並不是是嚴格按照原始序列的順序(如圖藍色和紅色方塊是交叉的)。
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
flatMap
也是接收一個Function
的函數式接口爲參數,這個函數式的輸入爲一個T類型數據值,對於Flux來講輸出能夠是Flux和Mono,對於Mono來講輸出只能是Mono。舉例說明:
StepVerifier.create( Flux.just("flux", "mono") .flatMap(s -> Flux.fromArray(s.split("\\s*")) // 1 .delayElements(Duration.ofMillis(100))) // 2 .doOnNext(System.out::print)) // 3 .expectNextCount(8) // 4 .verifyComplete();
s
,將其拆分爲包含一個字符的字符串流;doOnNext
方法是「偷窺式」的方法,不會消費數據流);打印結果爲mfolnuox
,緣由在於各個拆分後的小字符串都是間隔100ms發出的,所以會交叉。
flatMap
一般用於每一個元素又會引入數據流的狀況,好比咱們有一串url數據流,須要請求每一個url並收集response數據。假設響應式的請求方法以下:
Mono<HttpResponse> requestUrl(String url) {...}
而url數據流爲一個Flux<String> urlFlux
,那麼爲了獲得全部的HttpResponse,就須要用到flatMap:
urlFlux.flatMap(url -> requestUrl(url));
其返回內容爲Flux<HttpResponse>
類型的HttpResponse流。
3)filter - 過濾
filter
操做能夠對數據元素進行篩選。
public final Flux<T> filter(Predicate<? super T> tester) public final Mono<T> filter(Predicate<? super T> tester)
filter
接受一個Predicate
的函數式接口爲參數,這個函數式的做用是進行判斷並返回boolean。舉例說明:
StepVerifier.create(Flux.range(1, 6) .filter(i -> i % 2 == 1) // 1 .map(i -> i * i)) .expectNext(1, 9, 25) // 2 .verifyComplete();
filter
的lambda參數表示過濾操做將保留奇數;4)zip - 一對一合併
看到zip
這個詞可能會聯想到拉鍊,它可以將多個流一對一的合併起來。zip有多個方法變體,咱們介紹一個最多見的二合一的。
它對兩個Flux/Mono流每次各取一個元素,合併爲一個二元組(Tuple2
):
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2) public static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)
Flux
的zip
方法接受Flux或Mono爲參數,Mono
的zip
方法只能接受Mono類型的參數。
舉個例子,假設咱們有一個關於zip
方法的說明:「Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.」,咱們但願將這句話拆分爲一個一個的單詞並以每200ms一個的速度發出,除了前面flatMap的例子中用到的delayElements
,能夠以下操做:
private Flux<String> getZipDescFlux() { String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2."; return Flux.fromArray(desc.split("\\s+")); // 1 } @Test public void testSimpleOperators() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); // 2 Flux.zip( getZipDescFlux(), Flux.interval(Duration.ofMillis(200))) // 3 .subscribe(t -> System.out.println(t.getT1()), null, countDownLatch::countDown); // 4 countDownLatch.await(10, TimeUnit.SECONDS); // 5 }
CountDownLatch
,初始爲1,則會等待執行1次countDown
方法後結束,不使用它的話,測試方法所在的線程會直接返回而不會等待數據流發出完畢;Flux.interval
聲明一個每200ms發出一個元素的long數據流;由於zip操做是一對一的,故而將其與字符串流zip以後,字符串流也將具備一樣的速度;Tuple2
,使用getT1
方法拿到字符串流的元素;定義完成信號的處理爲countDown
;countDownLatch.await(10, TimeUnit.SECONDS)
會等待countDown
倒數至0,最多等待10秒鐘。除了zip
靜態方法以外,還有zipWith
等非靜態方法,效果與之相似:
getZipDescFlux().zipWith(Flux.interval(Duration.ofMillis(200)))
在異步條件下,數據流的流速不一樣,使用zip可以一對一地將兩個或多個數據流的元素對齊發出。
5)更多
Reactor中提供了很是豐富的操做符,除了以上幾個常見的,還有:
create
和generate
等及其變體方法;doOnNext
、doOnError
、doOncomplete
、doOnSubscribe
、doOnCancel
等及其變體方法;when
、and/or
、merge
、concat
、collect
、count
、repeat
等及其變體方法;take
、first
、last
、sample
、skip
、limitRequest
等及其變體方法;timeout
、onErrorReturn
、onErrorResume
、doFinally
、retryWhen
等及其變體方法;window
、buffer
、group
等及其變體方法;publishOn
和subscribeOn
方法。使用這些操做符,你幾乎能夠搭建出可以進行任何業務需求的數據處理管道/流水線。
抱歉以上這些暫時不能一一介紹,更多詳情請參考JavaDoc,在下一章咱們還會回頭對Reactor從更深層次進行系統的分析。
此外,也可閱讀我翻譯的Reactor參考文檔,我會盡可能及時更新翻譯的內容。文檔源碼位於github,若有翻譯不當,歡迎提交Pull-Request。
在Reactor中,對於多線程併發調度的處理變得異常簡單。
在以往的多線程開發場景中,咱們一般使用Executors
工具類來建立線程池,一般有以下四種類型:
newCachedThreadPool
建立一個彈性大小緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程;newFixedThreadPool
建立一個大小固定的線程池,可控制線程最大併發數,超出的線程會在隊列中等待;newScheduledThreadPool
建立一個大小固定的線程池,支持定時及週期性的任務執行;newSingleThreadExecutor
建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。此外,newWorkStealingPool
還能夠建立支持work-stealing的線程池。
說良心話,Java提供的Executors
工具類使得咱們對ExecutorService
使用已經很是駕輕就熟了。BUT~ Reactor讓線程管理和任務調度更加「傻瓜」——調度器(Scheduler)幫助咱們搞定這件事。Scheduler
是一個擁有多個實現類的抽象接口。Schedulers
類(按照一般的套路,最後爲s
的就是工具類咯)提供的靜態方法可搭建如下幾種線程執行環境:
Schedulers.immediate()
);Schedulers.single()
)。注意,這個方法對全部調用者都提供同一個線程來使用, 直到該調度器被廢棄。若是你想使用獨佔的線程,請使用Schedulers.newSingle()
;Schedulers.elastic()
)。它根據須要建立一個線程池,重用空閒線程。線程池若是空閒時間過長 (默認爲 60s)就會被廢棄。對於 I/O 阻塞的場景比較適用。Schedulers.elastic()
可以方便地給一個阻塞 的任務分配它本身的線程,從而不會妨礙其餘任務和資源;Schedulers.parallel()
),所建立線程池的大小與CPU個數等同;Schedulers.fromExecutorService(ExecutorService)
)基於自定義的ExecutorService建立 Scheduler(雖然不太建議,不過你也可使用Executor來建立)。Schedulers
類已經預先建立了幾種經常使用的線程池:使用single()
、elastic()
和parallel()
方法能夠分別使用內置的單線程、彈性線程池和固定大小線程池。若是想建立新的線程池,可使用newSingle()
、newElastic()
和newParallel()
方法。
Executors
提供的幾種線程池在Reactor中都支持:
Schedulers.single()
和Schedulers.newSingle()
對應Executors.newSingleThreadExecutor()
;Schedulers.elastic()
和Schedulers.newElastic()
對應Executors.newCachedThreadPool()
;Schedulers.parallel()
和Schedulers.newParallel()
對應Executors.newFixedThreadPool()
;Schedulers
提供的以上三種調度器底層都是基於ScheduledExecutorService
的,所以都是支持任務定時和週期性執行的;Flux
和Mono
的調度操做符subscribeOn
和publishOn
支持work-stealing。舉例:將同步的阻塞調用變爲異步的
前面介紹到Schedulers.elastic()
可以方便地給一個阻塞的任務分配專門的線程,從而不會妨礙其餘任務和資源。咱們就能夠利用這一點將一個同步阻塞的調用調度到一個本身的線程中,並利用訂閱機制,待調用結束後異步返回。
假設咱們有一個同步阻塞的調用方法:
private String getStringSync() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello, Reactor!"; }
正常狀況下,調用這個方法會被阻塞2秒鐘,而後同步地返回結果。咱們藉助elastic調度器將其變爲異步,因爲是異步的,爲了保證測試方法所在的線程可以等待結果的返回,咱們使用CountDownLatch
:
@Test public void testSyncToAsync() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.fromCallable(() -> getStringSync()) // 1 .subscribeOn(Schedulers.elastic()) // 2 .subscribe(System.out::println, null, countDownLatch::countDown); countDownLatch.await(10, TimeUnit.SECONDS); }
fromCallable
聲明一個基於Callable的Mono;subscribeOn
將任務調度到Schedulers
內置的彈性線程池執行,彈性線程池會爲Callable的執行任務分配一個單獨的線程。切換調度器的操做符
Reactor 提供了兩種在響應式鏈中調整調度器 Scheduler的方法:publishOn
和subscribeOn
。它們都接受一個 Scheduler
做爲參數,從而能夠改變調度器。可是publishOn
在鏈中出現的位置是有講究的,而subscribeOn
則無所謂。
假設與上圖對應的代碼是:
Flux.range(1, 1000)
.map(...)
.publishOn(Schedulers.elastic()).filter(...)
.publishOn(Schedulers.parallel()).flatMap(...)
.subscribeOn(Schedulers.single())
publishOn
會影響鏈中其後的操做符,好比第一個publishOn調整調度器爲elastic,則filter
的處理操做是在彈性線程池中執行的;同理,flatMap
是執行在固定大小的parallel線程池中的;subscribeOn
不管出如今什麼位置,都隻影響源頭的執行環境,也就是range
方法是執行在單線程中的,直至被第一個publishOn
切換調度器以前,因此range
後的map
也在單線程中執行。關於publishOn
和subscribeOn
爲何會出現如此的調度策略,須要深刻討論Reactor的實現原理,咱們將在下一章展開。
在響應式流中,錯誤(error)是終止信號。當有錯誤發生時,它會致使流序列中止,而且錯誤信號會沿着操做鏈條向下傳遞,直至遇到subscribe中的錯誤處理方法。這樣的錯誤仍是應該在應用層面解決的。不然,你可能會將錯誤信息顯示在用戶界面,或者經過某個REST endpoint發出。因此仍是建議在subscribe時經過錯誤處理方法妥善解決錯誤。
@Test public void testErrorHandling() { Flux.range(1, 6) .map(i -> 10/(i-3)) // 1 .map(i -> i*i) .subscribe(System.out::println, System.err::println); }
輸出爲:
25 100 java.lang.ArithmeticException: / by zero //注:這一行是紅色,表示標準錯誤輸出
subscribe
方法的第二個參數定義了對錯誤信號的處理,從而測試方法exit爲0(即正常退出),可見錯誤沒有蔓延出去。不過這還不夠~
此外,Reactor還提供了其餘的用於在鏈中處理錯誤的操做符(error-handling operators),使得對於錯誤信號的處理更加及時,處理方式更加多樣化。
在討論錯誤處理操做符的時候,咱們藉助命令式編程風格的 try 代碼塊來做比較。咱們都很熟悉在 try-catch 代碼塊中處理異常的幾種方法。常見的包括以下幾種:
以上全部這些在 Reactor 都有相應的基於 error-handling 操做符處理方式。
1. 捕獲並返回一個靜態的缺省值
onErrorReturn
方法可以在收到錯誤信號的時候提供一個缺省值:
Flux.range(1, 6) .map(i -> 10/(i-3)) .onErrorReturn(0) // 1 .map(i -> i*i) .subscribe(System.out::println, System.err::println);
輸出以下:
25 100 0
2. 捕獲並執行一個異常處理方法或計算一個候補值來頂替
onErrorResume
方法可以在收到錯誤信號的時候提供一個新的數據流:
Flux.range(1, 6) .map(i -> 10/(i-3)) .onErrorResume(e -> Mono.just(new Random().nextInt(6))) // 提供新的數據流 .map(i -> i*i) .subscribe(System.out::println, System.err::println);
輸出以下:
25 100 16
舉一個更有業務含義的例子:
Flux.just(endpoint1, endpoint2) .flatMap(k -> callExternalService(k)) // 1 .onErrorResume(e -> getFromCache(k)); // 2
3. 捕獲,並再包裝爲某一個業務相關的異常,而後再拋出業務異常
有時候,咱們收到異常後並不想當即處理,而是會包裝成一個業務相關的異常交給後續的邏輯處理,可使用onErrorMap
方法:
Flux.just("timeout1") .flatMap(k -> callExternalService(k)) // 1 .onErrorMap(original -> new BusinessException("SLA exceeded", original)); // 2
這一功能其實也能夠用onErrorResume
實現,略麻煩一點:
Flux.just("timeout1") .flatMap(k -> callExternalService(k)) .onErrorResume(original -> Flux.error( new BusinessException("SLA exceeded", original) );
4. 捕獲,記錄錯誤日誌,而後繼續拋出
若是對於錯誤你只是想在不改變它的狀況下作出響應(如記錄日誌),並讓錯誤繼續傳遞下去, 那麼能夠用doOnError
方法。前面提到,形如doOnXxx
是隻讀的,對數據流不會形成影響:
Flux.just(endpoint1, endpoint2) .flatMap(k -> callExternalService(k)) .doOnError(e -> { // 1 log("uh oh, falling back, service failed for key " + k); // 2 }) .onErrorResume(e -> getFromCache(k));
5. 使用 finally 來清理資源,或使用 Java 7 引入的 "try-with-resource"
Flux.using( () -> getResource(), // 1 resource -> Flux.just(resource.getAll()), // 2 MyResource::clean // 3 );
另外一方面, doFinally
在序列終止(不管是 onComplete、onError
仍是取消)的時候被執行, 而且可以判斷是什麼類型的終止事件(完成、錯誤仍是取消),以便進行鍼對性的清理。如:
LongAdder statsCancel = new LongAdder(); // 1 Flux<String> flux = Flux.just("foo", "bar") .doFinally(type -> { if (type == SignalType.CANCEL) // 2 statsCancel.increment(); // 3 }) .take(1); // 4
LongAdder
進行統計;doFinally
用SignalType
檢查了終止信號的類型;take(1)
可以在發出1個元素後取消流。重試
還有一個用於錯誤處理的操做符你可能會用到,就是retry
,見文知意,用它能夠對出現錯誤的序列進行重試。
請注意:**retry對於上游Flux是採起的重訂閱(re-subscribing)的方式,所以重試以後實際上已經一個不一樣的序列了, 發出錯誤信號的序列仍然是終止了的。舉例以下:
Flux.range(1, 6) .map(i -> 10 / (3 - i)) .retry(1) .subscribe(System.out::println, System.err::println); Thread.sleep(100); // 確保序列執行完
輸出以下:
5 10 5 10 java.lang.ArithmeticException: / by zero
可見,retry
不過是再一次重新訂閱了原始的數據流,從1開始。第二次,因爲異常再次出現,便將異常傳遞到下游了。
前邊的例子並無進行流量控制,也就是,當執行.subscribe(System.out::println)
這樣的訂閱的時候,直接發起了一個無限的請求(unbounded request),就是對於數據流中的元素不管快慢都「照單全收」。
subscribe
方法還有一個變體:
// 接收一個Subscriber爲參數,該Subscriber能夠進行更加靈活的定義 subscribe(Subscriber subscriber)
注:其實這纔是
subscribe
方法本尊,前邊介紹到的能夠接收0~4個函數式接口爲參數的subscribe
最終都是拼裝爲這個方法,因此按理說前邊的subscribe
方法纔是「變體」。
咱們能夠經過自定義具備流量控制能力的Subscriber進行訂閱。Reactor提供了一個BaseSubscriber
,咱們能夠經過擴展它來定義本身的Subscriber。
假設,咱們如今有一個很是快的Publisher——Flux.range(1, 6)
,而後自定義一個每秒處理一個數據元素的慢的Subscriber,Subscriber就須要經過request(n)
的方法來告知上游它的需求速度。代碼以下:
@Test public void testBackpressure() { Flux.range(1, 6) // 1 .doOnRequest(n -> System.out.println("Request " + n + " values...")) // 2 .subscribe(new BaseSubscriber<Integer>() { // 3 @Override protected void hookOnSubscribe(Subscription subscription) { // 4 System.out.println("Subscribed and make a request..."); request(1); // 5 } @Override protected void hookOnNext(Integer value) { // 6 try { TimeUnit.SECONDS.sleep(1); // 7 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Get value [" + value + "]"); // 8 request(1); // 9 } }); }
Flux.range
是一個快的Publisher;request
的時候打印request個數;BaseSubscriber
的方法來自定義Subscriber;hookOnSubscribe
定義在訂閱的時候執行的操做;hookOnNext
定義每次在收到一個元素的時候的操做;輸出以下(咱們也可使用log()
來打印相似下邊的輸出,以代替上邊代碼中的System.out.println
):
Subscribed and make a request... Request 1 values... Get value [1] Request 1 values... Get value [2] Request 1 values... Get value [3] Request 1 values... Get value [4] Request 1 values... Get value [5] Request 1 values... Get value [6] Request 1 values...
這6個元素是以每秒1個的速度被處理的。因而可知range
方法生成的Flux採用的是緩存的回壓策略,可以緩存下游暫時來不及處理的元素。
以上關於Reactor的介紹主要是概念層面和使用層面的介紹,不過應該也足以應對常見的業務環境了。
從命令式編程到響應式編程的切換並非一件容易的事,須要一個適應的過程。不過相信你經過本節的瞭解和實操,已經能夠體會到使用Reactor編程的一些特色:
request
方法來告知源頭它一次最多可以處理 n 個元素,從而將「推送」模式轉換爲「推送+拉取」混合的模式。後續隨着對Reactor的瞭解咱們還會逐漸瞭解它更多的好玩又好用的特性。
Reactor的開發者中也有來自RxJava的大牛,所以Reactor中甚至許多方法名都是來自RxJava的API的,學習了Reactor以後,很輕鬆就能夠上手Rx家族的庫了。