需求瞭解:java
Rxjava中的普通的 Observable
在觀察者訂閱的時候就會發射數據,可是有的時候咱們想本身控制數據的發射,好比在有指定的觀察者或者所有的觀察者訂閱後開始發射數據,這個時候咱們就要要用到Rxjava中的可鏈接的Observable來完成這個需求。react
這一節主要介紹 ConnectableObservable
和它的子類以及它們的操做符:git
一個可鏈接的Observable(ConnectableObservable
)與普通的Observable差很少。不一樣之處:可鏈接的Observable在被訂閱時並不開始發射數據,只有在它的 connect()
被調用時纔開始。用這種方法,你能夠等部分或者全部的潛在訂閱者都訂閱了這個Observable以後纔開始發射數據。github
注意: ConnectableObservable 的線程切換隻能經過 replay
操做符來實現,普通 Observable 的 subscribeOn()
和 observerOn()
在 ConnectableObservable 中不起做用。能夠經過 replay 操做符的指定線程調度器的方式來進行線程的切換。緩存
Javadoc: ConnectableObservableapp
將普通的Observable轉換爲可鏈接的Observable(ConnectableObservable
)。ide
若是要使用可鏈接的Observable,可使用Observable的 publish
操做符,來將相應轉換爲ConnectableObservable對象。函數
有一個變體接受一個函數做爲參數(publish(Function selector)
)。這個函數用原始Observable發射的數據做爲參數,產生 一個新的數據做爲 ConnectableObservable 給發射,替換原位置的數據項。實質是在簽名的基礎上添加一個 Map 操做。this
簡單實例:
// 1. publish() // 建立ConnectableObservable ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5) .publish(); // publish操做將Observable轉化爲一個可鏈接的Observable // 2. publish(Function<Observable<T>, ObservableSource<R>> selector) // 接受原始Observable的數據,產生一個新的Observable,能夠對這個Observable進行函數處理 Observable<String> publish = Observable.range(1, 5) .publish(new Function<Observable<Integer>, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Observable<Integer> integerObservable) throws Exception { System.out.println("--> apply(4): " + integerObservable.toString()); Observable<String> map = integerObservable.map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "[this is map value]: " + integer * integer; } }); return map; } }); publish.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("--> accept(4): " + s); } });
輸出:
--> apply(4): io.reactivex.subjects.PublishSubject@3fb4f649 --> accept(4): [this is map value]: 1 --> accept(4): [this is map value]: 4 --> accept(4): [this is map value]: 9 --> accept(4): [this is map value]: 16 --> accept(4): [this is map value]: 25
Javadoc: Observable.publish()
Javadoc: Observable.publish(Function<Observable<T>,ObservableSource<R> selector)
讓一個可鏈接的Observable開始發射數據給訂閱者。
connect
方法會讓它後面的Observable開始給發射數據給訂閱 者。connect 方法返回一個 Subscription 對象,能夠調用它的 unsubscribe 方法讓Observable停 止發射數據給觀察者。實例代碼:
// 1. publish() // 建立ConnectableObservable ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5) .publish(); // publish操做將Observable轉化爲一個可鏈接的Observable // 建立普通的Observable Observable<Integer> range = Observable.range(1, 5); // 1.1 connectableObservable在被訂閱時並不開始發射數據,只有在它的 connect() 被調用時纔開始 connectableObservable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(1)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(1): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(1): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(1)"); } }); // 1.2 connectableObservable在被訂閱時並不開始發射數據,只有在它的 connect() 被調用時纔開始 connectableObservable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(2): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(2): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(2)"); } }); // 1.3 普通Observable在被訂閱時就會發射數據 range.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(3): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(3)"); } }); System.out.println("----------------start connect------------------"); // 可鏈接的Observable在被訂閱時並不開始發射數據,只有在它的connect()被調用時纔開始發射數據 // connectableObservable.connect(); // 可選參數Consumer,返回一個Disposable對象,能夠獲取訂閱狀態和取消當前的訂閱 connectableObservable.connect(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("--> connect accept: " + disposable.isDisposed()); // disposable.dispose(); } });
輸出:
--> onSubscribe(1) --> onSubscribe(2) --> onSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> onNext(3): 3 --> onNext(3): 4 --> onNext(3): 5 --> onComplete(3) ----------------start connect------------------ --> connect accept: false --> onNext(1): 1 --> onNext(2): 1 --> onNext(1): 2 --> onNext(2): 2 --> onNext(1): 3 --> onNext(2): 3 --> onNext(1): 4 --> onNext(2): 4 --> onNext(1): 5 --> onNext(2): 5 --> onComplete(1) --> onComplete(2)
Javadoc: ConnectableObservable.connect()
Javadoc: ConnectableObservable.connect(Consumer<Disposable> connection)
RefCount
的做用是讓一個可鏈接的Observable行爲像普通的Observable。
RefCount 操做符把從一個可鏈接的Observable鏈接和斷開的過程自動化了。它操做一個可鏈接的Observable,返回一個普通的Observable。當第一個訂閱者訂閱這個Observable 時, RefCount 鏈接到下層的可鏈接Observable。 RefCount 跟蹤有多少個觀察者訂閱它,直到最後一個觀察者完成才斷開與下層可鏈接Observable的鏈接。
解析: refCount()
把 ConnectableObservable 變爲一個普通的 Observable 但又保持了 ConnectableObservable 的特性。若是出現第一個 Observer,它就會自動調用 connect()
,若是全部的 Observer 所有 dispose
,那麼它也會中止接受上游 Observable 的數據。
實例代碼:
/** * refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler) * * 具備如下可選參數: * subscriberCount: 指定須要鏈接到上游的訂閱者數量。注意:當訂閱者知足此數量後纔會處理 * timeout: 全部訂閱用戶退訂後斷開鏈接前的等待時間 * unit: 時間單位 * scheduler: 斷開鏈接以前要等待的目標調度器 */ Observable<Long> refCountObservable = Observable .intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS) .publish() .refCount() .subscribeOn(Schedulers.newThread()) // 指定訂閱調度在子線程 .observeOn(Schedulers.newThread()); // 指定觀察者調度在子線程 // .refCount(1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread()); // 第1個訂閱者 refCountObservable.subscribe(new Observer<Long>() { private Disposable disposable; private int buff = 0; @Override public void onSubscribe(Disposable d) { System.out.println("----> onSubscribe(1): "); disposable = d; } @Override public void onNext(Long value) { if (buff == 3) { disposable.dispose(); // 解除當前的訂閱 System.out.println("----> Subscribe(1) is dispose! "); } else { System.out.println("--> onNext(1): " + value); } buff++; } @Override public void onError(Throwable e) { System.out.println("--> onError(1): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(1): "); } }); // 第2個訂閱者 refCountObservable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> onSubscribe(2): "); } }) .delaySubscription(2, TimeUnit.SECONDS) // 延遲2秒後訂閱 .subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println("--> accept(2): " + value); } }); System.in.read();
輸出:
----> onSubscribe(1): --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 3 ----> onSubscribe(2): ----> Subscribe(1) is dispose! --> accept(2): 4 --> accept(2): 5
Javadoc: ConnectableObservable.refCount(subscriberCount, timeout, unit, scheduler)
一個普通的Observable能夠經過 publish
來將其轉換爲ConnectableObservable,而後能夠調用其 refCount()
的方法將其轉換爲一個具備 ConnectableObservable 特性的Observable。
其實Observable中還有一個操做方法,能夠直接完成此步驟的操做,這就是 Observable.share()
操做符。
能夠來看一下share操做符的源碼:
public final Observable<T> share() { return publish().refCount(); }
經過源碼能夠知道,share() 方法能夠直接將Observable轉換爲一個具備ConnectableObservable特性的Observable對象,即Observable.publish().refCount() == Observable.share()
。
實例代碼:
// share() // 經過share() 同時應用 publish 和 refCount 操做 Observable<Long> share = Observable .intervalRange(1, 5, 0, 500, TimeUnit.MILLISECONDS) // .publish().refCount() .share() // 等價於上面的操做 .subscribeOn(Schedulers.newThread()) // 指定訂閱調度在子線程 .observeOn(Schedulers.newThread()); // 指定觀察者調度在子線程 // 1. 第一個訂閱者 share.subscribe(new Observer<Long>() { private Disposable disposable; private int buff = 0; @Override public void onSubscribe(Disposable d) { System.out.println("----> onSubscribe(1): "); disposable = d; } @Override public void onNext(Long value) { if (buff == 3) { disposable.dispose(); // 解除當前的訂閱 System.out.println("----> Subscribe(1) is dispose! "); } else { System.out.println("--> onNext(1): " + value); } buff++; } @Override public void onError(Throwable e) { System.out.println("--> onError(1): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(1): "); } }); // 2. 第二個訂閱者 share.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> onSubscribe(2): "); } }) .delaySubscription(1, TimeUnit.SECONDS) // 延遲1秒後訂閱 .subscribe(new Consumer<Long>() { @Override public void accept(Long value) throws Exception { System.out.println("--> accept(2): " + value); } }); System.in.read();
輸出:
----> onSubscribe(1): --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 3 ----> onSubscribe(2): ----> Subscribe(1) is dispose! --> accept(2): 4 --> accept(2): 5
Javadoc: Observable.share()
保證全部的觀察者收到相同的數據序列,即便它們在Observable開始發射數據以後才訂閱。
若是在將一個Observable轉換爲可鏈接的Observable以前對它使用 Replay 操做符,產生的這個可鏈接Observable將老是發射完整的數據序列給任何將來的觀察者,能夠緩存發射過的數據,即便那些觀察者在這 個Observable開始給其它觀察者發射數據以後才訂閱。
注意: replay操做符生成的 connectableObservable
,若是沒有對緩存進行限定,那麼不管觀察者什麼時候去訂閱,均可以收到 Observable 完整的數據序列項。
replay
操做符最好根據實際狀況限定緩存的大小,不然數據發射過快或者較多時會佔用很高的內存。replay
操做符有能夠接受不一樣參數的變體,有的能夠指定 replay
的最大緩存數量或者指定緩存時間,還能夠指定調度器。
實例代碼:
// 建立發射數據的Observable Observable<Long> observable = Observable .intervalRange(1, 10, 1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread()); /** * 1.1 replay(Scheduler scheduler) * 可選參數:scheduler, 指定線程調度器 * 接受原始數據的全部數據 */ // ConnectableObservable<Long> replay1 = observable.replay(); /** * 1.2 replay(int bufferSize, Scheduler scheduler) * 可選參數:scheduler, 指定線程調度器 * 只緩存 bufferSize 個最近的原始數據 */ // ConnectableObservable<Long> replay1 = observable.replay(1); // 設置緩存大小爲1, 從原數據中緩存最近的1個數據 /** * 1.3 replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) * 可選參數:scheduler, 指定線程調度器 * 在訂閱前指定的時間段內緩存 bufferSize 個數據, 注意計時開始是原始數據發射第1個數據項以後開始 */ // ConnectableObservable<Long> replay1 = observable.replay(5, 1000, TimeUnit.MILLISECONDS); /** * 1.4 replay(long time, TimeUnit unit, Scheduler scheduler) * 可選參數:scheduler, 指定線程調度器 * 在訂閱前指定的時間段內緩存數據, 注意計時開始是原始數據發射第1個數據項以後開始 */ ConnectableObservable<Long> replay1 = observable.replay( 1000, TimeUnit.MILLISECONDS); // 進行 connect 操做 replay1.connect(); // 第一個觀察者 replay1.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> onSubScribe(1-1)"); } }).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println("--> accept(1-1): " + aLong); } }); // 第二個觀察者(延遲1秒後訂閱) replay1.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> onSubScribe(1-2)"); } }).delaySubscription(1, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println("--> accept(1-2): " + aLong); } }); // 第三個觀察者(延遲2秒後訂閱) replay1.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> onSubScribe(1-3)"); } }).delaySubscription(2, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println("--> accept(1-3): " + aLong); } }); System.in.read(); System.out.println("----------------------------------------------------------"); /** * 2. replay(Function<Observable<T>, ObservableSource<R>> selector, * int bufferSize, 可選參數: 指定從元數據序列數據的緩存大小 * long time, TimeUnit unit, 可選參數: 指定緩存指定時間段的數據序列 * Scheduler scheduler) 可選參數: 指定線程調度器 * * 接受一個變換函數 function 爲參數,這個函數接受原始Observable發射的數據項爲參數 * 經過指定的函數處理後,返回一個處理後的Observable */ Observable<String> replayObservable = observable.replay(new Function<Observable<Long>, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Observable<Long> longObservable) throws Exception { // 對原始數據進行處理 Observable<String> map = longObservable.map(new Function<Long, String>() { @Override public String apply(Long aLong) throws Exception { return aLong + "² = " + aLong * aLong; // 將原始數據進行平方處理,並轉換爲字符串數據類型 } }); return map; } }, 1, Schedulers.newThread()); replayObservable.subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()); // 第一個觀察者 replayObservable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("--> onSubScribe(2-1)"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("--> accept(2-1): " + s); } }); // 訂閱第二個觀察者 (延遲2秒後訂閱) replayObservable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("--> onSubScribe(2-2)"); } }).delaySubscription(2, TimeUnit.SECONDS) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("--> accept(2-2): " + s); } }); System.in.read();
輸出:
----> onSubScribe(1-1) --> accept(1-1): 1 --> accept(1-1): 2 --> accept(1-1): 3 ----> onSubScribe(1-2) --> accept(1-2): 2 --> accept(1-2): 3 --> accept(1-1): 4 --> accept(1-2): 4 --> accept(1-1): 5 --> accept(1-2): 5 ----> onSubScribe(1-3) --> accept(1-3): 4 --> accept(1-3): 5 --> accept(1-1): 6 --> accept(1-2): 6 --> accept(1-3): 6 --> accept(1-1): 7 --> accept(1-2): 7 --> accept(1-3): 7 --> accept(1-1): 8 --> accept(1-2): 8 --> accept(1-3): 8 --> accept(1-1): 9 --> accept(1-2): 9 --> accept(1-3): 9 --> accept(1-1): 10 --> accept(1-2): 10 --> accept(1-3): 10 ---------------------------------------------------------- --> onSubScribe(2-1) --> accept(2-1): 1² = 1 --> accept(2-1): 2² = 4 --> accept(2-1): 3² = 9 --> accept(2-1): 4² = 16 --> onSubScribe(2-2) --> accept(2-1): 5² = 25 --> accept(2-2): 1² = 1 --> accept(2-2): 2² = 4 --> accept(2-1): 6² = 36 --> accept(2-2): 3² = 9 --> accept(2-1): 7² = 49 --> accept(2-1): 8² = 64 --> accept(2-2): 4² = 16 --> accept(2-2): 5² = 25 --> accept(2-1): 9² = 81 --> accept(2-2): 6² = 36 --> accept(2-1): 10² = 100 --> accept(2-2): 7² = 49 --> accept(2-2): 8² = 64 --> accept(2-2): 9² = 81 --> accept(2-2): 10² = 100
Javadoc: Observable.replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: Observable.replay(Function<Observable,ObservableSource > selector, int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
Rxjava 的鏈接操做符主要的核心是 ConnectableObservable
這個可鏈接的Observable對象的概念。可鏈接的 Observable 在被訂閱時並不會直接發射數據,只有在他的 connect() 方法被調用時纔會發射數據。便於更好的對數據的發射行爲的控制,同時也對數據有很好的操做能力,能夠緩存數據,指定緩存大小,時間片斷緩存等。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: