只有光頭才能變強。html
文本已收錄至個人GitHub倉庫,歡迎Star:https://github.com/ZhongFuCheng3y/3y前端
本文知識點架構:java
知識點架構react
若是有關注我公衆號文章的同窗就會發現,最近我不定時轉發了一些比較好的WebFlux的文章,由於我最近在學。git
我以前也說過,學習一項技術以前,先要了解爲何要學這項技術。其實此次學習WebFlux
也沒有多大的原生動力,主要是在咱們組內會輪流作一次技術分享,而我又不知道分享什麼比較好…程序員
以前在初學大數據相關的知識,可是這一塊的時間線會拉得比較長,感受趕不及小組內分享(而組內的同窗又大部分都懂大數據,就只有我一個菜雞,淚目)。因此,想的是:「要不我學點新東西搞搞?」。因而就花了點時間學WebFlux
啦~github
這篇文章主要講解什麼是WebFlux
,帶領你們入個門,但願對你們有所幫助(至少看完這篇文章,知道WebFlux是幹嗎用的)web
咱們從Spring
的官網拉下一點點就能夠看到介紹WebFlux
的地方了spring
WebFlux的簡介編程
從官網的簡介中咱們能得出什麼樣的信息?
Spring5
提供了一整套響應式(非阻塞)的技術棧供咱們使用(包括Web控制器、權限控制、數據訪問層等等)。總結起來,WebFlux只是響應式編程中的一部分(在Web控制端),因此通常咱們用它與SpringMVC來對比。
在上面提到了響應式編程(Reactive Programming),而WebFlux只是響應式編程的其中一個技術棧而已,因此咱們先來探討一下什麼是響應式編程
從維基百科裏邊咱們獲得的定義:
reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change
響應式編程(reactive programming)是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程範式
在維基百科上也舉了個小例子:
例子
意思大概以下:
a=b+c
,這就意味着a
的值是由b
和c
計算出來的。若是b
或者c
後續有變化,不會影響到a
的值a:=b+c
,這就意味着a
的值是由b
和c
計算出來的。但若是b
或者c
的值後續有變化,會影響到a
的值我認爲上面的例子已經能夠幫助咱們理解變化傳遞(propagation of change)
那數據流(data stream)和聲明式(declarative)怎麼理解呢?那能夠提一提咱們的Stream流了。以前寫過Lambda表達式和Stream流的文章,你們能夠先去看看:
Lambda的語法是這樣的(Stream流的使用會涉及到不少Lambda表達式的東西,因此通常先學Lambda再學Stream流):
語法
Stream流的使用分爲三個步驟(建立Stream流、執行中間操做、執行最終操做):
三步走
執行中間操做實際上就是給咱們提供了不少的API去操做Stream流中的數據(求和/去重/過濾)等等
中間操做 解釋
說了這麼多,怎麼理解數據流和聲明式呢?實際上是這樣的:
好比下面的代碼;將數組中的數據變成數據流,經過顯式聲明調用.sum()
來處理數據流中的數據,獲得最終的結果:
public static void main(String[] args) { int[] nums = { 1, 2, 3 }; int sum2 = IntStream.of(nums).parallel().sum(); System.out.println("結果爲:" + sum2); }
如圖下所示:
數據流與聲明式
上面講了響應式編程是什麼:
響應式編程(reactive programming)是一種基於數據流(data stream)和變化傳遞(propagation of change)的聲明式(declarative)的編程範式
也講解了數據流/變化傳遞/聲明式是什麼意思,但說到響應式編程就離不開異步非阻塞。
從Spring官網介紹WebFlux的信息咱們就能夠發現asynchronous, nonblocking
這樣的字樣,由於響應式編程它是異步的,也能夠理解成變化傳遞它是異步執行的。
以下圖,合計的金額會受其餘的金額影響(更新的過程是異步的):
合計的錢會由於其餘的金額影響
咱們的JDK8 Stream流是同步的,它就不適合用於響應式編程(但基礎的用法是須要懂的,由於響應式流編程都是操做流嘛)
而在JDK9 已經支持響應式流了,下面咱們來看一下
響應式流的規範早已經被提出了:裏面提到了:
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure ----->http://www.reactive-streams.org/
翻譯再加點信息:
響應式流(Reactive Streams)經過定義一組實體,接口和互操做方法,給出了實現異步非阻塞背壓的標準。第三方遵循這個標準來實現具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。
規範裏頭實際上就是定義了四個接口:
規範的四個接口
Java 平臺直到 JDK 9才提供了對於Reactive的完整支持,JDK9也定義了上述提到的四個接口,在java.util.concurrent
包上
Java的響應式流接口
一個通用的流處理架構通常會是這樣的(生產者產生數據,對數據進行中間處理,消費者拿到數據消費):
流式處理架構
到這裏咱們再看回響應式流的接口,咱們應該就能懂了:
在響應式流上提到了back pressure(背壓)這麼一個概念,其實很是好理解。在響應式流實現異步非阻塞是基於生產者和消費者模式的,而生產者消費者很容易出現的一個問題就是:生產者生產數據多了,就把消費者給壓垮了。
而背壓說白了就是:消費者能告訴生產者本身須要多少許的數據。這裏就是Subscription接口所作的事。
下面咱們來看看JDK9接口的方法,或許就更加能理解上面所說的話了:
// 發佈者(生產者) public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } // 訂閱者(消費者) public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } // 用於發佈者與訂閱者之間的通訊(實現背壓:訂閱者可以告訴生產者須要多少數據) public interface Subscription { public void request(long n); public void cancel(); } // 用於處理髮布者 發佈消息後,對消息進行處理,再交由消費者消費 public interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
代碼中有大量的註釋,我就很少BB了,建議直接複製跑一下看看:
class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("處理器接受到數據: " + item); // 過濾掉小於0的, 而後發佈出去 if (item > 0) { this.submit("轉換後的數據:" + item); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理器處理完了!"); // 關閉發佈者 this.close(); } } public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定義發佈者, 發佈的數據類型是 Integer // 直接使用jdk自帶的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定義處理器, 對數據進行過濾, 並轉換爲String類型 MyProcessor processor = new MyProcessor(); // 3. 發佈者 和 處理器 創建訂閱關係 publiser.subscribe(processor); // 4. 定義最終訂閱者, 消費 String 類型數據 Subscriber<String> subscriber = new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理完了!"); } }; // 5. 處理器 和 最終訂閱者 創建訂閱關係 processor.subscribe(subscriber); // 6. 生產數據, 併發布 publiser.submit(-111); publiser.submit(111); // 7. 結束後 關閉發佈者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主線程延遲中止, 不然數據沒有消費就退出 Thread.currentThread().join(1000); } }
輸出的結果以下:
輸出的結果
流程實際上很是簡單的:
流程
參考資料:
Java 8 的 Stream 主要關注在流的過濾,映射,合併,而 Reactive Stream 更進一層,側重的是流的產生與消費,即流在生產與消費者之間的協調
說白了就是:響應式流是異步非阻塞+流量控制的(能夠告訴生產者本身須要多少的量/取消訂閱關係)
展望響應式編程的場景應用:
好比一個日誌監控系統,咱們的前端頁面將再也不須要經過「命令式」的輪詢的方式不斷向服務器請求數據而後進行更新,而是在創建好通道以後,數據流從系統源源不斷流向頁面,從而展示實時的指標變化曲線;
再好比一個社交平臺,朋友的動態、點贊和留言不是手動刷出來的,而是當後臺數據變化的時候自動體現到界面上的。
扯了一大堆,終於回到WebFlux了。通過上面的基礎,咱們如今已經可以得出一些結論的了:
咱們再回來看官網的圖:
mvc or webflux
Spring官方爲了讓咱們更加快速/平滑到WebFlux上,以前SpringMVC那套都是支持的。也就是說:咱們能夠像使用SpringMVC同樣使用着WebFlux。
支持SpringMVC那套
WebFlux使用的響應式流並非用JDK9平臺的,而是一個叫作Reactor響應式流庫。因此,入門WebFlux其實更可能是瞭解怎麼使用Reactor的API,下面咱們來看看~
Reactor是一個響應式流,它也有對應的發佈者(Publisher
),Reactor的發佈者用兩個類來表示:
而訂閱者則是Spring框架去完成
下面咱們來看一個簡單的例子(基於WebFlux環境構建):
// 阻塞5秒鐘 private String createStr() { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } return "some string"; } // 普通的SpringMVC方法 @GetMapping("/1") private String get1() { log.info("get1 start"); String result = createStr(); log.info("get1 end."); return result; } // WebFlux(返回的是Mono) @GetMapping("/2") private Mono<String> get2() { log.info("get2 start"); Mono<String> result = Mono.fromSupplier(() -> createStr()); log.info("get2 end."); return result; }
首先,值得說明的是,咱們構建WebFlux環境啓動時,應用服務器默認是Netty的:
基於Netty
咱們分別來訪問一下SpringMVC的接口和WebFlux的接口,看一下有什麼區別:
SpringMVC:
SpringMVC
WebFlux:
WebFlux
從調用者(瀏覽器)的角度而言,是感知不到有什麼變化的,由於都是得等待5s才返回數據。可是,從服務端的日誌咱們能夠看出,WebFlux是直接返回Mono對象的(而不是像SpringMVC一直同步阻塞5s,線程才返回)。
這正是WebFlux的好處:可以以固定的線程來處理高併發(充分發揮機器的性能)。
WebFlux還支持服務器推送(SSE - >Server Send Event),咱們來看個例子:
/** * Flux : 返回0-n個元素 * 注:須要指定MediaType * @return */ @GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE) private Flux<String> flux() { Flux<String> result = Flux .fromStream(IntStream.range(1, 5).mapToObj(i -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } return "flux data--" + i; })); return result; }
效果就是每秒會給瀏覽器推送數據:
服務器推送
很是感謝人才們能看到這裏,若是這個文章寫得還不錯,以爲「三歪」我有點東西的話 求點贊 求關注️ 求分享👥 求留言💬 對暖男我來講真的 很是有用!!!
WebFlux我還沒寫完,這篇寫了WebFlux支持SpringMVC那套註解來開發,下篇寫寫如何使用WebFlux另外一種模式(Functional Endpoints)來開發以及一些常見的問題還須要補充一下~
本已收錄至個人GitHub精選文章,歡迎Star:https://github.com/ZhongFuCheng3y/3y
樂於輸出乾貨的Java技術公衆號:Java3y。公衆號內有300多篇原創技術文章、海量視頻資源、精美腦圖,關注便可獲取!
轉發到朋友圈是對我最大的支持!
創做不易,各位的支持和承認,就是我創做的最大動力,咱們下篇文章見! 求點贊 求關注️ 求分享👥 求留言💬