外行人都能看懂的WebFlux,錯過了血虧!

前言

只有光頭才能變強。html

文本已收錄至個人GitHub倉庫,歡迎Star:https://github.com/ZhongFuCheng3y/3y前端

本文知識點架構:java

知識點架構

知識點架構react

若是有關注我公衆號文章的同窗就會發現,最近我不定時轉發了一些比較好的WebFlux的文章,由於我最近在學。git

我以前也說過,學習一項技術以前,先要了解爲何要學這項技術。其實此次學習WebFlux也沒有多大的原生動力,主要是在咱們組內會輪流作一次技術分享,而我又不知道分享什麼比較好…程序員

以前在初學大數據相關的知識,可是這一塊的時間線會拉得比較長,感受趕不及小組內分享(而組內的同窗又大部分都懂大數據,就只有我一個菜雞,淚目)。因此,想的是:「要不我學點新東西搞搞?」。因而就花了點時間學WebFlux啦~github

這篇文章主要講解什麼是WebFlux,帶領你們入個門,但願對你們有所幫助(至少看完這篇文章,知道WebFlux是幹嗎用的)web

1、什麼是WebFlux?

咱們從Spring的官網拉下一點點就能夠看到介紹WebFlux的地方了spring

WebFlux的簡介

WebFlux的簡介編程

從官網的簡介中咱們能得出什麼樣的信息?

  • 咱們程序員每每根據不一樣的應用場景選擇不一樣的技術,有的場景適合用於同步阻塞的,有的場景適合用於異步非阻塞的。而Spring5提供了一整套響應式(非阻塞)的技術棧供咱們使用(包括Web控制器、權限控制、數據訪問層等等)。
  • 而左側的圖則是技術棧的對比啦;
  • 響應式通常用Netty或者Servlet 3.1的容器(由於支持異步非阻塞),而Servlet技術棧用的是Servlet容器
  • 在Web端,響應式用的是WebFlux,Servlet用的是SpringMVC
  • …..

總結起來,WebFlux只是響應式編程中的一部分(在Web控制端),因此通常咱們用它與SpringMVC來對比。

2、如何理解響應式編程?

在上面提到了響應式編程(Reactive Programming),而WebFlux只是響應式編程的其中一個技術棧而已,因此咱們先來探討一下什麼是響應式編程

從維基百科裏邊咱們獲得的定義:

reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change

響應式編程(reactive programming)是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程範式

在維基百科上也舉了個小例子:

例子

例子

意思大概以下:

  • 在命令式編程(咱們的平常編程模式)下,式子a=b+c,這就意味着a的值是由bc計算出來的。若是b或者c後續有變化,不會影響a的值
  • 在響應式編程下,式子a:=b+c,這就意味着a的值是由bc計算出來的。但若是b或者c的值後續有變化,會影響a的值

我認爲上面的例子已經能夠幫助咱們理解變化傳遞(propagation of change)

那數據流(data stream)和聲明式(declarative)怎麼理解呢?那能夠提一提咱們的Stream流了。以前寫過Lambda表達式和Stream流的文章,你們能夠先去看看:

Lambda的語法是這樣的(Stream流的使用會涉及到不少Lambda表達式的東西,因此通常先學Lambda再學Stream流):

語法

語法

Stream流的使用分爲三個步驟(建立Stream流、執行中間操做、執行最終操做):

三步走

三步走

執行中間操做實際上就是給咱們提供了不少的API去操做Stream流中的數據(求和/去重/過濾)等等

中間操做 解釋

中間操做 解釋

說了這麼多,怎麼理解數據流和聲明式呢?實際上是這樣的:

  • 原本數據是咱們自行處理的,後來咱們把要處理的數據抽象出來(變成了數據流),而後經過API去處理數據流中的數據(是聲明式的)

好比下面的代碼;將數組中的數據變成數據流,經過顯式聲明調用.sum()來處理數據流中的數據,獲得最終的結果:

public static void main(String[] args) {
    int[] nums = { 1, 2, 3 };
    int sum2 = IntStream.of(nums).parallel().sum();
    System.out.println("結果爲:" + sum2);
}

如圖下所示:

數據流與聲明式

數據流與聲明式

2.1 響應式編程->異步非阻塞

上面講了響應式編程是什麼:

響應式編程(reactive programming)是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程範式

也講解了數據流/變化傳遞/聲明式是什麼意思,但說到響應式編程就離不開異步非阻塞

從Spring官網介紹WebFlux的信息咱們就能夠發現asynchronous, nonblocking 這樣的字樣,由於響應式編程它是異步的,也能夠理解成變化傳遞它是異步執行的。

以下圖,合計的金額會受其餘的金額影響(更新的過程是異步的):

合計的錢會由於其餘的金額影響

合計的錢會由於其餘的金額影響

咱們的JDK8 Stream流是同步的,它就不適合用於響應式編程(但基礎的用法是須要懂的,由於響應式流編程都是操做嘛)

而在JDK9 已經支持響應式流了,下面咱們來看一下

3、JDK9 Reactive

響應式流的規範早已經被提出了:裏面提到了:

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure ----->http://www.reactive-streams.org/

翻譯再加點信息:

響應式流(Reactive Streams)經過定義一組實體,接口和互操做方法,給出了實現異步非阻塞背壓的標準。第三方遵循這個標準來實現具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。

規範裏頭實際上就是定義了四個接口:

規範的四個接口

規範的四個接口

Java 平臺直到 JDK 9才提供了對於Reactive的完整支持,JDK9也定義了上述提到的四個接口,在java.util.concurrent包上

Java的響應式流接口

Java的響應式流接口

一個通用的流處理架構通常會是這樣的(生產者產生數據,對數據進行中間處理,消費者拿到數據消費):

流式處理架構

流式處理架構

  • 數據來源,通常稱爲生產者(Producer)
  • 數據的目的地,通常稱爲消費者(Consumer)
  • 在處理時,對數據執行某些操做一個或多個處理階段。(Processor)

到這裏咱們再看回響應式流的接口,咱們應該就能懂了:

  • Publisher(發佈者)至關於生產者(Producer)
  • Subscriber(訂閱者)至關於消費者(Consumer)
  • Processor就是在發佈者與訂閱者之間處理數據用的

在響應式流上提到了back pressure(背壓)這麼一個概念,其實很是好理解。在響應式流實現異步非阻塞是基於生產者和消費者模式的,而生產者消費者很容易出現的一個問題就是:生產者生產數據多了,就把消費者給壓垮了

而背壓說白了就是:消費者能告訴生產者本身須要多少許的數據。這裏就是Subscription接口所作的事。

下面咱們來看看JDK9接口的方法,或許就更加能理解上面所說的話了:

// 發佈者(生產者)
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
// 訂閱者(消費者)
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
// 用於發佈者與訂閱者之間的通訊(實現背壓:訂閱者可以告訴生產者須要多少數據)
public interface Subscription {
    public void request(long n);
    public void cancel();
}
// 用於處理髮布者 發佈消息後,對消息進行處理,再交由消費者消費
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

3.1 看個例子

代碼中有大量的註釋,我就很少BB了,建議直接複製跑一下看看:

class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存訂閱關係, 須要用它來給發佈者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個數據, 處理
        System.out.println("處理器接受到數據: " + item);

        // 過濾掉小於0的, 而後發佈出去
        if (item > 0) {
            this.submit("轉換後的數據:" + item);
        }

        // 處理完調用request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現了異常(例如處理數據的時候產生了異常)
        throwable.printStackTrace();

        // 咱們能夠告訴發佈者, 後面不接受數據了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 所有數據處理完了(發佈者關閉了)
        System.out.println("處理器處理完了!");
        // 關閉發佈者
        this.close();
    }

}

public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定義發佈者, 發佈的數據類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();

        // 2. 定義處理器, 對數據進行過濾, 並轉換爲String類型
        MyProcessor processor = new MyProcessor();

        // 3. 發佈者 和 處理器 創建訂閱關係
        publiser.subscribe(processor);

        // 4. 定義最終訂閱者, 消費 String 類型數據
        Subscriber<String> subscriber = new Subscriber<String>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關係, 須要用它來給發佈者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 咱們能夠告訴發佈者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 所有數據處理完了(發佈者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 5. 處理器 和 最終訂閱者 創建訂閱關係
        processor.subscribe(subscriber);

        // 6. 生產數據, 併發布
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 結束後 關閉發佈者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主線程延遲中止, 不然數據沒有消費就退出
        Thread.currentThread().join(1000);
    }

}

輸出的結果以下:

輸出的結果

輸出的結果

流程實際上很是簡單的:

流程

流程

參考資料:

Java 8 的 Stream 主要關注在流的過濾,映射,合併,而 Reactive Stream 更進一層,側重的是流的產生與消費,即流在生產與消費者之間的協調

說白了就是:響應式流是異步非阻塞+流量控制的(能夠告訴生產者本身須要多少的量/取消訂閱關係)

展望響應式編程的場景應用:

好比一個日誌監控系統,咱們的前端頁面將再也不須要經過「命令式」的輪詢的方式不斷向服務器請求數據而後進行更新,而是在創建好通道以後,數據流從系統源源不斷流向頁面,從而展示實時的指標變化曲線;

再好比一個社交平臺,朋友的動態、點贊和留言不是手動刷出來的,而是當後臺數據變化的時候自動體現到界面上的。

4、入門WebFlux

扯了一大堆,終於回到WebFlux了。通過上面的基礎,咱們如今已經可以得出一些結論的了:

  • WebFlux是Spring推出響應式編程的一部分(web端)
  • 響應式編程是異步非阻塞的(是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程範式)

咱們再回來看官網的圖:

mvc or webflux

mvc or webflux

4.1 簡單體驗WebFlux

Spring官方爲了讓咱們更加快速/平滑到WebFlux上,以前SpringMVC那套都是支持的。也就是說:咱們能夠像使用SpringMVC同樣使用着WebFlux

支持SpringMVC那套

支持SpringMVC那套

WebFlux使用的響應式流並非用JDK9平臺的,而是一個叫作Reactor響應式流庫。因此,入門WebFlux其實更可能是瞭解怎麼使用Reactor的API,下面咱們來看看~

Reactor是一個響應式流,它也有對應的發佈者(Publisher ),Reactor的發佈者用兩個類來表示:

  • Mono(返回0或1個元素)
  • Flux(返回0-n個元素)

而訂閱者則是Spring框架去完成

下面咱們來看一個簡單的例子(基於WebFlux環境構建):

// 阻塞5秒鐘
private String createStr() {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
    }
    return "some string";
}

// 普通的SpringMVC方法
@GetMapping("/1")
private String get1() {
    log.info("get1 start");
    String result = createStr();
    log.info("get1 end.");
    return result;
}

// WebFlux(返回的是Mono)
@GetMapping("/2")
private Mono<String> get2() {
    log.info("get2 start");
    Mono<String> result = Mono.fromSupplier(() -> createStr());
    log.info("get2 end.");
    return result;
}

首先,值得說明的是,咱們構建WebFlux環境啓動時,應用服務器默認是Netty的:

基於Netty

基於Netty

咱們分別來訪問一下SpringMVC的接口和WebFlux的接口,看一下有什麼區別:

SpringMVC:

SpringMVC

SpringMVC

WebFlux:

WebFlux

WebFlux

從調用者(瀏覽器)的角度而言,是感知不到有什麼變化的,由於都是得等待5s才返回數據。可是,從服務端的日誌咱們能夠看出,WebFlux是直接返回Mono對象的(而不是像SpringMVC一直同步阻塞5s,線程才返回)。

這正是WebFlux的好處:可以以固定的線程來處理高併發(充分發揮機器的性能)。

WebFlux還支持服務器推送(SSE - >Server Send Event),咱們來看個例子:

/**
     * Flux : 返回0-n個元素
     * 注:須要指定MediaType
     * @return
     */
@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
private Flux<String> flux() {
    Flux<String> result = Flux
        .fromStream(IntStream.range(1, 5).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
            return "flux data--" + i;
        }));
    return result;
}

效果就是每秒會給瀏覽器推送數據:

服務器推送

服務器推送

很是感謝人才們能看到這裏,若是這個文章寫得還不錯,以爲「三歪」我有點東西的話 求點贊 求關注️ 求分享👥 求留言💬 對暖男我來講真的 很是有用!!!

WebFlux我還沒寫完,這篇寫了WebFlux支持SpringMVC那套註解來開發,下篇寫寫如何使用WebFlux另外一種模式(Functional Endpoints)來開發以及一些常見的問題還須要補充一下~

本已收錄至個人GitHub精選文章,歡迎Starhttps://github.com/ZhongFuCheng3y/3y

樂於輸出乾貨的Java技術公衆號:Java3y。公衆號內有300多篇原創技術文章、海量視頻資源、精美腦圖,關注便可獲取!

轉發到朋友圈是對我最大的支持!

轉發到朋友圈是對我最大的支持!

創做不易,各位的支持和承認,就是我創做的最大動力,咱們下篇文章見! 求點贊 求關注️ 求分享👥 求留言💬

相關文章
相關標籤/搜索