RxJava 知識梳理(3) RxJava2 基礎知識小結

前言

首先要感謝 Season_zlc 的一系列RxJava2的教程,關於上游、下游、水缸的類比,讓我對於整個RxJava2的基本思想有了更加清晰的認識。你們有興趣的話必定要多看看,寫的通俗易懂,傳送門:給初學者的 RxJava 2.0 教程 (一) ,本文的思想都來源於它的一系列文章。java

文章比較長,爲了不耽誤你們的時間,先列出須要介紹的知識點: react

1、RxJava2 的基本模型

1.1 使用實例

在開始學習以前,咱們先看一下最簡單的例子:android

  • 第一步:導入依賴包:
dependencies {
    //在build.gradle中,導入依賴。
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
複製代碼
  • 第二步:使用最基本的Observable + Observer的最簡單示例,這裏咱們在上游發送了四個onNext(String s)事件以後,最後發送了一個onComplete()事件。
public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }
複製代碼
  • 第三步:運行結果,訂閱成功以後,會依次回調如下三步操做:onSubscribeonNextonComplete

1.2 基本元素

在上面的例子中,涉及到了如下五個類:緩存

  • Observable:上游。
  • ObservableOnSubscribe:上游的create方法所接收的參數。
  • ObservableEmitter:上游事件的發送者。
  • Observer:下游的接收者。
  • Disposable:用於維繫上游、下游之間的聯繫。

對於整個模型,能夠總結爲如下幾點:bash

  • RxJava2簡單的來講,就是一個發送事件、接收事件的過程,咱們能夠將發送事件方類比做上游,而接收事件方類比做下游。
  • 上游每產生一個事件,下游就能收到事件,上游對應Observable,而下游對應Observer
  • 只有當上遊和下游創建鏈接以後,上游纔會開始發送事件,這一關係的創建是經過subscribe方法。

各關鍵元素的UML圖以下: 網絡

1.3 ObservableEmitter

用於 發出事件,它能夠分別發出onNext/onComplete/onError事件:app

  • 上游能夠發送無限個onNext,下游也能夠接收無限個onNext
  • 當上遊發送了一個onComplete/onError後,上游onComplete/onError後的事件將會繼續發送,可是下游在收到onComplete/onError事件後再也不繼續接收事件。
  • 上游能夠不發送onComplete或者onError事件。
  • 調用onError或者onComplete切斷了上游和下游的聯繫,在聯繫切斷後上游再發送onError事件就會報錯,onCompleteonError的調用狀況有如下幾種: (1) onComplete能夠發送屢次,可是隻會收到一次回調。 (2) onError只能夠發送一次,發送屢次會報錯。 (3) onComplete以後不能夠發送onError,不然會報錯。 (4) onError以後能夠發送onComplete,可是隻會收到onError事件。
  • onError的參數不容許爲空。

其繼承關係以下圖所示: ide

1.4 Disposable

理解成爲 水管的機關,當調用它的dispose方法時,將會將上游和下游之間的管道切斷,從而致使 下游接收不到事件函數

  • ObserveronSubscribe回調中,會傳入一個Disposable對象,下游能夠經過該對象的dispose()方法主動切斷和上游的聯繫,在這以後上游的observableEmitter.isDisposed()方法將返回true
  • 當上遊和下游的聯繫切斷以後,下游收不到包括onComplete/onError在內的任何事件,若此時上游再調用onError方法發送事件,那麼將會報錯。

咱們來模擬一下,在下游收到2以後,經過Disposable來切斷上游和下游之間的聯繫:工具

public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
                if ("2".equals(s)) {
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }
複製代碼

最終的運行結果爲:

1.5 Subscribe 的重載方法

經過subscribe肯定上游和下游的聯繫有如下幾種方法:

能夠看到,這裏能夠分爲三類:

  • 不帶參數
  • Consumer<T>
  • Observer
  • Action

對於不使用Observer類做爲形參的subscribe函數,其實實現的功能和使用Observer類做爲參數的方法相同,只不過它們是將Observer的四個回調分解成形參,有參數的回調用Consumer<T>代替,而沒有參數的則用Action代替。

2、線程切換

2.1 基本概念

  • 當咱們在上游建立一個Observable來發送事件,那麼這個上游就默認在主線程發送事件;而當咱們在下游建立一個Observer來接收事件,那麼這個下游就默認在主線程中接收事件。
  • subscribeOn指定的是 上游發送事件 的線程,而observeOn指定的是 下游接收事件 的線程。
  • 屢次調用subscribeOn只有第一次有效,而每調用一次observeOn,那麼下游接收消息的線程就會切換一次。
  • CompositeDisposable能夠用來容納Disposable對象,每當咱們獲得一個Disposable對象時,就經過add方法將它添加進入容器,在退出的時候,調用clear方法,便可切斷全部的水管。

2.2 線程類型

  • Schedulers.io():表明IO操做,一般用於網絡請求、文件讀寫等IO密集型的操做。
  • Schedulers.computation():表明CPU密集型的操做,適用於大量計算。
  • Schedulers.newThread():建立新的常規線程。
  • AndroidSchedulers.mainThread():表明Android的主線程。

2.3 示例

在鏈式調用當中,咱們能夠經過observeOn方法屢次切換管道下游處理消息的線程,例以下面的代碼,咱們對下游進行了兩次線程的切換:

static void mapSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
                observableEmitter.onNext("true");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
                observableEmitter.onNext("false");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
                observableEmitter.onComplete();
            }
        //1.指定了subscribe方法執行的線程,並進行第一次下游線程的切換,將其切換到新的子線程。 
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {

            @Override
            public Boolean apply(String s) throws Exception {
                Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
                return "true".equals(s);
            }
        //2.進行第二次下游線程的切換,將其切換到主線程。 
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {

            @Override
            public void onSubscribe(Disposable disposable) {

            }

            @Override
            public void onNext(Boolean aBoolean) {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
            }
        });
    }
複製代碼

以上代碼的運行的結果爲:

3、Map 和 FlatMap 操做符

3.1 Map

  • Map操做符的做用是對上游發送的每個事件應用一個函數,使得每一個事件按照函數的邏輯進行變換,經過Map就能夠把上游發送的每個事件,轉換成Object或者集合,其英文註釋爲:
  • 如下面使用map的代碼爲例,能夠看到map接收一個Function類,它有兩個泛型變量,分別爲調用map方法的Observable<T><T>泛型,和返回的Obervable<R><R>泛型。
public static void mapVerify() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
        });
        Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        });
        Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
    }
複製代碼

Function爲一個接口:

而且在 map函數調用完畢以後,將返回一個新的 Observable,它的類型爲 ObservableMap

3.2 FlatMap

  • FlatMap用於將一個發送事件的上游Observable變換成多個發送事件的Observable,而後將它們發送的事件合併,放進一個單獨的Observable中,其註釋爲:
  • 上游每發送一個事件,就會針對該事件建立一個單獨的水管,而後發送轉換後的新的事件,下游接收到的就是這些新的水管發送的事件。
  • FlatMap不保證不一樣水管之間事件的順序,若是須要保證順序,則須要使用contactMap

3.2.1 示例

static void flatMapSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                observableEmitter.onNext(1);
                observableEmitter.onNext(2);
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                return Observable.fromArray("a value of " + integer + ",b value of " + integer);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }
複製代碼

map操做符相似,它也接收一個類型爲Function的接口,只不過它的? extends R參數類型換成了? extends Observable<? extends R>

3.2.2 FlatMap 不保證下游接收事件的順序

前面咱們說到,flatMap操做符不會保證下游接收事件的順序,下面,咱們就以一個例子來講明,在flatMapapply函數中,咱們將一個事件轉換成兩個Observable,而且加上了延時:

static void flatMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "flatMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "flatMapOrderSample emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "flatMapOrderSample emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "flatMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }
複製代碼

能夠看到,最終的輸出結果和flatMap收到事件的順序並不相同:

下面,仍是一樣的場景,將 flatMap換成 contactMap

static void contactMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(2);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "contactMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }
複製代碼

最終的運行結果爲:

4、Zip 操做符

4.1 基本概念

  • Zip經過一個函數從多個Observable每次各取出一個事件,合併成一個新的事件發送給下游。
  • 組合的順序是嚴格按照事件發送的順序來的。
  • 最終下游收到的事件數量和上游中發送事件最少的那一根水管的事件數量相同。

4.1.1 兩個 Observable 運行在同一線程當中

static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }
複製代碼

此時的運行結果爲:

4.1.2 兩個 Observable 運行在不一樣的線程

static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        }).subscribeOn(Schedulers.io());
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }
複製代碼

運行結果爲:

5、背壓

「背壓」其實就是一種用於解決問題的工具,那麼咱們的問題又是什麼呢?

  • 問題:當上遊發送事件的速度很快,下游消費事件的速度又很慢,而系統又必須緩存這些上游發送的消息以便下游處理,那麼就會致使系統中堆積了不少的資源。
  • 工具:下游告知上游目前本身的處理能力,上游根據下游的處理能力,進行適當的調整。

想必你們在不少文章中都聽過這個一句話:在RxJava2中,Observable不支持「背壓」,而Flowable支持背壓。

5.1 不支持背壓的 Observable

關於Observable不支持背壓,咱們應當從兩種狀況去考慮,即上游、下游是否位於相同的線程。

5.1.1 Observable 之上游、下游位於相同線程

首先,咱們不調用observeOnsubscribeOn方法來改變上游、下游的工做線程,這樣,上游和下游就位於同一線程,同時,咱們在下游的處理函數中,每收到一個消息就休眠2000ms,以模擬上游處理速度大於下游的場景。

static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }
複製代碼

從下面的打印結果能夠看到,當「使用 Observable,而且上游、下游位於相同線程」時,並不會出現消息堆積的狀況,由於上游發射完一條消息後,必需要等到下游處理完該消息,纔會發射一條新的消息。

5.1.2 Observable 之上游、下游位於不一樣線程

接着,咱們採用subscribeOnobserveOn來使得上游和下游位於不一樣的工做線程,其它均和2.2中相同。

static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }
複製代碼

2.2中不一樣,當上遊和下游位於不一樣的工做線程,那麼上游發送消息時,不會考慮下游是否已經處理了以前的消息,它會直接發送,而這些發送的消息被存放在水缸當中,下游每處理完一條消息,就去水缸中取下一條數據,那麼隨着水缸中數據愈來愈多,那麼系統中的無用資源就會急劇增長。

5.1.3 關於 Observable 不支持背壓的小結

咱們之因此說Observable不支持「背壓」,就是在2.1介紹的整個族譜中,沒有一個類,一種方法能讓下游通知上游說:不要再發消息到水缸裏了,我已經處理不過來了!

那是否是說Flowable支持「背壓」,而Observable不支持,那麼Observable就要被取代了呢,其實否則,Flowable對於「背壓」的支持是以性能爲代價的,咱們應當只在有可能出現2.3中上游下游速率不匹配的問題時,纔去使用Flowable,不然就應當使用Observable,也就是知足兩點條件:

  • 上游和下游位於不一樣的工做線程
  • 上游發送消息的速度,要遠遠大於下游處理消息的速度,有可能形成消息的堆積。

5.2 支持背壓的 Flowable

5.2.1 基本概念

  • FlowableSubscriber分別對應於以前討論的ObservableObserver,它們直接的鏈接仍然是經過subscribe方法。
  • Flowable在設計的時候採用了 響應式拉取 的思想,當下遊調用了Subscriptionrequest方法時,就代表了下游處理事件的能力,這樣上游就能夠根據這個值來控制事件發送的頻率,避免出現前面談到的上游發送太快,而下游處理太慢從而致使OOM的發生。
  • 只有上游根據下游的處理能力來發送事件,才能達到理想的效果。

5.2.2 基本使用

static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR);

        sourceFlow.subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
複製代碼

其類結構圖和Observable幾乎徹底一致:

5.3 Flowable 支持背壓的策略

從上面的類圖能夠看出,FlowableObservable最大的不一樣,就是在create方法中,須要傳入額外的參數,它表示的是「背壓」的策略,這裏可選的值包括:

  • ERROR
  • BUFFER
  • DROP
  • LATEST

5.3.1 使用 ERROR 的策略

  • 當上遊和下游位於同一個線程時,若是上游發送的事件超過了下游聲明的request(n)的值,那麼會拋出MissingBackpressureException異常。
  • 當上遊和下游位於不一樣線程時,若是上游發送的事件超過了下游的聲明,事件會被放在水缸當中,這個水缸默認的大小是128,只有當下遊調用request時,才從水缸中取出事件發送給下游,若是水缸中事件的個數超過了128,那麼也會拋出MissingBackpressureException異常。

下面這段代碼,咱們先將三個事件放入到水缸當中,以後每次調用request方法就會從水缸當中取出一個事件發送給下游。

static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

    static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(1);
        }
    }
複製代碼

當上遊和下游位於不一樣的線程,每次經過Subscription調用request就會從水缸中取出一個事件,發送給下游:

5.3.2 BUFFER 策略

  • 使用BUFFER策略時,至關於在上游放置了一個容量無限大的水缸,全部下游暫時沒法處理的消息都放在水缸當中,這裏再也不像ERROR策略同樣,區分上游和下游是否位於同一線程。
  • 所以,若是下游一直沒有處理消息,那麼將會致使內存一直增加,從而引發OOM
static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(10);
        }
    }

    static void flowBufferSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000;i ++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }

        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
複製代碼

在上面的例子中,咱們先把10000條消息放入到水缸當中,以後經過Subscription每次從水缸中取出10條消息發送給下游,演示結果爲:

5.3.3 DROP 策略

  • 使用DROP策略時,會把水缸沒法存放的事件丟棄掉,這裏一樣不會受到下游和下游是否處於同一個線程的限制。
static void flowDropSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
複製代碼

咱們先往水缸中放入130條消息,以後每次經過Subscription取出60條消息發送給下游,能夠看到,最後最多隻取到了第128條消息,第129/130條消息被丟棄了。

5.3.4 LATEST 策略

  • DROP相似,當水缸沒法容納下消息時,會將它丟棄,可是除此以外,上游還會緩存最新的一條消息,實例以下:
static void flowLatestSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }
複製代碼

從下面的運行結果能夠看出,當取出最後一批數據的時候,上游除了收到存儲在水缸當中的數據,還額外收到了最後一條消息,也就是第130條數據,這就是DROP策略和LATEST策略的區別:


更多文章,歡迎訪問個人 Android 知識梳理系列:

相關文章
相關標籤/搜索