RxJava2.0實用操做符總結及原理簡析

歡迎關注微信公衆號「隨手記技術團隊」,查看更多隨手記團隊的技術文章。
本文做者:周浩源
原文連接:mp.weixin.qq.com/s/OJCEyH1gJ…html

大概從2015年開始,RxJava1.0開始快速流行起來,短短兩年時間,RxJava在Android開發中已經算是無人不知無人不曉了,加之它與Retrofit等流行框架的完美結合,已經成爲Android項目開發的必備利器。隨手記做爲一個大型項目,引入三方框架一直比較慎重,但也從今年初開始,正式引入了RxJava2.0,並配合Retrofit對項目的網絡框架和繁瑣的異步邏輯進行重構。RxJava雖然好用,但伴隨而來的是不可避免的學習成本,爲了讓你們快速的瞭解RxJava的前因後果以及快速上手使用,特意總結該篇文章。本文將詳細講解如何快速理解RxJava的操做符,並從源碼角度來分析RxJava操做符的原理。java

RxJava的優勢

簡單來說RxJava是一個簡化異步調用的庫,但其實它更是一種優雅的編程方式和編程思想,當你熟悉RxJava的使用方式以後,會很容易愛上它。
我總結它的優勢主要有兩個方面:react

  • 簡潔,免除傳統異步代碼邏輯中的callback hell
  • 增長業務邏輯代碼的可讀性

關於第一點你們應該都會認同,關於第二點可能有人會有疑惑,由於不少人以爲RxJava大量不明因此的操做符會讓代碼的可讀性變得更差,其實產生這種印象偏偏就是由於沒有掌握RxJava操做符的使用和原理所致使的。
好比隨手記項目中綁定用戶QQ帳號的業務邏輯,這段邏輯的代碼涉及三個異步接口,兩個是QQ登陸SDK的,一個是隨手記後臺的,在使用RxJava重構前,這段代碼使用了3個AsyncTask,也就是三個嵌套的回調,代碼複雜,可讀性很是差。而改造以後,它變成了下面這樣子git

若是你對這裏面的幾個RxJava操做符比較熟悉的話,你會迅速瞭解我這段代碼作了什麼事情,並且不用再去梳理一堆嵌套回調了,這就是RxJava帶來的可讀性。
因此,學習RxJava,理解和掌握操做符是不可避免的第一步。github

RxJava2.0與RxJava1.0的關係

從RxJava1.0到RxJava2.0,基本思想沒有變化,但RxJava2.0按照Reactive-Streams規範對整個架構進行了從新設計,並變動了Maven倉庫依賴地址和包名。因此如今RxJava的github網站中,RxJava1.0和RxJava2.0是兩個獨立的分支,不相互兼容 ,也不能同時使用,並且RxJava1.0再過一段時間也將再也不維護。因此,目前還使用RxJava1.0的,建議儘早切換到RxJava2.0,而若是沒有接觸過RxJava1.0,直接使用和學習RxJava2.0就能夠了。若是想了解RxJava1.0和RxJava2.0的詳細區別,請參考官方文檔
爲行文方便,今後處開始,本文使用Rx來表示RxJava2.x。編程

Rx的操做符有哪些

剛接觸Rx的人面對一堆各式各樣的操做符會以爲不知如何去學習記憶,其實你只須要從總體上了解Rx操做符的類別和掌握一些使用頻率較高的操做符就足夠了,至於其餘的操做符,你只須要知道它的使用場景和掌握如何快速理解一個操做符的方法,就能夠在須要的時候快速拿來用了。
下圖是我根據官方文檔總結的Rx操做符的分類及每一個類別下的表明性操做符
數組

Rx主要操做符
Rx主要操做符

從上圖能夠看出,Rx的操做符主要十個大類別,每一個類別下經常使用的操做符也就三五個左右,因此只要掌握這些,你就能夠應付大部分的業務場景了。

如何快速理解一個Rx操做符

提到Rx操做符,相信不少人都會對描述Rx操做符的花花綠綠的寶石圖有很大印象。
bash

Rx寶石圖
Rx寶石圖

要快速理解Rx操做符,看懂寶石圖是個快捷有效的方式,如今咱們就來詳細分析一下構成寶石圖的各個主要元素。
首先,咱們有必要回顧一下Rx中的幾個主要的基類

  • io.reactivex.Flowable : 事件源(0..N個元素), 支持 Reactive-Streams and 背壓
    • io.reactivex.Observable:事件源(0..N個元素), 不支持背壓
    • io.reactivex.Single: 僅發射一個元素或產生error的事件源,
    • io.reactivex.Completable: 不發射任何元素,只產生completion或error的事件源
    • io.reactivex.Maybe: 不發射任何元素,或只發射一個元素,或產生error的事件源
    • Subject: 既是事件源,也是事件接受者
      能夠看到Rx中最重要的概念就是事件源了,基本上全部的操做符都是針對事件源來進行一些轉換、組合等操做,而咱們最經常使用的事件源就是Observable了。

本文中咱們就以Observable事件源爲例來說解Rx的操做符,Observable發射的事件咱們統一稱之爲item。首先咱們須要詳細瞭解一下寶石圖中各個圖像元素的含義:微信

  • —>Observable的時間線,從左至右流動
  • :星星、圓、方塊等表示Observable發射的item
  • |:時間線最後的小豎線表示Observable的事件流已經成功發射完畢了
  • X:時間線最後的X符合表示因爲某種緣由Observable非正常終止發射,產生了error

上面幾種元素組合在一塊兒表明一個完整的Observable,也能夠稱爲源Observable網絡

-->方向朝下的虛線箭頭表示以及中間的長方框表示正在對上面的源Observable進行某種轉換。長方框裏的文字展現了轉換的性質。下面的Observable是對上面的源Observable轉換後的結果。

掌握了寶石圖的含義,咱們就能夠根據某個操做符的寶石圖快速理解這個操做符了。舉幾個例子:

1. map

map
map

能夠看到,這幅圖表達的意思是一個源 Observable前後發射了一、二、3的三個item,而通過 map操做符一轉換,就變成了一個發射了十、20、30三個item的新的 Observable。描述操做符的長方框中也清楚的說明了該 map操做符進行了何種具體的轉換操做(圖中的10*x只是一個例子,這個具體的轉換函數是能夠自定義的)。
因而,咱們就很快速地理解了 map操做符的含義和用法,簡單來說,它就是經過一個函數將一個 Observable發射的item逐個進行某種轉換。
示例代碼:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer integer) throws Exception {
        return integer * 10;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer result) throws Exception {
        Log.i(TAG, "accept : " + result +"\n" );
    }
});複製代碼

輸出結果:

2. zip


根據 zip的寶石圖,能夠知道zip操做符的做用是把多個源 Observable發射的item經過特定函數組合在一塊兒,而後發射組合後的item。從圖中還能夠看到一個重要的信息是,最終發射的item是對上面的兩個源 Observable發射的item按照發射順序逐個組合的結果,並且最終發射的 1A等item的發射時間是由組合它的 1A等item中發射時間較晚的那個item決定的,也正是如此, zip操做符常常能夠用在須要同時組合處理多個網絡請求的結果的業務場景中。
示例代碼:

Observable.zip(Observable.just(1, 2, 3),
        Observable.just("A", "B", "C"),
        new BiFunction<Integer, String, String>() {
            @Override
            public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
                return integer + s;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, "zip : accept : " + s + "\n");
            }
        });複製代碼

輸出結果:

3. concat


從寶石圖能夠看出, concat操做符的做用就是將兩個源 Observable發射的item鏈接在一塊兒發射出來。這裏的鏈接指的是總體鏈接,被 concat操做後產生的 Observable會先發射第一個源 Observable的全部item,而後緊接着再發射第二個源 Observable的全部的item。
示例代碼:

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, "concat : " + integer + "\n");
            }
        });複製代碼

輸出結果:

大部分操做符都配有這樣的寶石圖,經過官方文檔或者直接在Rx源碼中查看JavaDoc就能夠找到,再也不過多舉例。你也能夠在rxmarbles這樣的網站上查看更多能夠動態交互的寶石圖。

Rx操做符的原理

要了解操做符的原理,確定要從源碼入手嘍。因此咱們先來簡單擼一遍Rx的最基本的Create操做符的源碼。
Rx的源碼目錄結構是比較清晰的,咱們先從Observable.create方法來分析

Observable.create(new ObservableOnSubscribe<String>() {
  @Override
  public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
      e.onNext("s");
  }
}).subscribe(new Observer<String>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
    // 建立的Observer中多了一個回調方法onSubscribe,傳遞參數爲Disposable ,Disposable至關於RxJava1.x中的Subscription,用於解除訂閱。
  }

  @Override
  public void onNext(@NonNull String s) {

  }

  @Override
  public void onError(@NonNull Throwable e) {

  }

  @Override
  public void onComplete() {

  }
});複製代碼

create方法以下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}複製代碼

代碼很簡單,第一行判空不用管,第二行調用RxJavaPlugins的方法是爲了實現Rx的hook功能,咱們暫時也無需關注,在通常狀況下,第二行代碼會直接返回它的入參即ObservableCreate對象,ObservableCreateObservable的子類,實現了Observable的一些抽象方法好比subscribeActual。事實上Rx的每一個操做符都對應Observable的一個子類。
這裏create方法接受的是一個ObservableOnSubscribe的接口實現類:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}複製代碼

經過註釋能夠知道這個接口的做用是經過一個subscribe方法接受一個ObservableEmitter類型的實例,俗稱發射器。
Observable.create方法執行時,咱們傳入的就是一個ObservableOnSubscribe類型的匿名內部類,並實現了它的subscribe方法,而後它又被傳入create方法的返回對象ObservableCreate,最終成爲ObservableCreate的成員source

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ...複製代碼

接着咱們來看Observablesubscribe方法,它的入參是一個Observer(即觀察者,也就是事件接收者)

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

       subscribeActual(observer);
   } catch (NullPointerException e) { // NOPMD
       throw e;
   } catch (Throwable e) {
       Exceptions.throwIfFatal(e);
       // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
       RxJavaPlugins.onError(e);

       NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
       npe.initCause(e);
       throw npe;
   }
}複製代碼

最終它會調用它的子類ObservableCreatesubscribeActual方法:

@Override
protected void subscribeActual(Observer<? super T> observer) {
   CreateEmitter<T> parent = new CreateEmitter<T>(observer);
   observer.onSubscribe(parent);

   try {
       source.subscribe(parent);
   } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
   }
}複製代碼

subscribeActual裏首先建立了用於發射事件的CreateEmitter對象parentCreateEmitter實現了接口EmitterDisposable,並持有observer
這段代碼的關鍵語句是source.subscribe(parent),這行代碼執行後,就會觸發事件源進行發射事件,即e.onNext("s")會被調用。細心的同窗也會注意到這行代碼以前,parent先被傳入了observeronSubscribe()方法,而在上面咱們說過,observeronSubscribe()方法接受一個Disposable類型的參數,能夠用於解除訂閱,之因此可以解除訂閱,正是由於在觸發事件發射以前調用了observeronSubscribe(),給了咱們調用CreateEmitter的解除訂閱的方法dispose()的機會。
繼續來看CreateEmitteronNext()方法,它最終是經過調用observeronNext()方法將事件發射出去的

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


   private static final long serialVersionUID = -3434801548987643227L;

   final Observer<? super T> observer;

   CreateEmitter(Observer<? super T> observer) {
       this.observer = observer;
   }

   @Override
   public void onNext(T t) {
       if (t == null) {
           onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
           return;
       }
       // 在真正發射以前,會先判斷該CreateEmitter是否已經解除訂閱
       if (!isDisposed()) {
           observer.onNext(t);
       }
   }
   ...
}複製代碼

至此,Rx事件源的建立和訂閱的流程就走通了。

下面咱們從map操做符來入手看一下Rx操做符的原理,map方法以下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}複製代碼

map方法接受一個Function類型的參數mapper,返回了一個ObservableMap對象,它也是繼承自Observable,而mapper被傳給了ObservableMap的成員function,同時當前的源Observable被傳給ObservableMap的成員source,進入ObservableMap

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}複製代碼

能夠看到這裏用到了裝飾者模式,ObservableMap持有來自它上游的事件源sourceMapObserver持有來自它下游的事件接收者和咱們實現的轉換方法function,在subscribeActual()方法中完成ObservableMapsource的訂閱,觸發MapObserveronNext()方法,繼而未來自source的原始數據通過函數mapper轉換後再發射給下游的事件接收者,從而實現map這一功能。

如今咱們終於可以來總結一下包含多個操做符時的訂閱流程了,如下面這段代碼爲例

Observable.
        create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("holen");
            }
        })
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(@NonNull String s) throws Exception {
                return s.length();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });複製代碼

執行代碼時,自上而下每一步操做符都會建立一個新的Observable(均爲Observable的子類,對應不一樣的操做符),當執行create時,建立並返回了ObservableCreate,當執行map時,建立並返回了ObservableMap,而且每個新的Observable都持有它上游的源Observable(即source)及當前涉及到的操做函數function。當最後一步執行訂閱方法subscribe時會觸發ObservableMapsubscribeActual()方法,並將最下游的Observer包裝成MapObserver,同時該方法又會繼續調用它所持有ObservableCreate的訂閱方法(即執行source.subscribe),由此也會觸發ObservableCreatesubscribeActual()方法,此時咱們的發射器CreateEmitter纔會調用它的onNext()方法發射事件,再依次調用MapObserver的操做函數mapperonNext()方法,最終將事件傳遞給了最下游的ObserveronNext()方法。

我簡單的將這段邏輯用下面這幅圖來表示

操做符liftcompose

liftcompose在Rx中是兩個比較特殊的操做符。
lift讓咱們能夠對Observer進行封裝,在RxJava1.0中大部分變換都基於lift這個神奇的操做符。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
   ObjectHelper.requireNonNull(lifter, "onLift is null");
   return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}複製代碼

lift操做符接受一個ObservableOperator對象

/**
 * Interface to map/wrap a downstream observer to an upstream observer.
 *
 * @param <Downstream> the value type of the downstream
 * @param <Upstream> the value type of the upstream
 */
public interface ObservableOperator<Downstream, Upstream> {
    /**
     * Applies a function to the child Observer and returns a new parent Observer.
     * @param observer the child Observer instance
     * @return the parent Observer instance
     * @throws Exception on failure
     */
    @NonNull
    Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}複製代碼

看註釋能夠知道,這是一個將下游訂閱者包裝成一個上游訂閱者的接口。相似Map操做符中的MapObserver。

compose操做符讓咱們能夠對Observable進行封裝

@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
   return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}複製代碼

wrap方法以下,僅僅是走了RxJavaPlugins的流程

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   if (source instanceof Observable) {
       return RxJavaPlugins.onAssembly((Observable<T>)source);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}複製代碼

compose方法接受一個ObservableTransformer對象

/**
 * Interface to compose Observables.
 *
 * @param <Upstream> the upstream value type
 * @param <Downstream> the downstream value type
 */
public interface ObservableTransformer<Upstream, Downstream> {
    /**
     * Applies a function to the upstream Observable and returns an ObservableSource with
     * optionally different element type.
     * @param upstream the upstream Observable instance
     * @return the transformed ObservableSource instance
     */
    @NonNull
    ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}複製代碼

ObservableSource即爲咱們的基類Observable繼承的惟一接口。看註釋能夠知道,ObservableTransformer是一個組合多個Observable的接口,它經過一個apply()方法接收上游的Observable,進行一些操做後,返回新的Observable
這裏組合多個Observable的意思其實就是組合多個操做符,好比咱們常常會須要在使用Rx進行網絡異步請求時進行線程變化,這個操做通常都是差很少的,每次都寫會比較煩,這時咱們就可使用compose把經常使用的線程變換的幾個操做符組合起來

private final ObservableTransformer schedulersObservable = new ObservableTransformer() {
   @Override
   public ObservableSource apply(Observable upstream) {
       return upstream.subscribeOn(Schedulers.io())
               .unsubscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread());
   }
};  

protected void testCompose() {
   getNetObservable()
           .compose(schedulersObservable)
           .subscribe(new Consumer<String>() {
               @Override
               public void accept(@NonNull String s) throws Exception {
                   mRxOperatorsText.append(s);
               }
           });
}複製代碼

關於compose的典型應用,你們有興趣還能夠去看一下開源項目RxLifecycle,它就是巧妙地利用compose操做符來解決了使用Rx可能會出現的內存泄露問題。

Rx操做符的應用場景

說了這麼多,其實咱們最關心的仍是Rx操做符的應用場景。其實只要存在異步的地方,均可以優雅地使用Rx操做符。好比不少流行的Rx周邊開源項目

而針對本身想要實現的功能情景,如何去選擇特定的操做符,官網的文檔中也列出了一些指導——Rx操做符決策樹

固然除了這些,咱們在開發項目時,還會有各類具體的業務場景須要選擇合適的操做符,這裏我總結了一些常常遇到的場景以及適合它們的操做符

只要咱們理解了Rx操做符的原理,熟練掌握了一些使用頻率較高的操做符,就可以在以上場景中輕鬆地使用,再也不讓本身的代碼被複雜的業務邏輯搞得混亂。


以上就是本文的所有內容,關於Rx還有不少東西值得深刻地學習研究,後續有機會再跟你們分享更多Rx的使用心得。

參考

相關文章
相關標籤/搜索