RxJava2 實戰知識梳理(12) 實戰講解 publish & replay & share & refCount & autoCo

1、前言

今天,咱們來整理如下幾個你們容易弄混的概念,並用實際例子來演示,能夠從 RxSample 的第十二章中獲取:java

  • publish
  • reply
  • ConnectableObservable
  • connect
  • share
  • refCount
  • autoConnect

對於以上這些概念,能夠用一幅圖來歸納: git

從圖中能夠看出,這裏面能夠供使用者訂閱的 Observable能夠分爲四類,下面咱們將逐一介紹這幾種 Observable的特色:

  • 第一類:Cold Observable,就是咱們經過Observable.createObservable.interval等建立型操做符生成的Observable
  • 第二類:由Cold Observable通過publish()或者replay(int N)操做符轉換成的ConnectableObservable
  • 第三類:由ConnectableObservable通過refCount(),或者由Cold Observable通過share()轉換成的Observable
  • 第四類:由ConnectableObservable通過autoConnect(int N)轉換成的Observable

2、Cold Observable

Cold Observable就是咱們經過Observable.createObservable.interval等建立型操做符生成的Observable,它具備如下幾個特色:github

  • 當一個訂閱者訂閱Cold Observable時,Cold Observable會從新開始發射數據給該訂閱者。
  • 當多個訂閱者訂閱到同一個Cold Observable,它們收到的數據是相互獨立的。
  • 當一個訂閱者取消訂閱Cold Observable後,Cold Observable會中止發射數據給該訂閱者,但不會中止發射數據給其它訂閱者。

下面,咱們演示一個例子,首先咱們建立一個Cold Observable緩存

//直接訂閱Cold Observable。
    private void createColdSource() {
        mConvertObservable = getSource();
    }

    private Observable<Integer> getSource() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                try {
                    int i = 0;
                    while (true) {
                        Log.d(TAG, "源被訂閱者發射數據=" + i + ",發送線程ID=" + Thread.currentThread().getId());
                        mSourceOut.add(i);
                        observableEmitter.onNext(i++);
                        updateMessage();
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.io());
    }
複製代碼

在建立兩個訂閱者,它們能夠隨時訂閱到Cold Observable或者取消對它的訂閱:ide

private void startSubscribe1() {
        if (mConvertObservable != null && mDisposable1 == null) {
            mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "訂閱者1收到數據=" + integer + ",接收線程ID=" + Thread.currentThread().getId());
                    mSubscribe1In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe1() {
        if (mDisposable1 != null) {
            mDisposable1.dispose();
            mDisposable1 = null;
            mSubscribe1In.clear();
            updateMessage();
        }
    }

    private void startSubscribe2() {
        if (mConvertObservable != null && mDisposable2 == null) {
            mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "訂閱者2收到數據=" + integer + ",接收線程ID=" + Thread.currentThread().getId());
                    mSubscribe2In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe2() {
        if (mDisposable2 != null) {
            mDisposable2.dispose();
            mDisposable2 = null;
            mSubscribe2In.clear();
            updateMessage();
        }
    }
複製代碼

爲了驗證以前說到的幾個特色,進入程序以後,咱們會先建立該Cold Observable,以後進行一系列的操做,效果以下: spa

在上面的圖中,咱們作了一下幾步操做:

  • 第一步:啓動應用,建立Cold Observable,這時候Cold Observable沒有發送任何數據。
  • 第二步:Observer1訂閱Observable,此時Cold Observable開始發送數據,Observer1也能夠收到數據,即 一個訂閱者訂閱 Cold Observable 時, Cold Observable 會開始發射數據給該訂閱者
  • 第三步:Observer2訂閱Observable,此時Observable2也能夠收到數據,可是它和Observable1收到的數據是相互獨立的,即 當多個訂閱者訂閱到同一個 Cold Observable ,它們收到的數據是相互獨立的
  • 第四步:Observer1取消對Observable的訂閱,這時候Observer1收不到數據,而且Observable也不會發射數據給它,可是仍然會發射數據給Observer2,即 當一個訂閱者取消訂閱 Cold Observable 後,Cold Observable 會中止發射數據給該訂閱者,但不會中止發射數據給其它訂閱者
  • 第五步:Observer1從新訂閱Observable,這時候Observable0開始發射數據給Observer1,即 一個訂閱者訂閱 Cold Observable 時, Cold Observable 會從新開始發射數據給該訂閱者

3、由 Cold Observable 轉換的 ConnectableObservable

在瞭解完Cold Observable以後,咱們再來看第二類的Observable,它的類型爲ConnectableObservable,它是經過Cold Observable通過下面兩種方式生成的:線程

  • .publish()
  • .reply(int N)

若是使用.publish()建立,那麼訂閱者只能收到在訂閱以後Cold Observable發出的數據,而若是使用reply(int N)建立,那麼訂閱者在訂閱後能夠收到Cold Observable在訂閱以前發送的N個數據。3d

咱們先以publish()爲例,介紹ConnectableObservable的幾個特色:code

  • 不管ConnectableObservable有沒有訂閱者,只要調用了ConnectableObservableconnect方法,Cold Observable就開始發送數據。
  • connect會返回一個Disposable對象,調用了該對象的dispose方法,Cold Observable將會中止發送數據,全部ConnectableObservable的訂閱者也沒法收到數據。
  • 在調用connect返回的Disposable對象後,若是從新調用了connect方法,那麼Cold Observable會從新發送數據。
  • 當一個訂閱者訂閱到ConnectableObservable後,該訂閱者會收到在訂閱以後,Cold Observable發送給ConnectableObservable的數據。
  • 當多個訂閱者訂閱到同一個ConnectableObservable時,它們收到的數據是相同的。
  • 當一個訂閱者取消對ConnectableObservable,不會影響其餘訂閱者收到消息。

下面,咱們建立一個ConnectableObservable,兩個訂閱者以後會訂閱到它,而不是Cold Observablecdn

//.publish()將源Observable轉換成爲HotObservable,當調用它的connect方法後,不管此時有沒有訂閱者,源Observable都開始發送數據,訂閱者訂閱後將能夠收到數據,而且訂閱者解除訂閱不會影響源Observable數據的發射。
    public void createPublishSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish();
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }
複製代碼

和上面同樣,仍是用一個例子來演示,該例子的效果爲:

  • 第一步:啓動應用,經過Cold Observablepublish方法建立ConnectableObservable,並調用ConnectableObservableconnect方法,能夠看到,此時雖然ConnectableObservable沒有任何訂閱者,可是Cold Observable也已經開始發送數據。
  • 第二步:Observer1訂閱到ConnectableObservable,此時它只能收到訂閱以後Cold Observable發射的數據。
  • 第三步:Observer2訂閱到ConnectableObservableCold Observable只會發射一份數據,而且Observer1Observer2收到的數據是相同的。
  • 第三步:Observer1取消對ConnectableObservable的訂閱,Cold Observable仍然會發射數據,Observer2仍然能夠收到Cold Observable發射的數據。
  • 第四步:Observer1從新訂閱ConnectableObservable,和第三步相同,它仍然只會收到訂閱以後Cold Observable發射的數據。
  • 第五步:經過connect返回的Disposable對象,調用dispose方法,此時Cold Observable中止發射數據,而且Observer1Observer2都收不到數據。

上面這些現象發生的根本緣由在於:如今ObserverObserver2都是訂閱到ConnectableObservable,真正產生數據的Cold Observable並不知道他們的存在,和它交互的是ConnectableObservableConnectableObservable至關於一箇中介,它完成下面兩項任務:

  • 對於上游:經過connectdispose方法決定是否要訂閱到Cold Observer,也就是決定了Cold Observable是否發送數據。
  • 對於下游:將Cold Observable發送的數據轉交給它的訂閱者。

4、由 ConnectableObservable 轉換成 Observable

ConnectableObservable轉換成Observable有兩種方法,咱們分爲兩節介紹下當訂閱到轉換後的Observable時的現象:

  • .refCount()
  • .autoConnect(int N)

4.1 ConnectableObservable 由 refCount 轉換成 Observable

通過refCount方法,ConnectableObservable能夠轉換成正常的Observable,咱們稱爲refObservable,這裏咱們假設ConnectableObservable是由Cold Observable經過publish()方法轉換的,對於它的訂閱者,有如下幾個特色:

  • 第一個訂閱者訂閱到refObservable後,Cold Observable開始發送數據。
  • 以後的訂閱者訂閱到refObservable後,只能收到在訂閱以後Cold Observable發送的數據。
  • 若是一個訂閱者取消訂閱到refObservable後,假如它是當前refObservable的惟一一個訂閱者,那麼Cold Observable會中止發送數據;不然,Cold Observable仍然會繼續發送數據,其它的訂閱者仍然能夠收到Cold Observable發送的數據。

接着上例子,咱們建立一個refObservable

//.share()至關於.publish().refCount(),當有訂閱者訂閱時,源訂閱者會開始發送數據,若是全部的訂閱者都取消訂閱,源Observable就會中止發送數據。
    private void createShareSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().refCount();
    }
複製代碼

示例以下:

操做分爲如下幾步:

  • 第一步:經過.publish().refCount()建立由ConnectableObservable轉換後的refObservable,此時Cold Observable沒有發送任何消息。
  • 第二步:Observer1訂閱到refObservableCold Observable開始發送數據,Observer1接收數據。
  • 第三步:Observer2訂閱到refObservable,它只能收到在訂閱以後Cold Observable發送的數據。
  • 第四步:Observer1取消訂閱,Cold Observable繼續發送數據,Observer2仍然能收到數據。
  • 第五步:Observer2取消訂閱,Cold Observable中止發送數據。
  • 第六步:Observer1從新訂閱,Cold Observable從新開始發送數據。

最後說明一點:訂閱到Cold Observable.publish().refCount()Cold Observableshare()所返回的Observable是等價的。

4.2 ConnectableObservable 由 autoConnect(int N) 轉換成 Observable

autoConnect(int N)refCount很相似,都是將ConnectableObservable轉換成普通的Observable,咱們稱爲autoObservable,一樣咱們先假設ConnectableObservable是由Cold Observable經過publish()方法生成的,它有如下幾個特色:

  • 當有N個訂閱者訂閱到refObservable後,Cold Observable開始發送數據。
  • 以後的訂閱者訂閱到refObservable後,只能收到在訂閱以後Cold Observable發送的數據。
  • 只要Cold Observable開始發送數據,即便全部的autoObservable的訂閱和都取消了訂閱,Cold Observable也不會中止發送數據,若是想要Cold Observable中止發送數據,那麼可使用autoConnect(int numberOfSubscribers, Consumer connection)Consumer返回的Disposable,它的做用和ConnectableObservableconnect方法返回的Disposable相同。

其建立方法以下所示:

//.autoConnect在有指定個訂閱者時開始讓源Observable發送消息,可是訂閱者是否取消訂閱不會影響到源Observable的發射。
    private void createAutoConnectSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                mConvertDisposable = disposable;
            }
        });
    }
複製代碼

示例效果以下:

咱們進行了以下幾步操做:

  • 第一步:啓動應用,建立autoConnect轉換後的autoObservable
  • 第二步:Observer1訂閱到autoObservable,此時知足條件,Cold Observable開始發送數據。
  • 第三步:Observer2訂閱到autoObservable,它只能收到訂閱後發生的數據。
  • 第四步:Observer1取消訂閱,Cold Observable繼續發送數據,Observer2仍然能夠收到數據。
  • 第五步:Observer2取消訂閱,Cold Observable仍然繼續發送數據。
  • 第六步:Observer2訂閱到autoObservable,它只能收到訂閱後發送的消息了。
  • 第七步:調用mConvertDisposabledisposeCold Observable中止發送數據。

5、publish 和 reply(int N) 的區別

在上面的例子當中,全部總結的特色都是創建在ConnectableObservable是由publish()生成,只因此這麼作,是爲了方便你們理解,不管是訂閱到ConnectableObservable,仍是由ConnectableObservable轉換的refObservableautoObservable,使用這兩種方式建立的惟一區別就是,訂閱者在訂閱後,若是是經過publish()建立的,那麼訂閱者以後收到訂閱後Cold Observable發送的數據;而若是是reply(int N)建立的,那麼訂閱者還能額外收到N個以前Cold Observable發送的數據,咱們用下面一個小例子來演示,訂閱者訂閱到的Observable以下:

//.reply會讓緩存源Observable的N個數據項,當有新的訂閱者訂閱時,它會發送這N個數據項給它。
    private void createReplySource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.replay(3);
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }
複製代碼

示例演示效果:

操做步驟:

  • 第一步:啓動應用,經過Cold Observablepublish方法建立ConnectableObservable,並調用ConnectableObservablereplay(3)方法,能夠看到,此時雖然ConnectableObservable沒有任何訂閱者,可是Cold Observable也已經開始發送數據。
  • 第二步:Observer1訂閱到ConnectableObservable,此時它會先收到以前發射的3個數據,以後收到訂閱以後Cold Observable發射的數據。

最後再提一下,更詳細的代碼你們能夠從 RxSample 的第十二章中獲取。


更多文章,歡迎訪問個人 Android 知識梳理系列:

相關文章
相關標籤/搜索