Spring WebFlux是Spring Framework 5.0中引入的新的響應式Web框架。 與Spring MVC不一樣,它不須要Servlet API,徹底異步和非阻塞, 並經過Reactor項目實現Reactive Streams規範,因此性能更高。 而且能夠在諸如Netty,Undertow和Servlet 3.1+容器的服務器上運行。javascript
Spring WebFlux特性:html
異步非阻塞:前端
衆所周知,SpringMVC是同步阻塞的IO模型,資源浪費相對來講比較嚴重,當咱們在處理一個比較耗時的任務時,例如:上傳一個比較大的文件,首先,服務器的線程一直在等待接收文件,在這期間它就像個傻子同樣等在那,什麼都幹不了,好不容易等到文件來了而且接收完畢,咱們又要將文件寫入磁盤,在這寫入的過程當中,這根線程又再次懵bi了,又要等到文件寫完才能去幹其它的事情。這一前一後的等待,不浪費資源麼?java
沒錯,Spring WebFlux就是來解決這問題的,Spring WebFlux能夠作到異步非阻塞。仍是上面那上傳文件的例子,Spring WebFlux是這樣作的:線程發現文件還沒準備好,就先去作其它事情,當文件準備好以後,通知這根線程來處理,當接收完畢寫入磁盤的時候(根據具體狀況選擇是否作異步非阻塞),寫入完畢後通知這根線程再來處理(異步非阻塞狀況下)。相對SpringMVC而言,能夠節省系統資源以及支持更高的併發量。react
響應式(reactive)函數編程:git
Spring WebFlux支持函數式編程,得益於對於reactive-stream的支持(經過reactor框架來實現的)github
之前,咱們的應用都運行於Servlet容器之中,例如咱們你們最爲熟悉的Tomcat, Jetty...等等。而如今Spring WebFlux不只能運行於傳統的Servlet容器中(前提是容器要支持Servlet3.1,由於非阻塞IO是使用了Servlet3.1的特性),還能運行在支持NIO的Netty和Undertow中。web
Spring WebFlux與Spring MVC的對比圖:spring
Spring WebFlux支持兩種編程方式:編程
在學習webflux以前,咱們首先要學習一下異步的servlet。咱們須要瞭解同步servlet阻塞了什麼?爲何須要異步servlet?異步servlet能支持高吞吐量的原理是什麼?
servlet容器(如tomcat)裏面,每處理一個請求會佔用一個線程,同步servlet裏面,業務代碼處理多久,servlet容器的線程就會等(阻塞)多久,而servlet容器的線程是由上限的,當請求多了的時候servlet容器線程就會所有用完,就沒法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!
而異步serlvet裏面,servlet容器的線程不會傻等業務代碼處理完畢,而是直接返回(繼續處理其餘請求),給業務代碼一個回調函數(asyncContext.complete()),業務代碼處理完了再通知我!這樣就可使用少許的線程處理更加高的請求,從而實現高吞吐量!
咱們來看一個同步Servlet的示例代碼:
package org.example.servlet; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * @program: servlet-demo * @description: 同步的Servlet Demo * @author: 01 * @create: 2018-10-04 17:02 **/ @WebServlet("/SyncServlet") public class SyncServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { long timeMillis = System.currentTimeMillis(); // 執行業務代碼 doSometing(req, resp); System.out.println("sync use: " + (System.currentTimeMillis() - timeMillis)); } private void doSometing(HttpServletRequest req, HttpServletResponse resp) throws IOException { // 模擬耗時操做 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } resp.getWriter().append("done"); } }
運行結果以下:
sync use: 5000
從運行結果能夠看到,業務代碼花了5 秒,但servlet容器的線程幾乎沒有任何耗時。而若是是同步servlet的,線程就會傻等5秒,這5秒內這個線程只處理了這一個請求。
而後咱們來看一下異步Servlet的示例代碼:
package org.example.servlet; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** * @program: servlet-demo * @description: 異步的Servlet Demo * @author: 01 * @create: 2018-10-04 17:16 **/ @WebServlet(asyncSupported = true, urlPatterns = "/AsyncServlet") public class AsyncServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { long timeMillis = System.currentTimeMillis(); // 1.開啓異步上下文 AsyncContext asyncContext = req.startAsync(); // 2.異步執行業務代碼,放到另外一個線程去處理 CompletableFuture.runAsync(() -> doSometing(asyncContext, asyncContext.getRequest(), asyncContext.getResponse())); System.out.println("async use: " + (System.currentTimeMillis() - timeMillis)); } private void doSometing(AsyncContext asyncContext, ServletRequest req, ServletResponse resp) { // 模擬耗時操做 try { TimeUnit.SECONDS.sleep(5); resp.getWriter().append("done"); } catch (InterruptedException | IOException e) { e.printStackTrace(); } // 3.業務代碼處理完畢,通知請求結束 asyncContext.complete(); } }
運行結果以下:
async use: 8
能夠看到,異步的Servlet不會阻塞Tomcat的線程,異步Servlet能夠把耗時的操做交給另外一個線程去處理,從而使得Tomcat的線程可以繼續接收下一個請求。這就是異步Servlet的工做方式,得益於非阻塞的特性,可以大大提升服務器的吞吐量。
瞭解了同步的Servlet和異步Servlet之間的區別以及異步Servlet的工做方式以後,咱們就能夠開始嘗試使用一下Spring的webflux了。
建立一個Spring Boot工程,選擇以下依賴:
關於reactor:
spring webflux是基於reactor來實現響應式的。那麼reactor是什麼呢?我是這樣理解的 reactor = jdk8的stream + jdk9的flow響應式流。理解了這句話,reactor就很容易掌握。reactor裏面Flux和Mono就是stream,它的最終操做就是 subscribe/block 2種。
Reactor中的Mono和Flux:
Flux 和 Mono 是 Reactor 中的兩個基本概念。Flux 表示的是包含 0 到 N 個元素的異步序列。 在該序列中能夠包含三種不一樣類型的消息通知:正常的包含元素的消息、序列結束的消息和序列出錯的消息。 當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。Mono 表示的是包含 0 或者 1 個元素的異步序列。 該序列中一樣能夠包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間能夠進行轉換。 對一個 Flux 序列進行計數操做,獲得的結果是一個 Mono對象。把兩個 Mono 序列合併在一塊兒,獲得的是一個 Flux 對象。瞭解更多>>
咱們來看一段代碼,理解一下reactor的概念:
package org.example.spring.webflux; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; /** * @program: webflux * @description: Reactor Demo * @author: 01 * @create: 2018-10-04 17:58 **/ public class ReactorDemo { public static void main(String[] args) { // Mono 0-1個元素 // Flux 0-N 個元素 String[] strings = {"1", "2", "3"}; // 定義訂閱者 Subscriber<Integer> subscriber = new Subscriber<>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理完了!"); } }; // reactor = jdk8 stream + jdk9 reactive stream // 這裏就是jdk8的stream Flux.fromArray(strings).map(Integer::parseInt) // 最終操做,這裏就是jdk9的reactive stream .subscribe(subscriber); } }
在以上例子中,咱們能夠像JDK9那樣實現訂閱者,而且直接就能夠用在reactor的subscribe方法上。調用了subscribe方法就至關於調用了stream的最終操做。有了 reactor = jdk8 stream + jdk9 reactive stream 概念後,在掌握了jdk8的stream和jkd9的flow以後,reactor也不難掌握。
若是對 jdk8 stream 和 jdk9 reactive stream不瞭解的話,能夠參考我另外兩篇文章:
瞭解了reactor的概念後,咱們來編寫一段測試代碼,對比一下webflux的兩種開發方式:
package org.example.spring.webflux.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; /** * @program: webflux * @description: webflux demo * @author: 01 * @create: 2018-10-04 17:47 **/ @Slf4j @RestController public class TestController { /** * 傳統的 spring mvc 開發方式 */ @GetMapping("/mvc") public String mvc() { long timeMillis = System.currentTimeMillis(); log.info("mvc() start"); String result = createStr(); log.info("mvc() end use time {}/ms", System.currentTimeMillis() - timeMillis); return result; } /** * spring webflux 的開發方式 */ @GetMapping("/webflux") public Mono<String> webflux() { long timeMillis = System.currentTimeMillis(); log.info("webflux() start"); Mono<String> result = Mono.fromSupplier(this::createStr); log.info("webflux() end use time {}/ms", System.currentTimeMillis() - timeMillis); return result; } private String createStr() { // 模擬耗時操做 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return "some string"; } }
訪問/mvc
,控制檯輸出日誌以下:
訪問/webflux
,控制檯輸出日誌以下:
以上的例子中,只演示了reactor 裏的mono操做,返回了0-1個元素。如下示例則簡單演示了flux操做,返回0-N個元素,代碼以下:
/** * 使用flux,像流同樣返回0-N個元素 */ @GetMapping(value = "/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> flux() { long timeMillis = System.currentTimeMillis(); log.info("webflux() start"); Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "flux data--" + i; })); log.info("webflux() end use time {}/ms", System.currentTimeMillis() - timeMillis); return result; }
訪問/flux
接口後,控制檯輸出日誌以下:
在瀏覽器上會每隔一秒接收一行數據:
在上一小節的例子中咱們使用flux返回數據時,能夠屢次返回數據(其實和響應式沒有關係),實際上使用的技術就是H5的SSE。咱們學習一個技術,API的使用只是最初級也是最簡單的,更加劇要的是須要知其然並知其因此然,不然就只能死記硬背不用就忘!咱們不知足在spring裏面能實現sse效果,更加須要知道spring是如何作到的。
其實SSE很簡單,咱們花一點點時間就能夠掌握,咱們在純servlet環境裏面實現。以下示例:
package org.example.servlet; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * @program: servlet-demo * @description: SSE Demo * @author: 01 * @create: 2018-10-04 19:37 **/ @WebServlet("/ServerSentEventsServlet") public class ServerSentEventsServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { // 設置返回的數據類型及字符編碼 resp.setContentType("text/event-stream"); resp.setCharacterEncoding("UTF-8"); for (int i = 0; i < 5; i++) { // 自定義事件標識(非必須) resp.getWriter().write("event:me\n"); // 需特定格式:data: + 數據 + 2個回車符 resp.getWriter().write("data:" + i + "\n\n"); resp.getWriter().flush(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }
其中最爲關鍵的是 ContentType 需爲 "text/event-stream",而後返回的數據符合固定的要求格式便可。
使用瀏覽器訪問以下:
若是前端須要進行一些處理的話,咱們也能夠編寫js代碼來獲取數據,以下示例:
<script type="text/javascript"> // 初始化sse,參數爲url var sse = new EventSource("ServerSentEventsServlet"); // 無自定義事件標識時,經過onmessage事件獲取返回的數據 sse.onmessage = function (evt) { console.log("message", evt.data, evt) }; // 如有自定義的事件標識時,經過添加事件監聽獲取返回的數據 sse.addEventListener("me", function (evt) { console.log("message", evt.data); if (evt.data === 3) { // 關閉sse sse.close() } }); </script>
因爲篇幅所限,文中只結合了部分示例介紹了主要的理論知識,因此我另外使用webflux開發了CRUD完整示例demo(非RouterFunction模式),GitHub地址以下:
RouterFunction模式的CRUD完整示例demo,GitHub地址以下:
https://github.com/Binary-ZeroOne/webflux-routerfunction-demo