重學Rx(一)


一.爲何重學?
java

       從開始接觸rx到如今也有幾年時間,但僅僅侷限於簡單的用,一直處於看似懂其實不懂的狀態,相比較讓本身思想更加rx仍是有必定的距離。關於Flowable/Observable、cold/hot、subject/Processor,操做符結合使用,都須要去梳理清楚。android

      其次,mvvm開發模式是我心裏嚮往的一種開發模式,但同時又對Databinding那種蠻橫的數據綁定方式以及難定位語法的錯誤和運行時的崩潰緣由望而卻步。當知道mvvm模式除了依賴Databinding以外還能夠依靠rx去實現以後,更加堅決了我擁抱rx的決心。數據庫

Tips:本文主要基於Rxjava 2.0複製代碼

二.從哪裏學呢?緩存

        從操做符的基本用法開始學?立刻我否認了這個想法。就好像自定義控件API不是重點同樣,Rx的操做符也不該該是重點。重點應該放在比較異同,感覺設計目的,最後落腳點應該纔是學習操做符。Rx試圖統一同步和異步回調這類總結性的語言先不去說就從rx是一個擴展的觀察者模式提及。既然是觀察者模式就應該有被觀察者(Observable)觀察(Observer)兩個角色和訂閱(Subscribe)一個動做。第一個任務應該先熟知Observable的分類。bash

三.被觀察者(Observable)分類網絡

3.1按照熱冷分類異步

cold Observablehot Observable,它們的區別是:mvvm

Hot Observable 不管有沒有訂閱,事件始終都會發生(發生的前提是調用connect,ps:connect方法是ConnectableObservable類內的方法)。當 Hot Observable 有多個訂閱者時,Hot Observable 與訂閱者們的關係是一對多的關係,能夠與多個訂閱者共享信息學習

Cold Observable 只有訂閱纔開始發送事件。而且Cold Observable 和訂閱者只能是一對一的關係,當有多個不一樣的訂閱者時,消息是從新完整發送的。ui

Rx提供常見建立的Observable默認狀況下都是冷流----訂閱才發送。以下入建立方式


首先怎麼建立熱流

方式一:經過冷流轉換成熱流。

冷流調用publish方法轉換成熱流——實質Observable轉換成ConnectableObservable。要觸發一個熱流發送數據並非去訂閱,而是調用connect方法。若是不調用connect方法,即便訂閱了熱流也不會觸發上游發送事件。

方式二:直接建立熱流

咱們點進去publish方法,發現以下代碼:

public final ConnectableObservable<T> publish() {
        return ObservablePublish.create(this);
    } 複製代碼

能夠看到其本質仍是用過ObservablePublish$create方法去建立熱流的。由此能夠得到直接建立熱流的方法。直接建立熱流也須要調用connect方法,觸發上游事件

方式三: 藉助 Subject(它自己是hot) 轉換爲 Hot Observable

subject便是被觀察者又是觀察者,當 Subject 做爲觀察者時,它能夠訂閱目標 Cold Observable 使對方開始發送事件。同時它又做爲被觀察者轉發或者發送新的事件,讓 Cold Observable 藉助 Subject 轉換爲 Hot Observable。

  作爲觀察者接受數據:

Observable<Object> objectObservable = Observable.create(e -> {
                Observable.interval(500, TimeUnit.MILLISECONDS).subscribe(e::onNext);
            });

            PublishSubject<Object> subject = PublishSubject.create();
            objectObservable.subscribe(subject);複製代碼

 作爲被觀察者發送數據 :

subject.subscribe(e->{});複製代碼

讓 Cold Observable 藉助 Subject 轉換爲 Hot Observable。關於Subject另開一篇闡述。

方式四:replay方法

replay()方法和 publish()同樣,會返回一個 ConnectableObservable,區別是, replay()會爲新的subscriber重放他以前所收到的上游數據(指定緩存以前數量)。

冷流能轉換成熱流,那麼反過來,熱流能轉換成冷流嘛?

不能!

       不過發現熱流中有一個refCount操做符,被他調用事後返回一個Observable對象,看似轉化成了冷流,其實和冷流仍是不一樣的,返回的它擁有獨有的特性:只要有訂閱者,數據流就不會中止,若是全部的訂閱者都取消訂閱了,則數據流中止。更加相似android中綁定服務的特性。-----我本身叫它伴熱不冷流。

一樣的冷流中也提供了share方法變成一個伴熱不冷流。

public final Observable<T> share() {
        return publish().refCount();
    } 複製代碼

3.2按照是否支持背壓分類

        背壓是指在異步場景中,被觀察者發送事件速度遠快於觀察者的處理速度的狀況下,一種請求才去發送的策略。在rx1中有的操做符支持背壓,有的不支持背壓,混在一塊兒。rx2中它們分開了。


  • Observeable流的建立方法,Flowable也一樣擁有(Create、just  ...)。
  • Flowable默認建立的也是冷流(想要轉換成熱流一樣有四種方法)。
    1. 經過publish方法。經過調用publish方法。
    2. 經過 FlowablePublish$create方法直接建立
    3. 經過Processor類,它和Subject是一個道理,無非是它支持背壓。
    4. 經過replay()方法。

咱們去對比Observeable/Flowable,從表面上觀察他們的流的思路:

不支持背壓:被觀察者是主動的推送數據給觀察者,觀察者是被動接收的。

支持背壓:觀察者主動從被觀察者那裏去拉取數據,而被觀察者變成被動的等待通知再發送數據。

3.3其餘的被觀察者

3.3.1 Single特色

建立的是一個cold流不能直接轉換成hot流,同時也不支持背壓,只發射單個數據錯誤事件,只有onSuccess、onError回調方法,可是能夠經過toXXX方法轉換成Observable、Flowable、Completable以及Maybe。


3.3.2 Completable特色

建立的是一個cold流,也不能直接轉換成hot流,同時也不支持背壓,不發射數據,只有onComplete、onError回調方法,可是能夠經過toXXX方法轉換成Observable、Flowable、Single以及Maybe。


3.3.3 Maybe特色

建立的是一個cold流,也不能直接轉換成hot流,同時也不支持背壓。是Single和Completable的結合體。可是能夠經過toXXX方法轉換成Observable、Flowable、Single。


那麼問題來了

1.爲何要這麼多類型的被觀察者?這不是畫蛇添足嘛?

   這個問題以我目前的水平還不能用簡單直白的語言描述清楚,只能經過場景去闡述:爲何用Single?仔細思考一個網絡請求場景,咱們根本不須要onComplete回調,爲了使用意圖更加明確,因此提供了Single被觀察者。爲何用Completable?好比向本地數據庫存數據,而後存完成後,流繼續往下發送,當咱們使用Completable而後結合andThen回讓使用意圖包括封裝更加明確。Maybe同樣的道理--------使用意圖更加明確

2.這些被觀察者和觀察者之間有什麼繼承關係嗎?

上述五對被觀察者和觀察者以前功能類似、方法相同。可是經過查看源碼發現:被觀察者都是各自實現本身的接口,同時觀察者也是各自實現本身的接口——保證了他們各自的拓展或者配套的操做符不會相互影響。這就意味着不少操做符都有多套------沒有繼承關係

四. 訂閱方法重載

每個訂閱方法都有多個重載方法:

public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
  
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete) {
        return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
    }

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        return ls;
    }
    public final void subscribe(Observer<? super T> observer) { 
    }複製代碼

咱們能夠經過返回的Disposable取消訂閱,那麼有一個問題來了,void返回值這個怎麼取消訂閱?經過觀察者回調方法返回Disposable用於取消訂閱。可是Flowable並無從觀察者中返回這個Disposable對象。那麼怎麼取消訂閱防止內存泄漏呢?Flowable提供了subscribeWith這個方法能夠返回當前訂閱的觀察者,而且經過ResourceSubscriber DisposableSubscriber等觀察者來提供 Disposable的接口。

小彩蛋

tips1:

observeOn 後面的全部操做都會在它指定線程工做。subscribeOn 指定的線程是從這個Observable生成一直到遇到其餘 observeOn。若是程序須要屢次切換線程,使用屢次observeOn是徹底能夠的。 而subscribeOn只有最上方的subscribeOn會起做用

tips2:

儘可能避免過多的使用操做符,由於每一個操做符都會根據操做符的特性生成新的Observable,訂閱他的上游而後給下游發送數據,避免使用過多的操做符能夠下降內存抖動。

tips3

connect方法返回一個Disposable對象,用來取消熱流發送數據。

相關文章
相關標籤/搜索