(二)Flux入門學習:流的概念,特性和基本操做react
(三)Flux深刻學習:流的高級特性和進階用法web
(四)reactor-core響應式api如何測試和調試?數據庫
(五)Spring reactive: Spring WebFlux的使用編程
(六)Spring reactive: webClient的使用api
Spring framework 5 的一大新特性:響應式編程(Reactive Programming)。那麼什麼是響應式?他能給咱們帶來什麼?如何優雅地使用?本系列會從最基礎的概念和簡單的api講起,再慢慢深刻探討響應式的一些高級特性,最後講解實戰內容,例如WebFlux和WebClient等在Spring boot中的使用,如何測試和調試。網絡
想要了解原理的話,美團點評的這篇博客 Java NIO淺析 很是適合入門。多線程
簡單地說:異步
當咱們調用socket.read()、socket.write()這類阻塞函數的時候,這類函數不能當即返回,也沒法中斷,須要等待socket可讀或者可寫,纔會返回,所以一個線程只能處理一個請求。在這等待的過程當中,cpu並不幹活,(即阻塞住了),那麼cpu的資源就沒有很好地利用起來。所以對於這種狀況,咱們使用多線程來提升cpu資源的利用率:在等待的這段時間,就能夠切換到別的線程去處理事件,直到socket可讀或可寫了,經過中斷信號通知cpu,再切換回來繼續處理數據。例如線程A正在等待socket可讀,而線程B已經就緒了,那麼就能夠先切換到線程B去處理。雖然上下文切換也會花一些時間,可是遠比阻塞在線程A這裏空等要好。固然計算機內部實際的狀況比這複雜得多。socket
而NIO的讀寫函數能夠馬上返回,這就給了咱們不開線程利用CPU的最好機會:若是一個鏈接不能讀寫(socket.read()返回0或者socket.write()返回0),咱們能夠把這件事記下來。所以只須要一個線程不斷地輪詢這些事件,一旦有就緒的時間,處理便可。不須要多線程。
阻塞型IO
非阻塞型IO
響應式編程就是基於reactor的思想,當你作一個帶有必定延遲的纔可以返回的io操做時,不會阻塞,而是馬上返回一個流,而且訂閱這個流,當這個流上產生了返回數據,能夠馬上獲得通知並調用回調函數處理數據。
咱們首先須要理解響應式編程的基本模型:
Reactor中的發佈者(Publisher)由Flux和Mono兩個類定義,它們都提供了豐富的操做符(operator)。一個Flux對象表明一個包含0..N個元素的響應式序列,元素能夠是普通對象、數據庫查詢的結果、http響應體,甚至是異常。而一個Mono對象表明一個包含零/一個(0..1)元素的結果。上圖就是一個Flux類型的數據流,Flux往流上發送了3個元素,Subscriber經過訂閱這個流來接收通知。
如何建立一個流?最簡單的方式有如下幾種:
//建立一個流,並直接往流上發佈一個值爲value數據 Flux.just(value); //經過list建立一個流,往流上依次發佈list中的數據 Flux.fromIterable(list); //建立一個流,並向流上從i開始連續發佈n個數據,數據類型爲Integer Flux.range(i, n); //建立一個流,並定時向流上發佈一個數據,數據從0開始遞增,數據類型爲Long Flux.interval(Duration.ofSeconds(n));
既然是「數據流」的發佈者,Flux和Mono均可以發出三種「數據信號」:元素值、錯誤信號、完成信號,錯誤信號和完成信號都是終止信號,完成信號用於告知下游訂閱者該數據流正常結束,錯誤信號終止數據流的同時將錯誤傳遞給下游訂閱者。
subscriber是一個訂閱者,他只有很是簡單的4個接口:
public interface Subscriber<T> { void onSubscribe(Subscription var1); //收到下一個元素值信號時的行爲 void onNext(T var1); //收到錯誤信號時的行爲 void onError(Throwable var1); //收到終止信號時的行爲 void onComplete(); }
Subscriber必需要訂閱一個Flux纔可以接收通知:
flux.subscribe( value -> handleData(value), error -> handleError(error), () -> handleComplete() );
上面這個例子經過lambda表達式,定義了Subscriber分別在收到消息,收到錯誤,和消息流結束時的行爲,當Subscriber接收到一個新數據,就會異步地執行handleData方法處理數據。
接下來咱們建立幾個最簡單的流來試一下:
首先咱們新建一個maven項目,引入reactor的類庫:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.2.3.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.2.3.RELEASE</version> <scope>test</scope> </dependency> </dependencies>
編寫代碼以下:
public class ReactorTests { @After public void after() { sleep(30_000); } @Test public void testJust() { Flux.just("hello", "world") .subscribe(System.out::println); } @Test public void testList() { List<String> words = Arrays.asList( "hello", "reactive", "world" ); Flux.fromIterable(words) .subscribe(System.out::println); } @Test public void testRange() { Flux.range(1, 10) .subscribe(System.out::println); } @Test public void testInterval() { Flux.interval(Duration.ofSeconds(1)) .subscribe(System.out::println); } }
訂閱這些流,收到數據以後只是簡單地把它打印出來,運行這些Test,就可以看到訂閱者在接收到流上的數據時,異步地去處理這些數據。