JDK9特性-Reactive Stream 響應式流

初識Reactive Stream

Reactive Stream (響應式流/反應流) 是JDK9引入的一套標準,是一套基於發佈/訂閱模式的數據處理規範。響應式流從2013年開始,做爲提供非阻塞背壓的異步流處理標準的倡議。 它旨在解決處理元素流的問題——如何將元素流從發佈者傳遞到訂閱者,而不須要發佈者阻塞,或訂閱者有無限制的緩衝區或丟棄。更確切地說,Reactive流目的是「找到最小的一組接口,方法和協議,用來描述必要的操做和實體以實現這樣的目標:以非阻塞背壓方式實現數據的異步流」。java

「背壓(反壓)back pressure」概念很關鍵。首先異步消費者會向生產者訂閱接收消息,而後當有新的信息可用時,消費者會經過以前訂閱時提供的回調函數被再次激活調用。若是生產者發出的信息比消費者可以處理消息最大量還要多,消費者可能會被迫一直在抓消息,耗費愈來愈多的資源,埋下潛在的崩潰風險。爲了防止這一點,須要有一種機制使消費者能夠通知生產者,下降消息的生成速度。生產者能夠採用多種策略來實現這一要求,這種機制稱爲背壓。數組

響應式流模型很是簡單——訂閱者向發佈者發送多個元素的異步請求,發佈者向訂閱者異步發送多個或稍少的元素。響應式流會在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。併發

簡單來講,在響應式流下訂閱者能夠與發佈者溝通,若是使用JMS就應該知道,訂閱者只能被動接收發布者所產生的消息數據。這就比如沒有水龍頭的水管同樣,我只能被動接收水管裏流過來的水,沒法關閉也沒法減小。而響應式流就至關於給水管加了個水龍頭,在消費者這邊能夠控制水流的增長、減小及關閉。框架

響應式流模型圖:
JDK9特性-Reactive Stream 響應式流異步

發佈者(Publisher)是潛在的無限數量的有序元素的生產者。發佈者可能有多個來自訂閱者的待處理請求。ide

  • 根據收到的要求向當前訂閱者發佈(或發送)元素。

訂閱者(Subscriber)從發佈者那裏訂閱並接收元素。訂閱者能夠請求更多的元素。函數

  • 發佈者向訂閱者發送訂閱令牌(Subscription)。
  • 使用訂閱令牌,訂閱者從發佈者那裏請求多個元素。
  • 當元素準備就緒時,發佈者向訂閱者發送多個或更少的元素。

Reactive Stream主要接口

JDK9 經過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現響應式流。在JDK9裏Reactive Stream的主要接口聲明在Flow類裏,Flow 類中定義了四個嵌套的靜態接口,用於創建流量控制的組件,發佈者在其中生成一個或多個供訂閱者使用的數據項:this

  • Publisher:數據項發佈者、生產者
  • Subscriber:數據項訂閱者、消費者
  • Subscription:發佈者與訂閱者之間的關係紐帶,訂閱令牌
  • Processor:數據處理器

Flow類結構以下:
JDK9特性-Reactive Stream 響應式流.net

Publisher是可以發出元素的發佈者,Subscriber是接收元素並作出響應的訂閱者。當執行Publisher裏的subscribe方法時,發佈者會回調訂閱者的onSubscribe方法,這個方法中,一般訂閱者會藉助傳入的Subscription向發佈者請求n個數據。而後發佈者經過不斷調用訂閱者的onNext方法向訂閱者發出最多n個數據。若是數據所有發完,則會調用onComplete告知訂閱者流已經發完;若是有錯誤發生,則經過onError發出錯誤數據,一樣也會終止流。線程

其中,Subscription至關因而鏈接PublisherSubscriber的「紐帶」。由於當發佈者調用subscribe方法註冊訂閱者時,會經過訂閱者的回調方法onSubscribe傳入Subscription對象,以後訂閱者就可使用這個Subscription對象的request方法向發佈者「要」數據了。背壓機制正是基於此來實現的。

以下圖:
JDK9特性-Reactive Stream 響應式流

Processor則是集PublisherSubscriber於一身,至關因而發佈者與訂閱者之間的一個」中間人「,能夠經過Processor進行一些中間操做:

/**
 * A component that acts as both a Subscriber and Publisher.
 *
 * @param <T> the subscribed item type
 * @param <R> the published item type
 */
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

以下圖:
JDK9特性-Reactive Stream 響應式流

參考:

https://blog.csdn.net/rickiyeat/article/details/78175962


響應流使用示例

1.如下代碼簡單演示了SubmissionPublisher 和這套發佈-訂閱框架的基本使用方式:

package com.example.demo;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * @program: demo
 * @description: Flow Demo
 * @author: 01
 * @create: 2018-10-04 13:25
 **/
public class FlowDemo {

    public static void main(String[] args) throws Exception {
        // 1. 定義發佈者, 發佈的數據類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

        // 2. 定義訂閱者
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存訂閱關係, 須要用它來給發佈者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者已經達到了目標, 能夠調用cancel告訴發佈者再也不接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 咱們能夠告訴發佈者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 所有數據處理完了(發佈者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 3. 發佈者和訂閱者 創建訂閱關係
        publiser.subscribe(subscriber);

        // 4. 生產數據, 併發布
        // 這裏忽略數據生產過程
        for (int i = 0; i < 3; i++) {
            System.out.println("生成數據:" + i);
            // submit是個block方法
            publiser.submit(i);
        }

        // 5. 結束後 關閉發佈者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主線程延遲中止, 不然數據沒有消費就會退出
        Thread.currentThread().join(1000);
        // debug的時候, 下面這行須要有斷點
        // 不然主線程結束沒法debug
        System.out.println();
    }
}

運行結果以下:
JDK9特性-Reactive Stream 響應式流

上文中提到過能夠調節發佈者的數據產出速度,那麼這個速度是如何調節的呢?關鍵就在於submit方法,該方法是一個阻塞方法。須要先說明的是SubmissionPublisher裏有一個數據緩衝區,用於緩衝發佈者產生的數據,而這個緩衝區是利用一個Object數組實現的,緩衝區最大長度爲256。咱們能夠在onSubscribe方法裏打上斷點,查看到這個緩衝區:
JDK9特性-Reactive Stream 響應式流

當這個緩衝區的數據滿了以後,submit方法就會進入阻塞狀態,發佈者數據的產生速度就會變慢,以此實現調節發佈者的數據產出速度。


2.第二個例子演示告終合Processor的使用方式,代碼以下:

package com.example.demo;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
 * Processor, 須要繼承SubmissionPublisher並實現Processor接口
 *
 * 輸入源數據 integer, 過濾掉小於0的, 而後轉換成字符串發佈出去
 */
class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存訂閱關係, 須要用它來給發佈者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個數據, 處理
        System.out.println("處理器接受到數據: " + item);

        // 過濾掉小於0的, 而後發佈出去
        if (item > 0) {
            this.submit("轉換後的數據:" + item);
        }

        // 處理完調用request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現了異常(例如處理數據的時候產生了異常)
        throwable.printStackTrace();

        // 咱們能夠告訴發佈者, 後面不接受數據了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 所有數據處理完了(發佈者關閉了)
        System.out.println("處理器處理完了!");
        // 關閉發佈者
        this.close();
    }

}

/**
 * 帶 process 的 flow demo
 * @author 01
 */
public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定義發佈者, 發佈的數據類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

        // 2. 定義處理器, 對數據進行過濾, 並轉換爲String類型
        MyProcessor processor = new MyProcessor();

        // 3. 發佈者 和 處理器 創建訂閱關係
        publiser.subscribe(processor);

        // 4. 定義最終訂閱者, 消費 String 類型數據
        Subscriber<String> subscriber = new Subscriber<>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關係, 須要用它來給發佈者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 咱們能夠告訴發佈者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 所有數據處理完了(發佈者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 5. 處理器 和 最終訂閱者 創建訂閱關係
        processor.subscribe(subscriber);

        // 6. 生產數據, 併發布
        // 這裏忽略數據生產過程
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 結束後 關閉發佈者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主線程延遲中止, 不然數據沒有消費就退出
        Thread.currentThread().join(1000);
    }
}

運行結果以下:
JDK9特性-Reactive Stream 響應式流

相關文章
相關標籤/搜索