關於RxJava最友好的文章——背壓(Backpressure)

前言

背壓(Backpressure)多是全部想要深刻運用RxJava的朋友必須理解的一個概念java

關於它的介紹,我本意是想寫在RxJava2.0更新介紹的文章裏的,但是寫着寫着發現,要完整介紹這個概念須要花費的篇幅太長,剛好目前對於背壓的介紹文章比較少,因此決定單獨拿出來,自成一篇。而關於RxJava2.0的文章修改以後就會發出來和你們探討。react

若是對於RxJava不是很熟悉,那麼在這篇文章以前,我但願你們先看看那篇關於Rxjava最友好的文章,能夠幫助你們很順暢的瞭解RxJava。緩存


從場景出發

讓咱們先忘掉背壓(Backpressure)這個概念,從RxJava一個比較常見的工做場景提及。bash

RxJava是一個觀察者模式的架構,當這個架構中被觀察者(Observable)和觀察者(Subscriber)處在不一樣的線程環境中時,因爲者各自的工做量不同,致使它們產生事件和處理事件的速度不同,這就會出現兩種狀況:架構

  • 被觀察者產生事件慢一些,觀察者處理事件很快。那麼觀察者就會等着被觀察者發送事件,(比如觀察者在等米下鍋,程序等待,這沒有問題)
  • 被觀察者產生事件的速度很快,而觀察者處理很慢。那就出問題了,若是不做處理的話,事件會堆積起來,最終擠爆你的內存,致使程序崩潰。(比如被觀察者生產的大米沒人吃,堆積最後就會爛掉)

下面咱們用代碼演示一下這種崩潰的場景:異步

//被觀察者在主線程中,每1ms發送一個事件
Observable.interval(1, TimeUnit.MILLISECONDS)
                //.subscribeOn(Schedulers.newThread())
                //將觀察者的工做放在新線程環境中
                .observeOn(Schedulers.newThread())
                //觀察者處理每1000ms才處理一個事件
                .subscribe(new Action1<Long>() {
                      @Override
                      public void call(Long aLong) {
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          Log.w("TAG","---->"+aLong);
                      }
                  });
複製代碼

在上面的代碼中,被觀察者發送事件的速度是觀察者處理速度的1000倍ide

這段代碼運行以後:post

...
    Caused by: rx.exceptions.MissingBackpressureException
    ...
    ...
複製代碼

拋出MissingBackpressureException每每就是由於,被觀察者發送事件的速度太快,而觀察者處理太慢,並且你尚未作相應措施,因此報異常。學習

而這個MissingBackpressureException異常裏面就包含了Backpressure這個單詞,看來背壓確定和這種異常狀況有關係。spa

那麼背壓(Backpressure)究竟是什麼呢?


關於背壓(Backpressure)

我這兩天翻閱了大量的中文和英文資料,我發現中文資料中,不少人對於背壓(Backpressure)的理解是有很大問題的,有的人把它看做一個須要避免的問題,或者程序的異常,有的人則乾脆避而不談,模棱兩可,着實讓人尷尬。

經過參考和對比大量的相關資料,我在這裏先對背壓(Backpressure)作一個明確的定義:背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種告訴上游的被觀察者下降發送速度的策略

簡而言之,背壓是流速控制的一種策略

須要強調兩點:

  • 背壓策略的一個前提是異步環境,也就是說,被觀察者和觀察者處在不一樣的線程環境中。
  • 背壓(Backpressure)並非一個像flatMap同樣能夠在程序中直接使用的操做符,他只是一種控制事件流速的策略。

那麼咱們再回看上面的程序異常就很好理解了,就是當被觀察者發送事件速度過快的狀況下,咱們沒有作流速控制,致使了異常。

那麼背壓(Backpressure)策略具體是哪如何實現流速控制的呢?


響應式拉取(reactive pull)

首先咱們回憶以前那篇關於Rxjava最友好的文章,裏面其實提到,在RxJava的觀察者模型中,被觀察者是主動的推送數據給觀察者,觀察者是被動接收的。而響應式拉取則反過來,觀察者主動從被觀察者那裏去拉取數據,而被觀察者變成被動的等待通知再發送數據

結構示意圖以下:

觀察者能夠根據自身實際狀況按需拉取數據,而不是被動接收(也就至關於告訴上游觀察者把速度慢下來),最終實現了上游被觀察者發送事件的速度的控制,實現了背壓的策略。

代碼實例以下:

//被觀察者將產生100000個事件
Observable observable=Observable.range(1,100000);
class MySubscriber extends Subscriber<T> {
    @Override
    public void onStart() {
    //必定要在onStart中通知被觀察者先發送一個事件
      request(1);
    }
 
    @Override
    public void onCompleted() {
        ...
    }
 
    @Override
    public void onError(Throwable e) {
        ...
    }
 
    @Override
    public void onNext(T n) {
        ...
        ...
        //處理完畢以後,在通知被觀察者發送下一個事件
        request(1);
    }
}

observable.observeOn(Schedulers.newThread())
            .subscribe(MySubscriber);
複製代碼

在代碼中,傳遞事件開始前的onstart()中,調用了request(1),通知被觀察者先發送一個事件,而後在onNext()中處理完事件,再次調用request(1),通知被觀察者發送下一個事件....

注意在onNext()方法中,最好最後再調用request()方法.

若是你想取消這種backpressure 策略,調用quest(Long.MAX_VALUE)便可。

實際上,在上面的代碼中,你也能夠不須要調用request(n)方法去拉取數據,程序依然能完美運行,這是由於range --> observeOn,這一段中間過程自己就是響應式拉取數據,observeOn這個操做符內部有一個緩衝區,Android環境下長度是16,它會告訴range最多發送16個事件,充滿緩衝區便可。不過話說回來,在觀察者中使用request(n)這個方法可使背壓的策略表現得更加直觀,更便於理解

若是你足夠細心,會發現,在開頭展現異常狀況的代碼中,使用的是interval這個操做符,可是在這裏使用了range操做符,爲何呢?

這是由於interval操做符自己並不支持背壓策略,它並不響應request(n),也就是說,它發送事件的速度是不受控制的,而range這類操做符是支持背壓的,它發送事件的速度能夠被控制。

那麼到底什麼樣的Observable是支持背壓的呢?


Hot and Cold Observables

須要說明的時,Hot Observables 和cold Observables並非嚴格的概念區分,它只是對於兩類Observable形象的描述

  • Cold Observables:指的是那些在訂閱以後纔開始發送事件的Observable(每一個Subscriber都能接收到完整的事件)。
  • Hot Observables:指的是那些在建立了Observable以後,(不論是否訂閱)就開始發送事件的Observable

其實也有建立了Observable以後調用諸如publish()方法就能夠開始發送事件的,這裏我們暫且忽略。

咱們通常使用的都是Cold Observable,除非特殊需求,纔會使用Hot Observable,在這裏,Hot Observable這一類是不支持背壓的,而是Cold Observable這一類中也有一部分並不支持背壓(好比interval,timer等操做符建立的Observable)。

懵逼了吧?

Tips: 都是Observable,結果有的支持背壓,有的不支持,這就是RxJava1.X的一個問題。在2.0中,這種問題已經解決了,之後談到2.0時再細說。

在那些不支持背壓策略的操做符中使用響應式拉取數據的話,仍是會拋出MissingBackpressureException。

那麼,不支持背壓的Observevable如何作流速控制呢?


流速控制相關的操做符

過濾(拋棄)

就是雖然生產者產生事件的速度很快,可是把大部分的事件都直接過濾(浪費)掉,從而間接的下降事件發送的速度。

相關相似的操做符:Sample,ThrottleFirst.... 以sample爲例,

Observable.interval(1, TimeUnit.MILLISECONDS)

                .observeOn(Schedulers.newThread())
                //這個操做符簡單理解就是每隔200ms發送裏時間點最近那個事件,
                //其餘的事件浪費掉
                  .sample(200,TimeUnit.MILLISECONDS)
                  .subscribe(new Action1<Long>() {
                      @Override
                      public void call(Long aLong) {
                          try {
                              Thread.sleep(200);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          Log.w("TAG","---->"+aLong);
                      }
                  });
複製代碼

這是以殺敵一千,自損八百的方式解決這個問題,由於拋棄了絕大部分的事件,而在咱們使用RxJava 時候,咱們本身定義的Observable產生的事件可能都是咱們須要的,通常來講不會拋棄,因此這種方案有它的缺陷。

緩存

就是雖然被觀察者發送事件速度很快,觀察者處理不過來,可是能夠選擇先緩存一部分,而後慢慢讀。

相關相似的操做符:buffer,window... 以buffer爲例,

Observable.interval(1, TimeUnit.MILLISECONDS)

                .observeOn(Schedulers.newThread())
                //這個操做符簡單理解就是把100毫秒內的事件打包成list發送
                .buffer(100,TimeUnit.MILLISECONDS)
                  .subscribe(new Action1<List<Long>>() {
                      @Override
                      public void call(List<Long> aLong) {
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          Log.w("TAG","---->"+aLong.size());
                      }
                  });
                  
複製代碼

兩個特殊操做符

對於不支持背壓的Observable除了使用上述兩類生硬的操做符以外,還有更好的選擇:onBackpressurebuffer,onBackpressureDrop

  • onBackpressurebuffer:把observable發送出來的事件作緩存,當request方法被調用的時候,給下層流發送一個item(若是給這個緩存區設置了大小,那麼超過了這個大小就會拋出異常)。
  • onBackpressureDrop:將observable發送的事件拋棄掉,直到subscriber再次調用request(n)方法的時候,就發送給它這以後的n個事件。

下面,咱們以onBackpressureDrop爲例說說用法:

Observable.interval(1, TimeUnit.MILLISECONDS)
                .onBackpressureDrop()
                .observeOn(Schedulers.newThread())
               .subscribe(new Subscriber<Long>() {

                    @Override
                    public void onStart() {
                        Log.w("TAG","start");
//                        request(1);
                    }

                    @Override
                      public void onCompleted() {

                      }
                      @Override
                      public void onError(Throwable e) {
                            Log.e("ERROR",e.toString());
                      }

                      @Override
                      public void onNext(Long aLong) {
                          Log.w("TAG","---->"+aLong);
                          try {
                              Thread.sleep(100);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  });
複製代碼

這段代碼的輸出:

W/TAG: start
W/TAG: ---->0
W/TAG: ---->1
W/TAG: ---->2
W/TAG: ---->3
W/TAG: ---->4
W/TAG: ---->5
W/TAG: ---->6
W/TAG: ---->7
W/TAG: ---->8
W/TAG: ---->9
W/TAG: ---->10
W/TAG: ---->11
W/TAG: ---->12
W/TAG: ---->13
W/TAG: ---->14
W/TAG: ---->15
W/TAG: ---->1218
W/TAG: ---->1219
W/TAG: ---->1220
...
複製代碼

之因此出現0-15這樣連貫的數據,就是是由於observeOn操做符內部有一個長度爲16的緩存區,它會首先請求16個事件緩存起來....

你可能會以爲這兩個操做符和上面講的過濾和緩存很相似,確實,功能上是有些相似,可是這兩個操做符提供了更多的特性,那就是能夠響應下游觀察者的request(n)方法了,也就是說,使用了這兩種操做符,可讓本來不支持背壓的Observable「支持」背壓了


勘誤

1, 本文以前對於Hot Observables和Cold observables的描述寫反了,是我太大意,目前已改正,大家如今看到的是正確的,感謝@jaychang0917的提醒


後記

講了這麼多終於要到尾聲了。

下面咱們總結一下:

  • 背壓是一種策略,具體措施是下游觀察者通知上游的被觀察者發送事件
  • 背壓策略很好的解決了異步環境下被觀察者和觀察者速度不一致的問題
  • 在RxJava1.X中,一樣是Observable,有的不支持背壓策略,致使某些狀況下,顯得特別麻煩,出了問題也很難排查,使得RxJava的學習曲線變得十份陡峭。

這篇文章並非爲了讓你學習在RxJava1.0中使用背壓(若是你以前不瞭解背壓的話),由於在1.0中,背壓的設計並不十分完美。而是但願你對背壓有一個全面清晰的認識,對於它在RxJava1.0中的設計缺陷有所瞭解便可。由於這篇文章自己是爲了2.0作一個鋪墊,後續的文章中我會繼續談到背壓和使用背壓的正確姿式。

相關文章
相關標籤/搜索