RxJava 系列-2:背壓和 Flowable

背壓(Back Pressure)的概念最初並非在響應式編程中提出的,它最初用在流體力學中,指的是後端的壓力, 一般用於描述系統排出的流體在出口處或二次側受到的與流動方向相反的壓力。git

在響應式編程中,咱們能夠將產生信息的部分叫作上游或者叫生產者,處理產生的信息的部分叫作下游或者消費者。 試想若是在異步的環境中,生產者的生產速度大於消費者的消費速度的時候,明顯會出現生產過剩的情景,這時候就須要消費者對多餘的數據進行緩存, 但若是生產的信息數量過多,以致於超出緩存大小,就會出現緩存溢出,甚至可能形成內存耗盡。github

咱們能夠制定一個數據丟失的規則,來丟失那些「能夠丟失的數據」,以減輕緩存的壓力。 在以前咱們介紹了一些方法,好比throttleXXXdebouncesample等,都是用來解決在生產速度過快的狀況下的數據過濾的,它們指定了數據取捨的規則。 而在Flowable,咱們能夠經過onBackpressureXXX一系列的方法來制定當數據生產過快狀況下的數據取捨的規則,編程

咱們能夠把這種處理方式理解成背壓,所謂背壓,在Rx中就是經過一種下游用來控制上游事件發射頻率的機制(就像流體在出口受到了阻力同樣)。 因此,如何理解背壓呢?筆者認爲,在力學中它是一種現象,在Rx中它是一種機制。後端

在這篇文章中,咱們會先介紹背壓的相關內容,而後咱們再介紹一下onBackpressureXXX系列的方法。緩存

關於RxJava2的基礎使用和方法梳理能夠參考:RxJava2 系列 (1):一篇的比較全面的 RxJava2 方法總結異步

說明:如下文章部分翻譯自RxJava官方文檔Backpressure (2.0)post

一、背壓機制

若是將生產和消費總體看做一個管道,生成看做上游,消費看做下游; 那麼當異步的應用場景下,當生產者生產過快而消費者消費很慢的時候,能夠經過背壓來告知上游減慢生成的速度。spa

一般在進行異步的操做的時候會經過緩存來存儲發射出的數據。在早期的RxJava中,這些緩存是無界的。 這意味着當須要緩存的數據很是多的時候,它們可能會佔用很是多的存儲空間,並有可能由於虛擬機不斷GC而致使程序執行過慢,甚至直接拋出OOM。 在最新的RxJava中,大多數的異步操做內部都存在一個有界的緩存,當超出這個緩存的時候就會拋出MissingBackpressureException異常並結束整個序列。翻譯

然而,某些狀況下的表現會有所不一樣,它們不會拋出MissingBackpressureException異常。好比下面的range操做:code

private static void compute(int i) throws InterruptedException {
    Thread.sleep(500);
    System.out.println("computing : " + i);
}

private static void testFlowable() throws InterruptedException {
    Flowable.range(1, MAX_LENGTH).observeOn(Schedulers.computation()).subscribe(FlowableTest::compute);

    Thread.sleep(500 * MAX_LENGTH);
}
複製代碼

在這段代碼中咱們生成一段整數,而後每隔500毫秒執行依次計算操做。從輸出的結果來看,在程序的實際執行過程當中,數據的發射是串行的。 也就是發射完一個數據以後進入compute進行計算,等待500毫秒以後才發射下一個。 所以,在程序的執行過程當中沒有拋出異常,也沒有過多的內存消耗。

而下面的這段代碼就會在程序運行的時候馬上拋出MissingBackpressureException異常:

PublishProcessor<Integer> source = PublishProcessor.create();
source.observeOn(Schedulers.computation()).subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) source.onNext(i);
Thread.sleep(10_000);
複製代碼

這是由於PublishProcessor底層會調用PublishSubscription,然後者實現了AtomicLong,它會經過判斷引用的long是否爲0來拋出異常,這個long型整數會在調用PublishSubscription.request()的時候被改寫。前面的一個例子的原理就是當每次調用了觀察者的onNext以後會調用PublishSubscription.request()來請求數據,這樣至關於消費者會在消費完事件以後向生產者請求,所以整個序列的執行看上去是串行的,從而不會拋出異常。

二、onBackpressureXXX

大多數開發者在遇到MissingBackpressureException一般是由於使用observeOn方法監聽了非背壓的PublishProcessor, timer()interval()或者自定義的create()。咱們有如下幾種方式來解決這個問題:

2.1 增長緩存大小

observeOn方法的默認緩存大小是16,當生產的速率過快的時候,那麼可能很快會超出該緩存大小,從而致使緩存溢出。 一種簡單的解決辦法是經過提高該緩存的大小來防止緩存溢出,咱們可使用observeOn的重載方法來設置緩存的大小。好比:

PublishProcessor<Integer> source = PublishProcessor.create();
source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);
複製代碼

可是這種解決方案只能解決暫時的問題,當生產的速率過快的時候仍是有可能形成緩存溢出,因此這不是根本的解決辦法。

2.2 經過丟棄和過濾來減輕緩存壓力

咱們能夠根據本身的應用的場景和數據的重要性,選擇使用一些方法來過濾和丟棄數據。 好比,丟棄的方式能夠選擇throttleFirst, throttleLast, throttleWithTimeout等,還可使用按照時間採樣的方式來減小接受的數據。

PublishProcessor<Integer> source = PublishProcessor.create();
source.sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);
複製代碼

可是,這種方式僅僅用來減小下游接收的數據,當緩存的數據不斷增長的時候仍是有可能致使緩存溢出,因此,這也不是一種根本的解決辦法。

2.3 onBackpressureBuffer()

這種無參的方法會使用一個無界的緩存,只要虛擬機沒有拋出OOM異常,它就會把全部的數據緩存起來。

Flowable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);
複製代碼

上面的例子即便使用了很小的緩存也不會有異常拋出,由於onBackpressureBuffer會將發射的全部數據緩存起來,只會將一小部分的數據傳遞給observeOn

這種處理方式其實是不存在背壓的,由於onBackpressureBuffer緩存了全部的數據,咱們可使用該方法的4個重載方法來對背壓進行個性化設置。

2.4 onBackpressureBuffer(int capacity)

這個方法使用一個有界的緩存,當達到了緩存大小的時候會拋出一個BufferOverflowError錯誤。 經過這種方法能夠增長默認的緩存大小,可是經過observeOn方法同樣能夠指定緩存的大小,所以,這個方法的應用變得愈來愈少。

2.5 onBackpressureBuffer(int capacity, Action onOverflow)

這方法除了能夠指定一個有界的緩存還提供了一個,當緩存溢出的時候還會回調指定的Action。 可是這種回調的用途比較有限,由於它除了提供當前回調的棧信息之外提供不了任何有用的信息。

2.6 onBackpressureBuffer(int capacity, Action onOverflow, BackpressureOverflowStrategy strategy)

這個重載方法相對比較實用一些,它除了上面的那些功能以外,還指定了當緩存到達指定的緩存時的行爲。 這裏的BackpressureOverflowStrategy顧名思義是一個策略,它是一個枚舉類型,預約義了三種枚舉值,最終會在FlowableOnBackpressureBufferStrategy中根據指定的枚舉類型選擇不一樣的實現策略,所以,咱們可使用它來指定緩存溢出時候的行爲。

下面是該枚舉類型的三個值及其含義:

  1. ERROR:當緩存溢出的時候會拋出一個異常;
  2. DROP_OLDEST:當緩存發生溢出的時候,會丟棄最老的值,並將新的值插入到緩存中;
  3. DROP_LATEST:當緩存發生溢出的時候,最新的值會被忽略,只有比較老的值會被傳遞給下游使用;

須要注意的地方是,後面的兩種策略會形成下游獲取到的值是不連續的,由於有一部分值會由於緩存不夠被丟棄,可是它們不會拋出BufferOverflowException

2.7 onBackpressureDrop()

這個方法會在數據達到緩存大小的時候丟棄最新的數據。能夠將其當作是onBackpressureBuffer+0 capacity+DROP_LATEST的組合。

這個方法特別適用於那種能夠忽略從源中發射出值的那種場景,好比GPS定位問題,定位數據會不斷髮射出來,即便丟失當前數據,等會兒同樣能拿到最新的數據。

component.mouseMoves()
    .onBackpressureDrop()
    .observeOn(Schedulers.computation(), 1)
    .subscribe(event -> compute(event.x, event.y));
複製代碼

該方法還存在一個重載方法onBackpressureDrop(Consumer<? super T> onDrop),它容許咱們傳入一個接口來指定當某個數據被丟失時的行爲。

2.8 onBackpressureLatest()

對應於onBackpressureDrop()的,還有onBackpressureLatest()方法,該方法只會保留最新的數據並會覆蓋較老、沒有分發的數據。 咱們能夠將其當作是onBackpressureBuffer+1 capacity+DROP_OLDEST的組合。

onBackpressureDrop()不一樣的地方在於,當下遊消費過慢的時候,這種方式總會存在一個緩存的值。 這種特別適用於那種數據的生產很是頻繁,可是隻有最新的數據會被消費的那種情形。好比,當用戶點擊了屏幕,那麼咱們傾向於只處理最新按下的位置的事件。

component.mouseClicks()
    .onBackpressureLatest()
    .observeOn(Schedulers.computation())
    .subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);
複製代碼

因此,總結一下:

  1. onBackpressureDrop():不會緩存任何數據,專一於當下,新來的數據來不及處理就丟掉,之後會有更好的;
  2. onBackpressureLatest():會緩存一個數據,當正在執行某個任務的時候有新的數據過來,會把它緩存起來,若是又有新的數據過來,那就把以前的替換掉,緩存裏面的老是最新的。

三、總結

以上就是背壓機制的一些內容,以及咱們介紹了Flowable中的幾個背壓相關的方法。 實際上,RxJava的官方文檔也有說明——Flowable適用於數據量比較大的情景,由於它的一些建立方法自己就使用了背壓機制。 這部分方法咱們就再也不一一進行說明,由於,它們的方法簽名和Observable基本一致,只是多了一層背壓機制。

比較匆匆地整理完了背壓的內容,可是我想這塊還會有更加豐富的內容值得咱們去發現和探索。

以上。

RxJava2 系列文章

相關文章
相關標籤/搜索