今天,咱們來整理如下幾個你們容易弄混的概念,並用實際例子來演示,能夠從 RxSample 的第十二章中獲取:java
publish
reply
ConnectableObservable
connect
share
refCount
autoConnect
對於以上這些概念,能夠用一幅圖來歸納: git
從圖中能夠看出,這裏面能夠供使用者訂閱的Observable
能夠分爲四類,下面咱們將逐一介紹這幾種
Observable
的特色:
Cold Observable
,就是咱們經過Observable.create
、Observable.interval
等建立型操做符生成的Observable
。Cold Observable
通過publish()
或者replay(int N)
操做符轉換成的ConnectableObservable
。ConnectableObservable
通過refCount()
,或者由Cold Observable
通過share()
轉換成的Observable
。ConnectableObservable
通過autoConnect(int N)
轉換成的Observable
。Cold Observable
就是咱們經過Observable.create
、Observable.interval
等建立型操做符生成的Observable
,它具備如下幾個特色:github
Cold Observable
時,Cold Observable
會從新開始發射數據給該訂閱者。Cold Observable
,它們收到的數據是相互獨立的。Cold Observable
後,Cold Observable
會中止發射數據給該訂閱者,但不會中止發射數據給其它訂閱者。下面,咱們演示一個例子,首先咱們建立一個Cold Observable
:緩存
//直接訂閱Cold Observable。
private void createColdSource() {
mConvertObservable = getSource();
}
private Observable<Integer> getSource() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
try {
int i = 0;
while (true) {
Log.d(TAG, "源被訂閱者發射數據=" + i + ",發送線程ID=" + Thread.currentThread().getId());
mSourceOut.add(i);
observableEmitter.onNext(i++);
updateMessage();
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io());
}
複製代碼
在建立兩個訂閱者,它們能夠隨時訂閱到Cold Observable
或者取消對它的訂閱:ide
private void startSubscribe1() {
if (mConvertObservable != null && mDisposable1 == null) {
mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "訂閱者1收到數據=" + integer + ",接收線程ID=" + Thread.currentThread().getId());
mSubscribe1In.add(integer);
updateMessage();
}
});
}
}
private void disposeSubscribe1() {
if (mDisposable1 != null) {
mDisposable1.dispose();
mDisposable1 = null;
mSubscribe1In.clear();
updateMessage();
}
}
private void startSubscribe2() {
if (mConvertObservable != null && mDisposable2 == null) {
mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "訂閱者2收到數據=" + integer + ",接收線程ID=" + Thread.currentThread().getId());
mSubscribe2In.add(integer);
updateMessage();
}
});
}
}
private void disposeSubscribe2() {
if (mDisposable2 != null) {
mDisposable2.dispose();
mDisposable2 = null;
mSubscribe2In.clear();
updateMessage();
}
}
複製代碼
爲了驗證以前說到的幾個特色,進入程序以後,咱們會先建立該Cold Observable
,以後進行一系列的操做,效果以下: spa
Cold Observable
,這時候Cold Observable
沒有發送任何數據。Observer1
訂閱Observable
,此時Cold Observable
開始發送數據,Observer1
也能夠收到數據,即 一個訂閱者訂閱 Cold Observable 時, Cold Observable 會開始發射數據給該訂閱者Observer2
訂閱Observable
,此時Observable2
也能夠收到數據,可是它和Observable1
收到的數據是相互獨立的,即 當多個訂閱者訂閱到同一個 Cold Observable ,它們收到的數據是相互獨立的。Observer1
取消對Observable
的訂閱,這時候Observer1
收不到數據,而且Observable
也不會發射數據給它,可是仍然會發射數據給Observer2
,即 當一個訂閱者取消訂閱 Cold Observable 後,Cold Observable 會中止發射數據給該訂閱者,但不會中止發射數據給其它訂閱者。Observer1
從新訂閱Observable
,這時候Observable
從0
開始發射數據給Observer1
,即 一個訂閱者訂閱 Cold Observable 時, Cold Observable 會從新開始發射數據給該訂閱者。在瞭解完Cold Observable
以後,咱們再來看第二類的Observable
,它的類型爲ConnectableObservable
,它是經過Cold Observable
通過下面兩種方式生成的:線程
.publish()
.reply(int N)
若是使用.publish()
建立,那麼訂閱者只能收到在訂閱以後Cold Observable
發出的數據,而若是使用reply(int N)
建立,那麼訂閱者在訂閱後能夠收到Cold Observable
在訂閱以前發送的N
個數據。3d
咱們先以publish()
爲例,介紹ConnectableObservable
的幾個特色:code
ConnectableObservable
有沒有訂閱者,只要調用了ConnectableObservable
的connect
方法,Cold Observable
就開始發送數據。connect
會返回一個Disposable
對象,調用了該對象的dispose
方法,Cold Observable
將會中止發送數據,全部ConnectableObservable
的訂閱者也沒法收到數據。connect
返回的Disposable
對象後,若是從新調用了connect
方法,那麼Cold Observable
會從新發送數據。ConnectableObservable
後,該訂閱者會收到在訂閱以後,Cold Observable
發送給ConnectableObservable
的數據。ConnectableObservable
時,它們收到的數據是相同的。ConnectableObservable
,不會影響其餘訂閱者收到消息。下面,咱們建立一個ConnectableObservable
,兩個訂閱者以後會訂閱到它,而不是Cold Observable
:cdn
//.publish()將源Observable轉換成爲HotObservable,當調用它的connect方法後,不管此時有沒有訂閱者,源Observable都開始發送數據,訂閱者訂閱後將能夠收到數據,而且訂閱者解除訂閱不會影響源Observable數據的發射。
public void createPublishSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish();
mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
}
複製代碼
和上面同樣,仍是用一個例子來演示,該例子的效果爲:
Cold Observable
的publish
方法建立ConnectableObservable
,並調用ConnectableObservable
的connect
方法,能夠看到,此時雖然ConnectableObservable
沒有任何訂閱者,可是Cold Observable
也已經開始發送數據。Observer1
訂閱到ConnectableObservable
,此時它只能收到訂閱以後Cold Observable
發射的數據。Observer2
訂閱到ConnectableObservable
,Cold Observable
只會發射一份數據,而且Observer1
和Observer2
收到的數據是相同的。Observer1
取消對ConnectableObservable
的訂閱,Cold Observable
仍然會發射數據,Observer2
仍然能夠收到Cold Observable
發射的數據。Observer1
從新訂閱ConnectableObservable
,和第三步相同,它仍然只會收到訂閱以後Cold Observable
發射的數據。connect
返回的Disposable
對象,調用dispose
方法,此時Cold Observable
中止發射數據,而且Observer1
和Observer2
都收不到數據。上面這些現象發生的根本緣由在於:如今Observer
和Observer2
都是訂閱到ConnectableObservable
,真正產生數據的Cold Observable
並不知道他們的存在,和它交互的是ConnectableObservable
,ConnectableObservable
至關於一箇中介,它完成下面兩項任務:
connect
和dispose
方法決定是否要訂閱到Cold Observer
,也就是決定了Cold Observable
是否發送數據。Cold Observable
發送的數據轉交給它的訂閱者。由ConnectableObservable
轉換成Observable
有兩種方法,咱們分爲兩節介紹下當訂閱到轉換後的Observable
時的現象:
.refCount()
.autoConnect(int N)
通過refCount
方法,ConnectableObservable
能夠轉換成正常的Observable
,咱們稱爲refObservable
,這裏咱們假設ConnectableObservable
是由Cold Observable
經過publish()
方法轉換的,對於它的訂閱者,有如下幾個特色:
refObservable
後,Cold Observable
開始發送數據。refObservable
後,只能收到在訂閱以後Cold Observable
發送的數據。refObservable
後,假如它是當前refObservable
的惟一一個訂閱者,那麼Cold Observable
會中止發送數據;不然,Cold Observable
仍然會繼續發送數據,其它的訂閱者仍然能夠收到Cold Observable
發送的數據。接着上例子,咱們建立一個refObservable
:
//.share()至關於.publish().refCount(),當有訂閱者訂閱時,源訂閱者會開始發送數據,若是全部的訂閱者都取消訂閱,源Observable就會中止發送數據。
private void createShareSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish().refCount();
}
複製代碼
示例以下:
操做分爲如下幾步:.publish().refCount()
建立由ConnectableObservable
轉換後的refObservable
,此時Cold Observable
沒有發送任何消息。Observer1
訂閱到refObservable
,Cold Observable
開始發送數據,Observer1
接收數據。Observer2
訂閱到refObservable
,它只能收到在訂閱以後Cold Observable
發送的數據。Observer1
取消訂閱,Cold Observable
繼續發送數據,Observer2
仍然能收到數據。Observer2
取消訂閱,Cold Observable
中止發送數據。Observer1
從新訂閱,Cold Observable
從新開始發送數據。最後說明一點:訂閱到Cold Observable
的.publish().refCount()
和Cold Observable
的share()
所返回的Observable
是等價的。
autoConnect(int N)
和refCount
很相似,都是將ConnectableObservable
轉換成普通的Observable
,咱們稱爲autoObservable
,一樣咱們先假設ConnectableObservable
是由Cold Observable
經過publish()
方法生成的,它有如下幾個特色:
N
個訂閱者訂閱到refObservable
後,Cold Observable
開始發送數據。refObservable
後,只能收到在訂閱以後Cold Observable
發送的數據。Cold Observable
開始發送數據,即便全部的autoObservable
的訂閱和都取消了訂閱,Cold Observable
也不會中止發送數據,若是想要Cold Observable
中止發送數據,那麼可使用autoConnect(int numberOfSubscribers, Consumer connection)
中Consumer
返回的Disposable
,它的做用和ConnectableObservable
的connect
方法返回的Disposable
相同。其建立方法以下所示:
//.autoConnect在有指定個訂閱者時開始讓源Observable發送消息,可是訂閱者是否取消訂閱不會影響到源Observable的發射。
private void createAutoConnectSource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
mConvertDisposable = disposable;
}
});
}
複製代碼
示例效果以下:
咱們進行了以下幾步操做:autoConnect
轉換後的autoObservable
。Observer1
訂閱到autoObservable
,此時知足條件,Cold Observable
開始發送數據。Observer2
訂閱到autoObservable
,它只能收到訂閱後發生的數據。Observer1
取消訂閱,Cold Observable
繼續發送數據,Observer2
仍然能夠收到數據。Observer2
取消訂閱,Cold Observable
仍然繼續發送數據。Observer2
訂閱到autoObservable
,它只能收到訂閱後發送的消息了。mConvertDisposable
的dispose
,Cold Observable
中止發送數據。在上面的例子當中,全部總結的特色都是創建在ConnectableObservable
是由publish()
生成,只因此這麼作,是爲了方便你們理解,不管是訂閱到ConnectableObservable
,仍是由ConnectableObservable
轉換的refObservable
和autoObservable
,使用這兩種方式建立的惟一區別就是,訂閱者在訂閱後,若是是經過publish()
建立的,那麼訂閱者以後收到訂閱後Cold Observable
發送的數據;而若是是reply(int N)
建立的,那麼訂閱者還能額外收到N
個以前Cold Observable
發送的數據,咱們用下面一個小例子來演示,訂閱者訂閱到的Observable
以下:
//.reply會讓緩存源Observable的N個數據項,當有新的訂閱者訂閱時,它會發送這N個數據項給它。
private void createReplySource() {
mColdObservable = getSource();
mConvertObservable = mColdObservable.replay(3);
mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
}
複製代碼
示例演示效果:
操做步驟:Cold Observable
的publish
方法建立ConnectableObservable
,並調用ConnectableObservable
的replay(3)
方法,能夠看到,此時雖然ConnectableObservable
沒有任何訂閱者,可是Cold Observable
也已經開始發送數據。Observer1
訂閱到ConnectableObservable
,此時它會先收到以前發射的3
個數據,以後收到訂閱以後Cold Observable
發射的數據。最後再提一下,更詳細的代碼你們能夠從 RxSample 的第十二章中獲取。