函數式編程對於Reactive Programming很重要,但我不會在這篇文章中深刻探討函數式編程。html
在這篇文章中,我想看看Java中的總體Reactive發展環境。java
有了這些新的流行語,就很容易對它們的含義感到困惑。react
反應式編程是一種編程範式,但我不會稱之爲新的。它實際上已經存在了一段時間。程序員
就像面向對象編程,函數式編程或過程式編程同樣,反應式編程只是另外一種編程範式。算法
另外一方面,Reactive Streams是一種規範。對於Java程序員,Reactive Streams是一個API。Reactive Streams爲咱們提供了Java中的Reactive Programming的通用API。spring
Reactive Streams API是Kaazing,Netflix,Pivotal,Red Hat,Twitter,Typesafe等衆多工程師之間合做的產物。數據庫
Reactive Streams很是相似於JPA或JDBC。二者都是API規範。您須要使用API規範的實現。編程
例如,從JDBC規範中,您具備Java DataSource接口。Oracle JDBC實現將爲您提供DataSource接口的實現。正如Microsoft的SQL Server JDBC實現也將提供DataSource接口的實現。服務器
如今,您的高級程序能夠接受DataSource對象,而且應該可以使用數據源,而沒必要擔憂它是由Oracle提供仍是由Microsoft提供。多線程
就像JPA或JDBC同樣,Reactive Streams爲咱們提供了一個咱們能夠編寫代碼的API接口,而無需擔憂底層實現。
關於Reactive programming是什麼,有不少意見。Reactive programming也有不少炒做!
開始學習Reactive programming範例的最佳起點是閱讀「Reactive Manifesto」。
Reactive Manifesto描述了reactive systems的四個關鍵屬性:
若是可能的話,系統及時響應。響應性是可用性和實用性的基石,但更重要的是,響應性意味着能夠快速檢測到問題並有效地處理問題。reactive systems專一於提供快速一致的響應時間,創建可靠的上限,以便提供一致的服務質量。這種一致的行爲反過來簡化了錯誤處理,創建了最終用戶的信心,並鼓勵進一步的交互。
系統在出現故障時保持響應。這不只適用於高可用性,關鍵任務系統 - 任何無彈性的系統在發生故障後都不會響應。經過複製,遏制,隔離和委派來實現彈性。故障包含在每一個組件中,將組件彼此隔離,從而確保系統的各個部分能夠在不損害整個系統的狀況下發生故障和恢復。將每一個組件的恢復委派給另外一個(外部)組件,並在必要時經過複製確保高可用性。組件的客戶端不會負擔處理其故障的負擔。
系統在不一樣的工做負載下保持響應。反應系統能夠經過增長或減小分配用於服務這些輸入的資源來對輸入速率的變化做出反應。這意味着設計沒有爭用點或中心瓶頸,從而可以分片或複製組件並在它們之間分配輸入。Reactive Systems經過提供相關的實時性能測量來支持預測和反應式擴展算法。它們在商用硬件和軟件平臺上以經濟高效的方式實現彈性。
Reactive Systems依靠異步消息傳遞在組件之間創建邊界,以確保鬆散耦合,隔離和位置透明性。此邊界還提供將故障委派爲消息的方法。使用顯式消息傳遞能夠經過整形和監視系統中的消息隊列並在必要時應用反壓來實現負載管理,彈性和流量控制。做爲通訊手段的位置透明消息傳遞使得管理失敗能夠在羣集內或單個主機內使用相同的構造和語義。非阻塞通訊容許接收者僅在活動時消耗資源,從而減小系統開銷。
前三個屬性(Responsive, Resilient, Elastic)與您的架構選擇更相關。很容易理解爲何微服務,Docker和Kubernetes等技術是Reactive系統的重要方面。在單個服務器上運行LAMP堆棧顯然不符合Reactive Manifesto的目標。
做爲Java開發人員,它是咱們最感興趣的最後一個屬性,即Message Driven屬性。
消息驅動的架構固然不是革命性的。若是你須要一個關於消息驅動系統的入門讀物,我想建議閱讀Enterprise Integration Patterns - 一本真正標誌性的計算機科學書籍。本書中的概念爲Spring Integration和Apache Camel奠基了基礎。
咱們對Java開發人員感興趣的Reactive Manifesto的幾個方面是:消息失敗,背壓和非阻塞。這些是Java中反應式編程的微妙但重要的方面。
一般在Reactive編程中,您將處理消息流。
不但願的是拋出異常並結束消息流的處理。首選方法是優雅地處理故障。
也許你須要執行一個Web服務而且它已經關閉了。也許你可使用備份服務?或者可能在10ms內重試?
我不會在這裏解決每個邊緣案例。關鍵的一點是,您不但願因運行時異常而大聲失敗。理想狀況下,您須要記錄失敗,並具備某種類型的重試或恢復邏輯。一般,故障是經過回調來處理的。
JavaScript開發人員習慣於使用回調。但回調可能會變得難以使用。JavaScript開發人員將此稱爲回調地獄。
在Reactive Steams中,異常是一等公民。不是粗暴拋出的。錯誤處理內置於Reactive Streams API規範中。
你據說過「從消防中喝水」這句話嗎?
背壓是反應式編程中很是重要的概念。它爲下游客戶提供了一種說法,「我還有更多,請。」
想象一下,若是您正在查詢數據庫,結果集將返回1000萬行。傳統上,數據庫將以客戶端接受它們的速度吐出全部1000萬行。
當客戶端不能再接受時,它會阻塞。數據庫焦急地等待着。阻止。鏈中的線程耐心地等待解除阻塞。
在Reactive世界中,咱們但願咱們的客戶可以說給我前1000個。而後咱們能夠給他們1000並繼續咱們的業務 - 直到客戶回來並要求另外一組記錄。
這與客戶沒有發言權的傳統系統造成鮮明對比。經過阻塞線程來完成限制,而不是以編程方式。
對Java開發人員來講重要的Reactive架構的最後,也許是最重要的方面是非阻塞的。
直到Reactive來了很長時間,沒有阻塞彷佛並無那麼重要。
做爲Java開發人員,咱們已經被教會經過使用線程來利用強大的現代硬件。愈來愈多的核心意味着咱們可使用愈來愈多的線程。所以,若是咱們須要等待數據庫或Web服務返回,則不一樣的線程能夠利用CPU。這彷佛對咱們有意義。當咱們被阻塞的線程等待某種類型的I / O時,不一樣的線程可使用CPU。
所以,阻塞並非什麼大問題。對?
系統中的每一個線程都將消耗資源。每次線程被阻塞時,都會消耗資源。雖然CPU在維護不一樣線程方面很是有效,但仍然存在成本問題。咱們Java開發人員多是一羣傲慢的人。
他們老是看不起JavaScript,有點討厭的小語言。事實上,JavaScript共享「Java」這個詞老是讓咱們Java程序員感受有點髒。若是您是Java開發人員,當您必須指出Java和JavaScript是兩種不一樣的語言時,您有多少次感到煩惱?而後Node.js出現了。Node.js在吞吐量方面提出了瘋狂的基準測試。而後Java社區注意到了。是的,腳本小子已經長大,正在侵佔咱們的地盤。並非在谷歌的V8 JavaScript引擎中運行的JavaScript對於編程來講是一個很是快速的天賜之物。Java曾經在性能方面有其瑕疵,但與現代本地語言相比,它很是高效。
Node.js表現的祕訣是無阻塞。
Node.js使用具備有限數量線程的事件循環。雖然在Java世界中的阻塞一般被視爲沒什麼大不了的,但在Node.js世界中,它將成爲性能的死亡之吻。
這些圖形能夠幫助您可視化差別。在Node.js中,有一個非阻塞事件循環。請求以非阻塞方式處理。線程不會卡在等待其餘進程。
將Node.js模型與Java中使用的典型多線程服務器進行對比。併發是經過使用多個線程實現的。因爲多核處理器的增加,這被廣泛接受。
我我的認爲這兩種方法之間的區別在於高速公路和許多帶燈光的城市街道之間的區別。
經過單線程事件循環,您的過程能夠在超級高速公路上快速巡航。在多線程服務器中,您的進程會停在城市街道上並中止流量。二者均可以帶來大量流量。可是,我寧願在高速公路上巡航!
當您轉向非阻塞範例時,您的代碼會在CPU上停留更長時間。線程切換較少。您不只要管理許多線程,還要刪除線程之間的上下文切換。
您將在系統容量中看到更多的空間,供您的程序使用。非阻塞不是聖盃,你不會看到事情變得更快。是的,管理阻塞須要付出代價。但考慮到全部事情,它相對有效。事實上,在一個適度使用的系統上,我不肯定差別會有多大可衡量。
可是,隨着系統負載的增長,您能夠期待看到的是,您將擁有額外的容量來爲更多請求提供服務。您將得到更高的併發性。多少?好問題。用例很是具體。與全部基準同樣,您的里程也會有所不一樣。
咱們來看看Reactive Streams API for Java。Reactive Streams API僅包含四個接口。
發佈者是可能無限數量的有序元素的提供者,根據從其訂閱者收到的需求發佈它們。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
在將訂閱者實例傳遞給Publisher.subscribe(訂閱者)以後,將接收對Subscriber.onSubscribe(訂閱)的調用。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
訂閱表示訂閱發佈者的訂閱者的一對一輩子命週期。
public interface Subscription {
public void request(long n);
public void cancel();
}
Processor表明一個處理階段 - 既是訂閱者又是發佈者,並遵照二者的合同。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Java中的Reactive環境正在不斷髮展和成熟。David Karnok在Advanced Reactive Java上有一篇很棒的博客文章Advanced Reactive Java,他將各類反應項目分解成幾代。我會注意到下面每一代(隨着新版本的發佈,它可能會隨時改變)。
RxJava是ReactiveX項目中的Java實現。在撰寫本文時,ReactiveX項目實現了Java,JavaScript,.NET(C#),Scala,Clojure,C ++,Ruby,Python,PHP,Swift等等。
ReactiveX在GoF Observer模式上提供了反應性扭曲,這是一種很好的方法。ReactiveX調用他們的方法'Observer Pattern Done Right'
RxJava早於Reactive Streams規範。雖然RxJava 2.0+確實實現了Reactive Streams API規範,但您會注意到術語略有不一樣。David Karnok是RxJava的關鍵提交者,他認爲RxJava是第三代reactive library。
Reactor是Pivotal的Reactive Streams兼容實現。
從Reactor 3.0開始,Java 8或更高版本是必需的。Spring Framework 5中的Reactive功能基於Reactor 3.0構建。Reactor是reactive library。(David Karnok也是項目Reactor的提交者)
Akka Streams還徹底實現了Reactive Streams規範。Akka使用Actors處理流數據。雖然Akka Streams符合Reactive Streams API規範,但Akka Streams API與Reactive Streams接口徹底分離。Akka Streams被認爲是reactive library。
Ratpack是一組用於構建現代高性能HTTP應用程序的Java庫。Ratpack使用Java 8,Netty和Reactive原則。Ratpack提供了Reactive Stream API的基本實現,但並非一個功能齊全的reactive工具包。(可選)您能夠將RxJava或Reactor與Ratpack一塊兒使用。
Vert.x是一個Eclipse Foundation項目。它是JVM的多語言事件驅動的應用程序框架。Vert.x中的反應支持與Ratpack相似。Vert.x容許您使用RxJava或其本機實現的Reactive Streams API。
使用Java 1.8,您將得到對Reactive Streams規範的強大支持。
在Java 1.8中,Reactive stream不是Java API的一部分。可是,它能夠做爲一個單獨的罐子提供。
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.0</version>
</dependency>
雖然您能夠直接包含此依賴項,但您正在使用的任何Reactive Streams實現都應將其自動包含爲依賴項。
當你轉向Java 1.9時,事情會發生一些變化。Reactive Streams已成爲官方Java 9 API的一部分。
您會注意到Reactive Streams接口在Java 9中的Flow類下。但除此以外,API與Java 1.8中的Reactive Streams 1.0相同。
在Java 9中,Reactive Streams正式成爲Java API的一部分。在研究這篇文章時,很明顯各類反應性庫已經在不斷髮展和成熟(即David Karnok世代分類)。
在Reactive Streams以前,各類reactive庫沒法實現互操做性。他們沒法互相交談。早期版本的RxJava與項目Reactor的早期版本不兼容。可是在Java 9發佈前夕,主要的反應庫採用了Reactive Streams規範。如今,不一樣的庫能夠互操做。
互操做性是一個重要的多米諾骨牌。例如,MongoDB實現了Reactive Streams驅動程序。
如今,在咱們的應用程序中,咱們可使用Reactor或RxJava來使用MongoDB中的數據。咱們仍然在適應Reactive Streams的早期階段。可是在接下來的一年左右的時間裏,咱們能夠期待愈來愈多的開源項目提供Reactive Streams兼容性。
我但願在不久的未來咱們會看到更多的Reactive Streams。這是一個成爲Java開發人員的有趣時間!
下節再續!
原文:https://dzone.com/articles/what-are-reactive-streams-in-java
有什麼討論的內容,能夠加我公衆號: