上篇文章咱們簡單的介紹了Reactor的發展史和基本的Flux和Mono的使用,本文將會進一步挖掘Reactor的高級用法,一塊兒來看看吧。java
以前的文章咱們提到了4個Flux的subscribe的方法:react
Disposable subscribe(); Disposable subscribe(Consumer<? super T> consumer); Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
這四個方法,須要咱們使用lambda表達式來自定義consumer,errorConsumer,completeSonsumer和subscriptionConsumer這四個Consumer。git
寫起來比較複雜,看起來也不太方便,咱們考慮一下,這四個Consumer是否是和Subscriber接口中定義的4個方法是一一對應的呢?github
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
對的,因此咱們有一個更加簡單點的subscribe方法:多線程
public final void subscribe(Subscriber<? super T> actual)
這個subscribe方法直接接收一個Subscriber類。從而實現了全部的功能。異步
本身寫Subscriber太麻煩了,Reactor爲咱們提供了一個BaseSubscriber的類,它實現了Subscriber中的全部功能,還附帶了一些其餘的方法。 ide
咱們看下BaseSubscriber的定義:fetch
public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription, Disposable
注意,BaseSubscriber是單次使用的,這就意味着,若是它首先subscription到Publisher1,而後subscription到Publisher2,那麼將會取消對第一個Publisher的訂閱。
由於BaseSubscriber是一個抽象類,因此咱們須要繼承它,而且重寫咱們須要本身實現的方法。this
下面看一個自定義的Subscriber:線程
public class CustSubscriber<T> extends BaseSubscriber<T> { public void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); } public void hookOnNext(T value) { System.out.println(value); request(1); } }
BaseSubscriber中有不少以hook開頭的方法,這些方法都是咱們能夠重寫的,而Subscriber原生定義的on開頭的方法,在BaseSubscriber中都是final的,都是不能重寫的。
咱們看一個定義:
@Override public final void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { try { hookOnSubscribe(s); } catch (Throwable throwable) { onError(Operators.onOperatorError(s, throwable, currentContext())); } } }
能夠看到,它內部實際上調用了hook的方法。
上面的CustSubscriber中,咱們重寫了兩個方法,一個是hookOnSubscribe,在創建訂閱的時候調用,一個是hookOnNext,在收到onNext信號的時候調用。
在這些方法中,給了咱們足夠的自定義空間,上面的例子中咱們調用了request(1),表示再請求一個元素。
其餘的hook方法還有: hookOnComplete, hookOnError, hookOnCancel 和 hookFinally。
咱們以前講過了,reactive stream的最大特徵就是能夠處理Backpressure。
什麼是Backpressure呢?就是當consumer處理過不來的時候,能夠通知producer來減小生產速度。
咱們看下BaseSubscriber中默認的hookOnSubscribe實現:
protected void hookOnSubscribe(Subscription subscription){ subscription.request(Long.MAX_VALUE); }
能夠看到默認是request無限數目的值。 也就是說默認狀況下沒有Backpressure。
經過重寫hookOnSubscribe方法,咱們能夠自定義處理速度。
除了request以外,咱們還能夠在publisher中限制subscriber的速度。
public final Flux<T> limitRate(int prefetchRate) { return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate)); }
在Flux中,咱們有一個limitRate方法,能夠設定publisher的速度。
好比subscriber request(100),而後咱們設置limitRate(10),那麼最多producer一次只會產生10個元素。
接下來,咱們要講解一下怎麼建立Flux,一般來說有4種方法來建立Flux。
第一種方法就是最簡單的同步建立的generate.
先看一個例子:
public void useGenerate(){ Flux<String> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; }); flux.subscribe(System.out::println); }
輸出結果:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30
上面的例子中,咱們使用generate方法來同步的生成元素。
generate接收兩個參數:
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
第一個參數是stateSupplier,用來指定初始化的狀態。
第二個參數是一個generator,用來消費SynchronousSink,並生成新的狀態。
上面的例子中,咱們每次將state+1,一直加到10。
而後使用subscribe來將全部的生成元素輸出。
Flux也提供了一個create方法來建立Flux,create能夠是同步也能夠是異步的,而且支持多線程操做。
由於create沒有初始的state狀態,因此能夠用在多線程中。
create的一個很是有用的地方就是能夠將第三方的異步API和Flux關聯起來,舉個例子,咱們有一個自定義的EventProcessor,當處理相應的事件的時候,會去調用註冊到Processor中的listener的一些方法。
interface MyEventListener<T> { void onDataChunk(List<T> chunk); void processComplete(); }
咱們怎麼把這個Listener的響應行爲和Flux關聯起來呢?
public void useCreate(){ EventProcessor myEventProcessor = new EventProcessor(); Flux<String> bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); }); }
使用create就夠了,create接收一個consumer參數:
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
這個consumer的本質是去消費FluxSink對象。
上面的例子在MyEventListener的事件中對FluxSink對象進行消費。
push和create同樣,也支持異步操做,可是同時只能有一個線程來調用next, complete 或者 error方法,因此它是單線程的。
Handle和上面的三個方法不一樣,它是一個實例方法。
它和generate很相似,也是消費SynchronousSink對象。
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
不一樣的是它的參數是一個BiConsumer,是沒有返回值的。
看一個使用的例子:
public void useHandle(){ Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20) .handle((i, sink) -> { String letter = alphabet(i); if (letter != null) sink.next(letter); }); alphabet.subscribe(System.out::println); } public String alphabet(int letterNumber) { if (letterNumber < 1 || letterNumber > 26) { return null; } int letterIndexAscii = 'A' + letterNumber - 1; return "" + (char) letterIndexAscii; }
本文的例子learn-reactive
本文做者:flydean程序那些事本文連接:http://www.flydean.com/reactor-core-in-depth/
本文來源:flydean的博客
歡迎關注個人公衆號:「程序那些事」最通俗的解讀,最深入的乾貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!