Cold Observable 和 Hot Observable

Observable的分類

Observable 有 Cold 和 Hot 之分。html

hot&cold observable.jpg

Hot Observable 不管有沒有 Subscriber 訂閱,事件始終都會發生。當 Hot Observable 有多個訂閱者時,Hot Observable 與訂閱者們的關係是一對多的關係,能夠與多個訂閱者共享信息。java

然而,Cold Observable 只有 Subscriber 訂閱時,纔開始執行發射數據流的代碼。而且 Cold Observable 和 Subscriber 只能是一對一的關係,當有多個不一樣的訂閱者時,消息是從新完整發送的。也就是說對 Cold Observable 而言,有多個Subscriber的時候,他們各自的事件是獨立的。react

若是上面的解釋有點枯燥的話,那麼下面會更加形象地說明 Cold 和 Hot 的區別:安全

Think of a hot Observable as a radio station. All of the listeners that are listening to it at this moment listen to the same song.
A cold Observable is a music CD. Many people can buy it and listen to it independently.
by Nickolay Tsvetinov服務器

Cold Observable

Observable 的 just、creat、range、fromXXX 等操做符都能生成Cold Observable。網絡

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber2: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }複製代碼

執行結果:併發

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
   subscriber2: 3
subscriber1: 3
subscriber1: 4
   subscriber2: 4
   subscriber2: 5
subscriber1: 5
subscriber1: 6
   subscriber2: 6
subscriber1: 7
   subscriber2: 7
subscriber1: 8
   subscriber2: 8
subscriber1: 9
   subscriber2: 9複製代碼

能夠看出,subscriber1 和 subscriber2 的結果並不必定是相同的,兩者是徹底獨立的。app

儘管 Cold Observable 很好,可是對於某些事件不肯定什麼時候發生以及不肯定 Observable 發射的元素數量,那還得使用 Hot Observable。好比:UI交互的事件、網絡環境的變化、地理位置的變化、服務器推送消息的到達等等。ide

Cold Observable 如何轉換成 Hot Observable?

1. 使用publish,生成 ConnectableObservable

使用 publish 操做符,可讓 Cold Observable 轉換成 Hot Observable。它將原先的 Observable 轉換成 ConnectableObservable。this

來看看剛纔的例子:

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber3: "+aLong);
            }
        };

        ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        observable.connect();

        observable.subscribe(subscriber1);
        observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        observable.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }複製代碼

注意,生成的 ConnectableObservable 須要調用connect()才能真正執行。

執行結果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
subscriber1: 2
   subscriber2: 2
      subscriber3: 2
subscriber1: 3
   subscriber2: 3
      subscriber3: 3
subscriber1: 4
   subscriber2: 4
      subscriber3: 4
subscriber1: 5
   subscriber2: 5
      subscriber3: 5
subscriber1: 6
   subscriber2: 6
      subscriber3: 6
subscriber1: 7
   subscriber2: 7
      subscriber3: 7
subscriber1: 8
   subscriber2: 8
      subscriber3: 8
subscriber1: 9
   subscriber2: 9
      subscriber3: 9
subscriber1: 10
   subscriber2: 10
      subscriber3: 10
subscriber1: 11
   subscriber2: 11
      subscriber3: 11複製代碼

能夠看到,多個訂閱的 Subscriber 共享同一事件。
在這裏,ConnectableObservable 是線程安全的。

2. 使用Subject/Processor

Subject 和 Processor 的做用是相同的。Processor 是 RxJava2.x 新增的類,繼承自 Flowable 支持背壓控制。而 Subject 則不支持背壓控制。

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber3: "+aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread());

        PublishSubject<Long> subject = PublishSubject.create();
        observable.subscribe(subject);

        subject.subscribe(subscriber1);
        subject.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        subject.subscribe(subscriber3);

        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }複製代碼

執行結果跟上面使用 publish 操做符是同樣的。

Subject 既是 Observable 又是 Observer(Subscriber)。這一點能夠從 Subject 的源碼上看到。

import io.reactivex.*;
import io.reactivex.annotations.*;

/** * Represents an Observer and an Observable at the same time, allowing * multicasting events from a single source to multiple child Subscribers. * <p>All methods except the onSubscribe, onNext, onError and onComplete are thread-safe. * Use {@link #toSerialized()} to make these methods thread-safe as well. * * @param <T> the item value type */
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /** * Returns true if the subject has any Observers. * <p>The method is thread-safe. * @return true if the subject has any Observers */
    public abstract boolean hasObservers();

    /** * Returns true if the subject has reached a terminal state through an error event. * <p>The method is thread-safe. * @return true if the subject has reached a terminal state through an error event * @see #getThrowable() * &see {@link #hasComplete()} */
    public abstract boolean hasThrowable();

    /** * Returns true if the subject has reached a terminal state through a complete event. * <p>The method is thread-safe. * @return true if the subject has reached a terminal state through a complete event * @see #hasThrowable() */
    public abstract boolean hasComplete();

    /** * Returns the error that caused the Subject to terminate or null if the Subject * hasn't terminated yet. * <p>The method is thread-safe. * @return the error that caused the Subject to terminate or null if the Subject * hasn't terminated yet */
    @Nullable
    public abstract Throwable getThrowable();

    /** * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and * onComplete methods, making them thread-safe. * <p>The method is thread-safe. * @return the wrapped and serialized subject */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}複製代碼

當 Subject 做爲 Subscriber 時,它能夠訂閱目標 Cold Observable 使對方開始發送事件。同時它又做爲Observable 轉發或者發送新的事件,讓 Cold Observable 藉助 Subject 轉換爲 Hot Observable。

注意,Subject 並非線程安全的,若是想要其線程安全須要調用toSerialized()方法。(在RxJava1.x的時代還能夠用 SerializedSubject 代替 Subject,可是在RxJava2.x之後SerializedSubject再也不是一個public class)
然而,不少基於 EventBus 改造的 RxBus 並無這麼作,包括我之前也寫過這樣的 RxBus :( 。這樣的作法是很是危險的,由於會遇到併發的狀況。

Hot Observable 如何轉換成 Cold Observable?

1. ConnectableObservable的refCount操做符

reactivex官網的解釋是

make a Connectable Observable behave like an ordinary Observable

RefCount.png

RefCount操做符把從一個可鏈接的 Observable 鏈接和斷開的過程自動化了。它操做一個可鏈接的Observable,返回一個普通的Observable。當第一個訂閱者訂閱這個Observable時,RefCount鏈接到下層的可鏈接Observable。RefCount跟蹤有多少個觀察者訂閱它,直到最後一個觀察者完成才斷開與下層可鏈接Observable的鏈接。

若是全部的訂閱者都取消訂閱了,則數據流中止。若是從新訂閱則從新開始數據流。

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber2: "+aLong);
            }
        };

        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();

        Observable<Long> observable = connectableObservable.refCount();

        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        disposable1.dispose();
        disposable2.dispose();

        System.out.println("從新開始數據流");

        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }複製代碼

執行結果:

subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1
從新開始數據流
subscriber1: 0
   subscriber2: 0
subscriber1: 1
   subscriber2: 1複製代碼

若是不是全部的訂閱者都取消了訂閱,只取消了部分。部分的訂閱者從新開始訂閱,則不會從頭開始數據流。

Consumer<Long> subscriber1 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("subscriber1: "+aLong);
            }
        };

        Consumer<Long> subscriber2 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber2: "+aLong);
            }
        };

        Consumer<Long> subscriber3 = new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println(" subscriber3: "+aLong);
            }
        };

        ConnectableObservable<Long> connectableObservable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Long> e) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(e::onNext);
            }
        }).observeOn(Schedulers.newThread()).publish();
        connectableObservable.connect();

        Observable<Long> observable = connectableObservable.refCount();

        Disposable disposable1 = observable.subscribe(subscriber1);
        Disposable disposable2 = observable.subscribe(subscriber2);
        observable.subscribe(subscriber3);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        disposable1.dispose();
        disposable2.dispose();

        System.out.println("subscriber一、subscriber2 從新訂閱");

        disposable1 = observable.subscribe(subscriber1);
        disposable2 = observable.subscribe(subscriber2);

        try {
            Thread.sleep(20L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }複製代碼

執行結果:

subscriber1: 0
   subscriber2: 0
      subscriber3: 0
subscriber1: 1
   subscriber2: 1
      subscriber3: 1
subscriber一、subscriber2 從新訂閱
      subscriber3: 2
subscriber1: 2
   subscriber2: 2
      subscriber3: 3
subscriber1: 3
   subscriber2: 3
      subscriber3: 4
subscriber1: 4
   subscriber2: 4複製代碼

在這裏,subscriber1和subscriber2先取消了訂閱,subscriber3並無取消訂閱。以後,subscriber1和subscriber2又從新訂閱。最終subscriber一、subscriber二、subscriber3的值保持一致。

2. Observable的share操做符

share操做符封裝了publish().refCount()調用,能夠看其源碼。

/** * Returns a new {@link ObservableSource} that multicasts (shares) the original {@link ObservableSource}. As long as * there is at least one {@link Observer} this {@link ObservableSource} will be subscribed and emitting data. * When all subscribers have disposed it will dispose the source {@link ObservableSource}. * <p> * This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}. * <p> * ![](https://user-gold-cdn.xitu.io/2017/5/24/13f4b200f1918f0e92b457852181b676) * <dl> * <dt><b>Scheduler:</b></dt> * <dd>{@code share} does not operate by default on a particular {@link Scheduler}.</dd> * </dl> * * @return an {@code ObservableSource} that upon connection causes the source {@code ObservableSource} to emit items * to its {@link Observer}s * @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX operators documentation: RefCount</a> */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> share() {
        return publish().refCount();
    }複製代碼

總結

理解了 Hot Observable 和 Cold Observable 的區別纔可以寫出更好Rx代碼。同理,也能理解Hot & Cold Flowable。再者,在其餘語言的Rx版本中包括 RxSwift、RxJS 等也存在 Hot Observable 和 Cold Observable 這樣的概念。

相關文章
相關標籤/搜索