擁抱 RxJava(三):關於 Observable 的冷熱,常見的封裝方式以及誤區

這一系列文章原本我發表在簡書。最近開始轉移到掘金。之後也會在掘金髮表(慢慢拋棄簡書了應該,掘金的技術環境確實比簡書好些)。javascript


前言: 不少朋友誤會我文章的意思。我寫這個系列文章的意思主要是幫助瞭解一下RxJava的常見用法。而不是使用一下本身或別人封裝好的RxBus就以爲本身的項目使用RxJava了。可是這也僅僅是我的口味問題,不少狀況下確實RxBus/EventBus會很方便,很刺激,很上癮。因此從這篇文章開始,我把標題中的"放棄RxBus"去除。java

不管在簡書,微信平臺,GitHub,掘金等等分享平臺。一個名字上寫着 "MVP(MVVM) + RxJava + Retrofit + Dagger2 + ........"這樣的名字,再熟悉不過了。然而,大多數狀況進去看一下RxJava部分。要麼就是簡單的把取到的數據用Observable.just()直接傳給下一層,要麼就是直接使用Retrofit的Adapter來直接得到Observable,而app中其餘部分並無reactive。並且還有不少Observable用法錯誤,好比冷熱不分,連續太多的Map/FlatMap等等。react

0. RxBus/Retrofit 足夠用了,我爲何要讓本身的App 更加的Reactive?

爲何不用RxBus我已經寫了兩篇文章了,可能因爲我不常寫文,不少人並無理解。在這裏我再解釋一次:EventBus若是是一輛穿梭在全部代碼之間的公交車。那麼Observable就是穿梭在少量人之間的Uber專車。他比起EventBus有不少優點,好比類型安全,異常處理,線程切換,強大的操做符等等。你固然能夠作出一輛超級Uber來當全局公交車(RxBus)使用,然而這卻損失了RxJava原本的許多優點,而且又給本身挖了許多坑,得不償失。android

0.1 一個常見誤區,過多的operator

剛開始使用RxJava的時候,咱們會以爲operator的鏈式調用會很是的爽,一個簡單的例子:git

Observable.just("1", "2", "3", "4", "5", "6", "7")
          .map(x -> Integer.valueOf(x))
          .map(x -> x * 2)
          .map(x -> x + 4)
          .filter(x -> x >2)
          // and much more operators
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());複製代碼

當你只有不多數據的時候,這樣固然能夠,可是你數據量上來的時候,這就會有不少的overhead。 其實幾乎全部的operator都會給你生成一個新的Observable。因此在上面這個例子中,咱們在過程當中生成了至少7個Observable。然而咱們徹底能夠將中間的.map().map().map().filter合併在一個FlatMap中,減小不少的overhead。github

1. Observable.just()的侷限性。

  1. 使用Observable.just() 即便你沒有調用subscribe方法。just()括號裏面的代碼也已經執行了。顯然,Observable.just()不適合封裝網絡數據,由於咱們一般不想在subscribe以前作網絡請求。
    舉個例子:
    class TestClass{
    TestClass(){
     System.out.println("I'm created!");
    }
    }
    Observable.just(new TestClass());複製代碼
    這時你運行代碼,你就看到確實你的TestClass 已經被建立了:
    I/System.out: I'm created!複製代碼
    同理,fromIterable也和just有一樣的缺點。固然,這個能夠簡單的用defer()/fromCallable()/create()操做符來是實現只有subscribe只有才加載。
    好比:
    // use fromCallable
    Observable.fromCallable(TestClass::new);
    //or
    Observable.defer(() -> Observable.just(new TestClass()));複製代碼
  2. Observable.just()不夠靈活。雖說設計模式上咱們追求 "Minimize Mutability" 可是若是咱們的程序愈來愈 reactive的時候。一個 ObservableJust 每每是不知足需求的。好比以前必定訂閱的subscriber。若是數據更新了,你不能夠同過ObservableJust 來通知全部的Observable 新數據更新了,須要你的subscriber主動更新。這顯然有悖於咱們追求的reactive programming。 主動pull數據而不是數據告訴你,我更新了而後再作出反應。

固然ObservableJust在不少狀況下,確實不錯。若是你不須要監聽後續的更新,那麼ObservableJust能夠知足你的需求。設計模式

2. Hot Observable 和 cold Observable

這部分是本篇文章的重點!緩存

不少人在封裝數據的時候,並無太多考慮冷熱的問題,一般狀況下並不會出錯。由於目前不少開源項目(Demo)裏除了RxBus,並無太多的RxJava的實時狀況。然而,當你的App愈來愈Reactive的時候,冷熱即是一個必須考慮的問題。
Hot Observable 意思是若是他開始傳輸數據,你不主動喊停(dispose()/cancel()),那麼他就不會停,一直髮射數據,即便他已經沒有Subscriber了。而Cold Observable則是subscribe時纔會發射數據。
然而,問題來了。我上篇文章講過,只有subscribeActual方法調用了的時候,Observable發射數據,那爲何Hot Observable沒有Subscriber也會發射數據,他把數據發射給誰了呢?咱們在解決這個問題以前,先看一下Cold Observable:安全

2.1 Cold Observable

咱們常見的工廠方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer()。 他們的共同點是當你有多個Subscriber的時候,他們的事件是獨立的,舉個例子:微信

Observable interval = Observable.interval(1,TimeUnit.SECONDS);複製代碼

若是咱們有兩個subscriber,那麼他們會各自有本身的計時器,而且互不干擾。效果以下圖:

Hot Observable

2.2 Hot Observable

不一樣於Cold Observable, Hot Observable是共享數據的。對於Hot Observable的全部subscriber,他們會在同一時刻收到相同的數據。咱們一般使用publish()操做符來將ColdObservable變爲Hot。或者咱們在RxBus中經常用到的Subjects 也是Hot Observable。
剛剛咱們剛剛提出了一個問題,

既然Hot Observable在沒有subscriber的時候,還會繼續發送數據,那麼數據究竟發給誰了呢?

其實Hot Observable其實並無發送數據,而是他上層的Observable 發送數據給這個hot Observable。不信?咱們來分別看一下:

2.2.1 ConnectableObservable

咱們在上面的誤區中知道了,幾乎全部operator都會生成一個新的Observable。publish固然不例外。可是有區別的是,publish會給你一個ConnectableObservable。具體實現類是ObservablePublish。這個Observable的區別是他提供一個connect()方法,若是你調用connect()方法,ConnectableObservable就會開始接收上游Observable的數據。咱們來測試一下:

ConnectableObservable interval = Observable.interval(1, TimeUnit.SECONDS).publish();
//connect even when no subscribers
interval.connect();複製代碼

ConnectableObservable

果真,因爲咱們subscribe晚了一些。0這個數據沒有收到,當咱們兩個 Subscriber 都dispose的時候,ConnectableObservable 也仍在接受數據,致使咱們6這個數據沒有接收到。
ConnectableObservable 其實在內部,有一個PublishObserver,他有兩個做用。一個是當咱們調用 connect()方法時, PublishObserver開始接受上游的數據,咱們的例子裏即是 Observable.interval(1, TimeUnit.SECONDS) 。因此才能在咱們沒有調用 subscribe方法時,他也能開始發送數據。第二個做用是 PublishObserver存儲全部的下游Subscriber, 也就是咱們例子中的Subscriber1 和Subscriber2,在 PublishObserver 每次接到一個上游數據,就會將接收到的結果,依次分發給他存儲的全部 Subscribers ,若是下游 Subscriber 調用了 dispose方法,那麼他就會在本身的緩存中刪除這個 Subscriber,下次接受到上游數據便不會傳給這個Subscriber
那麼這時候,有同窗應該要問了:

咱們可不能夠中止從上游接受數據?

咱們固然能夠。 connect()方法會返回一個 Disposable 給咱們來控制是否繼續接受上游的數據。

2.2.2 ConnectableObservable的經常使用操做符

咱們固然不但願每次都手動控制 ConnectableObservable的開關。RxJava給咱們提供了一些經常使用的控制操做符

  1. refCount()
    refCount()能夠說是最經常使用的操做符了。他會把 ConnectableObservable變爲一個一般的Observable但又保持了HotObservable的特性。也就是說,若是出現第一個Subscriber,他就會自動調用 connect()方法,若是他開始接受以後,下游的 Subscribers所有dispose,那麼他也會中止接受上游的數據。具體看圖:

refCount()

每一個 Subscriber 每次都會接受一樣的數據,可是當全部 subscriber 都 dispose時候,他也會自動dipose上游的 Observable 。因此咱們從新subscribe的時候,又從新從0開始。
這個操做符經常使用到,RxJava將他和publish合併爲一個操做符 :share()

  1. autoConnect()
    autoConnect()看名字就知道,他會自動連接,若是你單純調用 autoConnect() ,那麼,他會在你連接第一個 Subscriber 的時候調用 connect(),或者你調用 autoConnect(int Num),那麼他將會再收到Num個 subscriber的時候連接。
    可是,這個操做符的關鍵在於,因爲咱們爲了鏈式調用,autoConnect會返回Observable給你,你不會在返回方法裏得到一個 Disposable來控制上游的開關。 不過沒問題,autoConnect提供了另外一種重載方法 :
    autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)
    他會在這個 Consumer傳給你 你須要的那個總開關。並且,autoConnect並不會autoDisconnect, 也就是若是他即便沒有subscriber了。他也會繼續接受數據。
  2. replay()
    replay()方法和 publish()同樣,會返回一個 ConnectableObservable,區別是, replay()會爲新的subscriber重放他以前所收到的上游數據,咱們再來舉個例子:

    //only replay 3 values
    Observable.interval(1, TimeUnit.SECONDS).replay(3).refCount();複製代碼

    replay()

    果真,Subscriber2在subscribe時候,當即收到了以前已經錯過的三個數據,而後繼續接受後面的數據。
    可是,這裏有幾點須要考慮:replay() 會緩存上游發過來的數據,因此並不須要擔憂從新生成新數據給新的 Subscriber。

  3. ReplayingShare()
    其實ReplayingShare並不能算是ConnectableObservable的一個操做符,他是JakeWhaton的一個開源庫,只有百來行。實現的功能是幾乎和replay(1).refCount()差很少。可是若是中斷 Conncection以後,從新開始subscribe,他仍然會給你一個重放他上一次的結果。 具體看圖:

ReplayingShare()

咱們看到和剛纔的replay不一樣,即便兩個Subscriber都 dispose, 從新開始仍然會接收到咱們緩存過的一個數據。

2.3 Subjects

Subjects 做爲一個Reactive世界中的特殊存在,他特殊在於他本身既是一個Observable又是一個Observer(Subscriber)。你既能夠像普通Observable同樣讓別的Subscriber來訂閱,也能夠用Subjects來訂閱別人。更方便的是他甚至暴露了OnXX(),方法給你。你直接調用能夠通知全部的Subscriber。 這也是RxBus的基礎,RxBus幾乎離不開Subjects。 蜘蛛俠的老爹告訴咱們,力量越大,責任就也大。Subjects也同樣。 Subjects由於暴露了OnXX()方法,使得Subjects的數據來源變得難以控制。並且,Subjects一直是HotObservable,咱們來看下Subject的OnNext()方法的實現:

@Override
public void onNext(T t) {
    if (subscribers.get() == TERMINATED) {
        return;
    }
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    for (PublishDisposable<T> s : subscribers.get()) {
        s.onNext(t);
    }
}複製代碼

能夠看出來Subjects只要調用了OnNext()方法就會當即發送數據。因此,使用時必定要注意Subjects和Subscriber的連接時序問題。具體Subjects的用法我想介紹帖子已經足夠多了。這裏就不贅述了。

3. 在Android中常見的幾種封裝和注意事項

1.封裝View 的Listener

View 的各類Listener 咱們經常使用create方法來封裝,好比OnClickListener:

Observable.create(emitter -> {
    button.setOnClickListener(v -> emitter.onNext("I'm Clicked"));
    emitter.setCancellable(() -> button.setOnClickListener(null));
});複製代碼

這裏很是關鍵的一點是必定要設置解除綁定,不然你將持續使用這個會形成內存泄漏。並且最好配合使用share()。不然只有最後一個Subscriber能收到OnClick。固然,若是不考慮方法數的話,推薦配合使用RxBinding。

並且,用create()方法封裝Listener適合幾乎全部的callback, 而且安全。

2.封裝簡單的數據源

設想一個場景,咱們有一個User類。裏面有咱們的用戶名,頭像,各類信息。然而在咱們的app中,可能有三四個Fragment/Activity須要根據這個User作出不一樣的反應。這時咱們就能夠簡單的使用Subject來封裝User類。

public class UserRepository {
    private User actualUser;

    private Subject<User> subject = ReplaySubject.createWithSize(1);

    /** * *Get User Data from wherever you want Network/Database etc */

    public Observable<User> getUpdate(){
        return subject;
    }

    public void updateUser(User user){
        actualUser = user;
        subject.onNext(actualUser);
    }
}複製代碼

若是咱們某些模塊須要這個User,那麼只須要subscribe到這個Repository,若是User有更新,每個Subscriber都會收到更新後的User而且互相不影響。並且咱們使用ReplaySubject,即便有新的Subscriber,也會收到最新的一個User的緩存。
可是使用的時候必定要注意,由於用的是Subject.因此在onNext方法中一旦出現了error。那麼全部的Subscriber都將和這個subject斷開了連接。這裏也能夠用 RxRelay 代替Subject,簡單來講Relay就是一個沒有onError和onComplete的Subject。

3.簡單的使用concat().first()來處理多來源

Dan Lew在他的博客Loading data from multiple sources with RxJava 中介紹過他這種處理方法,

// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;  
Observable<Data> disk = ...;  
Observable<Data> network = ...;

// Retrieve the first source with data
Observable<Data> source = Observable  
  .concat(memory, disk, network)
  .first();複製代碼

而後在每次作不一樣請求的時候刷新緩存

Observable<Data> networkWithSave = network.doOnNext(data -> {  
  saveToDisk(data);
  cacheInMemory(data);
});

Observable<Data> diskWithCache = disk.doOnNext(data -> {  
  cacheInMemory(data);
});複製代碼

具體也能夠看這篇簡書,我也不在過多贅述 :RxJava(八)concat符操做處理多數據源

這裏也說一下這個方法的缺點。 首先,這個只適合一個Item的時候。若是咱們有多個Item從這個Observable中流出。 fisrt()操做符只會取第一個。

4.本身繼承Observable 手動寫subscribeActual()方法

這多是最靈活的寫法?若是你想用RxJava封裝本身的庫,推薦這種方法封裝。由於這樣不只僅能夠有效的進行錯誤處理,而且不會暴露過多邏輯給外面,許多優秀的RxJava相關庫都是這樣封裝,就連RxJava本身也是把一個個的operator封裝成一個個不一樣的Observable。可是這種方法確實要求很高,要作不少考慮,好比異步,多線程衝突,錯誤處理。對新手不是很推薦。
仍是稍微講一個例子: 仍是咱們的onClickLisnter的封裝:
RxBinding 的 封裝:

final class ViewClickObservable extends Observable<Object> {
  private final View view;

  ViewClickObservable(View view) {
    this.view = view;
  }

  @Override protected void subscribeActual(Observer<? super Object> observer) {
    if (!checkMainThread(observer)) {
      return;
    }
    Listener listener = new Listener(view, observer);
    observer.onSubscribe(listener);
    view.setOnClickListener(listener);
  }

  static final class Listener extends MainThreadDisposable implements OnClickListener {
    private final View view;
    private final Observer<? super Object> observer;

    Listener(View view, Observer<? super Object> observer) {
      this.view = view;
      this.observer = observer;
    }

    @Override public void onClick(View v) {
      if (!isDisposed()) {
        observer.onNext(Notification.INSTANCE);
      }
    }

    @Override protected void onDispose() {
      view.setOnClickListener(null);
    }
  }
}複製代碼

其實這裏雖然代碼更多,可是實質上是剛纔咱們說到的用Observable.create()來封裝沒有不少區別。那咱們爲何還要這麼麻煩本身寫Observable? 由於與create相比,減小了對象個數。 Observable封裝一個OnClickListener 須要 ObservableCreate, ObservableOnSubscribe,ObservableEmitter 這三個類的實例。來確保你封裝出來的Observable 是遵照 Observable Contract的。 而若是你默認本身遵照Observable Contract, 你只須要一個 CustomObservable來實現。減小了兩個對象的生成。 這個觀點也獲得了證明,我在StackOverFlow問過相關問題。很幸運獲得了RxJava 2.x 做者 David 的回覆:

若是直接繼承與Observable來封裝。大概分以下幾步:

  1. 將你須要監聽的類/接口,經過構造方法傳入這個Observable。好比將咱們須要監聽OnClick的View傳入:
    ViewClickObservable(View view) {
     this.view = view;
    }複製代碼
  2. 將你須要使用的Listener/CallBack 和 Disposable/Observer接口結合成爲一個實現類,實現監聽。而且經過構造方法,將下游的Observer傳入,以實現傳輸數據。好比OnClickListener:

    static final class Listener extends MainThreadDisposable implements OnClickListener {
     private final View view;
     private final Observer<? super Object> observer;
    
     Listener(View view, Observer<? super Object> observer) {
       this.view = view;
       this.observer = observer;
     }
    
     @Override public void onClick(View v) {
       if (!isDisposed()) {
         observer.onNext(Notification.INSTANCE);
       }
     }
    
     @Override protected void onDispose() {
       view.setOnClickListener(null);
     }
    }複製代碼

這裏推薦使用內部類的形式,下降內部可見性。並且這裏須要注意的點很是多。
首先Disposable的 兩個方法都須要實現,isDisposed()通常使用AtomicBoolean來控制監聽是否已經取消訂閱,好比:

private final AtomicBoolean unsubscribed = new AtomicBoolean();

    @Override
    public final boolean isDisposed() {
        return unsubscribed.get();
    }複製代碼

dispose()方法通常會放一些取消監聽等方法。好比咱們上面看到的view.setOnClickListener(null);。 這裏 onDispose正常是沒有這個方法的。 這個是Jake Wharton爲了方便封裝出來的接口,放在dispose方法裏運行的。

public abstract class MainThreadDisposable implements Disposable {
    @Override
    public final void dispose() {
        onDispose();
    }
    protected abstract void onDispose();
}複製代碼

他固然還在dipose方法裏作了其餘安全檢查,消除了一些Boilerplate。

其次,這裏的對於Observer的控制是十分寬鬆的。因此你的行爲最好必定遵循Observable Contract。不然出現其餘問題,好比下游取消訂閱上游還在發送數據。 又或者onComplete/onError以後還有onNext出發。 又或者出現異常並不會在onError中獲得等等。 這裏推薦看一下RxBinding其餘實現類的源碼。或者是Retrofit RxJava 2 的 Adapter。 都會頗有幫助。

最後,若是你是封裝一個生產Observable的方法,那麼使用Disposable。若是你是想封裝一個自定義Operator。那麼須要實現Observer接口。使用這個observer來監聽上游數據。自定義Operator實在複雜。這裏我就不講了,我本身也沒精通這個。

  1. 在SubscribeActual裏,註冊監聽方法,仍是咱們剛纔的例子:
@Override protected void subscribeActual(Observer<? super Object> observer) {
    if (!checkMainThread(observer)) {
      return;
    }
    Listener listener = new Listener(view, observer);
    observer.onSubscribe(listener);
    view.setOnClickListener(listener);
  }複製代碼

看了我第二篇文章的朋友這裏應該對subscribeActual方法不陌生。這裏的參數是下游的observer。將他直接傳入咱們剛纔設計好的Listener/Disposable/Observer。 而後經過observer.onSubscribe註冊咱們的disposable。 而後咱們開始註冊監聽。這裏順序很重要。必定要先調用onSubscribe方法再註冊監聽。不然可能會出現下游disposable空指針異常。

總結

這篇文章在簡書上也發了有一陣子了。轉到掘金時我又從新檢查了下,補充了一些內容。可能有些地方看起來與我以前的文章有些許重複。但願見諒。

相關文章
相關標籤/搜索