Java 9 揭祕(17. Reactive Streams)

Tips
作一個終身學習的人。java

Java 9

在本章中,主要介紹如下內容:react

  • 什麼是流(stream)
  • 響應式流(Reactive Streams)的倡議是什麼,以及規範和Java API
  • 響應式流在JDK 中的API以及如何使用它們
  • 如何使用JDK 9中的響應式流的Java API來建立發佈者,訂閱者和處理者

一. 什麼是流

流是由生產者生產並由一個或多個消費者消費的元素(item)的序列。 這種生產者——消費者模型也被稱爲source/sink模型或發佈者——訂閱者(publisher-subscriber )模型。 在本章中,將其稱爲發佈者訂閱者模型。git

有幾種流處理機制,其中pull模型和push模型是最多見的。 在push模型中,發佈者將元素推送給訂閱者。 在pull模式中,訂閱者將元素推送給發佈者。 發佈者和訂閱者都以一樣的速率工做,這是一個理想的狀況,這些模式很是有效。 咱們會考慮一些狀況,若是他們不按一樣的速率工做,這種狀況下涉及的問題以及對應的解決辦法。github

當發佈者比訂閱者快的時候,後者必須有一個無邊界緩衝區來保存快速傳入的元素,或者它必須丟棄它沒法處理的元素。 另外一個解決方案是使用一種稱爲背壓(backpressure )的策略,其中訂閱者告訴發佈者減慢速率並保持元素,直到訂閱者準備好處理更多的元素。 使用背壓可確保更快的發佈者不會壓制較慢的訂閱者。 使用背壓可能要求發佈者擁有無限制的緩衝區,若是它要一直生成和保存元素。 發佈者能夠實現有界緩衝區來保存有限數量的元素,若是緩衝區已滿,能夠選擇放棄它們。 可使用另外一策略,其中發佈者將發佈元素從新發送到訂閱者,這些元素髮布時訂閱者不能接受。正則表達式

訂閱者在請求發佈者的元素而且元素不可用時,該作什麼? 在同步請求中訂閱者戶必須等待,無限期地,直到有元素可用。 若是發佈者同步地向訂閱者發送元素,而且訂閱者同步處理它們,則發佈者必須阻塞直到數據處理完成。 解決方案是在兩端進行異步處理,訂閱者能夠在從發佈者請求元素以後繼續處理其餘任務。 當更多的元素準備就緒時,發佈者將它們異步發送給訂閱者。小程序

二. 什麼是響應式流(Reactive Streams)

響應式流從2013年開始,做爲提供非阻塞背壓的異步流處理標準的倡議。 它旨在解決處理元素流的問題——如何將元素流從發佈者傳遞到訂閱者,而不須要發佈者阻塞,或訂閱者有無限制的緩衝區或丟棄。api

響應式流模型很是簡單——訂閱者向發佈者發送多個元素的異步請求。 發佈者向訂閱者異步發送多個或稍少的元素。併發

Tips
響應式流在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。dom

在2015年,出版了用於處理響應式流的規範和Java API。 有關響應式流的更多信息,請訪問http://www.reactive-streams.org/。 Java API 的響應式流只包含四個接口:異步

Publisher<T>
Subscriber<T>
Subscription
Processor<T,R>

發佈者(publisher)是潛在無限數量的有序元素的生產者。 它根據收到的要求向當前訂閱者發佈(或發送)元素。

訂閱者(subscriber)從發佈者那裏訂閱並接收元素。 發佈者向訂閱者發送訂閱令牌(subscription token)。 使用訂閱令牌,訂閱者從發佈者哪裏請求多個元素。 當元素準備就緒時,發佈者向訂閱者發送多個或更少的元素。 訂閱者能夠請求更多的元素。 發佈者可能有多個來自訂閱者的元素待處理請求。

訂閱(subscription)表示訂閱者訂閱的一個發佈者的令牌。 當訂閱請求成功時,發佈者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與發佈者進行交互,例如請求更多的元素或取消訂閱。

下圖顯示了發佈者和訂閱者之間的典型交互順序。 訂閱令牌未顯示在圖表中。 該圖沒有顯示錯誤和取消事件。

發佈者和訂閱者之間的交互

處理者(processor)充當訂閱者和發佈者的處理階段。 Processor接口繼承了PublisherSubscriber接口。 它用於轉換髮布者——訂閱者管道中的元素。 Processor<T,R>訂閱類型T的數據元素,接收並轉換爲類型R的數據,併發布變換後的數據。 下圖顯示了處理者在發佈者——訂閱和管道中做爲轉換器的做用。 能夠擁有多個處理者。

處理者做爲轉化器

下面顯示了響應式流倡導所提供的Java API。全部方法的返回類型爲void。 這是由於這些方法表示異步請求或異步事件通知。

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
public interface Subscription {
    public void request(long n);
    public void cancel();
}
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

用於響應式流的Java API彷佛很容易理解。 可是,實現起來並不簡單。 發佈者和訂閱者之間的全部交互的異步性質以及處理背壓使得實現變得複雜。 做爲應用程序開發人員,會發現實現這些接口很複雜。 類庫應該提供實現來支持普遍的用例。 JDK 9提供了Publisher接口的簡單實現,能夠將其用於簡單的用例或擴展以知足本身的需求。 RxJava是響應式流的Java實現之一。

三. JDK 9 中響應式流的API

JDK 9在java.util.concurrent包中提供了一個與響應式流兼容的API,它在java.base模塊中。 API由兩個類組成:

Flow
SubmissionPublisher<T>

Flow類是final的。 它封裝了響應式流Java API和靜態方法。 由響應式流Java API指定的四個接口做爲嵌套靜態接口包含在Flow類中:

Flow.Processor<T,R>
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription

這四個接口包含與上面代碼所示的相同的方法。 Flow類包含defaultBufferSize()靜態方法,它返回發佈者和訂閱者使用的緩衝區的默認大小。 目前,它返回256。

SubmissionPublisher<T>類是Flow.Publisher<T>接口的實現類。 該類實現了AutoCloseable接口,所以可使用try-with-resources塊來管理其實例。 JDK 9不提供Flow.Subscriber<T>接口的實現類; 須要本身實現。 可是,SubmissionPublisher<T>類包含可用於處理此發佈者發佈的全部元素的consume(Consumer<? super T> consumer)方法。

1. 發佈者——訂閱者交互

在開始使用JDK API以前,瞭解使用響應式流的典型發佈者——訂閱者會話中發生的事件順序很重要。 包括在每一個事件中使用的方法。 發佈者能夠擁有零個或多個訂閱者。 這裏只使用一個訂閱者。

  • 建立發佈者和訂閱者,它們分別是Flow.PublisherFlow.Subscriber接口的實例。
  • 訂閱者經過調用發佈者的subscribe()方法來嘗試訂閱發佈者。 若是訂閱成功,發佈者用Flow.Subscription異步調用訂閱者的onSubscribe()方法。 若是嘗試訂閱失敗,則使用調用訂閱者的onError()方法,並拋出IllegalStateException異常,而且發佈者——訂閱者交互結束。
  • 訂閱者經過調用Subscriptionrequest(N)方法向發佈者發送多個元素的請求。 訂閱者能夠向發佈者發送更多元素的多個請求,而沒必要等待其先前請求是否完成。
  • 訂閱者在全部先前的請求中調用訂閱者的onNext(T item)方法,直到訂閱者戶請求的元素數量上限——在每次調用中向訂閱者發送一個元素。 若是發佈者沒有更多的元素要發送給訂閱者,則發佈者調用訂閱者的onComplete()方法來發信號通知流,從而結束髮布者——訂閱者交互。 若是訂閱者請求Long.MAX_VALUE元素,則它其實是無限制的請求,而且流其實是推送流。
  • 若是發佈者隨時遇到錯誤,它會調用訂閱者的onError()方法。
  • 訂閱者能夠經過調用其Flow.Subscriptioncancel()方法來取消訂閱。 一旦訂閱被取消,發佈者——訂閱者交互結束。 然而,若是在請求取消以前存在未決請求,訂閱者能夠在取消訂閱以後接收元素。

總結上述結束條件的步驟,一旦在訂閱者上調用了onComplete()onError()方法,訂閱者就再也不收到發佈者的通知。

在發佈者的subscribe()方法被調用以後,若是訂閱者不取消其訂閱,則保證如下訂閱方法調用序列:

onSubscribe onNext* (onError | onComplete)?

這裏,符號*?在正則表達式中被用做關鍵字,一個*表示零個或多個出現, ?意爲零或一次。

在訂閱者上的第一個方法調用是onSubscribe()方法,它是成功訂閱發佈者的通知。訂閱者的onNext()方法能夠被調用零次或屢次,每次調用指示元素髮布。onComplete()onError()方法能夠被調用爲零或一次來指示終止狀態; 只要訂閱者不取消其訂閱,就會調用這些方法。

2. 建立發佈者

建立發佈者取決於Flow.Publisher<T>接口的實現類。該類包含如下構造函數:

SubmissionPublisher()
SubmissionPublisher(Executor executor, int maxBufferCapacity)
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)

SubmissionPublisher使用提供的Executor向其訂閱者提供元素。 若是使用多個線程來生成要發佈的元素而且能夠估計訂閱者數量,則可使用具備固定線程池的newFixedThreadPool(int nThread),這可使用Executors類的newFixedThreadPool(int nThread)靜態方法獲取。 不然,使用默認的Executor,它使用ForkJoinPool類的commonPool()方法獲取。

SubmissionPublisher類爲每一個訂閱者使用一個獨立的緩衝區。 緩衝區大小由構造函數中的maxBufferCapacity參數指定。 默認緩衝區大小是Flow類的defaultBufferSize()靜態方法返回的值,該值爲256。若是發佈的元素數超過了訂戶的緩衝區大小,則額外的元素將被刪除。 可使用SubmissionPublisher類的getMaxBufferCapacity()方法獲取每一個訂閱者的當前緩衝區大小。

當訂閱者的方法拋出異常時,其訂閱被取消。 當訂閱者的onNext()方法拋出異常時,在其訂閱被取消以前調用構造函數中指定的處理程序。 默認狀況下,處理程序爲null。

如下代碼片斷會建立一個SubmissionPublisher,它發佈全部屬性設置爲默認值的Long類型的元素:

// Create a publisher that can publish Long values
SubmissionPublisher<Long> pub = new SubmissionPublisher<>();

SubmissionPublisher類實現了AutoCloseable接口。 調用其close()方法調用其當前訂閱者上的onComplete()方法。 調用close()方法後嘗試發佈元素會拋出IllegalStateException異常。

3. 發佈元素

SubmissionPublisher<T>類包含如下發布元素的方法:

int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
int submit(T item)

submit()方法阻塞,直到當前訂閱者的資源可用於發佈元素。 考慮每一個訂閱者的緩衝區容量爲10的狀況。 訂閱者訂閱了發佈者而且不請求任何元素。 發佈者發佈了10個元素並所有緩衝全部元素。 嘗試使用submit()方法發佈另外一個元素將阻塞,由於訂閱者的緩衝區已滿。

offer()方法是非阻塞的。 該方法的第一個版本容許指定超時,以後刪除該項。 能夠指定一個刪除處理器,它是一個BiPredicate。 在刪除訂閱者的元素以前調用刪除處理器的test()方法。 若是test()方法返回true,則再次重試該項。 若是test()方法返回false,則在不重試的狀況下刪除該項。 從offer()方法返回的負整數表示向訂閱者發送元素失敗的嘗試次數;正整數表示在全部當前訂閱者中提交但還沒有消費的最大元素數量的估計。

應該使用哪一種方法發佈一個元素:submit()offer()? 這取決於你的要求。 若是每一個已發佈的元素必須發給全部訂閱者,則submit()方法是最好選擇。 若是要等待發布一段特定時間的元素進行重試,則可使用offer()方法。

4. 舉個例子

來看一個使用SubmissionPublisher做爲發佈者的例子。 SubmissionPublisher可使用其submit(T item)方法發佈元素。 如下代碼片斷生成併發布五個整數(1,2,3,4和5),假設pub是對SubmissionPublisher對象的引用:

// Generate and publish 10 integers
LongStream.range(1L, 6L)
          .forEach(pub::submit);

須要訂閱者才能使用發佈者發佈的元素。 SubmissionPublisher類包含一個consume(Consumer<? super T> consumer)方法,它容許添加一個但願處理全部已發佈元素的訂閱者,而且對任何其餘通知(如錯誤和完成通知)不感興趣。 該方法返回一個CompletedFuture<Void>,當發佈者調用訂閱者的onComplete()方法時,表示完成。 如下代碼片斷將一個Consumer做爲訂閱者添加到發佈者中:

// Add a subscriber that prints the published items
CompletableFuture<Void> subTask = pub.consume(System.out::println);

本章中的代碼是com.jdojo.stream的模塊的一部分,其聲明以下所示。

// module-info.java
module com.jdojo.stream {
    exports com.jdojo.stream;
}

下面包含了NumberPrinter類的代碼,它顯示瞭如何使用SubmissionPublisher類來發布整數。 示例代碼的詳細說明遵循NumberPrinter類的輸出。

// NumberPrinter.java
package com.jdojo.stream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.LongStream;
public class NumberPrinter {
    public static void main(String[] args) {        
        CompletableFuture<Void> subTask = null;
        // The publisher is closed when the try block exits
        try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
            // Print the buffer size used for each subscriber
            System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());
            // Add a subscriber to the publisher. The subscriber prints the published elements
            subTask = pub.consume(System.out::println);
            // Generate and publish five integers
            LongStream.range(1L, 6L)
                      .forEach(pub::submit);
        }
        if (subTask != null) {
            try {
                // Wait until the subscriber is complete
                subTask.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

輸出結果爲:

Subscriber Buffer Size: 256
1
2
3
4
5

main()方法聲明一個subTask的變量來保存訂閱者任務的引用。 subTask.get()方法將阻塞,直到訂閱者完成。

CompletableFuture<Void> subTask = null;

發佈類型爲Long的元素髮布者是在資源塊中建立的。 發佈者是SubmissionPublisher<Long>類的實例。 當try塊退出時,發佈者將自動關閉。

try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
  //...
}

該程序打印將訂閱發佈者的每一個訂閱者的緩衝區大小。

// Print the buffer size used for each subscriber
System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());

訂閱者將使用consume()方法添加到發佈者。 請注意,該方法容許指定一個Consumer,它在內部轉換爲Subscriber。 每一個發佈的元素會通知訂閱者。 訂閱者只需打印它接收的元素。

// Add a subscriber to the publisher. The subscriber prints the published elements
subTask = pub.consume(System.out::println);

如今是發佈整數的時候了。 該程序生成五個整數,1到5,並使用發佈者的submit()方法發佈它們。

// Generate and publish five integers
LongStream.range(1L, 6L)
          .forEach(pub::submit);

已發佈的整數以異步方式發送給訂閱者。 當try塊退出時,發佈者關閉。 要保持程序運行,直到訂閱者完成處理全部已發佈的元素,必須調用subTask.get()。 若是不調用此方法,則可能不會在輸出中看到五個整數。

4. 建立訂閱者

要有訂閱者,須要建立一個實現Flow.Subscriber<T>接口的類。 實現接口方法的方式取決於具體的需求。 在本節中,將建立一個SimpleSubscriber類,該類實現Flow.Subscriber<Long>接口。 下面包含此類的代碼。

// SimpleSubscriber.java
package com.jdojo.stream;
import java.util.concurrent.Flow;
public class SimpleSubscriber implements Flow.Subscriber<Long> {    
    private Flow.Subscription subscription;
    // Subscriber name
    private String name = "Unknown";
    // Maximum number of items to be processed by this subscriber
    private final long maxCount;
    // keep track of number of items processed
    private long counter;
    public SimpleSubscriber(String name, long maxCount) {
        this.name = name;
        this.maxCount = maxCount <= 0 ? 1 : maxCount;
    }
    public String getName() {
        return name;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.printf("%s subscribed with max count %d.%n", name, maxCount);        
        // Request all items in one go
        subscription.request(maxCount);
    }
    @Override
    public void onNext(Long item) {
        counter++;
        System.out.printf("%s received %d.%n", name, item);
        if (counter >= maxCount) {
            System.out.printf("Cancelling %s. Processed item count: %d.%n", name, counter);            
            // Cancel the subscription
            subscription.cancel();
        }
    }
    @Override
    public void onError(Throwable t) {
        System.out.printf("An error occurred in %s: %s.%n", name, t.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.printf("%s is complete.%n", name);
    }
}

SimpleSubscriber類的實例表示一個訂閱者,它有一個名稱和要處理的最大數量的items (maxCount)方法。 須要將其名稱和maxCount傳遞給其構造函數。 若是maxCount小於1,則在構造函數中設置爲1。

onSubscribe()方法中,它保存發佈者在名爲subscription的實例變量中傳遞的Flow.Subscription。 它打印有關Flow.Subscription的消息,並請求一次能夠處理的全部元素。 該訂閱者有效地使用push模型,由於在該請求以後,再也不向發佈者發送更多的元素的請求。 發佈着將推送maxCount或更少的元素數量給該訂閱者。

onNext()方法中,它將counter實例變量遞增1。counter實例變量跟蹤訂閱者接收到的元素數量。 該方法打印詳細說明接收到的元素消息。 若是它已經收到能夠處理的最後一個元素,它將取消訂閱。 取消訂閱後,發佈者再也不收到任何元素。

onError()onComplete()方法中,它打印一個有關其狀態的消息。

如下代碼段建立一個SimpleSubscriber,其名稱爲S1,能夠處理最多10個元素。

SimpleSubscriber sub1 = new SimpleSubscriber("S1", 10);

如今看一下具體使用SimpleSubscriber的例子。 下包含一個完整的程序。 它按期發佈元素。 發佈一個元素後,它等待1到3秒鐘。 等待的持續時間是隨機的。 如下詳細說明本程序的輸出。 該程序使用異步處理可能致使不一樣輸出結果。

// PeriodicPublisher.java
package com.jdojo.stream;
import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class PeriodicPublisher {
    final static int MAX_SLEEP_DURATION = 3;
    // Used to generate sleep time
    final static Random sleepTimeGenerator = new Random();
    public static void main(String[] args) {
        SubmissionPublisher<Long> pub = new SubmissionPublisher<>();
        // Create three subscribers
        SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);
        SimpleSubscriber sub2 = new SimpleSubscriber("S2", 5);
        SimpleSubscriber sub3 = new SimpleSubscriber("S3", 6);
        SimpleSubscriber sub4 = new SimpleSubscriber("S4", 10);
        // Subscriber to the publisher
        pub.subscribe(sub1);
        pub.subscribe(sub2);
        pub.subscribe(sub3);
        // Subscribe the 4th subscriber after 2 seconds
        subscribe(pub, sub4, 2);
        // Start publishing items
        Thread pubThread = publish(pub, 5);
        try {
            // Wait until the publisher is finished
            pubThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static Thread publish(SubmissionPublisher<Long> pub, long count) {
        Thread t = new Thread(() -> {
            for (long i = 1; i <= count; i++) {
                pub.submit(i);
                sleep(i);
            }
            // Close the publisher
            pub.close();
        });
        // Start the thread
        t.start();
        return t;
    }
    private static void sleep(Long item) {
        // Wait for 1 to 3 seconds
        int sleepTime = sleepTimeGenerator.nextInt(MAX_SLEEP_DURATION) + 1;
        try {
            System.out.printf("Published %d. Sleeping for %d sec.%n", item, sleepTime);
            TimeUnit.SECONDS.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private static void subscribe(SubmissionPublisher<Long> pub, Subscriber<Long> sub,
                                  long delaySeconds) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(delaySeconds);
                pub.subscribe(sub);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }            
        }).start();
    }
}

輸出結果爲:

S2 subscribed with max count 5.
Published 1. Sleeping for 1 sec.
S3 subscribed with max count 6.
S1 subscribed with max count 2.
S1 received 1.
S3 received 1.
S2 received 1.
Published 2. Sleeping for 1 sec.
S1 received 2.
S2 received 2.
S3 received 2.
Cancelling S1. Processed item count: 2.
S4 subscribed with max count 10.
Published 3. Sleeping for 1 sec.
S4 received 3.
S3 received 3.
S2 received 3.
Published 4. Sleeping for 2 sec.
S4 received 4.
S3 received 4.
S2 received 4.
Published 5. Sleeping for 2 sec.
S2 received 5.
Cancelling S2. Processed item count: 5.
S4 received 5.
S3 received 5.
S3 is complete.
S4 is complete.

PeriodicPublisher類使用兩個靜態變量。 MAX_SLEEP_DURATION靜態變量保存發佈這等待發佈下一個元素最大秒數。 它設置爲3。sleepTimeGenerator靜態變量Random對象的引用,該對象在sleep()方法中用於生成下一個等待的隨機持續時間。

PeriodicPublisher類的main()方法執行如下操做:

  • 它建立做爲SubmissionPublisher<Long>類的實例的發佈者。
  • 它建立了四個爲S1S2S3S4的訂閱者。每一個訂閱者可以處理不一樣數量的元素。
  • 三個訂閱者當即訂閱。
  • S4的訂閱者在兩秒鐘的最短延遲以後以單獨的線程訂閱。 PeriodicPublisher類的subscribe()方法負責處理此延遲訂閱。注意到在兩個元素(1和2)已經發布以後S4訂閱的輸出中,它將不會收到這兩個元素。
  • 它調用publish()方法,它啓動一個新的線程來發布五個元素,它啓動線程並返回線程引用。
  • main()方法調用發佈元素線程的join()方法,因此在全部元素髮布以前程序不會終止。
  • publish()方法負責發佈五個元素。最後關閉發佈者。它調用sleep()方法,使當前線程休眠一個和MAX_SLEEP_DURATION秒之間的隨機選擇的持續時間。
  • 在輸出中注意到,一些訂閱者取消了訂閱,由於他們從發佈商那裏收到指定數量的元素。

請注意,該程序保證全部元素將在終止以前發佈,但不保證全部訂閱者都將收到這些元素。 在輸出中,會看到訂閱者收到全部已發佈的元素。 這是由於發佈者在發佈最後一個元素後等待至少一秒鐘,這給了訂閱者足夠的時間,在這個小程序中接收和處理最後一個元素。

該程序沒有表現出背壓(backpressure) ,由於全部訂閱者都經過一次性請求元素來使用push模型。 能夠將SimpleSubscriber類修改成分配任務,以查看背壓的效果:

  • onSubscribe()方法中使用subscription.request(1)方法請求一個元素。
  • onNext()方法中,延遲後請求更多的元素。 延遲應使訂閱者的工做速度較慢,發佈者發佈元素的速度較慢。
  • 須要發佈超過256個元素,這是每一個發佈者向訂閱者使用的默認緩衝區,或者使用SubmissionPublisher類的另外一個構造函數使用較小的緩衝區大小。 這將迫使發佈者發佈比訂閱者能夠處理的更多的元素。
  • 訂閱者使用刪除處理程序( drop handler)訂閱,以即可以看到發佈者什麼時候發現背壓。
  • 使用SubmissionPublisher類的offer()方法發佈元素,所以當訂閱者沒法處理更多元素時,發佈者不會無限期地等待。

5. 使用處理者

處理者(Processor)同時是訂閱者也是發佈者。 要使用處理者,須要一個實現Flow.Processor<T,R>接口的類,其中T是訂閱元素類型,R是已發佈的元素類型。 在本節中,建立了一個基於Predicate<T>過濾元素的簡單處理者。 處理者訂閱發佈六個整數——1,2,3,4,5和6的發佈者。訂閱者訂閱處理者。 處理者從其發佈者接收元素,若是它們經過了Predicate<T>指定的標準,則從新發布相同的元素。 下面包含其實例做爲處理者的FilterProcessor<T>類的代碼。

// FilterProcessor.java
package com.jdojo.stream;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Predicate;
public class FilterProcessor<T> extends SubmissionPublisher<T> implements Processor<T,T>{
    private Predicate<? super T> filter;
    public FilterProcessor(Predicate<? super T> filter) {
        this.filter = filter;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        // Request an unbounded number of items
        subscription.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(T item) {
        // If the item passes the filter publish it. Otherwise, no action is needed.
        System.out.println("Filter received: " + item);
        if (filter.test(item)) {            
            this.submit(item);
        }
    }
    @Override
    public void onError(Throwable t) {
        // Pass the onError message to all subscribers asynchronously        
        this.getExecutor().execute(() -> this.getSubscribers()
                                             .forEach(s -> s.onError(t)));
    }
    @Override
    public void onComplete() {
        System.out.println("Filter is complete.");
        // Close this publisher, so all its subscribers will receive a onComplete message
        this.close();
    }
}

FilterProcessor<T>類繼承自SubmissionPublisher<T>類,並實現了Flow.Processor<T,T>接口。 處理者必須是發佈者以及訂閱者。 從SubmissionPublisher<T>類繼承了這個類,因此沒必要編寫代碼來使其成爲發佈者。 該類實現了Processor<T,T>接口的全部方法,所以它將接收和發佈相同類型的元素。

構造函數接受Predicate<? super T> 參數並將其保存在實例變量filter中,將在onNext()方法中使用filter元素。

onNext()方法應用filter。 若是filter返回true,則會將該元素從新發布到其訂閱者。 該類從其超類SubmissionPublisher繼承了用於從新發布元素的submit()方法。

onError()方法異步地將錯誤從新發布給其訂閱者。 它使用SubmissionPublisher類的getExecutor()getSubscribers()方法,該方法返回Executor和當前訂閱者的列表。 Executor用於異步地向當前訂閱者發佈消息。

onComplete()方法關閉處理者的發佈者部分,它將向全部訂閱者發送一個onComplete消息。

讓咱們看看這個處理者具體的例子。 下面包含ProcessorTest類的代碼。 可能會獲得一個不一樣的輸出,由於這個程序涉及到幾個異步步驟。 該程序的詳細說明遵循程序的輸出。

// ProcessorTest.java
package com.jdojo.stream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
public class ProcessorTest {
    public static void main(String[] args) {
        CompletableFuture<Void> subTask = null;
        // The publisher is closed when the try block exits
        try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
            // Create a Subscriber
            SimpleSubscriber sub = new SimpleSubscriber("S1", 10);
            // Create a processor
            FilterProcessor<Long> filter = new FilterProcessor<>(n -> n % 2 == 0);
            // Subscribe the filter to the publisher and a subscriber to the filter
            pub.subscribe(filter);            
            filter.subscribe(sub);
            // Generate and publish 6 integers
            LongStream.range(1L, 7L)
                      .forEach(pub::submit);
        }
        try {
            // Sleep for two seconds to let subscribers finish handling all items
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出的結果爲:

S1 subscribed with max count 10.
Filter received: 1
Filter received: 2
Filter received: 3
S1 received 2.
Filter received: 4
S1 received 4.
Filter received: 5
Filter received: 6
Filter is complete.
S1 received 6.
S1 is complete.

ProcessorTest類的main()方法建立一個發佈者,它將發佈六個整數——1,2,3,4,5和6。該方法作了不少事情:

  • 它建立一個使用try-with-resources塊的發佈者,因此當try塊退出時它將自動關閉。
  • 它建立一個SimpleSubscriber類的實例的訂閱者。訂閱者名爲S1,最多可處理10個元素。
  • 它建立一個處理者,它是FilterProcessor<Long>類的實例。傳遞一個Predicate<Long>,讓處理者從新發布整數並丟棄奇數。
  • 處理者被訂閱發佈者,而且簡單訂閱者被訂閱處處理者。這完成了發佈者到訂閱者的管道——發佈者處處理者到訂閱者。
  • 在第一個try塊的末尾,代碼生成從1到6的整數,並使用發佈者發佈它們。
  • main()方法結束時,程序等待兩秒鐘,以確保處理者和訂閱者有機會處理其事件。若是刪除此邏輯,程序可能沒法打印任何內容。必須包含這個邏輯,由於全部事件都是異步處理的。當第一個try塊退出時,發佈者將完成向處理者發送全部通知。然而,處理者和訂閱者須要一些時間來接收和處理這些通知。

四. 總結

流是生產者生產並由一個或多個消費者消費的元素序列。 這種生產者——消費者模型也被稱爲source/sink模型或發行者——訂閱者模型。

有幾種流處理機制,pull模型和push模型是最多見的。 在push模型中,發佈者將數據流推送到訂閱者。 在pull模型中,定於這從發佈者拉出數據。 當兩端不以相同的速率工做的時,這些模型有問題。 解決方案是提供適應發佈者和訂閱者速率的流。 使用稱爲背壓的策略,其中訂閱者通知發佈者它能夠處理多少個元素,而且發佈者僅向訂閱者發送那些須要處理的元素。

響應式流從2013年開始,做爲提供非阻塞背壓的異步流處理標準的舉措。 它旨在解決處理元素流的問題 ——如何將元素流從發佈者傳遞到訂閱者,而不須要發佈者阻塞,或者訂閱者有無限制的緩衝區或丟棄。 響應式流模型在pull模型和push模型流處理機制之間動態切換。 當訂閱者處理較慢時,它使用pull模型,當訂閱者處理更快時使用push模型。

在2015年,出版了一個用於處理響應式流的規範和Java API。 Java API 中的響應式流由四個接口組成:Publisher<T>Subscriber<T>SubscriptionProcessor<T,R>

發佈者根據收到的要求向訂閱者發佈元素。 用戶訂閱發佈者接收元素。 發佈者向訂閱者發送訂閱令牌。 使用訂閱令牌,訂閱者從發佈者請求多個數據元素。 當數據元素準備就緒時,發佈者向訂閱者發送多個個或稍少的數據元素。 訂閱者能夠請求更多的數據元素。

JDK 9在java.util.concurrent包中提供了與響應式流兼容的API,它在java.base模塊中。 API由兩個類組成:FlowSubmissionPublisher<T>

Flow類封裝了響應式流Java API。 由響應式流Java API指定的四個接口做爲嵌套靜態接口包含在Flow類中:Flow.Processor<T,R>Flow.Publisher<T>Flow.Subscriber<T>Flow.Subscription

相關文章
相關標籤/搜索