RxJava2源碼分析(二):操做符原理分析

前言:上一篇文章RxJava2源碼分析(一):基本流程分析,是對RxJava2基本流程的分析,有了上一篇的基礎,這篇就再深刻一點,開始分析一下RxJava2操做符的原理。html

  爲了方便理解RxJava2操做符的原理,這裏選擇最經常使用的map操做符來說解操做符的原理,示例代碼以下java

private void basicUseRxJava() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "我是數字" + integer;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("wizardev", "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

    }
複製代碼

內容回顧

  由於這一篇的內容是在上一篇的基礎上進行講解的,因此在講解操做符以前,先回顧一下前一篇主要的知識,以下:app

  1. Observable執行的create方法後返回的是ObservableCreate實例。
  2. create方法的參數,實際是注入到ObservableCreate類中,做爲它的成員變量。
  3. 調用Observable的subscribe方法最終調用的是ObservableCreate類中的subscribeActual方法。

操做符分析

  一樣,這裏分析源碼的順序依然按照代碼的執行順序,create方法前文已經分析過了,這裏就直接看map方法,map方法的代碼以下ide

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
複製代碼

有了前一篇的經驗,能夠直接從這個方法中得出結論map方法返回的是ObservableMap實例,同時將map方法的參數及Observable自身注入了其構造方法中。源碼分析

  如今看下ObservableMap類的源碼,以下ui

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);//調用了其父類的構造方法
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    //省略部分無關代碼
    //...
}
複製代碼

從上面的代碼中能夠看出,ObservableMap繼承至AbstractObservableWithUpstream,繼續進入AbstractObservableWithUpstream類中看下源碼,以下this

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}
複製代碼

從上面的一段代碼,能夠知道AbstractObservableWithUpstream類其實就是Observable類的裝飾類,這個類的做用就是將實例化的Observable注入進來,做爲其成員變量。分析到這裏能夠得出這幾個類的關係以下spa

 注:ObservableMap中的成員變量function就是咱們寫的這段代碼code

new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "我是數字" + integer;
            }
        }
複製代碼

好了,到這裏算是將map方法所作的事情分析完了,下面來看實例代碼的最後一個方法subscribe.cdn

subscribe方法分析

  經過上一篇文章能夠知道subscribe方法實際調用的是Observable子類的subscribeActual方法,而這裏調用subscribe方法的類是ObservableMap,因此這裏調用的就是ObservableMap類的subscribeActual方法。如今來看下ObservableMap類的subscribeActual方法的源碼,以下(爲了分析方便,這裏將與subscribeActual方法有關的代碼一塊兒貼了出來)

final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
複製代碼

從上面的代碼能夠發現ObservableMap類的subscribeActual方法又調用了其上游的subscribe方法,爲了便於理解這裏解釋一下文中所說的上游下游

文中說的「上游」及「下游」實際上是相對而言的,這裏的「上游」是靠近Observable的,如示例代碼中的subscribe是最下游,map是其上游,而map操做符又是crate方法的下游。

這裏上游的subscribe方法就是ObservableCreate調用的subscribe方法,實際就是調用ObservableCreate的subscribeActual方法,接着就是前一篇講過的流程了。

結論

  分析到如今能夠得出如下結論

  • subscribe方法的調用流程是從下往上的,就是從下游往上游分別調用其subscribe方法。

爲了方便理解,這裏我用時序圖表示subscribe的調用順序,以下圖

接收數據流程分析

  上面的內容分析了RxJava2基本流程加入操做符後的subscribe方法的執行順序,接着就來看下,數據的接收順序。通過上面的分析能夠知道最終調用的是ObservableCreate類的subscribeActual方法,這裏與前一篇文章subscribeActual方法不一樣的就是subscribeActual方法的參數改變了,這裏的參數是MapObserver類的實例,再來看下ObservableCreate類的subscribeActual方法的源碼,以下

//這裏的參數實際是MapObserver類的實例
protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            //這句代碼的最終調用的就是MapObserver類的onNext方法。
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

一些重要的內容已經在上面的代碼中進行了註釋,可能這句source.subscribe(parent);代碼,很差理解,這裏就來解釋一下這句代碼,由上一篇文章可知,這裏的source就是示例代碼中的new ObservableOnSubscribe<Integer>()...實例,這裏就是調用了這個實例的subscribe方法,而這個方法中的代碼就是調用了其參數的onNext方法,**最終調用的就是MapObserver類的onNext方法。**如今,來看下MapObserver類的onNext方法的代碼,以下

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
            //一、
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //二、
            downstream.onNext(v);
        }
複製代碼

主要來看下v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");這句代碼,這句代碼中的mapper就是注入到ObservableMap中的成員變量function,詳細內容能夠查看源碼。調用的apply方法,就是示例代碼中的這句代碼

@Override
            public String apply(Integer integer) throws Exception {
                return "我是數字" + integer;
            }
複製代碼

接着,能夠發現又調用了「2」處的代碼downstream.onNext(v);,這句代碼中的downstream就是示例代碼中最下游的subscribe方法中的參數便是下面的代碼

new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("wizardev", "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        }
複製代碼

因此這句downstream.onNext(v);調用的就是new Observer<String>()匿名類中的onNext方法。關於downstream的是什麼時候初始化的,能夠從MapObserver的父類BasicFuseableObserver類中知曉。

結論

  分析到這裏,又能夠得出一些結論

  • 關於數據的處理上從上游到下游一級級的處理的。
  • 在MapObserver類中的onNext方法,首先調用的是function中的apply方法,而後再調用下游的onNext方法並將處理後的參數傳入。

總結

  經過分析map操做符,能夠知道訂閱方法(subscribe)是從下游到上游進行訂閱的,而數據的發射是從上游到下游進行的。這兩個特性不單單是map操做符的特性,對其餘的操做符一樣適用。爲了講清楚這兩個特性,本文就選了比較具備表明性的map操做符,若是想了解其餘操做符的原理,就順着這兩個特性分析就好了。

  授之以魚,不如授之以漁。本文的目的就是解釋清楚操做符的思想及原理,理解了這種思想及原理,分析其餘的操做符也就不在話下了。

歡迎關注個人公衆號
掃碼關注公衆號,回覆「獲取資料」有驚喜
相關文章
相關標籤/搜索