這是一篇譯文,原文出處 戳這裏。其實好久之前我就看完了這篇文章,只不過我的對響應式編程研究的不夠深刻,羞於下筆翻譯,在加上這類譯文加了原創還有爭議性,因此一直沒有動力。恰逢今天交流羣裏兩個大佬對響應式編程的話題辯得不可開交,趁印象還算深入,藉機把這篇文章翻譯一下。說道辯論的點,不妨也在這裏拋出來:html
響應式編程在單機環境下是否雞肋?java
結論是:沒有結論,我以爲只能抱着懷疑的眼光審視這個問題了。另外還聊到了 RSocket 這個最近在 SpringOne 大會上比較火爆的響應式"新「網絡協議,github 地址戳這裏,爲何給」新「字打了個引號,仔細觀察下 RSocket 的 commit log,其實三年前就有了。有興趣的同窗自行翻閱,說不定就是今年這最後兩三個月的熱點技術哦。react
Java 圈子有一個怪事,那就是對 RxJava,Reactor,WebFlux 這些響應式編程的名詞、框架永遠處於渴望瞭解,感到新鮮,卻又不甚瞭解,使用貧乏的狀態。以前轉載小馬哥的那篇《Reactive Programming 一種技術,各自表述》時,就已經聊過這個關於名詞之爭的話題了,今天羣裏的討論更是加深了個人映像。Java 圈子裏面不少朋友一直對響應式編程處於一個瞭解名詞,知道基本原理,而不是深度用戶的狀態(我也是之一)。可能真的和圈子有關,按石衝兄的說法,其實 Scala 圈子裏面的那幫人,不知道比我們高到哪裏去了(就響應式編程而言)。git
實在是很久沒發文章了,向你們說聲抱歉,之後的更新頻率確定是沒有之前那麼勤了(說的好像之前很勤快似的),一部分緣由是在公司內網寫的文章無法貼到公衆號中和你們分享討論,另外一部分是目前我也處於學習公司內部框架的階段,不太方便提煉成文章,最後,最大的一部分緣由仍是我這段時間須要學(tou)習(lan)其(da)他(you)東(xi)西啦。好了,廢話也說完了,下面是譯文的正文部分。github
關於響應式編程(Reactive Programming),你可能有過這樣的疑問:咱們已經有了 Java8 的 Stream, CompletableFuture, 以及 Optional,爲何還必要存在 RxJava 和 Reactor?編程
回答這個問題並不難,若是在響應式編程中處理的問題很是簡單,你的確不須要那些第三方類庫的支持。 但隨着複雜問題的出現,你寫出了一堆難看的代碼。而後這些代碼變得愈來愈複雜,難以維護,而 RxJava 和 Reactor 具備許多方便的功能,能夠解決你當下問題,並保障了將來一些可預見的需求。本文從響應式編程模型中抽象出了8個標準,這將有助於咱們理解標準特性與這些庫之間的區別:緩存
咱們將會對如下這些類進行這些特性的對比:安全
讓咱們開始吧~微信
這些類都是支持 Composable 特性的,使得各位使用者很便利地使用函數式編程的思想去思考問題,這也正是咱們擁躉它們的緣由。網絡
CompletableFuture - 衆多的 .then*()
方法使得咱們能夠構建一個 pipeline, 用以傳遞空值,單一的值,以及異常.
Stream - 提供了許多鏈式操做的編程接口,支持在各個操做之間傳遞多個值。
Optional - 提供了一些中間操做 .map()
, .flatMap()
, .filter()
.
Observable, Flowable, Flux - 和 Stream 相同
CompletableFuture - 不具有惰性執行的特性,它本質上只是一個異步結果的容器。這些對象的建立是用來表示對應的工做,CompletableFuture 建立時,對應的工做已經開始執行了。但它並不知道任何工做細節,只關心結果。因此,沒有辦法從上至下執行整個 pipeline。當結果被設置給 CompletableFuture 時,下一個階段纔開始執行。
Stream - 全部的中間操做都是延遲執行的。全部的終止操做(terminal operations),會觸發真正的計算(譯者注:如 collect() 就是一個終止操做)。
Optional - 不具有惰性執行的特性,全部的操做會馬上執行。
Observable, Flowable, Flux - 惰性執行,只有當訂閱者出現時纔會執行,不然不執行。
CompletableFuture - 能夠複用,它僅僅是一個實際值的包裝類。但須要注意的是,這個包裝是可更改的。.obtrude*()
方法會修改它的內容,若是你肯定沒有人會調用到這類方法,那麼重用它仍是安全的。
Stream - 不能複用。Java Doc 註釋道:
A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.
(譯者注:Stream 只能被調用一次。若是被校測到流被重複使用了,它會跑出拋出一個 IllegalStateException 異常。可是某些流操做會返回他們的接受者,而不是一個新的流對象,因此沒法在全部狀況下檢測出是否能夠重用)
Optional - 徹底可重用,由於它是不可變對象,並且全部操做都是馬上執行的。
Observable, Flowable, Flux - 生而重用,專門設計成如此。當存在訂閱者時,每一次執行都會從初始點開始完整地執行一邊。
CompletableFuture - 這個類的要點在於它異步地把多個操做鏈接了起來。CompletableFuture
表明一項操做,它會跟一個 Executor
關聯起來。若是不明確指定一個 Executor
,那麼會默認使用公共的 ForkJoinPool
線程池來執行。這個線程池能夠用 ForkJoinPool.commonPool()
獲取到。默認設置下它會建立系統硬件支持的線程數同樣多的線程(一般和 CPU 的核心數相等,若是你的 CPU 支持超線程(hyperthreading),那麼會設置成兩倍的線程數)。不過你也可使用 JVM 參數指定 ForkJoinPool 線程池的線程數,
-Djava.util.concurrent.ForkJoinPool.common.parallelism=?
複製代碼
或者在建立 CompletableFuture
時提供一個指定的 Executor。
Stream - 不支持建立異步執行流程,可是可使用 stream.parallel()
等方式建立並行流。
Optional - 不支持,它只是一個容器。
Observable, Flowable, Flux - 專門設計用以構建異步系統,但默認狀況下是同步的。subscribeOn
和 observeOn
容許你來控制訂閱以及接收(這個線程會調用 observer 的 onNext
/ onError
/ onCompleted
方法)。
subscribeOn
方法使得你能夠決定由哪一個 Scheduler
來執行 Observable.create
方法。即使你沒有調用建立方法,系統內部也會作一樣的事情。例如:
Observable
.fromCallable(() -> {
log.info("Reading on thread: " + currentThread().getName());
return readFile("input.txt");
})
.map(text -> {
log.info("Map on thread: " + currentThread().getName());
return text.length();
})
.subscribeOn(Schedulers.io()) // <-- setting scheduler
.subscribe(value -> {
log.info("Result on thread: " + currentThread().getName());
});
複製代碼
輸出:
Reading file on thread: RxIoScheduler-2
Map on thread: RxIoScheduler-2
Result on thread: RxIoScheduler-2
複製代碼
相反的,observeOn()
控制在 observeOn()
以後,用哪一個 Scheduler
來運行下游的執行階段。例如:
Observable
.fromCallable(() -> {
log.info("Reading on thread: " + currentThread().getName());
return readFile("input.txt");
})
.observeOn(Schedulers.computation()) // <-- setting scheduler
.map(text -> {
log.info("Map on thread: " + currentThread().getName());
return text.length();
})
.subscribeOn(Schedulers.io()) // <-- setting scheduler
.subscribe(value -> {
log.info("Result on thread: " + currentThread().getName());
});
複製代碼
輸出:
Reading file on thread: RxIoScheduler-2
Map on thread: RxComputationScheduler-1
Result on thread: RxComputationScheduler-1
複製代碼
可緩存和可複用之間的區別是什麼?假如咱們有 pipeline A
,重複使用它兩次,來建立兩個新的 pipeline B = A + X
以及 C = A + Y
CompletableFuture - 跟可重用的答案同樣。
Stream - 不能緩存中間操做的結果,除非調用了終止操做。
Optional - 可緩存,全部操做馬上執行,而且進行了緩存。
Observable, Flowable, Flux - 默認不可緩存的,可是能夠調用 .cache()
把這些類變成可緩存的。例如:
Observable<Integer> work = Observable.fromCallable(() -> {
System.out.println("Doing some work");
return 10;
});
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
複製代碼
輸出:
Doing some work
10
Doing some work
20
複製代碼
使用 .cache()
:
Observable<Integer> work = Observable.fromCallable(() -> {
System.out.println("Doing some work");
return 10;
}).cache(); // <- apply caching
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
複製代碼
輸出:
Doing some work
10
20
複製代碼
Stream 和 Optional - 拉模型。調用不一樣的方法(.get()
, .collect()
等)從 pipeline 拉取結果。拉模型一般和阻塞、同步關聯,那也是公平的。當調用方法時,線程會一直阻塞,直到有數據到達。
CompletableFuture, Observable, Flowable, Flux - 推模型。當訂閱一個 pipeline ,而且某些事件被執行後,你會獲得通知。推模型一般和非阻塞、異步這些詞關聯在一塊兒。當 pipeline 在某個線程上執行時,你能夠作任何事情。你已經定義了一段待執行的代碼,當通知到達的時候,這段代碼就會在下個階段被執行。
支持回壓的前提是 pipeline 必須是推模型。
Backpressure(回壓) 描述了 pipeline 中的一種場景:某些異步階段的處理速度跟不上,須要告訴上游生產者放慢速度。直接失敗是不能接受的,這會致使大量數據的丟失。
Stream & Optional - 不支持回壓,由於它們是拉模型。
CompletableFuture - 不存在這個問題,由於它只產生 0 個或者 1 個結果。
Observable(RxJava 1), Flowable, Flux - 支持。經常使用策略以下:
Buffering - 緩衝全部的 onNext
的值,直到下游消費它們。
Drop Recent - 若是下游處理速率跟不上,丟棄最近的 onNext
值。
Use Latest - 若是下游處理速率跟不上,只提供最近的 onNext
值,以前的值會被覆蓋。
None - onNext
事件直接被觸發,不作緩衝和丟棄。
Exception - 若是下游處理跟不上的話,拋出異常。
Observable(RxJava 2) - 不支持。不少 RxJava 1 的使用者用 Observable
來處理不適用回壓的事件,或者是使用 Observable
的時候沒有配置任何策略,致使了不可預知的異常。因此,RxJava 2 明確地區分兩種狀況,提供支持回壓的 Flowable
和不支持回壓的 Observable
。
操做融合的內涵在於,它使得生命週期的不一樣點上的執行階段得以改變,從而消除類庫的架構因素所形成的系統開銷。全部這些優化都在內部被處理完畢,從而讓外部用戶以爲這一切都是透明的。
只有 RxJava 2 和 Reactor 支持這個特性,但支持的方式不一樣。總的來講,有兩種類型的優化:
Macro-fusion - 用一個操做替換 2 個或更多的相繼的操做
Micro-fusion - 一個輸出隊列的結束操做,和在一個輸入隊列的開始操做,可以共享一個隊列的實例。好比說,與其調用 request(1)
而後處理 onNext()`:
否則讓訂閱者直接從父 observable
拉取值。
一圖勝千言
Stream
,CompletableFuture
和 Optional
這些類的建立,都是爲了解決特定的問題。 而且他們很是適合用於解決這些問題。 若是它們知足你的需求,你能夠立馬使用它們。
然而,不一樣的問題具備不一樣的複雜度,而且某些問題只有新技術才能很好的解決,新技術的出現也是爲了解決那些高複雜度的問題。 RxJava 和 Reactor 是通用的工具,它們幫助你以聲明方式來解決問題,而不是使用那些不夠專業的工具,生搬硬套的使用其餘的工具來解決響應式編程的問題,只會讓你的解決方案變成一種 hack 行爲。
歡迎關注個人微信公衆號:「Kirito的技術分享」,關於文章的任何疑問都會獲得回覆,帶來更多 Java 相關的技術分享。