給初學者的RxJava2.0教程(六)

Outlinejava

[TOC]app

前言

在上一節中, 咱們找到了上下游流速不均衡從而致使BackPressureException出現的源頭 , 在這一節裏咱們將學習如何去治理它 . 可能不少看過其餘人寫的文章的朋友都會以爲只有Flowable才能解決 , 因此你們對這個Flowable都抱有很大的期許 , 其實吶 , 大家畢竟圖樣圖森破 , 今天咱們先拋開Flowable, 僅僅依靠咱們本身的雙手和智慧 , 來看看咱們如何去治理 , 經過本節的學習以後咱們再來看Flowable, 你會發現它其實並無想象中那麼牛叉, 它只是被其餘人過分神化了. ide

正題

咱們接着來看上一節的這個例子:工具

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {  //無限循環發送事件
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });複製代碼

上一節中咱們看到了它的運行結果是直接爆掉了內存, 也明白它爲何就爆掉了內存, 那麼咱們能作些什麼, 才能不讓這種狀況發生呢. 學習

以前咱們說了, 上游發送的全部事件都放到水缸裏了, 因此瞬間水缸就滿了, 那咱們能夠只放咱們須要的事件到水缸裏呀, 只放一部分數據到水缸裏, 這樣不就不會溢出來了嗎, 所以, 咱們把上面的代碼修改一下:優化

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer % 10 == 0;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });複製代碼

在這段代碼中咱們增長了一個filter, 只容許能被10整除的事件經過, 再來看看運行結果:spa

filter.gif

能夠看到, 雖然內存依然在增加, 可是增加速度相比以前, 已經減小了太多了, 至少在我錄完GIF以前尚未爆掉內存, 你們能夠試着改爲能被100整除試試.3d

能夠看到, 經過減小進入水缸的事件數量的確能夠緩解上下游流速不均衡的問題, 可是力度還不夠, 咱們再來看一段代碼:code

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .sample(2, TimeUnit.SECONDS)  //sample取樣
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });複製代碼

這裏用了一個sample操做符, 簡單作個介紹, 這個操做符每隔指定的時間就從上游中取出一個事件發送給下游. 這裏咱們讓它每隔2秒取一個事件給下游, 來看看此次的運行結果吧:cdn

sample.gif

此次咱們能夠看到, 雖然上游仍然一直在不停的發事件, 可是咱們只是每隔必定時間取一個放進水缸裏, 並無所有放進水缸裏, 所以此次內存僅僅只佔用了5M.

你們之後能夠出去吹牛逼了: 我曾經經過技術手段去優化一個程序, 最終使得內存佔用從300多M變成不到5M. ~(≧▽≦)/~

前面這兩種方法歸根到底其實就是減小放進水缸的事件的數量, 是以數量取勝, 可是這個方法有個缺點, 就是丟失了大部分的事件.

那麼咱們換一個角度來思考, 既然上游發送事件的速度太快, 那咱們就適當減慢發送事件的速度, 從速度上取勝, 聽上去不錯, 咱們來試試:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //每次發送完事件延時2秒
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });複製代碼

此次咱們讓上游每次發送完事件後都延時了2秒, 來看看運行結果:

sleep.gif

完美 ! 一切都是那麼完美 !

能夠看到, 咱們給上游加上延時了以後, 瞬間一頭髮情的公牛就變得跟只小綿羊同樣, 如此溫順, 如此平靜, 如此平穩的內存線, 美妙極了. 並且事件也沒有丟失, 上游經過適當的延時, 不但減緩了事件進入水缸的速度, 也可讓下游充足的時間從水缸裏取出事件來處理 , 這樣一來, 就不至於致使大量的事件涌進水缸, 也就不會OOM啦.

到目前爲止, 咱們沒有依靠任何其餘的工具, 就輕易解決了上下游流速不均衡的問題.

所以咱們總結一下, 本節中的治理的辦法就兩種:

  • 一是從數量上進行治理, 減小發送進水缸裏的事件
  • 二是從速度上進行治理, 減緩事件發送進水缸的速度

你們必定沒忘記, 在上一節還有個Zip的例子, 這個例子也爆了咱們的內存, 現學現用, 咱們用剛學到的辦法來試試能不能懲奸除惡, 先來看看第一種辦法.

先來減小進入水缸的事件的數量:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //進行sample採樣

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });複製代碼

來試試運行結果吧:

zip_sample.gif

哈哈, 成功了吧, 再來用第二種辦法試試.

此次咱們來減緩速度:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //發送事件以後延時2秒
                }
            }
        }).subscribeOn(Schedulers.io());

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });複製代碼

來看看運行結果吧:

zip_sleep.gif

果真也成功了, 這裏只打印出了下游收到的事件, 因此只有一個. 若是你對這個結果看不懂, 請自覺掉頭看前面幾篇文章.

經過本節的學習, 你們應該對如何處理上下游流速不均衡已經有了基本的認識了, 你們也能夠看到, 咱們並無使用Flowable, 因此不少時候仔細去分析問題, 找到問題的緣由, 從源頭去解決纔是最根本的辦法. 後面咱們講到Flowable的時候, 你們就會發現它其實沒什麼神祕的, 它用到的辦法和咱們本節所講的基本上是同樣的, 只是它稍微作了點封裝.

好了, 今天的教程就到這裏吧, 下一節中咱們就會來學習大家喜聞樂見的Flowable.

相關文章
相關標籤/搜索