Reactive Stream (響應式流/反應流) 是JDK9引入的一套標準,是一套基於發佈/訂閱模式的數據處理規範。響應式流從2013年開始,做爲提供非阻塞背壓的異步流處理標準的倡議。 它旨在解決處理元素流的問題——如何將元素流從發佈者傳遞到訂閱者,而不須要發佈者阻塞,或訂閱者有無限制的緩衝區或丟棄。更確切地說,Reactive流目的是「找到最小的一組接口,方法和協議,用來描述必要的操做和實體以實現這樣的目標:以非阻塞背壓方式實現數據的異步流」。java
「背壓(反壓)back pressure」概念很關鍵。首先異步消費者會向生產者訂閱接收消息,而後當有新的信息可用時,消費者會經過以前訂閱時提供的回調函數被再次激活調用。若是生產者發出的信息比消費者可以處理消息最大量還要多,消費者可能會被迫一直在抓消息,耗費愈來愈多的資源,埋下潛在的崩潰風險。爲了防止這一點,須要有一種機制使消費者能夠通知生產者,下降消息的生成速度。生產者能夠採用多種策略來實現這一要求,這種機制稱爲背壓。數組
響應式流模型很是簡單——訂閱者向發佈者發送多個元素的異步請求,發佈者向訂閱者異步發送多個或稍少的元素。響應式流會在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。併發
簡單來講,在響應式流下訂閱者能夠與發佈者溝通,若是使用JMS就應該知道,訂閱者只能被動接收發布者所產生的消息數據。這就比如沒有水龍頭的水管同樣,我只能被動接收水管裏流過來的水,沒法關閉也沒法減小。而響應式流就至關於給水管加了個水龍頭,在消費者這邊能夠控制水流的增長、減小及關閉。框架
響應式流模型圖:異步
發佈者(Publisher)是潛在的無限數量的有序元素的生產者。發佈者可能有多個來自訂閱者的待處理請求。ide
訂閱者(Subscriber)從發佈者那裏訂閱並接收元素。訂閱者能夠請求更多的元素。函數
JDK9 經過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現響應式流。在JDK9裏Reactive Stream的主要接口聲明在Flow類裏,Flow 類中定義了四個嵌套的靜態接口,用於創建流量控制的組件,發佈者在其中生成一個或多個供訂閱者使用的數據項:this
Flow類結構以下:.net
Publisher
是可以發出元素的發佈者,Subscriber
是接收元素並作出響應的訂閱者。當執行Publisher
裏的subscribe
方法時,發佈者會回調訂閱者的onSubscribe
方法,這個方法中,一般訂閱者會藉助傳入的Subscription
向發佈者請求n個數據。而後發佈者經過不斷調用訂閱者的onNext
方法向訂閱者發出最多n個數據。若是數據所有發完,則會調用onComplete
告知訂閱者流已經發完;若是有錯誤發生,則經過onError
發出錯誤數據,一樣也會終止流。線程
其中,Subscription
至關因而鏈接Publisher
和Subscriber
的「紐帶」。由於當發佈者調用subscribe
方法註冊訂閱者時,會經過訂閱者的回調方法onSubscribe
傳入Subscription
對象,以後訂閱者就可使用這個Subscription
對象的request
方法向發佈者「要」數據了。背壓機制正是基於此來實現的。
以下圖:
Processor
則是集Publisher
和Subscriber
於一身,至關因而發佈者與訂閱者之間的一個」中間人「,能夠經過Processor
進行一些中間操做:
/** * A component that acts as both a Subscriber and Publisher. * * @param <T> the subscribed item type * @param <R> the published item type */ public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
以下圖:
參考:
1.如下代碼簡單演示了SubmissionPublisher 和這套發佈-訂閱框架的基本使用方式:
package com.example.demo; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; /** * @program: demo * @description: Flow Demo * @author: 01 * @create: 2018-10-04 13:25 **/ public class FlowDemo { public static void main(String[] args) throws Exception { // 1. 定義發佈者, 發佈的數據類型是 Integer // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 接口 SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>(); // 2. 定義訂閱者 Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者已經達到了目標, 能夠調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理完了!"); } }; // 3. 發佈者和訂閱者 創建訂閱關係 publiser.subscribe(subscriber); // 4. 生產數據, 併發布 // 這裏忽略數據生產過程 for (int i = 0; i < 3; i++) { System.out.println("生成數據:" + i); // submit是個block方法 publiser.submit(i); } // 5. 結束後 關閉發佈者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主線程延遲中止, 不然數據沒有消費就會退出 Thread.currentThread().join(1000); // debug的時候, 下面這行須要有斷點 // 不然主線程結束沒法debug System.out.println(); } }
運行結果以下:
上文中提到過能夠調節發佈者的數據產出速度,那麼這個速度是如何調節的呢?關鍵就在於submit方法,該方法是一個阻塞方法。須要先說明的是SubmissionPublisher裏有一個數據緩衝區,用於緩衝發佈者產生的數據,而這個緩衝區是利用一個Object數組實現的,緩衝區最大長度爲256。咱們能夠在onSubscribe方法裏打上斷點,查看到這個緩衝區:
當這個緩衝區的數據滿了以後,submit方法就會進入阻塞狀態,發佈者數據的產生速度就會變慢,以此實現調節發佈者的數據產出速度。
2.第二個例子演示告終合Processor的使用方式,代碼以下:
package com.example.demo; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; /** * Processor, 須要繼承SubmissionPublisher並實現Processor接口 * * 輸入源數據 integer, 過濾掉小於0的, 而後轉換成字符串發佈出去 */ class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("處理器接受到數據: " + item); // 過濾掉小於0的, 而後發佈出去 if (item > 0) { this.submit("轉換後的數據:" + item); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理器處理完了!"); // 關閉發佈者 this.close(); } } /** * 帶 process 的 flow demo * @author 01 */ public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定義發佈者, 發佈的數據類型是 Integer // 直接使用jdk自帶的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>(); // 2. 定義處理器, 對數據進行過濾, 並轉換爲String類型 MyProcessor processor = new MyProcessor(); // 3. 發佈者 和 處理器 創建訂閱關係 publiser.subscribe(processor); // 4. 定義最終訂閱者, 消費 String 類型數據 Subscriber<String> subscriber = new Subscriber<>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理完了!"); } }; // 5. 處理器 和 最終訂閱者 創建訂閱關係 processor.subscribe(subscriber); // 6. 生產數據, 併發布 // 這裏忽略數據生產過程 publiser.submit(-111); publiser.submit(111); // 7. 結束後 關閉發佈者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主線程延遲中止, 不然數據沒有消費就退出 Thread.currentThread().join(1000); } }
運行結果以下: