給初學者的 RxJava2.0 教程 (八)

Outlinejavascript

[TOC]java

前言

在上一節中, 咱們學習了FLowable的一些基本知識, 同時也挖了許多坑, 這一節就讓咱們來填坑吧.react

正題

在上一節中最後咱們有個例子, 當上遊一次性發送128個事件的時候是沒有任何問題的, 一旦超過128就會拋出MissingBackpressureException異常, 提示你上游發太多事件了, 下游處理不過來, 那麼怎麼去解決呢? 併發

咱們先來思考一下, 發送128個事件沒有問題是由於FLowable內部有一個大小爲128的水缸, 超過128就會裝滿溢出來, 那既然你水缸這麼小, 那我給你換一個大水缸如何, 聽上去頗有道理的樣子, 來試試: ide

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });複製代碼

此次咱們直接讓上游發送了1000個事件,下游仍然不調用request去請求, 與以前不一樣的是, 此次咱們用的策略是BackpressureStrategy.BUFFER, 這就是咱們的新水缸啦, 這個水缸就比原來的水缸牛逼多了,若是說原來的水缸是95式步槍, 那這個新的水缸就比如黃金AK , 它沒有大小限制, 所以能夠存放許許多多的事件. 性能

因此此次的運行結果就是:學習

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
...
zlc.season.rxjava2demo D/TAG: emit 997
zlc.season.rxjava2demo D/TAG: emit 998
zlc.season.rxjava2demo D/TAG: emit 999複製代碼

不知道你們有沒有發現, 換了水缸的FLowable和Observable好像是同樣的嘛...測試

不錯, 這時的FLowable表現出來的特性的確和Observable如出一轍, 所以, 若是你像這樣單純的使用FLowable, 一樣須要注意OOM的問題, 例以下面這個例子:spa

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });複製代碼

按照咱們之前學習Observable同樣, 讓上游無限循環發送事件, 下游一個也不去處理, 來看看運行結果吧:3d

flowable.gif

一樣能夠看到, 內存迅速增加, 直到最後拋出OOM. 因此說不要迷戀FLowable, 它只是個傳說.

可能有朋友也注意到了, 以前使用Observable測試的時候內存增加很是迅速, 幾秒鐘就OOM, 但這裏增加速度卻比較緩慢, 能夠翻回去看以前的文章中的GIF圖進行對比, 這也看出FLowable相比Observable, 在性能方面有些不足, 畢竟FLowable內部爲了實現響應式拉取作了更多的操做, 性能有所丟失也是在所不免, 所以單單只是說由於FLowable是新興產物就盲目的使用也是不對的, 也要具體分場景,

那除了給FLowable換一個大水缸還有沒有其餘的辦法呢, 由於更大的水缸也只是緩兵之計啊, 動不動就OOM給你看.

想一想看咱們以前學習Observable的時候說到的如何解決上游發送事件太快的, 有一招叫從數量上取勝, 一樣的FLowable中也有這種方法, 對應的就是BackpressureStrategy.DROPBackpressureStrategy.LATEST這兩種策略.

從名字上就能猜到它倆是幹啥的, Drop就是直接把存不下的事件丟棄,Latest就是隻保留最新的事件, 來看看它們的實際效果吧.

先來看看Drop:

public static void request() {
        mSubscription.request(128);
    }

public static void demo3() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

咱們仍然讓上游無限循環發送事件, 此次的策略選擇了Drop, 同時把Subscription保存起來, 待會咱們在外部調用request(128)時, 即可以看到運行的結果.

咱們先來猜一下運行結果, 這裏爲何request(128)呢, 由於以前不是已經說了嗎, FLowable內部的默認的水缸大小爲128, 所以, 它剛開始確定會把0-127這128個事件保存起來, 而後丟棄掉其他的事件, 當咱們request(128)的時候,下游便會處理掉這128個事件, 那麼上游水缸中又會從新裝進新的128個事件, 以此類推, 來看看運行結果吧:

drop.gif

從運行結果中咱們看到的確是如此, 第一次request的時候, 下游的確收到的是0-127這128個事件, 但第二次request的時候就不肯定了, 由於上游一直在發送事件. 內存佔用也很正常, drop的做用相信你們也很清楚了.

再來看看Latest吧:

public static void request() {
        mSubscription.request(128);
    }

public static void demo4() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }複製代碼

一樣的, 上游無限循環發送事件, 策略選擇Latest, 同時把Subscription保存起來, 方便在外部調用request(128).來看看此次的運行結果:

latest.gif

誒, 看上去好像和Drop差很少啊, Latest也首先保存了0-127這128個事件, 等下游把這128個事件處理了以後才進行以後的處理, 光從這裏沒有看出有任何區別啊...

古人云,師者,因此傳道受業解惑也。人非生而知之者,孰能無惑?惑而不從師,其爲惑也,終不解矣.複製代碼

做爲初學者的入門導師, 是不能給你們留下一點點疑惑的, 來讓咱們繼續揭開這個疑問.

咱們把上面兩段代碼改良一下, 先來看看DROP的改良版:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000; i++) {  //只發1w個事件
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(128);  //一開始就處理掉128個事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });複製代碼

這段代碼和以前有兩點不一樣, 一是上游只發送了10000個事件, 二是下游在一開始就立馬處理掉了128個事件, 而後咱們在外部再調用request(128)試試, 來看看運行結果:

drop_1.gif

此次能夠看到, 一開始下游就處理掉了128個事件, 當咱們再次request的時候, 只獲得了第3317的事件, 後面的事件直接被拋棄了.

再來看看Latest的運行結果吧:

latest_1.gif

從運行結果中能夠看到, 除去前面128個事件, 與Drop不一樣, Latest老是能獲取到最後最新的事件, 例如這裏咱們老是能得到最後一個事件9999.

好了, 關於FLowable的策略咱們也講完了, 有些朋友要問了, 這些FLowable是我本身建立的, 因此我能夠選擇策略, 那面對有些FLowable並非我本身建立的, 該怎麼辦呢? 好比RxJava中的interval操做符, 這個操做符並非咱們本身建立的, 來看下面這個例子吧:

Flowable.interval(1, TimeUnit.MICROSECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);  //延時1秒
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });複製代碼

interval操做符發送Long型的事件, 從0開始, 每隔指定的時間就把數字加1併發送出來, 在這個例子裏, 咱們讓它每隔1毫秒就發送一次事件, 在下游延時1秒去接收處理, 不用猜也知道結果是什麼:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo W/TAG: onError: 
                              io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
                                  at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
                                  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
                                  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
                                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
                                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                  at java.lang.Thread.run(Thread.java:761)複製代碼

一運行就拋出了MissingBackpressureException異常, 提醒咱們發太多了, 那麼怎麼辦呢, 這個又不是咱們本身建立的FLowable啊...

別慌, 雖然不是咱們本身建立的, 可是RxJava給咱們提供了其餘的方法:

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

熟悉嗎? 這跟咱們上面學的策略是同樣的, 用法也簡單, 拿剛纔的例子現學現用:

Flowable.interval(1, TimeUnit.MICROSECONDS)
                .onBackpressureDrop()  //加上背壓策略
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });複製代碼

其他的我就不一一列舉了.

好了, 今天的教程就到這裏吧, 這一節咱們學習瞭如何使用內置的BackpressureStrategy來解決上下游事件速率不均衡的問題. 這些策略其實以前咱們將Observable的時候也提到過, 其實大差不差, 只要理解了爲何會上游發事件太快, 下游處理太慢這一點, 你就好處理了, FLowable無非就是給你封裝好了, 確實對初學者友好一點, 可是不少初學者每每只知道How, 殊不知道Why, 最重要的實際上是知道why, 而不是How.

(其他的教程大多數到這裏就結束了, 可是, 你覺得FLowable就這麼點東西嗎, 騷年, Too young too simple, sometimes naive! 這僅僅是開始, 真正牛逼的還沒來呢. 敬請關注下一節, 下節見 ! )

相關文章
相關標籤/搜索