關於 響應式 Reactive,前面的兩篇文章談了很多概念,基本都離不開下面兩點:前端
有興趣的朋友能夠看看這兩篇文章:java
Reactive(1) 從響應式編程到「好萊塢」
Reactive(2) 響應式流與制奶廠業務react
此次,咱們把目光轉向 SpringBoot,在SpringBoot 2.0版本以後,提供了對響應式編程的全面支持。
所以在升級到 2.x版本以後,便能方便的實現事件驅動模型的後端編程,這其中離不開 webflux這個模塊。
其同時也被 Spring 5 用做開發響應式 web 應用的核心基礎。 那麼, webflux 是一個怎樣的東西?web
Webfluxspring
Webflux 模塊的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。
該模塊中包含了對 響應式 HTTP、服務器推送 和 WebSocket 的支持。編程
Webflux 支持兩種不一樣的編程模型:後端
@RestController public class EchoController { @GetMapping("/echo") public Mono<String> sayHelloWorld() { return Mono.just("Echo!"); } }
這兩種編程模型只是在代碼編寫方式上存在不一樣,但底層的基礎模塊仍然是同樣的。
除此以外,Webflux 能夠運行在支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其餘異步運行時環境,如 Netty 和 Undertow。數組
關於Webflux 與 SpringMVC 的區別,能夠參考下圖:服務器
SpringBoot、Webflux、Reactor 能夠說是層層包含的關係,其中,響應式能力的核心仍然是來自 Reactor組件。
因而可知,掌握Reactor的用法 必然是熟練進行 Spring 響應式編程的重點。
在理解響應式Web編程以前,咱們須要對Reactor 兩個核心概念作一些澄清,一個是Mono,另外一個是Flux。
Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中能夠包含三種不一樣類型的消息通知:
當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。
Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中一樣能夠包含與 Flux 相同的三種類型的消息通知。
Flux 和 Mono 之間能夠進行轉換,好比對一個 Flux 序列進行計數操做,獲得的結果是一個 Mono
Reactor提供了很是方便的API來建立 Flux、Mono 對象,以下:
使用靜態工廠類建立Flux
Flux.just("Hello", "World").subscribe(System.out::println); Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println); Flux.empty().subscribe(System.out::println); Flux.range(1, 10).subscribe(System.out::println); Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
除了上述的方式以外,還可使用 generate()、create()方法來自定義流數據的產生過程:
generate()
Flux.generate(sink -> { sink.next("Echo"); sink.complete(); }).subscribe(System.out::println);
generate 只提供序列中單個消息的產生邏輯(同步通知),其中的 sink.next()最多隻能調用一次,好比上面的代碼中,產生一個Echo消息後就結束了。
create()
Flux.create(sink -> { for (char i = 'a'; i <= 'z'; i++) { sink.next(i); } sink.complete(); }).subscribe(System.out::print);
create 提供的是整個序列的產生邏輯,sink.next()能夠調用屢次(異步通知),如上面的代碼將會產生a-z的小寫字母。
使用靜態工廠類建立Mono
Mono 的建立方式與 Flux 是很類似的。 除了Flux 所擁有的構造方式以外,還能夠支持與Callable、Runnable、Supplier 等接口集成。
參考下面的代碼:
Mono.fromSupplier(() -> "Mono1").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("Mono2")).subscribe(System.out::println); Mono.create(sink -> sink.success("Mono3")).subscribe(System.out::println);
在Reactive(1) 從響應式編程到「好萊塢」 一文中曾經提到過緩衝(buffer)的概念。
buffer 是流處理中很是經常使用的一種處理,意思就是將流的一段截停後再作處理。
好比下面的代碼:
Flux.range(1, 100).buffer(20).subscribe(System.out::println); Flux.interval(Duration.of(0, ChronoUnit.SECONDS), Duration.of(1, ChronoUnit.SECONDS)) .buffer(Duration.of(5, ChronoUnit.SECONDS)). take(2).toStream().forEach(System.out::println); Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println); Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
第一個buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組數據(按20分組)
第二個buffer(Duration duration)是指湊足一段時間後的數據再近些處理,這裏是5秒鐘作一次處理
第三個bufferUtil(Predicate p)是指等到某個元素知足斷言(條件)時進行收集處理,這裏將會輸出[1,2],[3,4]..這樣的奇偶數字對
第四個bufferWhile(Predicate p)則僅僅是收集知足斷言(條件)的元素,這裏將會輸出2,4,6..這樣的偶數
與 buffer 相似的是window函數,後者的不一樣在於其在緩衝截停後並不會輸出一些元素列表,而是直接轉換爲Flux對象,以下:
Flux.range(1, 100).window(20) .subscribe(flux -> flux.buffer(5).subscribe(System.out::println));
window(20)返回的結果是一個Flux
所以上面的代碼會按5個一組輸出:
[1, 2, 3, 4, 5] [6, 7, 8, 9, 10] [11, 12, 13, 14, 15] ...
上面的bufferWhile 其實充當了過濾的做用,固然,對於流元素的過濾也可使用filter函數來處理:
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
take 函數能夠用來提取想要的元素,這與filter 過濾動做是偏偏相反的,來看看take的用法:
Flux.range(1, 10).take(2).subscribe(System.out::println); Flux.range(1, 10).takeLast(2).subscribe(System.out::println); Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println); Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);
第一個take(2)指提取前面的兩個元素;
第二個takeLast(2)指提取最後的兩個元素;
第三個takeWhile(Predicate p)指提取知足條件的元素,這裏是1-4
第四個takeUtil(Predicate p)指一直提取直到知足條件的元素出現爲止,這裏是1-6
使用map函數能夠將流中的元素進行個體轉換,以下:
Flux.range(1, 10).map(x -> x*x).subscribe(System.out::println);
這裏的map使用的JDK8 所定義的 Function接口
某些狀況下咱們須要對兩個流中的元素進行合併處理,這與合併兩個數組有點類似,但結合流的特色又會有不一樣的需求。
使用zipWith函數能夠實現簡單的流元素合併處理:
Flux.just("I", "You") .zipWith(Flux.just("Win", "Lose")) .subscribe(System.out::println); Flux.just("I", "You") .zipWith(Flux.just("Win", "Lose"), (s1, s2) -> String.format("%s!%s!", s1, s2)) .subscribe(System.out::println);
上面的代碼輸出爲:
[I,Win] [You,Lose] I!Win! You!Lose!
第一個zipWith輸出的是Tuple對象(不可變的元祖),第二個zipWith增長了一個BiFunction來實現合併計算,輸出的是字符串。
注意到zipWith是分別按照元素在流中的順序進行兩兩合併的,合併後的流長度則最短的流爲準,遵循最短對齊原則。
用於實現合併的還有 combineLastest函數,combinLastest 會動態的將流中新產生元素(末位)進行合併,注意是隻要產生新元素都會觸發合併動做併產生一個結果元素,以下面的代碼:
Flux.combineLatest( Arrays::toString, Flux.interval(Duration.of(0, ChronoUnit.MILLIS), Duration.of(100, ChronoUnit.MILLIS)).take(2), Flux.interval(Duration.of(50, ChronoUnit.MILLIS), Duration.of(100, ChronoUnit.MILLIS)).take(2) ).toStream().forEach(System.out::println);
輸出爲:
[0, 0] [1, 0] [1, 1]
與合併比較相似的處理概念是合流,合流的不一樣之處就在於元素之間不會產生合併,最終流的元素個數(長度)是兩個源的個數之和。
合流的計算可使用 merge或mergeSequential 函數,這二者的區別在於:
merge後的元素是按產生時間排序的,而mergeSequential 則是按整個流被訂閱的時間來排序,以下面的代碼:
Flux.merge(Flux.interval( Duration.of(0, ChronoUnit.MILLIS), Duration.of(100, ChronoUnit.MILLIS)).take(2), Flux.interval( Duration.of(50, ChronoUnit.MILLIS), Duration.of(100, ChronoUnit.MILLIS)).take(2)) .toStream() .forEach(System.out::println); System.out.println("---"); Flux.mergeSequential(Flux.interval( Duration.of(0, ChronoUnit.MILLIS), Duration.of(100, ChronoUnit.MILLIS)).take(2), Flux.interval( Duration.of(50, ChronoUnit.MILLIS), Duration.of(100, ChronoUnit.MILLIS)).take(2)) .toStream() .forEach(System.out::println);
輸出爲:
0 0 1 1 --- 0 1 0 1
merge 是直接將Flux 元素進行合流以外,而flatMap則提供了更加高級的處理:
flatMap 函數會先將Flux中的元素轉換爲 Flux(流),而後再新產生的Flux進行合流處理,以下:
Flux.just(1, 2) .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS), Duration.of(10, ChronoUnit.MILLIS)).take(x)) .toStream() .forEach(System.out::println);
flatMap也存在flatMapSequential的一個兄弟版本,後者決定了合併流元素的順序是與流的訂閱順序一致的。
reduce 和 reduceWith 操做符對流中包含的全部元素進行累積操做,獲得一個包含計算結果的 Mono 序列。累積操做是經過一個 BiFunction 來表示的。reduceWith 容許在在操做時指定一個起始值(與第一個元素進行運算)
以下面的代碼:
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println); Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
這裏經過reduce計算出1-100的累加結果(1+2+3+...100),結果輸出爲:
5050 5150
在前面所說起的這些功能基本都屬於正常的流處理,然而對於異常的捕獲以及採起一些修正手段也是一樣重要的。
利用Flux/Mono 框架能夠很方便的作到這點。
將正常消息和錯誤消息分別打印
Flux.just(1, 2) .concatWith(Mono.error(new IllegalStateException())) .subscribe(System.out::println, System.err::println);
當產生錯誤時默認返回0
Flux.just(1, 2) .concatWith(Mono.error(new IllegalStateException())) .onErrorReturn(0) .subscribe(System.out::println);
自定義異常時的處理
Flux.just(1, 2) .concatWith(Mono.error(new IllegalArgumentException())) .onErrorResume(e -> { if (e instanceof IllegalStateException) { return Mono.just(0); } else if (e instanceof IllegalArgumentException) { return Mono.just(-1); } return Mono.empty(); }) .subscribe(System.out::println);
當產生錯誤時重試
Flux.just(1, 2) .concatWith(Mono.error(new IllegalStateException())) .retry(1) .subscribe(System.out::println);
這裏的retry(1)表示最多重試1次,並且重試將從訂閱的位置開始從新發送流事件
咱們說過,響應式是異步化的,那麼就會涉及到多線程的調度。
Reactor 提供了很是方便的調度器(Scheduler)工具方法,能夠指定流的產生以及轉換(計算)發佈所採用的線程調度方式。
這些方式包括:
類別 | 描述 |
---|---|
immediate | 採用當前線程 |
single | 單一可複用的線程 |
elastic | 彈性可複用的線程池(IO型) |
parallel | 並行操做優化的線程池(CPU計算型) |
timer | 支持任務調度的線程池 |
fromExecutorService | 自定義線程池 |
下面,以一個簡單的實例來演示不一樣的線程調度:
Flux.create(sink -> { sink.next(Thread.currentThread().getName()); sink.complete(); }) .publishOn(Schedulers.single()) .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x)) .publishOn(Schedulers.elastic()) .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x)) .subscribeOn(Schedulers.parallel()) .toStream() .forEach(System.out::println);
在這段代碼中,使用publishOn指定了流發佈的調度器,subscribeOn則指定的是流產生的調度器。
首先是parallel調度器進行流數據的生成,接着使用一個single單線程調度器進行發佈,此時通過第一個map轉換爲另外一個Flux流,其中的消息疊加了當前線程的名稱。最後進入的是一個elastic彈性調度器,再次進行一次一樣的map轉換。
最終,通過多層轉換後的輸出以下:
[elastic-2] [single-1] parallel-1
SpringBoot 2.x、Spring 5 對於響應式的Web編程(基於Reactor)都提供了全面的支持,藉助於框架的能力能夠快速的完成一些簡單的響應式代碼開發。
本文提供了較多 Reactor API的代碼樣例,旨在幫助讀者能快速的理解 響應式編程的概念及方式。
對於習慣了傳統編程範式的開發人員來講,熟練使用 Reactor 仍然須要一些思惟上的轉變。
就筆者的自身感受來看,Reactor 存在一些學習和適應的成本,但一旦熟悉使用以後便能體會它的先進之處。 就如 JDK8 引入的Stream API以後,許多開發者則漸漸拋棄forEach的方式..
自己這就是一種生產效率的提高機會,何樂而不爲? 更況且,從應用框架的發展前景來看,響應式的前景是明朗的。
使用 Reactor 進行反應式編程
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html
Spring 5 的 WebFlux 開發介紹 https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html