RxJava(九):背壓

博客主頁java

1. 背壓

在 RxJava 中, 會遇到被觀察者發送消息太快以致於它的操做符或者訂閱者不能及時處理相關的消息,這就是典型的背壓( Back Pressure )場景。react

在 RxJava 官方的維基百科中關於 Back Pressure 是這樣描述的:segmentfault

In ReactiveX it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or observer can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.

Back Pressure 常常被翻譯爲背壓, 背壓的字面意思比較晦澀,難以理解。它是指在異步場景下,被觀察者發送事件速度遠快於觀察者處理的速度,從而致使下游的 buffer 溢出,這種現象叫做背壓。api

首先,背壓必須是在異步的場景下才會出現,即被觀察者和觀察者處於不一樣的線程中。緩存

其次, RxJava 是基於 Push 模型 。對於 Pull 模型而言,當消費者請求數據的時候,若是生產者比較慢 ,則消費者會阻塞等待。若是生產者比較快,生產者會等待消費者處理完後再生產新的數據,因此不會出現背壓的狀況。然而在 RxJava 中,只要生產者數據準備好了就會發射出去。若是生產者比較慢,則消費者會等待新的數據到來。若是生產者比較快,則會有不少數據發射給消費者,而無論消費者當前有沒有能力處理數據,這樣就會致使背壓。app

最後,在 RxJava 2.x 中,只有新增的 Flowable 類型是支持背壓的,而且 Flowable 不少操做
符內部都使用了背壓策略,從而避免過多的數據填滿內部的隊列。異步

在 RxJava l.x 中,有不少事件由於不能被正確地背壓,從而拋出
MissingBackpressureException 。在 RxJava l.x 中的 observeOn ,因爲切換了觀察者的線程,所以內部實現用隊列存儲事件。ide

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; ; i++) {
            subscriber.onNext(i);
        }
    }
}).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "Next: " + integer);
            }
        });

這段代碼其實並不會產生背壓,只會出現 ANR (application not responding),由於被觀察者和訂閱者處在同一個線程中,只有兩者不在同一個線程時,纔會出現背壓。線程

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; ; i++) {
                    subscriber.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "Next: " + integer);
                    }
                });

// 執行結果
Caused by: rx.exceptions.MissingBackpressureException

修改完以後當即引發了 App Crash,查看日誌以後發現,出現 MissingBackpressureException 異常。翻譯

在 RxJava l.x 中, Observable 是支持背壓的,從 Observable 的源碼中能夠看到,在 RxJava
1.x 中的 Buffer 的大小隻有 16

public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
    return buffer(boundary, 16);
}

也就是說,剛纔的代碼無須發無限次,只要發 17 次就能夠引發異常。下面的代碼將原先的無數次改爲了 17 次。

果真能夠拋出 MissingBackpressureException ,符合預期。若是把 17 改爲 16 ,則程序能夠正常運行,打印出 0-15。

在 RxJava l.x 中,不是全部的 Observable 都支持背壓。咱們知 Observable 有 Hot 和 Cold 之分。 Rx.Java 1.x 中, Hot Observables 是不支持背壓的,而 Cold Observables 中也有一部分不支持背壓。

2. RxJava 2.x 的背壓策略

在 RxJava 2.x 中, Observable 再也不支持背壓,而是改用 Flowable 來專門支持背壓。默認隊列大小爲 128 ,而且要求全部的操做符強制支持背壓。

從 BackpressureStrategy 的源碼能夠看到, Flowable 一共有 5 種背壓策略:

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

2.1 MISSING

此策略表示,經過 Create 方法建立的 Flowable 沒有指定背壓策略,不會對經過 OnNext 發射的數據作緩存或丟棄處理,須要下游經過背壓操做符 (onBackpressureBuffer/onBackpressureDrop/onBackpressureLatest)指定背壓策略。

2.2 ERROR

此策略表示,若是放入 Flowable 的異步緩存池中的數據超限了,則會拋出 MissingBackpressureException 異常。

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < 129; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

// 執行結果
Caused by: io.reactivex.exceptions.MissingBackpressureException

運行這段代碼後,會馬上引發 App Crash ,查看 LogCat 以後發現,出現 MissingBackpressureException 異常

由於 Flowable 的默認隊列是 128, 因此將上述代碼的 129 改爲 128 程序就能夠正常運行了。

2.3 BUFFER

此策略表示, Flowable 的異步緩存池同 Observable 的同樣,沒有固定大小,能夠無限制添加數據,不會 MissingBackpressureException 異常 但會致使 OOM

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; ; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.BUFFER)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

在 Android 中運行的話只會引發 ANR。

2.4 DROP

此策略表示,若是 Flowable 的異步緩存池滿了,則會丟掉將要放入緩存池中的數據。

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < 129; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.DROP)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

在 Android 中運行這段代碼,不會引發 Crash ,但只會打印 0~127,第 128 則被丟棄,因 Flowable 的內部隊列己經滿了。

2.5 LATEST

此策略表示,若是緩存池滿了,會丟掉將要放入緩存池中的數據。這一點與 DROP 策略同樣,不一樣的是,無論緩存池的狀態如何, LATEST 策略會將最後一條數據強行放入緩存池中。

Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        for (int i = 0; i < 1000; i++) {
            emitter.onNext(i);
        }
    }
}, BackpressureStrategy.LATEST)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Next: " + integer);
            }
        });

在 Android 中運行這段代碼,也不會引發 Crash,而且會打印出 0-127 以及 999。由於 999 是最後一條數據。

Flowable 不只能夠經過 create 建立時須要指定背壓策略,還能夠在經過其餘建立操做符,例如 just、fromArray 等建立後經過背壓操做符指定背壓策略。例如, onBackpressureBuffer() 對應
BackpressureStrategy.BUFFER , onBackpressureDrop() 對應 BackpressureStrategy.DROP ,
onBackpressureLatest() 對應 BackpressureStrategy.LATEST

Flowable.interval(1, TimeUnit.SECONDS)
        .onBackpressureBuffer()
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next: " + aLong);
            }
        });

若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)

相關文章
相關標籤/搜索