轉載自:https://www.cnblogs.com/lixinjie/p/a-reactive-streams-on-jvm-is-reactor.htmljavascript
響應式編程html
做爲響應式編程方向上的第一步,微軟在.NET生態系統中建立了Rx庫(Reactive Extensions)。RxJava是在JVM上對它的實現。
響應式編程是一個異步編程範式,一般出如今面向對象的語言中,做爲觀察者模式的一個擴展。
它關注數據的流動、變化的傳播。這意味着能夠輕易地使用編程語言表示靜態(如數組)或動態(如事件發射源)數據流。
java
響應式流react
隨着時間的推移,一個專門爲Java的標準化出現了。它是一個規範,定義了一些接口和交互規則,用於JVM平臺上的響應式庫。
它就是響應式流(Reactive Streams),它的這些接口已經被集成到Java 9裏,在java.util.concurrent.Flow這個父類裏。
響應式流和迭代器較類似,不過迭代器是基於「拉」(pull)的,而響應式流是基於「推」(push)的。
迭代器的使用實際上是命令式編程,由於由開發者決定何時調用next()獲取下一個元素。
在響應式流中,與上面等價的是發佈者-訂閱者。但當有新的可用元素時,是由發佈者推給訂閱者的。這個「推」就是響應式的關鍵所在。
另外,對被推過來元素的操做也是以聲明的方式進行的,程序員只需表達作什麼就好了,不須要管怎麼作。
發佈者使用onNext方法向訂閱者推送新元素,使用onError方法告知一個錯誤,使用onComplete方法告知已經結束。
可見,錯誤處理和完成(結束)也是以一個良好的方式被處理。錯誤和結束均可以終止序列。
這種方式很是靈活。這種模式支持0個(沒有)元素/1個元素/n(多)個元素(包括無限序列,若是滴答的鐘表)這些狀況。
程序員
Reactor粉墨登場數據庫
Reactor是第四代響應式庫,是一個響應式編程範式的實現,用於在JVM平臺上基於響應式流規範構建非阻塞異步應用。
它極大地實現了JVM上響應式流的規範(http://www.reactive-streams.org/)。
它是一個徹底非阻塞響應式編程的基石,帶有高效需求管理(以管理「後壓」的形式)。
它直接集成Java函數式API,特別是CompletableFuture,Stream和Duration。
它支持使用reactor-netty工程實現非阻塞跨進程通訊,適合微服務架構,支持HTTP(包括Websockets),TCP和UDP。
注:Reactor要求Java 8+
講了這麼多,是否是要首先思考下,爲何咱們須要這樣一個異步的響應式庫?
編程
阻塞就是浪費數組
現代的應用能達到很是多的併發用戶,即便現代硬件的能力被持續改進,現代軟件的性能仍然是一個關鍵的關注點。
大致上有兩種方式能夠改進一個程序的性能:
一、並行化,使用更多的線程和更多的硬件資源
二、提升效率,在當前資源用量的狀況下尋求更高效率
一般,Java開發者使用阻塞代碼來寫程序。這種實踐性很好,直到遇到性能瓶頸。
此時會引入額外線程,運行類似的阻塞代碼。可是這種擴展方法在資源利用方面會引發爭論和致使併發問題。
更糟糕的是,阻塞浪費資源。若是你仔細看,一旦一個程序涉及到一些延遲(特別是I/O,像數據庫請求或網絡調用),資源就被浪費,由於線程如今是空閒的,在等待數據。
因此並行化方式不是銀彈。咱們有必要讓硬件發揮徹底的力量,可是關於資源浪費的影響和緣由也是很是複雜的。
網絡
異步性來營救架構
前面提到的第二種方式是尋求更高效率,能夠做爲資源浪費問題的一個解決方案。
經過寫異步非阻塞代碼,你能讓執行切換到其它活動的任務,使用相同的底層資源,稍後再回到當前的處理上。
可是如何產生異步代碼到JVM上呢?Java提供兩種異步編程模型:
一、Callbacks,異步方法沒有返回值,可是會帶一個回調,當結果可用時回調會被調用。
二、Futures,異步方法當即返回一個Future<T>,異步處理過程就是計算一個T值,使用Future對象包裝了對它的訪問。這個值不是當即可用的,該對象能夠被輪詢來查看T值是否可用。
這兩種技術都足夠好嗎?並非對每種狀況都是的,兩種方式都有侷限性。
回調比較難於組合在一塊兒,很快就會致使代碼難以閱讀和維護(衆所周知的「回調地獄」)。
看個回調示例,展現一個用戶的前5個最愛,若是沒有的話就推薦5個給他:
這麼簡單的功能須要如此多的代碼,並且嵌套不少、且難懂。
下面是等價的用Reactor的示例:
從代碼的數量、寫法上是否是清爽了不少。
與回調相比,Futures稍微好一點,可是仍然在組合方面作得很差。組合多個Futures對象到一塊兒是可行的可是並不容易。
Future也有其它問題,很容易由於調用了get()方法形成了另外一個阻塞。
另外,它不支持延遲計算,缺少對多個值的支持,缺少高級錯誤處理。
從命令式到響應式編程
像Reactor這樣的響應式庫的目標就是解決在JVM上「傳統」異步方式的弊端,同時也關注一些額外方面:
可組合性和可讀性
數據做爲流,被豐富的操做符操做
什麼都不會發生,直到你訂閱
後壓,消費者通知生產者發射的速率太快了
高級別而不是高數值抽象
可組合性和可讀性
可組合性,其實就是編排多個異步任務的能力,使前一個任務的結果做爲後續任務的輸入,或以fork-join(分叉-合併)的方式執行若干個任務,或在更高的級別重複利用這些異步任務。
任務編排的能力和代碼的可讀性和可維護性緊密地耦合在一塊兒。隨着異步處理在數量和複雜度上的增長,組合和閱讀代碼變得更加困難。
就像咱們看到的,回調模型雖然簡單,可是當回調裏嵌套回調,達到多層時就會變成回調地獄。
Reactor提供豐富的組合選項,使嵌套級別最小,讓代碼的組織結構能反映出在進行什麼樣的抽象處理,且一般保持在同級別上。
裝配線類比
你能夠認爲響應式應用處理數據就像經過一個裝配(生產)線。Reactor既是傳送帶又是工做站。
原材料從一個源(原始發佈者)持續不斷地獲取,以一個完成的產品被推送給消費者(訂閱者)結束。
原材料能夠通過許多不一樣的轉換,如其它的中間步驟,或者是一個更大裝配線的一部分。
若是在某個地方出現一個小故障或阻塞了,出問題的工做站能夠向上遊發出通知來限制原材料的流動(速率)。
操做符
在Reactor裏,操做符就是裝配線類比中的工做站。每個操做符都向一個發佈者添加某些行爲,把上一步的發佈者包裝到一個新的實例裏。整個鏈就是這樣被連接起來的。
因此數據一開始從第一個發佈者出來,而後沿着鏈往下游移動,且被每個連接轉換。最後,一個訂閱者結束了這個處理。
響應式流規範並無明確規定操做符,不過Reactor就提供了豐富的操做符,它們涉及到不少方面,從簡單的轉換、過濾到複雜的編排、錯誤處理。
只要不訂閱,就什麼都不發生
當你寫一個發佈者鏈時,默認,數據是不會開始進入鏈中的。相反,你只是建立了異步處理的一個抽象描述。
經過訂閱這個行爲(動做),才把發佈者和訂閱者鏈接起來,而後纔會觸發數據在鏈裏流動。
這是在內部實現好的,經過來自於訂閱者的request信號往上游傳播,一路逆流而上直到最開始的發佈者那裏。
Reactor核心特性
Reactor引入可組合響應式的類型,實現了發佈者接口,但也提供了豐富的操做符,就是Flux和Mono。
Flux,流動,表示0到N個元素。
Mono,單個,表示0或1個元素。
它們之間的不一樣主要在語義上,表示異步處理的粗略基數。
如一個http請求只會產生一個響應,把它表示爲Mono<HttpResponse>顯然更有意義,且它只提供相對於0/1這樣上下文的操做符,由於此時count操做顯然沒有太大意義。
操做符能夠改變處理的最大基數,也會切換到相關類型上。如count操做符雖然存在於Flux<T>上,但它的返回值倒是一個Mono<Long>。
Flux<T>
一個Flux<T>是一個標準的Publisher<T>,表示一個異步序列,能夠發射0到N個元素,能夠經過一個完成信號或錯誤信號終止。
就像在響應式流規範裏那樣,這3種類型的信號轉化爲對一個下游訂閱者的onNext,onComplete,onError3個方法的調用。
這3個方法也能夠理解爲事件/回調,且它們都是可選的。
如沒有onNext但有onComplete,表示一個空的有限序列。既沒有onNext也沒有onComplete,表示一個空的無限序列(沒有什麼實際用途,可用於測試)。
無限序列也沒有必要是空的,如Flux.interval(Duration)產生一個Flux<Long> ,它是無限的,從鐘錶裏發射出的規則的「嘀嗒」。
Mono<T>
一個Mono<T>是一個特殊的Publisher<T>,最多發射一個元素,可使用onComplete信號或onError信號來終止。
它提供的操做符只是Flux提供的一個子集,一樣,一些操做符(如把Mono和Publisher結合起來)能夠把它切換到一個Flux。
如Mono#concatWith(Publisher)返回一個Flux,然而Mono#then(Mono)返回的是另外一個Mono。
Mono能夠用於表示沒有返回值的異步處理(與Runnable類似),用Mono<Void>表示。
建立Flux或Mono,並訂閱它們
最容易的方式就是使用它們各自的工廠方法:
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("foo");
當談到訂閱時,可使用Java 8的lambda表達式,訂閱方法有多種不一樣的變體,帶有不一樣的回調。
下面是方法簽名:
//訂閱並觸發序列
subscribe();
//能夠對每個產生的值進行處理
subscribe(Consumer<? super T> consumer);
//還能夠響應一個錯誤
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
//還能夠在成功結束後執行一些代碼
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
//還能夠對Subscription執行一些操做
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
使用Disposable取消訂閱
這些基於lambda的訂閱方法都返回一個Disposable類型,經過調用它的dispose()來取消這個訂閱。 對於Flux和Mono,取消就是一個信號,代表源應該中止生產元素。然而,不保證當即生效,一些源可能生產元素很是快,以至於尚未收到取消信號就已經生產完了。