(13)Reactor的backpressure策略——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 響應式流 | Reactor 3快速上手 | 響應式流規範 | 自定義數據流
本節測試源碼html

2.3 不一樣的回壓策略

許多地方也叫作「背壓」、「負壓」,我在《Reactor參考文檔》中是翻譯爲「背壓」的,後來在看到有「回壓」的翻譯,突然感受從文字上彷佛更加符合。java

這一節討論回壓的問題,有兩個前提:react

  1. 發佈者與訂閱者不在同一個線程中,由於在同一個線程中的話,一般使用傳統的邏輯就能夠,不須要進行回壓處理;
  2. 發佈者發出數據的速度高於訂閱者處理數據的速度,也就是處於「PUSH」狀態下,若是相反,那就是「PUll」狀態,不須要處理回壓。

2.3.1 回壓策略

回壓的處理有如下幾種策略:git

  1. ERROR: 當下遊跟不上節奏的時候發出一個錯誤信號。
  2. DROP:當下遊沒有準備好接收新的元素的時候拋棄這個元素。
  3. LATEST:讓下游只獲得上游最新的元素。
  4. BUFFER:緩存下游沒有來得及處理的元素(若是緩存不限大小的可能致使OutOfMemoryError)。

這幾種策略定義在枚舉類型OverflowStrategy中,不過還有一個IGNORE類型,即徹底忽略下游背壓請求,這可能會在下游隊列積滿的時候致使 IllegalStateException。github

2.3.2 使用create聲明回壓策略

上一節中,用於生成數據流的方法createpush能夠用於異步的場景,並且它們也支持回壓,咱們能夠經過提供一個 OverflowStrategy 來定義背壓行爲。方法簽名:編程

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)

默認(沒有第二個參數的方法)是緩存策略的,咱們來試一下別的策略,好比DROP的策略。緩存

咱們繼續使用2.2節的那個測試例子,下邊是用create建立的「快的發佈者」,不過方便起見拆放到兩個私有方法裏供調用:併發

public class Test_2_3 {
        /**
         * 使用create方法生成「快的發佈者」。
         * @param strategy 回壓策略
         * @return  Flux
         */
        private Flux<MyEventSource.MyEvent> createFlux(FluxSink.OverflowStrategy strategy) {
            return Flux.create(sink -> eventSource.register(new MyEventListener() {
                @Override
                public void onNewEvent(MyEventSource.MyEvent event) {
                    System.out.println("publish >>> " + event.getMessage());
                    sink.next(event);
                }

                @Override
                public void onEventStopped() {
                    sink.complete();
                }
            }), strategy);  // 1
        }
        /**
         * 生成MyEvent。
         * @param count 生成MyEvent的個數。
         * @param millis 每一個MyEvent之間的時間間隔。
         */
        private void generateEvent(int times, int millis) {
            // 循環生成MyEvent,每一個MyEvent間隔millis毫秒
            for (int i = 0; i < times; i++) {
                try {
                    TimeUnit.MILLISECONDS.sleep(millis);
                } catch (InterruptedException e) {
                }
                eventSource.newEvent(new MyEventSource.MyEvent(new Date(), "Event-" + i));
            }
            eventSource.eventStopped();
        }
    }

有了「快的發佈者」,下面是「慢的訂閱者」,以及一些測試準備工做:異步

public class Test_2_3 {
        private final int EVENT_DURATION   = 10;    // 生成的事件間隔時間,單位毫秒
        private final int EVENT_COUNT      = 20;    // 生成的事件個數
        private final int PROCESS_DURATION = 30;    // 訂閱者處理每一個元素的時間,單位毫秒

        private Flux<MyEventSource.MyEvent> fastPublisher;
        private SlowSubscriber slowSubscriber;
        private MyEventSource eventSource;
        private CountDownLatch countDownLatch;

        /**
         * 準備工做。
         */
        @Before
        public void setup() {
            countDownLatch = new CountDownLatch(1);
            slowSubscriber = new SlowSubscriber();
            eventSource = new MyEventSource();
        }

        /**
         * 觸發訂閱,使用CountDownLatch等待訂閱者處理完成。
         */
        @After
        public void subscribe() throws InterruptedException {
            fastPublisher.subscribe(slowSubscriber);
            generateEvent(EVENT_COUNT, EVENT_DURATION);
            countDownLatch.await(1, TimeUnit.MINUTES);
        }

        /**
         * 內部類,「慢的訂閱者」。
         */
        class SlowSubscriber extends BaseSubscriber<MyEventSource.MyEvent> {

            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                request(1);     // 訂閱時請求1個數據
            }

            @Override
            protected void hookOnNext(MyEventSource.MyEvent event) {
                System.out.println("                      receive <<< " + event.getMessage());
                try {
                    TimeUnit.MILLISECONDS.sleep(PROCESS_DURATION);
                } catch (InterruptedException e) {
                }
                request(1);     // 每處理完1個數據,就再請求1個
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.err.println("                      receive <<< " + throwable);
            }

            @Override
            protected void hookOnComplete() {
                countDownLatch.countDown();
            }
        }
    }

下面是測試方法:ide

/**
     * 測試create方法的不一樣OverflowStrategy的效果。
     */
    @Test
    public void testCreateBackPressureStratety() {
        fastPublisher =
                createFlux(FluxSink.OverflowStrategy.BUFFER)    // 1
                        .doOnRequest(n -> System.out.println("         ===  request: " + n + " ==="))    // 2
                        .publishOn(Schedulers.newSingle("newSingle"), 1);   // 3
    }
  1. 調整不一樣的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)觀察效果,create方法默認爲BUFFER;
  2. 打印出每次的請求(也就是後邊.publishOn的請求);
  3. 使用publishOn讓後續的操做符和訂閱者運行在一個單獨的名爲newSingle的線程上,第二個參數1是預取個數,也就是.publishOn做爲訂閱者每次向上遊request的個數,默認爲256,因此必定程度上也起到了緩存的效果,爲了測試,設置爲1。

一般狀況下,發佈者於訂閱者並不在同一個線程上,這裏使用publishOn來模擬這種狀況。

BUFFER策略的輸出以下(來不及處理的數據會緩存下來,這是一般狀況下的默認策略):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
         ===  request: 1 ===
publish >>> Event-3
                      receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
         ===  request: 1 ===
                      receive <<< Event-2
publish >>> Event-7
publish >>> Event-8
...

DROP策略的輸出以下(有新數據就緒的時候,看是否有request,有的話就發出,沒有就丟棄):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
         ===  request: 1 ===
publish >>> Event-4
                      receive <<< Event-4
publish >>> Event-5
publish >>> Event-6
publish >>> Event-7
         ===  request: 1 ===
publish >>> Event-8
                      receive <<< Event-8
...

能夠看到,第1/2/3/5/6/7/...的數據被丟棄了,當有request以後的數據會被髮出。調整一下publishOn方法的第二個參數(預取個數)爲2,輸出以下:

===  request: 2 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
                      receive <<< Event-1
publish >>> Event-4
publish >>> Event-5
publish >>> Event-6
         ===  request: 2 ===
publish >>> Event-7
                      receive <<< Event-7
publish >>> Event-8
publish >>> Event-9
publish >>> Event-10
                      receive <<< Event-8
publish >>> Event-11
publish >>> Event-12

可見,每次request(請求2個數據)以後的2個數據發出,更多就緒的數據因爲沒有request就丟棄了。

LATEST的輸出以下(request到來的時候,將最新的數據發出):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
publish >>> Event-3
         ===  request: 1 ===
                      receive <<< Event-3
publish >>> Event-4
publish >>> Event-5
         ===  request: 1 ===
                      receive <<< Event-5
publish >>> Event-6
publish >>> Event-7
publish >>> Event-8
         ===  request: 1 ===
                      receive <<< Event-8

ERROR的輸出以下(當訂閱者來不及處理時候發出一個錯誤信號):

===  request: 1 ===
publish >>> Event-0
                      receive <<< Event-0
publish >>> Event-1
publish >>> Event-2
         ===  request: 1 ===
                      receive <<< reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

IGNORE的輸出以下:

...
         ===  request: 2 ===
                      receive <<< Event-10
                      receive <<< Event-11
         ===  request: 2 ===
                      receive <<< Event-12
                      receive <<< reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure

2.3.3 調整回壓策略的操做符

Reactor提供了響應的onBackpressureXxx操做符,調整回壓策略。測試方法以下:

/**
     * 測試不一樣的onBackpressureXxx方法的效果。
     */
    @Test
    public void testOnBackPressureXxx() {
        fastPublisher = createFlux(FluxSink.OverflowStrategy.BUFFER)
                .onBackpressureBuffer()     // BUFFER
//                .onBackpressureDrop()     // DROP
//                .onBackpressureLatest()   // LATEST
//                .onBackpressureError()    // ERROR
                .doOnRequest(n -> System.out.println("         ===  request: " + n + " ==="))
                .publishOn(Schedulers.newSingle("newSingle"), 1);
    }

經過打開某一個操做符的註釋能夠觀察輸出。這裏就不貼輸出內容了,Reactor文檔的示意圖更加直觀:

onBackpressureBuffer,對於來自其下游的request採起「緩存」策略。

onBackpressureBuffer

onBackpressureDrop,元素就緒時,根據下游是否有未知足的request來判斷是否發出當前元素。

onBackpressureDrop

onBackpressureLatest,當有新的request到來的時候,將最新的元素髮出。

onBackpressureLatest

onBackpressureError,當有多餘元素就緒時,發出錯誤信號。

onBackpressureError

真是一圖勝千言啊,上邊的這些圖片都是來自Reactor官方文檔。

當進行異步編程時,一般會面臨相互協做的各個組件不在同一個線程的狀況,好比一個生產者不斷生成消息,而一個消費者不斷處理這些產生的消息,兩者一般不在一個線程甚至是兩個不一樣的組件。當有人不當心採用了×××資源(好比無上限的彈性線程池、×××隊列等),那麼在高併發或任務繁重時就有可能形成線程數爆炸增加,或隊列堆積,所以backpressure這種協調機制對於維持系統穩定具備重要做用。

相關文章
相關標籤/搜索