Spring船新版推出的WebFlux,是兄弟就來學我

初識SpringWebFlux

Spring WebFlux是Spring Framework 5.0中引入的新的響應式Web框架。 與Spring MVC不一樣,它不須要Servlet API,徹底異步和非阻塞, 並經過Reactor項目實現Reactive Streams規範,因此性能更高。 而且能夠在諸如Netty,Undertow和Servlet 3.1+容器的服務器上運行。javascript

Spring WebFlux特性:html

  1. 異步非阻塞:前端

    衆所周知,SpringMVC是同步阻塞的IO模型,資源浪費相對來講比較嚴重,當咱們在處理一個比較耗時的任務時,例如:上傳一個比較大的文件,首先,服務器的線程一直在等待接收文件,在這期間它就像個傻子同樣等在那,什麼都幹不了,好不容易等到文件來了而且接收完畢,咱們又要將文件寫入磁盤,在這寫入的過程當中,這根線程又再次懵bi了,又要等到文件寫完才能去幹其它的事情。這一前一後的等待,不浪費資源麼?java

    沒錯,Spring WebFlux就是來解決這問題的,Spring WebFlux能夠作到異步非阻塞。仍是上面那上傳文件的例子,Spring WebFlux是這樣作的:線程發現文件還沒準備好,就先去作其它事情,當文件準備好以後,通知這根線程來處理,當接收完畢寫入磁盤的時候(根據具體狀況選擇是否作異步非阻塞),寫入完畢後通知這根線程再來處理(異步非阻塞狀況下)。相對SpringMVC而言,能夠節省系統資源以及支持更高的併發量。react

  2. 響應式(reactive)函數編程:git

    Spring WebFlux支持函數式編程,得益於對於reactive-stream的支持(經過reactor框架來實現的)github

  3. 再也不拘束於Servlet容器:

    之前,咱們的應用都運行於Servlet容器之中,例如咱們你們最爲熟悉的Tomcat, Jetty...等等。而如今Spring WebFlux不只能運行於傳統的Servlet容器中(前提是容器要支持Servlet3.1,由於非阻塞IO是使用了Servlet3.1的特性),還能運行在支持NIO的Netty和Undertow中。web

Spring WebFlux與Spring MVC的對比圖:
Spring船新版推出的WebFlux,是兄弟就來學我spring

Spring WebFlux支持兩種編程方式:
Spring船新版推出的WebFlux,是兄弟就來學我編程


異步servlet

在學習webflux以前,咱們首先要學習一下異步的servlet。咱們須要瞭解同步servlet阻塞了什麼?爲何須要異步servlet?異步servlet能支持高吞吐量的原理是什麼?

servlet容器(如tomcat)裏面,每處理一個請求會佔用一個線程,同步servlet裏面,業務代碼處理多久,servlet容器的線程就會等(阻塞)多久,而servlet容器的線程是由上限的,當請求多了的時候servlet容器線程就會所有用完,就沒法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!

而異步serlvet裏面,servlet容器的線程不會傻等業務代碼處理完畢,而是直接返回(繼續處理其餘請求),給業務代碼一個回調函數(asyncContext.complete()),業務代碼處理完了再通知我!這樣就可使用少許的線程處理更加高的請求,從而實現高吞吐量!

咱們來看一個同步Servlet的示例代碼:

package org.example.servlet;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @program: servlet-demo
 * @description: 同步的Servlet Demo
 * @author: 01
 * @create: 2018-10-04 17:02
 **/
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        long timeMillis = System.currentTimeMillis();

        // 執行業務代碼
        doSometing(req, resp);

        System.out.println("sync use: " + (System.currentTimeMillis() - timeMillis));
    }

    private void doSometing(HttpServletRequest req, HttpServletResponse resp) throws IOException {
        // 模擬耗時操做
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        resp.getWriter().append("done");
    }
}

運行結果以下:

sync use: 5000

從運行結果能夠看到,業務代碼花了5 秒,但servlet容器的線程幾乎沒有任何耗時。而若是是同步servlet的,線程就會傻等5秒,這5秒內這個線程只處理了這一個請求。

而後咱們來看一下異步Servlet的示例代碼:

package org.example.servlet;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @program: servlet-demo
 * @description: 異步的Servlet Demo
 * @author: 01
 * @create: 2018-10-04 17:16
 **/
@WebServlet(asyncSupported = true, urlPatterns = "/AsyncServlet")
public class AsyncServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        long timeMillis = System.currentTimeMillis();

        // 1.開啓異步上下文
        AsyncContext asyncContext = req.startAsync();

        // 2.異步執行業務代碼,放到另外一個線程去處理
        CompletableFuture.runAsync(() -> doSometing(asyncContext, asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use: " + (System.currentTimeMillis() - timeMillis));
    }

    private void doSometing(AsyncContext asyncContext, ServletRequest req, ServletResponse resp) {
        // 模擬耗時操做
        try {
            TimeUnit.SECONDS.sleep(5);
            resp.getWriter().append("done");
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }

        // 3.業務代碼處理完畢,通知請求結束
        asyncContext.complete();
    }
}

運行結果以下:

async use: 8

能夠看到,異步的Servlet不會阻塞Tomcat的線程,異步Servlet能夠把耗時的操做交給另外一個線程去處理,從而使得Tomcat的線程可以繼續接收下一個請求。這就是異步Servlet的工做方式,得益於非阻塞的特性,可以大大提升服務器的吞吐量。


Webflux開發

瞭解了同步的Servlet和異步Servlet之間的區別以及異步Servlet的工做方式以後,咱們就能夠開始嘗試使用一下Spring的webflux了。

建立一個Spring Boot工程,選擇以下依賴:
Spring船新版推出的WebFlux,是兄弟就來學我

關於reactor:

spring webflux是基於reactor來實現響應式的。那麼reactor是什麼呢?我是這樣理解的 reactor = jdk8的stream + jdk9的flow響應式流。理解了這句話,reactor就很容易掌握。reactor裏面Flux和Mono就是stream,它的最終操做就是 subscribe/block 2種。

Reactor中的Mono和Flux:

Flux 和 Mono 是 Reactor 中的兩個基本概念。Flux 表示的是包含 0 到 N 個元素的異步序列。 在該序列中能夠包含三種不一樣類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。 當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。Mono 表示的是包含 0 或者 1 個元素的異步序列。 該序列中一樣能夠包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間能夠進行轉換。 對一個 Flux 序列進行計數操做,獲得的結果是一個 Mono對象。把兩個 Mono 序列合併在一塊兒,獲得的是一個 Flux 對象。瞭解更多>>

咱們來看一段代碼,理解一下reactor的概念:

package org.example.spring.webflux;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

/**
 * @program: webflux
 * @description: Reactor Demo
 * @author: 01
 * @create: 2018-10-04 17:58
 **/
public class ReactorDemo {

    public static void main(String[] args) {
        // Mono 0-1個元素
        // Flux 0-N 個元素
        String[] strings = {"1", "2", "3"};

        // 定義訂閱者
        Subscriber<Integer> subscriber = new Subscriber<>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關係, 須要用它來給發佈者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 咱們能夠告訴發佈者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 所有數據處理完了(發佈者關閉了)
                System.out.println("處理完了!");
            }

        };

        // reactor = jdk8 stream + jdk9 reactive stream
        // 這裏就是jdk8的stream
        Flux.fromArray(strings).map(Integer::parseInt)
                // 最終操做,這裏就是jdk9的reactive stream
                .subscribe(subscriber);
    }
}

在以上例子中,咱們能夠像JDK9那樣實現訂閱者,而且直接就能夠用在reactor的subscribe方法上。調用了subscribe方法就至關於調用了stream的最終操做。有了 reactor = jdk8 stream + jdk9 reactive stream 概念後,在掌握了jdk8的stream和jkd9的flow以後,reactor也不難掌握。

若是對 jdk8 stream 和 jdk9 reactive stream不瞭解的話,能夠參考我另外兩篇文章:

瞭解了reactor的概念後,咱們來編寫一段測試代碼,對比一下webflux的兩種開發方式:

package org.example.spring.webflux.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;

/**
 * @program: webflux
 * @description: webflux demo
 * @author: 01
 * @create: 2018-10-04 17:47
 **/
@Slf4j
@RestController
public class TestController {

    /**
     * 傳統的 spring mvc 開發方式
     */
    @GetMapping("/mvc")
    public String mvc() {
        long timeMillis = System.currentTimeMillis();
        log.info("mvc() start");
        String result = createStr();
        log.info("mvc() end use time {}/ms", System.currentTimeMillis() - timeMillis);

        return result;
    }

    /**
     * spring webflux 的開發方式
     */
    @GetMapping("/webflux")
    public Mono<String> webflux() {
        long timeMillis = System.currentTimeMillis();
        log.info("webflux() start");
        Mono<String> result = Mono.fromSupplier(this::createStr);
        log.info("webflux() end use time {}/ms", System.currentTimeMillis() - timeMillis);

        return result;
    }

    private String createStr() {
        // 模擬耗時操做
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "some string";
    }
}

訪問/mvc,控制檯輸出日誌以下:
Spring船新版推出的WebFlux,是兄弟就來學我

訪問/webflux,控制檯輸出日誌以下:
Spring船新版推出的WebFlux,是兄弟就來學我

以上的例子中,只演示了reactor 裏的mono操做,返回了0-1個元素。如下示例則簡單演示了flux操做,返回0-N個元素,代碼以下:

/**
 * 使用flux,像流同樣返回0-N個元素
 */
@GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> flux() {
    long timeMillis = System.currentTimeMillis();
    log.info("webflux() start");
    Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "flux data--" + i;
    }));
    log.info("webflux() end use time {}/ms", System.currentTimeMillis() - timeMillis);

    return result;
}

訪問/flux接口後,控制檯輸出日誌以下:
Spring船新版推出的WebFlux,是兄弟就來學我

在瀏覽器上會每隔一秒接收一行數據:
Spring船新版推出的WebFlux,是兄弟就來學我


SSE(Server-Sent Events)

在上一小節的例子中咱們使用flux返回數據時,能夠屢次返回數據(其實和響應式沒有關係),實際上使用的技術就是H5的SSE。咱們學習一個技術,API的使用只是最初級也是最簡單的,更加劇要的是須要知其然並知其因此然,不然就只能死記硬背不用就忘!咱們不知足在spring裏面能實現sse效果,更加須要知道spring是如何作到的。

其實SSE很簡單,咱們花一點點時間就能夠掌握,咱們在純servlet環境裏面實現。以下示例:

package org.example.servlet;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @program: servlet-demo
 * @description: SSE Demo
 * @author: 01
 * @create: 2018-10-04 19:37
 **/
@WebServlet("/ServerSentEventsServlet")
public class ServerSentEventsServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        // 設置返回的數據類型及字符編碼
        resp.setContentType("text/event-stream");
        resp.setCharacterEncoding("UTF-8");

        for (int i = 0; i < 5; i++) {
            // 自定義事件標識(非必須)
            resp.getWriter().write("event:me\n");

            // 需特定格式:data: + 數據 + 2個回車符
            resp.getWriter().write("data:" + i + "\n\n");
            resp.getWriter().flush();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

其中最爲關鍵的是 ContentType 需爲 "text/event-stream",而後返回的數據符合固定的要求格式便可。

使用瀏覽器訪問以下:
Spring船新版推出的WebFlux,是兄弟就來學我

若是前端須要進行一些處理的話,咱們也能夠編寫js代碼來獲取數據,以下示例:

<script type="text/javascript">
    // 初始化sse,參數爲url
    var sse = new EventSource("ServerSentEventsServlet");

    // 無自定義事件標識時,經過onmessage事件獲取返回的數據
    sse.onmessage = function (evt) {
        console.log("message", evt.data, evt)
    };

    // 如有自定義的事件標識時,經過添加事件監聽獲取返回的數據
    sse.addEventListener("me", function (evt) {
        console.log("message", evt.data);
        if (evt.data === 3) {
            // 關閉sse
            sse.close()
        }
    });
</script>

因爲篇幅所限,文中只結合了部分示例介紹了主要的理論知識,因此我另外使用webflux開發了CRUD完整示例demo(非RouterFunction模式),GitHub地址以下:

https://github.com/Binary-ZeroOne/webflux-demo

RouterFunction模式的CRUD完整示例demo,GitHub地址以下:

https://github.com/Binary-ZeroOne/webflux-routerfunction-demo

相關文章
相關標籤/搜索