響應式編程 Reactor 學習小記

從響應式編程提及

響應式編程是一種關注於數據流(data streams)和變化傳遞(propagation of change)的異步編程方式。 這意味着它能夠用既有的編程語言表達靜態(如數組)或動態(如事件源)的數據流。html

在響應式編程方面,微軟跨出了第一步,它在 .NET 生態中建立了響應式擴展庫(Reactive Extensions library, Rx)。接着 RxJava 在 JVM 上實現了響應式編程。後來,在 JVM 平臺出現了一套標準的響應式 編程規範,它定義了一系列標準接口和交互規範。並整合到 Java 9 中(Flow 類)。java

響應式編程一般做爲面向對象編程中的「觀察者模式」(Observer design pattern)的一種擴展。 響應式流(reactive streams)與「迭代子模式」(Iterator design pattern)也有相通之處, 由於其中也有 Iterable-Iterator 這樣的對應關係。主要的區別在於,Iterator 是基於 「拉取」(pull)方式的,而響應式流是基於「推送」(push)方式的。react

  • iterator 是一種「命令式」(imperative)編程範式,即便訪問元素的方法是 Iterable 的惟一職責。關鍵在於,何時執行 next() 獲取元素取決於開發者。
  • 響應式流中,相對應的角色是 Publisher-Subscriber,可是當有新的值到來的時候 ,卻反過來由發佈者(Publisher) 通知訂閱者(Subscriber),這種「推送」模式是響應式的關鍵

此外,對推送來的數據的操做是經過一種聲明式(declaratively)而不是命令式(imperatively)的方式表達的:開發者經過描述「控制流程」來定義對數據流的處理邏輯。git

除了數據推送,對錯誤處理(error handling)和完成(completion)信號的定義也很完善。一個 Publisher 能夠推送新的值到它的 Subscriber(調用 onNext 方法), 一樣也能夠推送錯誤(調用 onError 方法)和完成(調用 onComplete 方法)信號。 錯誤和完成信號均可以終止響應式流。能夠用下邊的表達式描述:github

onNext x 0..N [onError | onComplete]
複製代碼

這種方式很是靈活,不管是有/沒有值,仍是 n 個值(包括有無限個值的流,好比時鐘的持續讀秒),均可處理。編程

以上來自 projectreactor.io/docs/core/r… 翻譯數組

Reactive Streams

Reactive Streams 是上面提到的一套標準的響應式編程規範。它由四個核心概念構成:bash

  • 消息發佈者:只有一個 subscribe 接口,是訂閱者調用的,用來訂閱發佈者的消息。發佈者在訂閱者調用 request 以後把消息 push 給訂閱者。
    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    複製代碼
  • 訂閱者:訂閱者包括四個接口,這些接口都由 Publisher 觸發調用的。onSubscribe 告訴訂閱者訂閱成功,並返回了一個 Subscription ;經過 Subscription 訂閱者能夠告訴發佈者發送指定數量的消息(request 完成) ;onNext 是發佈者有消息時,調用訂閱者這個接口來達到發佈消息的目的;onError 通知訂閱者,發佈者出現了錯誤;onComplete 通知訂閱者消息發送完畢。
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    複製代碼
  • 訂閱:包括兩個接口,請求 n 個消息和取消這次訂閱。
    public interface Subscription {
        // request(n)用來發起請求數據,其中n表示請求數據的數量,它必須大於0,
        // 不然會拋出IllegalArgumentException,並觸發onError,request的調用會
        // 累加,若是沒有終止,最後會觸發相應次數的onNext方法.
        public void request(long n);
        // cancel至關於取消訂閱,調用以後,後續不會再收到訂閱,onError 和 
        // onComplete也不會被觸發
        public void cancel();
    }
    複製代碼
  • 處理器:Processor 同時繼承了 Subscriber 和 Publisher;其表明一個處理階段。
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    複製代碼

Reactive Streams 經過上面的四個核心概念和相關的函數,對響應式流進行了一個框架性的約定,它沒有具體實現。簡單來講,它只提供通用的、合適的解決方案,你們都按照這個規約來實現就行了。app

Java 的 Reactive Programming 類庫主要有三個,分別是 Akka-Streams ,RxJava 和 Project Reactor。Spring 5 開始支持 Reactive Programming,其底層使用的是 Project Reactor。本篇主要是對 Project Reactor 中的一些點進行學習總結。框架

Project Reactor

Project Reactor 是一個基於 Java 8 的實現了響應式流規範 (Reactive Streams specification)的響應式庫。

Reactor 引入了實現 Publisher 的響應式類 Flux 和 Mono,以及豐富的操做方式。 一個 Flux 對象表明一個包含 0..N 個元素的響應式序列,而一個 Mono 對象表明一個包含零或者一個(0..1)元素的結果。

Flux 和 Mono

Flux 是生產者,即咱們上面提到的 Publisher,它表明的是一個包含 0-N 個元素的異步序列,Mono能夠看作 Flux 的有一個特例,表明 0-1 個元素,若是不須要生產任何元素,只是須要一個完成任務的信號,可使用 Mono。

Flux-包含 0-N 個元素的異步序列

Flux

先來看這張圖,這裏是直接從官方文檔上貼過來的。就這張圖作下說明,先來關注幾個點:

  • 從左到右的時間序列軸
  • 1-6 爲 Flux enitted(發射)的元素
  • 上面 6 後面的豎線標識已經成功完成了
  • 下面的 1-3 表示轉換的結果
  • ❌ 表示出現了error,對應的是執行了onError
  • operator : 操做符,聲明式的可組裝的響應式方法,其組裝成的鏈稱爲「操做鏈」

那總體來看就是 Flux 產生元數據,經過一系列 operator 操做獲得轉換結果,正常成功就是 onCompleted,出現錯誤就是 onError。看下面的一個小例子:

Flux.just("glmapper","leishu").subscribe(new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription subscription) {
        // subscription 表示訂閱關係
        System.out.println("onSubscribe,"+ subscription.getClass());
        // subscription 經過 request 來觸發 onNext
        subscription.request(2);
    }
    @Override
    public void onNext(String s) {
        System.out.println("currrent value is = " + s);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println("it's error.");
    }
    @Override
    public void onComplete() {
        System.out.println("it's completed.");
    }
});
複製代碼

執行結果:

onSubscribe,class reactor.core.publisher.StrictSubscriber
currrent value is = glmapper
currrent value is = leishu
it's completed. 複製代碼

若是在 onSubscribe 方法中咱們不執行 request,則不會有後續任何操做。關於 request 下面看。

Flux 是一個可以發出 0 到 N 個元素的標準的 Publisher,它會被一個 "error" 或 "completion" 信號終止。所以,一個 Flux 的結果多是一個 value、completion 或 error。 就像在響應式流規範中規定的那樣,這三種類型的信號被翻譯爲面向下游的 onNextonCompleteonError方法。

Mono-異步的 0-1 結果

Mono

這張圖也來自官方文檔,和上面 Flux 的區別就是,Mono 最多隻能 emitted 一個元素。

Mono.just("glmapper").subscribe(System.out::println);
複製代碼

小結

經過上面兩段小的代碼來看,最直觀的感覺是,Flux 至關於一個 List,Mono 至關於 Optional。其實在編程中全部的結果咱們均可以用 List 來 表示,可是當只返回一個或者沒有結果時,用 Optional 可能會更精確些。

Optional 相關概念可自行搜索 jdk Optional

另外,Mono 和 Flux 都提供了一些工廠方法,用於建立相關的實例,這裏簡單羅列一下:

// 能夠指定序列中包含的所有元素。建立出來的 Flux 
// 序列在發佈這些元素以後會自動結束。
Flux.just("glmapper", "leishu");
// 從一個Iterable 對象中建立 Flux 對象,固然還能夠是數組、Stream對象等
Flux.fromIterable(Arrays.asList("glmapper","leishu"));
// 建立一個只包含錯誤消息的序列。
Flux.error(new IllegalStateException());
// 建立一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間
// 隔來發布。除了間隔時間以外,還能夠指定起始元素髮布以前的延遲時間。
Flux.interval(Duration.ofMillis(100)).take(10);
// 建立一個不包含任何消息通知的序列。
Flux.never();
// 建立一個不包含任何元素,只發布結束消息的序列。
Flux.empty(); 
// 建立包含從 start 起始的 count 個數量的 Integer 對象的序列
Flux.range(int start, int count);
// Mono 同上
Mono.empty();
Mono.never();
Mono.just("glmapper");
Mono.error(new IllegalStateException());
複製代碼

上面的這些靜態方法適合於簡單的序列生成,當序列的生成須要複雜的邏輯時,則應該使用 generate() 或 create() 方法。

一些概念

  • Operator:Operator 是一系列函數式的便捷操做,能夠鏈式調用。全部函數調用基本都 是 Reactor 的 Operator ,好比 just,map,flatMap,filter 等。
  • Processor:上面從 Processor 的接口定義能夠看出,它既是一個 Subscriber,又是一個 Publisher;Processor 夾在第一個 Publisher 和最後一個 Subscriber 中間,對數據進行處理。有點相似 stream 裏的 map,filter 等方法。具體在數據流轉中, Processor 以 Subscriber 的身份訂閱 Publisher 接受數據,又以 Publisher 的方式接受其它 Subscriber 的訂閱,它從本身訂閱的 Publisher 收到數據後,作一些處理,而後轉發給訂閱它的 Subscriber。
  • back pressure:背壓。對 MQ 有了解的應該清楚,消息積壓通常是在消費端,也就是說生產端只負責生產,並不會關心消費端的消費能力,這樣就到致使 pressure 積壓在消費端,這個是正向的。從上面對 Reactor 中的一些瞭解,Subscriber 是主動向 Publisher 請求的,這樣當消費端消費的速度沒有生產者快時,這些消息仍是積壓在生產端;這種好處就是生產者能夠根據實際狀況適當的調整生產消息的速度。
  • Hot VS Cold :參考 Hot VS Cold

核心調用過程

Reactor 的核心調用過程大體能夠分爲圖中的幾個階段

  • 聲明:不管是使用 just 或者其餘什麼方式建立反應式流,這個過程均可以稱之爲聲明,由於此時這些代碼不會被實際的執行。
  • subscribe:當調用 subscribe 時,整個執行過程便進入 subscribe 階段,通過一系列的調用以後,subscribe 動做會代理給具體的 Flux 來實現。
  • onSubscribe:onSubscribe 階段指的是 Subscriber#onSubscribe 方法被依次調用的階段。這個階段會讓各 Subscriber 知道 subscribe 方法已被觸發,真正的處理流程立刻就要開始。
  • request:onSubscribe 階段是表示訂閱動做的方式,讓各 Subscriber 知悉,準備開始處理數據。當最終的 Subscriber 作好處理數據的準備以後,它便會調用 Subscription 的 request 方法請求數據。
  • onNext:經過調用 Subscriber 的 onNext 方法,進行真正的響應式的數據處理。
  • onComplete:成功的終端狀態,沒有進一步的事件將被髮送。
  • onError:錯誤的終端狀態(和 onComplete 同樣,當發生時,後面的將不會在繼續執行)。

消息處理

當須要處理 Flux 或 Mono 中的消息時,能夠經過 subscribe 方法來添加相應的訂閱邏輯。在調用 subscribe 方法時能夠指定須要處理的消息類型。能夠只處理其中包含的正常消息,也能夠同時處理錯誤消息和完成消息。

經過 subscribe() 方法處理正常和錯誤消息

Flux.just(1, 2)
    .concatWith(Mono.error(new IllegalStateException()))
    .subscribe(System.out::println, System.err::println);
複製代碼

結果:

1
2
java.lang.IllegalStateException
複製代碼

正常的消息處理相對簡單。當出現錯誤時,有多種不一樣的處理策略:

  • 經過 onErrorReturn() 方法返回一個默認值

    Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);
    複製代碼

    結果:

    1
    2
    0
    複製代碼
  • 經過 onErrorResume()方法來根據不一樣的異常類型來選擇要使用的產生元素的流

    Flux.just(1, 2)
            .concatWith(Mono.error(new IllegalArgumentException()))
            .onErrorResume(e -> {
                if (e instanceof IllegalStateException) {
                    return Mono.just(0);
                } else if (e instanceof IllegalArgumentException) {
                    return Mono.just(-1);
                }
                return Mono.empty();
                }).subscribe(System.out::println);
    複製代碼

    結果:

    1
    2
    -1
    複製代碼
  • 經過 retry 操做符來進行重試,重試的動做是經過從新訂閱序列來實現的。在使用 retry 操做符時能夠指定重試的次數。

    Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);
    複製代碼

    結果:

    1
    2
    1
    2
    Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
    Caused by: java.lang.IllegalStateException
    	at com.glmapper.bridge.boot.reactor.SimpleTest.testFluxSub(SimpleTest.java:75)
    	at com.glmapper.bridge.boot.reactor.SimpleTest.main(SimpleTest.java:23)
    複製代碼

調度器 Scheduler

在 Reactor 中,執行模式以及執行過程取決於所使用的 Scheduler,Scheduler 是一個擁有普遍實現類的抽象接口,Schedulers 類提供的靜態方法用於達成以下的執行環境:

  • 當前線程(Schedulers.immediate())
    Schedulers.immediate().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
     });
     
     // main-11
    複製代碼
  • 可重用的單線程(Schedulers.single())。注意,這個方法對全部調用者都提供同一個線程來使用, 直到該調度器(Scheduler)被廢棄。若是你想使用專注的線程,就對每個調用使用 Schedulers.newSingle()。
    Schedulers.single().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // single-1-11
    複製代碼
  • 彈性線程池(Schedulers.elastic()。它根據須要建立一個線程池,重用空閒線程。線程池若是空閒時間過長 (默認爲 60s)就會被廢棄。對於 I/O 阻塞的場景比較適用。 Schedulers.elastic() 可以方便地給一個阻塞 的任務分配它本身的線程,從而不會妨礙其餘任務和資源。
    Schedulers.elastic().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // elastic-2-11
    複製代碼
  • 固定大小線程池(Schedulers.parallel())。所建立線程池的大小與 CPU 個數等同
    Schedulers.parallel().schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // parallel-1-11
    複製代碼
  • 基於現有的 ExecutorService 建立 Scheduler
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    Schedulers.fromExecutorService(executorService).schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
            
    // pool-4-thread-1-11
    複製代碼
  • 基於 newXXX 方法來建立調度器
    Schedulers.newElastic("test-elastic").schedule(()->{
        System.out.println(Thread.currentThread().getName()+"-"+11);
    });
    
    // test-elastic-4-11
    複製代碼

一些操做符默認會使用一個指定的調度器(一般也容許開發者調整爲其餘調度器)例如, 經過工廠方法 Flux.interval(Duration.ofMillis(100)) 生成的每 100ms 打點一次的 Flux, 默認狀況下使用的是 Schedulers.parallel(),下邊的代碼演示瞭如何將其裝換爲 Schedulers.single()

Flux<String> intervalResult = Flux.interval(Duration.ofMillis(100),
        Schedulers.newSingle("test"))
        .map(i -> Thread.currentThread().getName() +"@"+i);
        intervalResult.subscribe(System.out::println);
複製代碼

結果:

test-1@0
test-1@1
test-1@2
test-1@3
test-1@4
// 省略
複製代碼

publishOn 和 subscribeOn

Reactor 提供了兩種在響應式鏈中調整調度器 Scheduler 的方法:publishOn 和 subscribeOn。 它們都接受一個 Scheduler 做爲參數,從而能夠改變調度器。可是 publishOn 在鏈中出現的位置是有講究的,而 subscribeOn 則無所謂。

  • publishOn 的用法和處於訂閱鏈(subscriber chain)中的其餘操做符同樣。它將上游 信號傳給下游,同時執行指定的調度器 Scheduler 的某個工做線程上的回調。 它會 改變後續的操做符的執行所在線程 (直到下一個 publishOn 出如今這個鏈上)
  • subscribeOn 用於訂閱(subscription)過程,做用於那個向上的訂閱鏈(發佈者在被訂閱 時才激活,訂閱的傳遞方向是向上遊的)。因此,不管你把 subscribeOn 至於操做鏈的什麼位置, 它都會影響到源頭的線程執行環境(context)。 可是,它不會影響到後續的 publishOn,後者仍可以切換其後操做符的線程執行環境。
Flux.create(sink -> {
        sink.next(Thread.currentThread().getName());
        sink.complete();
    })
    .publishOn(Schedulers.single())
    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
    .publishOn(Schedulers.elastic())
    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
    .subscribeOn(Schedulers.parallel())
    .toStream()
    .forEach(System.out::println);
複製代碼

結果:

[elastic-2] [single-1] parallel-1
複製代碼

上面這段代碼使用 create() 方法建立一個新的 Flux 對象,其中包含惟一的元素是當前線程的名稱。

接着是兩對 publishOn() 和 map()方法,其做用是先切換執行時的調度器,再把當前的線程名稱做爲前綴添加。

最後經過 subscribeOn()方法來改變流產生時的執行方式。

最內層的線程名字 parallel-1 來自產生流中元素時使用的 Schedulers.parallel()調度器,中間的線程名稱 single-1 來自第一個 map 操做以前的 Schedulers.single() 調度器,最外層的線程名字 elastic-2 來自第二個 map 操做以前的 Schedulers.elastic()調度器。

先到這裏,剩下的想到再補充...

參考

相關文章
相關標籤/搜索