博客主頁java
在 RxJava 中, 會遇到被觀察者發送消息太快以致於它的操做符或者訂閱者不能及時處理相關的消息,這就是典型的背壓( Back Pressure )場景。react
在 RxJava 官方的維基百科中關於 Back Pressure 是這樣描述的:segmentfault
In ReactiveX it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or observer can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.
Back Pressure 常常被翻譯爲背壓, 背壓的字面意思比較晦澀,難以理解。它是指在異步場景下,被觀察者發送事件速度遠快於觀察者處理的速度,從而致使下游的 buffer 溢出,這種現象叫做背壓。api
首先,背壓必須是在異步的場景下才會出現,即被觀察者和觀察者處於不一樣的線程中。緩存
其次, RxJava 是基於 Push 模型 。對於 Pull 模型而言,當消費者請求數據的時候,若是生產者比較慢 ,則消費者會阻塞等待。若是生產者比較快,生產者會等待消費者處理完後再生產新的數據,因此不會出現背壓的狀況。然而在 RxJava 中,只要生產者數據準備好了就會發射出去。若是生產者比較慢,則消費者會等待新的數據到來。若是生產者比較快,則會有不少數據發射給消費者,而無論消費者當前有沒有能力處理數據,這樣就會致使背壓。app
最後,在 RxJava 2.x 中,只有新增的 Flowable 類型是支持背壓的,而且 Flowable 不少操做
符內部都使用了背壓策略,從而避免過多的數據填滿內部的隊列。異步
在 RxJava l.x 中,有不少事件由於不能被正確地背壓,從而拋出
MissingBackpressureException 。在 RxJava l.x 中的 observeOn ,因爲切換了觀察者的線程,所以內部實現用隊列存儲事件。ide
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; ; i++) { subscriber.onNext(i); } } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d(TAG, "Next: " + integer); } });
這段代碼其實並不會產生背壓,只會出現 ANR (application not responding),由於被觀察者和訂閱者處在同一個線程中,只有兩者不在同一個線程時,纔會出現背壓。線程
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; ; i++) { subscriber.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d(TAG, "Next: " + integer); } }); // 執行結果 Caused by: rx.exceptions.MissingBackpressureException
修改完以後當即引發了 App Crash,查看日誌以後發現,出現 MissingBackpressureException 異常。翻譯
在 RxJava l.x 中, Observable 是支持背壓的,從 Observable 的源碼中能夠看到,在 RxJava
1.x 中的 Buffer 的大小隻有 16
public final <B> Observable<List<T>> buffer(Observable<B> boundary) { return buffer(boundary, 16); }
也就是說,剛纔的代碼無須發無限次,只要發 17 次就能夠引發異常。下面的代碼將原先的無數次改爲了 17 次。
果真能夠拋出 MissingBackpressureException ,符合預期。若是把 17 改爲 16 ,則程序能夠正常運行,打印出 0-15。
在 RxJava l.x 中,不是全部的 Observable 都支持背壓。咱們知 Observable 有 Hot 和 Cold 之分。 Rx.Java 1.x 中, Hot Observables 是不支持背壓的,而 Cold Observables 中也有一部分不支持背壓。
在 RxJava 2.x 中, Observable 再也不支持背壓,而是改用 Flowable 來專門支持背壓。默認隊列大小爲 128 ,而且要求全部的操做符強制支持背壓。
從 BackpressureStrategy 的源碼能夠看到, Flowable 一共有 5 種背壓策略:
public enum BackpressureStrategy { /** * OnNext events are written without any buffering or dropping. * Downstream has to deal with any overflow. * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators. */ MISSING, /** * Signals a MissingBackpressureException in case the downstream can't keep up. */ ERROR, /** * Buffers <em>all</em> onNext values until the downstream consumes it. */ BUFFER, /** * Drops the most recent onNext value if the downstream can't keep up. */ DROP, /** * Keeps only the latest onNext value, overwriting any previous value if the * downstream can't keep up. */ LATEST }
此策略表示,經過 Create 方法建立的 Flowable 沒有指定背壓策略,不會對經過 OnNext 發射的數據作緩存或丟棄處理,須要下游經過背壓操做符 (onBackpressureBuffer/onBackpressureDrop/onBackpressureLatest)指定背壓策略。
此策略表示,若是放入 Flowable 的異步緩存池中的數據超限了,則會拋出 MissingBackpressureException 異常。
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 129; i++) { emitter.onNext(i); } } }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 執行結果 Caused by: io.reactivex.exceptions.MissingBackpressureException
運行這段代碼後,會馬上引發 App Crash ,查看 LogCat 以後發現,出現 MissingBackpressureException 異常
由於 Flowable 的默認隊列是 128, 因此將上述代碼的 129 改爲 128 程序就能夠正常運行了。
此策略表示, Flowable 的異步緩存池同 Observable 的同樣,沒有固定大小,能夠無限制添加數據,不會 MissingBackpressureException 異常 但會致使 OOM
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }, BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } });
在 Android 中運行的話只會引發 ANR。
此策略表示,若是 Flowable 的異步緩存池滿了,則會丟掉將要放入緩存池中的數據。
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 129; i++) { emitter.onNext(i); } } }, BackpressureStrategy.DROP) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } });
在 Android 中運行這段代碼,不會引發 Crash ,但只會打印 0~127,第 128 則被丟棄,因 Flowable 的內部隊列己經滿了。
此策略表示,若是緩存池滿了,會丟掉將要放入緩存池中的數據。這一點與 DROP 策略同樣,不一樣的是,無論緩存池的狀態如何, LATEST 策略會將最後一條數據強行放入緩存池中。
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 1000; i++) { emitter.onNext(i); } } }, BackpressureStrategy.LATEST) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } });
在 Android 中運行這段代碼,也不會引發 Crash,而且會打印出 0-127 以及 999。由於 999 是最後一條數據。
Flowable 不只能夠經過 create 建立時須要指定背壓策略,還能夠在經過其餘建立操做符,例如 just、fromArray 等建立後經過背壓操做符指定背壓策略。例如, onBackpressureBuffer() 對應
BackpressureStrategy.BUFFER , onBackpressureDrop() 對應 BackpressureStrategy.DROP ,
onBackpressureLatest() 對應 BackpressureStrategy.LATEST
Flowable.interval(1, TimeUnit.SECONDS) .onBackpressureBuffer() .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next: " + aLong); } });
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)