本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範 | 自定義數據流
本節測試源碼html
許多地方也叫作「背壓」、「負壓」,我在《Reactor參考文檔》中是翻譯爲「背壓」的,後來在看到有「回壓」的翻譯,突然感受從文字上彷佛更加符合。java
這一節討論回壓的問題,有兩個前提:react
回壓的處理有如下幾種策略:git
這幾種策略定義在枚舉類型OverflowStrategy
中,不過還有一個IGNORE類型,即徹底忽略下游背壓請求,這可能會在下游隊列積滿的時候致使 IllegalStateException。github
上一節中,用於生成數據流的方法create
和push
能夠用於異步的場景,並且它們也支持回壓,咱們能夠經過提供一個 OverflowStrategy 來定義背壓行爲。方法簽名:編程
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)
默認(沒有第二個參數的方法)是緩存策略的,咱們來試一下別的策略,好比DROP的策略。緩存
咱們繼續使用2.2節的那個測試例子,下邊是用create
建立的「快的發佈者」,不過方便起見拆放到兩個私有方法裏供調用:併發
public class Test_2_3 { /** * 使用create方法生成「快的發佈者」。 * @param strategy 回壓策略 * @return Flux */ private Flux<MyEventSource.MyEvent> createFlux(FluxSink.OverflowStrategy strategy) { return Flux.create(sink -> eventSource.register(new MyEventListener() { @Override public void onNewEvent(MyEventSource.MyEvent event) { System.out.println("publish >>> " + event.getMessage()); sink.next(event); } @Override public void onEventStopped() { sink.complete(); } }), strategy); // 1 } /** * 生成MyEvent。 * @param count 生成MyEvent的個數。 * @param millis 每一個MyEvent之間的時間間隔。 */ private void generateEvent(int times, int millis) { // 循環生成MyEvent,每一個MyEvent間隔millis毫秒 for (int i = 0; i < times; i++) { try { TimeUnit.MILLISECONDS.sleep(millis); } catch (InterruptedException e) { } eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i)); } eventSource.eventStopped(); } }
有了「快的發佈者」,下面是「慢的訂閱者」,以及一些測試準備工做:異步
public class Test_2_3 { private final int EVENT_DURATION = 10; // 生成的事件間隔時間,單位毫秒 private final int EVENT_COUNT = 20; // 生成的事件個數 private final int PROCESS_DURATION = 30; // 訂閱者處理每一個元素的時間,單位毫秒 private Flux<MyEventSource.MyEvent> fastPublisher; private SlowSubscriber slowSubscriber; private MyEventSource eventSource; private CountDownLatch countDownLatch; /** * 準備工做。 */ @Before public void setup() { countDownLatch = new CountDownLatch(1); slowSubscriber = new SlowSubscriber(); eventSource = new MyEventSource(); } /** * 觸發訂閱,使用CountDownLatch等待訂閱者處理完成。 */ @After public void subscribe() throws InterruptedException { fastPublisher.subscribe(slowSubscriber); generateEvent(EVENT_COUNT, EVENT_DURATION); countDownLatch.await(1, TimeUnit.MINUTES); } /** * 內部類,「慢的訂閱者」。 */ class SlowSubscriber extends BaseSubscriber<MyEventSource.MyEvent> { @Override protected void hookOnSubscribe(Subscription subscription) { request(1); // 訂閱時請求1個數據 } @Override protected void hookOnNext(MyEventSource.MyEvent event) { System.out.println(" receive <<< " + event.getMessage()); try { TimeUnit.MILLISECONDS.sleep(PROCESS_DURATION); } catch (InterruptedException e) { } request(1); // 每處理完1個數據,就再請求1個 } @Override protected void hookOnError(Throwable throwable) { System.err.println(" receive <<< " + throwable); } @Override protected void hookOnComplete() { countDownLatch.countDown(); } } }
下面是測試方法:ide
/** * 測試create方法的不一樣OverflowStrategy的效果。 */ @Test public void testCreateBackPressureStratety() { fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER) // 1 .doOnRequest(n -> System.out.println(" === request: " + n + " ===")) // 2 .publishOn(Schedulers.newSingle("newSingle"), 1); // 3 }
.publishOn
的請求);publishOn
讓後續的操做符和訂閱者運行在一個單獨的名爲newSingle
的線程上,第二個參數1是預取個數,也就是.publishOn
做爲訂閱者每次向上遊request的個數,默認爲256,因此必定程度上也起到了緩存的效果,爲了測試,設置爲1。一般狀況下,發佈者於訂閱者並不在同一個線程上,這裏使用
publishOn
來模擬這種狀況。
BUFFER
策略的輸出以下(來不及處理的數據會緩存下來,這是一般狀況下的默認策略):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 === request: 1 === publish >>> Event-3 receive <<< Event-1 publish >>> Event-4 publish >>> Event-5 publish >>> Event-6 === request: 1 === receive <<< Event-2 publish >>> Event-7 publish >>> Event-8 ...
DROP
策略的輸出以下(有新數據就緒的時候,看是否有request,有的話就發出,沒有就丟棄):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 publish >>> Event-3 === request: 1 === publish >>> Event-4 receive <<< Event-4 publish >>> Event-5 publish >>> Event-6 publish >>> Event-7 === request: 1 === publish >>> Event-8 receive <<< Event-8 ...
能夠看到,第1/2/3/5/6/7/...的數據被丟棄了,當有request以後的數據會被髮出。調整一下publishOn
方法的第二個參數(預取個數)爲2,輸出以下:
=== request: 2 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 publish >>> Event-3 receive <<< Event-1 publish >>> Event-4 publish >>> Event-5 publish >>> Event-6 === request: 2 === publish >>> Event-7 receive <<< Event-7 publish >>> Event-8 publish >>> Event-9 publish >>> Event-10 receive <<< Event-8 publish >>> Event-11 publish >>> Event-12
可見,每次request(請求2個數據)以後的2個數據發出,更多就緒的數據因爲沒有request就丟棄了。
LATEST
的輸出以下(request到來的時候,將最新的數據發出):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 publish >>> Event-3 === request: 1 === receive <<< Event-3 publish >>> Event-4 publish >>> Event-5 === request: 1 === receive <<< Event-5 publish >>> Event-6 publish >>> Event-7 publish >>> Event-8 === request: 1 === receive <<< Event-8
ERROR
的輸出以下(當訂閱者來不及處理時候發出一個錯誤信號):
=== request: 1 === publish >>> Event-0 receive <<< Event-0 publish >>> Event-1 publish >>> Event-2 === request: 1 === receive <<< reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
IGNORE
的輸出以下:
... === request: 2 === receive <<< Event-10 receive <<< Event-11 === request: 2 === receive <<< Event-12 receive <<< reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
Reactor提供了響應的onBackpressureXxx
操做符,調整回壓策略。測試方法以下:
/** * 測試不一樣的onBackpressureXxx方法的效果。 */ @Test public void testOnBackPressureXxx() { fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER) .onBackpressureBuffer() // BUFFER // .onBackpressureDrop() // DROP // .onBackpressureLatest() // LATEST // .onBackpressureError() // ERROR .doOnRequest(n -> System.out.println(" === request: " + n + " ===")) .publishOn(Schedulers.newSingle("newSingle"), 1); }
經過打開某一個操做符的註釋能夠觀察輸出。這裏就不貼輸出內容了,Reactor文檔的示意圖更加直觀:
onBackpressureBuffer,對於來自其下游的request採起「緩存」策略。
onBackpressureDrop,元素就緒時,根據下游是否有未知足的request來判斷是否發出當前元素。
onBackpressureLatest,當有新的request到來的時候,將最新的元素髮出。
onBackpressureError,當有多餘元素就緒時,發出錯誤信號。
真是一圖勝千言啊,上邊的這些圖片都是來自Reactor官方文檔。
當進行異步編程時,一般會面臨相互協做的各個組件不在同一個線程的狀況,好比一個生產者不斷生成消息,而一個消費者不斷處理這些產生的消息,兩者一般不在一個線程甚至是兩個不一樣的組件。當有人不當心採用了×××資源(好比無上限的彈性線程池、×××隊列等),那麼在高併發或任務繁重時就有可能形成線程數爆炸增加,或隊列堆積,所以backpressure這種協調機制對於維持系統穩定具備重要做用。