Reactive(3)5分鐘理解 SpringBoot 響應式的核心-Reactor

1、前言

關於 響應式 Reactive,前面的兩篇文章談了很多概念,基本都離不開下面兩點:前端

  • 響應式編程是面向流的、異步化的開發方式
  • 響應式是很是通用的概念,不管在前端領域、仍是實時流、離線處理場景中都是適用的。

有興趣的朋友能夠看看這兩篇文章:java

Reactive(1) 從響應式編程到「好萊塢」
Reactive(2) 響應式流與制奶廠業務react

此次,咱們把目光轉向 SpringBoot,在SpringBoot 2.0版本以後,提供了對響應式編程的全面支持。
所以在升級到 2.x版本以後,便能方便的實現事件驅動模型的後端編程,這其中離不開 webflux這個模塊。
其同時也被 Spring 5 用做開發響應式 web 應用的核心基礎。 那麼, webflux 是一個怎樣的東西?web

Webfluxspring

Webflux 模塊的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。
該模塊中包含了對 響應式 HTTP、服務器推送 和 WebSocket 的支持。編程

Webflux 支持兩種不一樣的編程模型:後端

  • 第一種是 Spring MVC 中使用的基於 Java 註解的方式,一個使用Reactive風格的Controller以下所示:
@RestController
public class EchoController {
    @GetMapping("/echo")
    public Mono<String> sayHelloWorld() {
        return Mono.just("Echo!");
    }
}
  • 第二種是 基於 Java 8 的 lambda 表達式的函數式編程模型。

這兩種編程模型只是在代碼編寫方式上存在不一樣,但底層的基礎模塊仍然是同樣的。
除此以外,Webflux 能夠運行在支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其餘異步運行時環境,如 Netty 和 Undertow。數組

關於Webflux 與 SpringMVC 的區別,能夠參考下圖:服務器

SpringBoot、Webflux、Reactor 能夠說是層層包含的關係,其中,響應式能力的核心仍然是來自 Reactor組件。
因而可知,掌握Reactor的用法 必然是熟練進行 Spring 響應式編程的重點。

2、 Mono 與 Flux

在理解響應式Web編程以前,咱們須要對Reactor 兩個核心概念作一些澄清,一個是Mono,另外一個是Flux。

Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中能夠包含三種不一樣類型的消息通知:

  • 正常的包含元素的消息
  • 序列結束的消息
  • 序列出錯的消息

當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。

Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中一樣能夠包含與 Flux 相同的三種類型的消息通知。
Flux 和 Mono 之間能夠進行轉換,好比對一個 Flux 序列進行計數操做,獲得的結果是一個 Mono 對象,或者把兩個 Mono 序列合併在一塊兒,獲得的是一個 Flux 對象。

構造器

Reactor提供了很是方便的API來建立 Flux、Mono 對象,以下:

使用靜態工廠類建立Flux

Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
  • just():能夠指定序列中包含的所有元素。建立出來的 Flux 序列在發佈這些元素以後會自動結束。
  • fromArray():能夠從一個數組、Iterable 對象或 Stream 對象中建立 Flux 對象。
  • empty():建立一個不包含任何元素,只發布結束消息的序列。
  • range(int start, int count):建立包含從 start 起始的 count 個數量的 Integer 對象的序列。
  • interval(Duration period)和 interval(Duration delay, Duration period):建立一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間以外,還能夠指定起始元素髮布以前的延遲時間。

除了上述的方式以外,還可使用 generate()、create()方法來自定義流數據的產生過程:

generate()

Flux.generate(sink -> {
    sink.next("Echo");
    sink.complete();
}).subscribe(System.out::println);

generate 只提供序列中單個消息的產生邏輯(同步通知),其中的 sink.next()最多隻能調用一次,好比上面的代碼中,產生一個Echo消息後就結束了。

create()

Flux.create(sink -> {
    for (char i = 'a'; i <= 'z'; i++) {
        sink.next(i);
    }
    sink.complete();
}).subscribe(System.out::print);

create 提供的是整個序列的產生邏輯,sink.next()能夠調用屢次(異步通知),如上面的代碼將會產生a-z的小寫字母。

使用靜態工廠類建立Mono

Mono 的建立方式與 Flux 是很類似的。 除了Flux 所擁有的構造方式以外,還能夠支持與Callable、Runnable、Supplier 等接口集成。

參考下面的代碼:

Mono.fromSupplier(() -> "Mono1").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Mono2")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Mono3")).subscribe(System.out::println);

3、 流計算

1. 緩衝

Reactive(1) 從響應式編程到「好萊塢」 一文中曾經提到過緩衝(buffer)的概念。
buffer 是流處理中很是經常使用的一種處理,意思就是將流的一段截停後再作處理。

好比下面的代碼:

Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.interval(Duration.of(0, ChronoUnit.SECONDS),
              Duration.of(1, ChronoUnit.SECONDS))
        .buffer(Duration.of(5, ChronoUnit.SECONDS)).
        take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

第一個buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組數據(按20分組)
第二個buffer(Duration duration)是指湊足一段時間後的數據再近些處理,這裏是5秒鐘作一次處理
第三個bufferUtil(Predicate p)是指等到某個元素知足斷言(條件)時進行收集處理,這裏將會輸出[1,2],[3,4]..這樣的奇偶數字對
第四個bufferWhile(Predicate p)則僅僅是收集知足斷言(條件)的元素,這裏將會輸出2,4,6..這樣的偶數

與 buffer 相似的是window函數,後者的不一樣在於其在緩衝截停後並不會輸出一些元素列表,而是直接轉換爲Flux對象,以下:

Flux.range(1, 100).window(20)
      .subscribe(flux -> 
           flux.buffer(5).subscribe(System.out::println));

window(20)返回的結果是一個Flux 類型的對象,咱們進而對其進行了緩衝處理。
所以上面的代碼會按5個一組輸出:

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
[11, 12, 13, 14, 15]
...

2. 過濾/提取

上面的bufferWhile 其實充當了過濾的做用,固然,對於流元素的過濾也可使用filter函數來處理:

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

take 函數能夠用來提取想要的元素,這與filter 過濾動做是偏偏相反的,來看看take的用法:

Flux.range(1, 10).take(2).subscribe(System.out::println);
Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);

第一個take(2)指提取前面的兩個元素;
第二個takeLast(2)指提取最後的兩個元素;
第三個takeWhile(Predicate p)指提取知足條件的元素,這裏是1-4
第四個takeUtil(Predicate p)指一直提取直到知足條件的元素出現爲止,這裏是1-6

3. 轉換

使用map函數能夠將流中的元素進行個體轉換,以下:

Flux.range(1, 10).map(x -> x*x).subscribe(System.out::println);

這裏的map使用的JDK8 所定義的 Function接口

4. 合併

某些狀況下咱們須要對兩個流中的元素進行合併處理,這與合併兩個數組有點類似,但結合流的特色又會有不一樣的需求。

使用zipWith函數能夠實現簡單的流元素合併處理:

Flux.just("I", "You")
        .zipWith(Flux.just("Win", "Lose"))
        .subscribe(System.out::println);
Flux.just("I", "You")
        .zipWith(Flux.just("Win", "Lose"), 
        (s1, s2) -> String.format("%s!%s!", s1, s2))
        .subscribe(System.out::println);

上面的代碼輸出爲:

[I,Win]
[You,Lose]
I!Win!
You!Lose!

第一個zipWith輸出的是Tuple對象(不可變的元祖),第二個zipWith增長了一個BiFunction來實現合併計算,輸出的是字符串。

注意到zipWith是分別按照元素在流中的順序進行兩兩合併的,合併後的流長度則最短的流爲準,遵循最短對齊原則。

用於實現合併的還有 combineLastest函數,combinLastest 會動態的將流中新產生元素(末位)進行合併,注意是隻要產生新元素都會觸發合併動做併產生一個結果元素,以下面的代碼:

Flux.combineLatest(
        Arrays::toString,
        Flux.interval(Duration.of(0, ChronoUnit.MILLIS), 
            Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(Duration.of(50, ChronoUnit.MILLIS), 
            Duration.of(100, ChronoUnit.MILLIS)).take(2)
).toStream().forEach(System.out::println);

輸出爲:

[0, 0]
[1, 0]
[1, 1]

5. 合流

與合併比較相似的處理概念是合流,合流的不一樣之處就在於元素之間不會產生合併,最終流的元素個數(長度)是兩個源的個數之和。
合流的計算可使用 merge或mergeSequential 函數,這二者的區別在於:

merge後的元素是按產生時間排序的,而mergeSequential 則是按整個流被訂閱的時間來排序,以下面的代碼:

Flux.merge(Flux.interval(
            Duration.of(0, ChronoUnit.MILLIS),
            Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
                Duration.of(50, ChronoUnit.MILLIS),
                Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
        .forEach(System.out::println);
System.out.println("---");
Flux.mergeSequential(Flux.interval(
        Duration.of(0, ChronoUnit.MILLIS),
        Duration.of(100, ChronoUnit.MILLIS)).take(2),
        Flux.interval(
                Duration.of(50, ChronoUnit.MILLIS),
                Duration.of(100, ChronoUnit.MILLIS)).take(2))
        .toStream()
        .forEach(System.out::println);

輸出爲:

0
0
1
1
---
0
1
0
1

merge 是直接將Flux 元素進行合流以外,而flatMap則提供了更加高級的處理:
flatMap 函數會先將Flux中的元素轉換爲 Flux(流),而後再新產生的Flux進行合流處理,以下:

Flux.just(1, 2)
        .flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS),
                Duration.of(10, ChronoUnit.MILLIS)).take(x))
        .toStream()
        .forEach(System.out::println);

flatMap也存在flatMapSequential的一個兄弟版本,後者決定了合併流元素的順序是與流的訂閱順序一致的。

6. 累積

reduce 和 reduceWith 操做符對流中包含的全部元素進行累積操做,獲得一個包含計算結果的 Mono 序列。累積操做是經過一個 BiFunction 來表示的。reduceWith 容許在在操做時指定一個起始值(與第一個元素進行運算)

以下面的代碼:

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

這裏經過reduce計算出1-100的累加結果(1+2+3+...100),結果輸出爲:

5050
5150

4、異常處理

在前面所說起的這些功能基本都屬於正常的流處理,然而對於異常的捕獲以及採起一些修正手段也是一樣重要的。

利用Flux/Mono 框架能夠很方便的作到這點。

將正常消息和錯誤消息分別打印

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .subscribe(System.out::println, System.err::println);

當產生錯誤時默認返回0

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);

自定義異常時的處理

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalArgumentException()))
        .onErrorResume(e -> {
            if (e instanceof IllegalStateException) {
                return Mono.just(0);
            } else if (e instanceof IllegalArgumentException) {
                return Mono.just(-1);
            }
            return Mono.empty();
        })
        .subscribe(System.out::println);

當產生錯誤時重試

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);

這裏的retry(1)表示最多重試1次,並且重試將從訂閱的位置開始從新發送流事件

5、線程調度

咱們說過,響應式是異步化的,那麼就會涉及到多線程的調度。

Reactor 提供了很是方便的調度器(Scheduler)工具方法,能夠指定流的產生以及轉換(計算)發佈所採用的線程調度方式。
這些方式包括:

類別 描述
immediate 採用當前線程
single 單一可複用的線程
elastic 彈性可複用的線程池(IO型)
parallel 並行操做優化的線程池(CPU計算型)
timer 支持任務調度的線程池
fromExecutorService 自定義線程池

下面,以一個簡單的實例來演示不一樣的線程調度:

Flux.create(sink -> {
    sink.next(Thread.currentThread().getName());
    sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);

在這段代碼中,使用publishOn指定了流發佈的調度器,subscribeOn則指定的是流產生的調度器。
首先是parallel調度器進行流數據的生成,接着使用一個single單線程調度器進行發佈,此時通過第一個map轉換爲另外一個Flux流,其中的消息疊加了當前線程的名稱。最後進入的是一個elastic彈性調度器,再次進行一次一樣的map轉換。

最終,通過多層轉換後的輸出以下:

[elastic-2] [single-1] parallel-1

小結

SpringBoot 2.x、Spring 5 對於響應式的Web編程(基於Reactor)都提供了全面的支持,藉助於框架的能力能夠快速的完成一些簡單的響應式代碼開發。
本文提供了較多 Reactor API的代碼樣例,旨在幫助讀者能快速的理解 響應式編程的概念及方式。

對於習慣了傳統編程範式的開發人員來講,熟練使用 Reactor 仍然須要一些思惟上的轉變。
就筆者的自身感受來看,Reactor 存在一些學習和適應的成本,但一旦熟悉使用以後便能體會它的先進之處。 就如 JDK8 引入的Stream API以後,許多開發者則漸漸拋棄forEach的方式..

自己這就是一種生產效率的提高機會,何樂而不爲? 更況且,從應用框架的發展前景來看,響應式的前景是明朗的。

參考閱讀

使用 Reactor 進行反應式編程
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html

Spring 5 的 WebFlux 開發介紹 https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html

相關文章
相關標籤/搜索