Rxjava2 可鏈接的Observable(ConnectableObservable)操做詳解及實例

簡要:

需求瞭解:java

Rxjava中的普通的 Observable 在觀察者訂閱的時候就會發射數據,可是有的時候咱們想本身控制數據的發射,好比在有指定的觀察者或者所有的觀察者訂閱後開始發射數據,這個時候咱們就要要用到Rxjava中的可鏈接的Observable來完成這個需求。react

這一節主要介紹 ConnectableObservable 和它的子類以及它們的操做符:git

  • ConnectableObservable: 一個可鏈接的Observable,在訂閱後不發射數據,調用 connect() 方法後開始發射數據。
  • Observable.publish():將一個Observable轉換爲一個可鏈接的Observable 。
  • ConnectableObservable.connect():指示一個可鏈接的Observable開始發射數據。
  • Observable.replay():確保全部的訂閱者看到相同的數據序列,即便它們在Observable開始發射數據以後才訂閱。
  • ConnectableObservable.refCount():讓一個可鏈接的Observable表現得像一個普通的Observable。
  • Observable.share():能夠直接將Observable轉換爲一個具備ConnectableObservable特性的Observable對象,等價於Observable.publish().refCount()
  • Observable.replay():保證全部的觀察者收到相同的數據序列,即便它們在Observable開始發射數據以後才訂閱。

1. ConnectableObservable

一個可鏈接的Observable(ConnectableObservable)與普通的Observable差很少。不一樣之處:可鏈接的Observable在被訂閱時並不開始發射數據,只有在它的 connect() 被調用時纔開始。用這種方法,你能夠等部分或者全部的潛在訂閱者都訂閱了這個Observable以後纔開始發射數據。github

img-ConnectableObservable
注意: ConnectableObservable 的線程切換隻能經過 replay 操做符來實現,普通 Observable 的 subscribeOn()observerOn() 在 ConnectableObservable 中不起做用。能夠經過 replay 操做符的指定線程調度器的方式來進行線程的切換。緩存

Javadoc: ConnectableObservableapp

2. Publish

將普通的Observable轉換爲可鏈接的Observable(ConnectableObservable)。ide

若是要使用可鏈接的Observable,可使用Observable的 publish 操做符,來將相應轉換爲ConnectableObservable對象。函數

有一個變體接受一個函數做爲參數(publish(Function selector))。這個函數用原始Observable發射的數據做爲參數,產生 一個新的數據做爲 ConnectableObservable 給發射,替換原位置的數據項。實質是在簽名的基礎上添加一個 Map 操做。this

簡單實例:

// 1. publish()
  // 建立ConnectableObservable
  ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
          .publish();    // publish操做將Observable轉化爲一個可鏈接的Observable

    // 2. publish(Function<Observable<T>, ObservableSource<R>> selector)
  // 接受原始Observable的數據,產生一個新的Observable,能夠對這個Observable進行函數處理
  Observable<String> publish = Observable.range(1, 5)
          .publish(new Function<Observable<Integer>, ObservableSource<String>>() {

              @Override
              public ObservableSource<String> apply(Observable<Integer> integerObservable) throws Exception {
                  System.out.println("--> apply(4): " + integerObservable.toString());

                  Observable<String> map = integerObservable.map(new Function<Integer, String>() {

                      @Override
                      public String apply(Integer integer) throws Exception {
                          return "[this is map value]: " + integer * integer;
                      }
                  });
                  return map;
              }
          });
          
    publish.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(4): " + s);
        }
    });

輸出:

--> apply(4): io.reactivex.subjects.PublishSubject@3fb4f649
--> accept(4): [this is map value]: 1
--> accept(4): [this is map value]: 4
--> accept(4): [this is map value]: 9
--> accept(4): [this is map value]: 16
--> accept(4): [this is map value]: 25

Javadoc: Observable.publish()
Javadoc: Observable.publish(Function<Observable<T>,ObservableSource<R> selector)

3. Connect

讓一個可鏈接的Observable開始發射數據給訂閱者。

  • 可鏈接的Observable (connectableObservable)與普通的Observable差很少,不過它並不會在被訂閱時開始發射數據,而是直到使用了 Connect 操做符時纔會開始。
  • RxJava中 connect 是 ConnectableObservable 接口的一個方法,使用 publish 操做符能夠將一個普通的Observable轉換爲一個 ConnectableObservable 。
  • 調用 ConnectableObservable 的 connect 方法會讓它後面的Observable開始給發射數據給訂閱 者。connect 方法返回一個 Subscription 對象,能夠調用它的 unsubscribe 方法讓Observable停 止發射數據給觀察者。
  • 即便沒有任何訂閱者訂閱它,你也可使用 connect 方法讓一個Observable開始發射數據 (或者開始生成待發射的數據)。這樣,你能夠將一個"冷"的Observable變爲"熱"的。

實例代碼:

// 1. publish()
    // 建立ConnectableObservable
    ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
            .publish();    // publish操做將Observable轉化爲一個可鏈接的Observable

    // 建立普通的Observable
    Observable<Integer> range = Observable.range(1, 5);

    // 1.1 connectableObservable在被訂閱時並不開始發射數據,只有在它的 connect() 被調用時纔開始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(1)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(1): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1)");
        }
    });

    // 1.2 connectableObservable在被訂閱時並不開始發射數據,只有在它的 connect() 被調用時纔開始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(2)");
        }
    });

    // 1.3 普通Observable在被訂閱時就會發射數據
    range.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(3)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(3): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(3): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(3)");
        }
    });

    System.out.println("----------------start connect------------------");
    // 可鏈接的Observable在被訂閱時並不開始發射數據,只有在它的connect()被調用時纔開始發射數據
    // connectableObservable.connect();
    
    // 可選參數Consumer,返回一個Disposable對象,能夠獲取訂閱狀態和取消當前的訂閱
    connectableObservable.connect(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> connect accept: " + disposable.isDisposed());
            // disposable.dispose();
        }
    });

輸出:

--> onSubscribe(1)
--> onSubscribe(2)
--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 3
--> onNext(3): 4
--> onNext(3): 5
--> onComplete(3)
----------------start connect------------------
--> connect accept: false
--> onNext(1): 1
--> onNext(2): 1
--> onNext(1): 2
--> onNext(2): 2
--> onNext(1): 3
--> onNext(2): 3
--> onNext(1): 4
--> onNext(2): 4
--> onNext(1): 5
--> onNext(2): 5
--> onComplete(1)
--> onComplete(2)

Javadoc: ConnectableObservable.connect()
Javadoc: ConnectableObservable.connect(Consumer<Disposable> connection)

4. RefCount

RefCount 的做用是讓一個可鏈接的Observable行爲像普通的Observable。

RefCount 操做符把從一個可鏈接的Observable鏈接和斷開的過程自動化了。它操做一個可鏈接的Observable,返回一個普通的Observable。當第一個訂閱者訂閱這個Observable 時, RefCount 鏈接到下層的可鏈接Observable。 RefCount 跟蹤有多少個觀察者訂閱它,直到最後一個觀察者完成才斷開與下層可鏈接Observable的鏈接。

解析: refCount() 把 ConnectableObservable 變爲一個普通的 Observable 但又保持了 ConnectableObservable 的特性。若是出現第一個 Observer,它就會自動調用 connect(),若是全部的 Observer 所有 dispose,那麼它也會中止接受上游 Observable 的數據。

實例代碼:

/**
     * refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler)
     *
     * 具備如下可選參數:
     * subscriberCount: 指定須要鏈接到上游的訂閱者數量。注意:當訂閱者知足此數量後纔會處理
     * timeout:         全部訂閱用戶退訂後斷開鏈接前的等待時間
     * unit:            時間單位
     * scheduler:        斷開鏈接以前要等待的目標調度器
     */
    Observable<Long> refCountObservable = Observable
            .intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .publish()
            .refCount()
            .subscribeOn(Schedulers.newThread())    // 指定訂閱調度在子線程
            .observeOn(Schedulers.newThread());     // 指定觀察者調度在子線程
        //  .refCount(1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread());

    // 第1個訂閱者
    refCountObservable.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除當前的訂閱
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 第2個訂閱者
    refCountObservable.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(2, TimeUnit.SECONDS)   // 延遲2秒後訂閱
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

輸出:

----> onSubscribe(1): 
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2): 
----> Subscribe(1) is dispose! 
--> accept(2): 4
--> accept(2): 5

Javadoc: ConnectableObservable.refCount(subscriberCount, timeout, unit, scheduler)

5. Share

一個普通的Observable能夠經過 publish 來將其轉換爲ConnectableObservable,而後能夠調用其 refCount() 的方法將其轉換爲一個具備 ConnectableObservable 特性的Observable。

其實Observable中還有一個操做方法,能夠直接完成此步驟的操做,這就是 Observable.share() 操做符。

能夠來看一下share操做符的源碼:

public final Observable<T> share() {
        return publish().refCount();
    }

經過源碼能夠知道,share() 方法能夠直接將Observable轉換爲一個具備ConnectableObservable特性的Observable對象,即Observable.publish().refCount() == Observable.share()

實例代碼:

// share()
    // 經過share() 同時應用 publish 和 refCount 操做
    Observable<Long> share = Observable
            .intervalRange(1, 5, 0, 500, TimeUnit.MILLISECONDS)
      //    .publish().refCount()
            .share()  // 等價於上面的操做
            .subscribeOn(Schedulers.newThread())    // 指定訂閱調度在子線程
            .observeOn(Schedulers.newThread());     // 指定觀察者調度在子線程

    // 1. 第一個訂閱者
    share.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除當前的訂閱
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 2. 第二個訂閱者
    share.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(1, TimeUnit.SECONDS)    // 延遲1秒後訂閱
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

輸出:

----> onSubscribe(1): 
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2): 
----> Subscribe(1) is dispose! 
--> accept(2): 4
--> accept(2): 5

Javadoc: Observable.share()

6. Replay

保證全部的觀察者收到相同的數據序列,即便它們在Observable開始發射數據以後才訂閱。

img-Replay

若是在將一個Observable轉換爲可鏈接的Observable以前對它使用 Replay 操做符,產生的這個可鏈接Observable將老是發射完整的數據序列給任何將來的觀察者,能夠緩存發射過的數據,即便那些觀察者在這 個Observable開始給其它觀察者發射數據以後才訂閱。

注意: replay操做符生成的 connectableObservable ,若是沒有對緩存進行限定,那麼不管觀察者什麼時候去訂閱,均可以收到 Observable 完整的數據序列項。

replay 操做符最好根據實際狀況限定緩存的大小,不然數據發射過快或者較多時會佔用很高的內存。replay 操做符有能夠接受不一樣參數的變體,有的能夠指定 replay 的最大緩存數量或者指定緩存時間,還能夠指定調度器。

  • replay不只能夠緩存Observable的全部數據序列,也能夠進行限定緩存大小的操做。
  • 還有有一種 replay 返回一個普通的Observable。它能夠接受一個變換函數爲參數,這個函數接受原始Observable發射的數據項爲參數,返回結果Observable要發射的一項數據。所以,這個操做符實際上是 replay 變換以後的數據項。

實例代碼:

// 建立發射數據的Observable
    Observable<Long> observable = Observable
            .intervalRange(1,
                    10,
                    1,
                    500,
                    TimeUnit.MILLISECONDS,
                    Schedulers.newThread());

    /**
     * 1.1 replay(Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 接受原始數據的全部數據
     */
//  ConnectableObservable<Long> replay1 = observable.replay();

    /**
     * 1.2 replay(int bufferSize, Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 只緩存 bufferSize 個最近的原始數據
     */
//  ConnectableObservable<Long> replay1 = observable.replay(1); // 設置緩存大小爲1, 從原數據中緩存最近的1個數據

    /**
     * 1.3 replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 在訂閱前指定的時間段內緩存 bufferSize 個數據, 注意計時開始是原始數據發射第1個數據項以後開始
     */
//  ConnectableObservable<Long> replay1 = observable.replay(5, 1000, TimeUnit.MILLISECONDS);

    /**
     * 1.4 replay(long time, TimeUnit unit, Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 在訂閱前指定的時間段內緩存數據, 注意計時開始是原始數據發射第1個數據項以後開始
     */
   ConnectableObservable<Long> replay1 = observable.replay( 1000, TimeUnit.MILLISECONDS);

    // 進行 connect 操做
    replay1.connect();

    // 第一個觀察者
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-1)");
        }
    }).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println("--> accept(1-1): " + aLong);
        }
    });

    // 第二個觀察者(延遲1秒後訂閱)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-2)");
        }
    }).delaySubscription(1, TimeUnit.SECONDS)
      .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("--> accept(1-2): " + aLong);
            }
      });

    // 第三個觀察者(延遲2秒後訂閱)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-3)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
       .subscribe(new Consumer<Long>() {
           @Override
           public void accept(Long aLong) throws Exception {
               System.out.println("--> accept(1-3): " + aLong);
           }
       });

    System.in.read();
    System.out.println("----------------------------------------------------------");
    /**
     * 2. replay(Function<Observable<T>, ObservableSource<R>> selector,
     * int bufferSize,                              可選參數: 指定從元數據序列數據的緩存大小
     * long time, TimeUnit unit,        可選參數: 指定緩存指定時間段的數據序列
     * Scheduler scheduler)                 可選參數: 指定線程調度器
     *
     * 接受一個變換函數 function 爲參數,這個函數接受原始Observable發射的數據項爲參數
     * 經過指定的函數處理後,返回一個處理後的Observable
     */
    Observable<String> replayObservable = observable.replay(new Function<Observable<Long>, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Observable<Long> longObservable) throws Exception {
            // 對原始數據進行處理
            Observable<String> map = longObservable.map(new Function<Long, String>() {
                @Override
                public String apply(Long aLong) throws Exception {
                    return aLong + "² = " + aLong * aLong;  // 將原始數據進行平方處理,並轉換爲字符串數據類型
                }
            });

            return map;
        }
    }, 1, Schedulers.newThread());

    replayObservable.subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread());

    // 第一個觀察者
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-1)");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(2-1): " + s);
        }
    });

    // 訂閱第二個觀察者 (延遲2秒後訂閱)
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-2)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
      .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("--> accept(2-2): " + s);
            }
       });

    System.in.read();

輸出:

----> onSubScribe(1-1)
--> accept(1-1): 1
--> accept(1-1): 2
--> accept(1-1): 3
----> onSubScribe(1-2)
--> accept(1-2): 2
--> accept(1-2): 3
--> accept(1-1): 4
--> accept(1-2): 4
--> accept(1-1): 5
--> accept(1-2): 5
----> onSubScribe(1-3)
--> accept(1-3): 4
--> accept(1-3): 5
--> accept(1-1): 6
--> accept(1-2): 6
--> accept(1-3): 6
--> accept(1-1): 7
--> accept(1-2): 7
--> accept(1-3): 7
--> accept(1-1): 8
--> accept(1-2): 8
--> accept(1-3): 8
--> accept(1-1): 9
--> accept(1-2): 9
--> accept(1-3): 9
--> accept(1-1): 10
--> accept(1-2): 10
--> accept(1-3): 10
----------------------------------------------------------
--> onSubScribe(2-1)
--> accept(2-1): 1² = 1
--> accept(2-1): 2² = 4
--> accept(2-1): 3² = 9
--> accept(2-1): 4² = 16
--> onSubScribe(2-2)
--> accept(2-1): 5² = 25
--> accept(2-2): 1² = 1
--> accept(2-2): 2² = 4
--> accept(2-1): 6² = 36
--> accept(2-2): 3² = 9
--> accept(2-1): 7² = 49
--> accept(2-1): 8² = 64
--> accept(2-2): 4² = 16
--> accept(2-2): 5² = 25
--> accept(2-1): 9² = 81
--> accept(2-2): 6² = 36
--> accept(2-1): 10² = 100
--> accept(2-2): 7² = 49
--> accept(2-2): 8² = 64
--> accept(2-2): 9² = 81
--> accept(2-2): 10² = 100

Javadoc: Observable.replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: Observable.replay(Function<Observable ,ObservableSource > selector, int bufferSize, long time, TimeUnit unit, Scheduler scheduler)

小結

Rxjava 的鏈接操做符主要的核心是 ConnectableObservable 這個可鏈接的Observable對象的概念。可鏈接的 Observable 在被訂閱時並不會直接發射數據,只有在他的 connect() 方法被調用時纔會發射數據。便於更好的對數據的發射行爲的控制,同時也對數據有很好的操做能力,能夠緩存數據,指定緩存大小,時間片斷緩存等。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

相關文章
相關標籤/搜索