RxJava2.x源碼解析(一):訂閱流程

如今網上已經有大量的源碼分析文章,各類技術的都有。但我以爲不少文章對初學者並不友好,讓人讀起來雲裏霧裏的,比源碼還源碼。究其緣由,是根本沒有從學習者的角度去分析。在本身完成了源碼閱讀以後,卻忘記了本身是如何一步步提出問題,進而走到這裏的。php

因此,我想在本篇及之後的文章中,花更多的精力去進行源碼的分析,爭取用淺顯易懂的語言,用適合的邏輯去組織內容。這樣不至於陷入源碼裏,致使文章難懂。儘可能讓更多的人願意去讀源碼。css

閱讀本文,你須要對 RxJava2 的一些基本使用有所瞭解,不過不用太深。這裏推薦下Season_zlc給初學者的RxJava2.0教程(一) ,比較淺顯易懂。 java

提到 RxJava,你第一個想到的詞是什麼?react

「異步」。android

RxJava 在 GitHub 上的官網主頁也說了,「RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.」(RxJava是一個使用可觀測序列來組建異步、基於事件的程序的庫,它是 Reactive Extensions 在Java虛擬機上的一個實現)。它的優勢嘛,用扔物線凱哥的話講,就是「簡潔」,而且「隨着程序邏輯變得愈來愈複雜,它依然可以保持簡潔」。nginx

這裏要注意一點,雖然對大多數人來說,更多的是使用 RxJava 來配合 Retrofit、OkHttp 進行網絡請求框架的封裝及數據的異步處理,可是,RxJava和網絡請求本質上沒有半毛錢的關係。它的本質,官網已經說的很明白了,就是「異步」。git

RxJava 基於觀察者模式實現,基於事件流進行鏈式調用。github

首先,咱們須要添加必要的依賴,這裏以最新的2.2.8版本爲例:安全

    implementation "io.reactivex.rxjava2:rxjava:2.2.8"
複製代碼

固然,對於 Android 項目來說,咱們通常還須要添加一個補充庫:網絡

    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
複製代碼

這個庫其實就是提供了 Android 相關的主線程的支持。

而後寫個簡單的代碼,就能夠開始咱們的源碼分析啦。

        // 上游 observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });

        // 下游 observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                // onSubscribe 方法會最早被執行
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };

        // 將上游和下游進行關聯
        observable.subscribe(observer);
複製代碼

爲便於理解,我故意將能夠鏈式調用的代碼,拆成了三部分。你徹底能夠寫成下面的鏈式風格:

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                // onSubscribe 方法會最早被執行
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
複製代碼

一樣,爲了便於理解,我會借用i/o流裏面常常用到的水流進行類比。將被觀察者 observable 稱爲上游(upstream),將觀察者 observer 稱爲下游(downstream)。讀源碼其實也能看出,做者自己也正是這麼類比的。

經過將整個過程拆分紅三個步驟,能更清晰的理清邏輯。咱們須要作的,本質上就是建立一個上游和一個下游,最終經過上游對象的subscribe方法將兩者關聯起來:

  1. 建立一個 Observable 的實現類
  2. 建立一個 Observer 的實現類
  3. 將兩者經過 Observable 的 subscribe(…) 方法將兩者進行關聯

明白了這三點,之後咱們就不會被各類實現類搞的眼花繚亂。

這三個步驟,裏面的核心是第三部,也就是訂閱過程,畢竟,這屬於一個動做,而咱們進行源碼分析的時候,每每就是從動做開始的。這時候,咱們Ctrl/Command + 鼠標左鍵,進入該方法看看,裏面作了下什麼。

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // RxJavaPlugins是個鉤子函數,用來在代碼的執行先後插入進行一些操做
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            // 關鍵點是這行代碼
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
複製代碼

這裏將this(上游Observable類型)的和下游observer做爲參數傳給了 RxJavaPlugins 的 onSubscribe(…)方法,並返回一個Observer,同時,將原來的observer指向這個返回值,那麼咱們看看這個函數中到底進行了什麼操做:

    //  RxJavaPlugins.java
    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }
複製代碼

這裏判斷onObservableSubscribe是否爲 null,不爲 null 則調用其 apply(…) 方法。若爲 null ,則直接返回原來的observer。而該變量須要經過RxJavaPlugin的setOnSingleSubscribe(...)方法來指定的,顯然,咱們並無指定,因此忽略無論(後面遇到相似問題,基本也均可以忽略)。

回到以前的訂閱流程,就能夠簡化爲下面這樣:

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            ...
            // 調用到具體實現子類的 subscribeActual(observer) 方法
            subscribeActual(observer);
        } catch (
            ...
        }
    }
複製代碼

從上面代碼能夠看出,訂閱過程,即調用Observable的subscribe(...)的過程,其實就是直接調用了其實現類的subscribeActual(observer)方法(該方法在 Observable 中是個抽象方法)。之後咱們遇到這個方法,就直接去 Observable 的實現類中找便可,就不會亂了。

一些熟悉RxJava的朋友可能會說,有時候咱們經過subscribe(...)訂閱的並非Observer對象,而是consumer對象,有各類重載。以下:

當你傳入的是Consumer的時候,無論你傳遞了幾個參數,最終都會代用到如下方法,那些你沒傳遞的 onError或者 onComplete 回調等等,會自動使用默認建立的值。

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe)
 
{
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        // 最終都會封裝成一個 LambdaObserver,並做爲參數傳入subscribe(...)方法中
        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }
複製代碼

能夠看出,這裏最終仍是將這些 Consumer 對象包裝在了一個 LambdaObserver 類型的變量中,而後又調用了subscribe(...)方法,將其做爲變量傳入,以後的分析,就跟上面是同樣的了。

訂閱方法講完了,咱們也知道最終調用到了 Observable 的實現類的subscribeActual(...)方法。那接下來確定就是要弄懂在這個方中作了什麼事。咱們例子中是使用Observable.create(...)方法建立的 observable:

        // 上游 observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });
複製代碼

其中,Observable.create(...)方法的實現是這樣的:

    public static <T> Observable<T> create(ObservableOnSubscribe<Tsource) {
        ObjectHelper.requireNonNull(source"source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

咱們傳進去了一個實現了ObservableOnSubscribe接口的匿名內部類,該接口類也很簡單,就定義了一個void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception抽象方法。

而後咱們將傳進來的source(剛剛提到的匿名內部類ObservableOnSubscribe)封裝進一個ObservableCreate對象中,又傳進了RxJavaPlugins.onAssembly(...)中,這個RxJavaPlugins類剛纔咱們說過,其實就是一個hook類,暫時直接忽略,通常就是直接把傳進來的參數返回了(不放心的話能夠本身點進去,之後遇到該方法再也不贅述)。

也就是說Observable.create(...)方法最終建立了一個ObservableCreate對象。注意,該對象是Observable抽象類的具體實現類。

特別注意!
特別注意!
特別注意!

重要事情說三遍。咱們這裏經過create(...)方法建立的Observable的具體實現子類是ObservableCreate。該子類的命名是有規律可言的。我在分析源碼的時候有時候就想,這麼多看起來名字都同樣的類,RxJava的開發者本人不會懵逼嗎?做爲一個用戶量這麼大的庫,確定各類都有講究,確定有貴了。嗯。規律就是生成的子類的命名方法爲「Observable+建立該類的方法名」,即:在建立該類的方法名稱前面加上個Observable,以此來做爲新的類的名稱。

不信?

咱們還能夠經過Observable.just(...)這種方式來建立Observable,點進去看看具體子類名字是啥:

其餘的本身就去驗證吧。

因此,咱們之後遇到Observable開頭的類名,就能夠猜想它是一個Observable類型的變量,類名後面的部分,就是建立該變量的方法(確保嚴謹,倒推可能不成立,要仔細確認)。

一樣的,各類Observer的實現類也是相似,只不過各類它們是把建立的方法放在了前面,而後以Observer結尾而已,這點以後遇到的時候會再說起。

回到剛纔講的。咱們經過create(…)方法,建立出來的是ObservableCreate,它是個Observable,那咱們就直接看它的subscribeActual(...)方法究竟作了什麼:

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 首先調用下游 observer 的 onSubscribe方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

首先,將observer包裝進CreateEmitter對象中。

而後當即調用 Observer 的onSubscribe(parent)方法,表示訂閱過程完成。(當咱們經過subscribe(...)進行訂閱的時候,會當即調用下游Observer 的onSubscribe(...)方法。經過查看其它實現類,能夠總結出該結論)。

這裏,會將咱們的封裝類CreateEmitter做爲參數傳進onSubscribe(...)方法中。

以後,又在代碼source.subscribe(parent)中將其做爲參數傳遞。這裏的source,是源的意思,其實也就是上游。此例子中具體指咱們傳入Observable.create(...)中的ObservableOnSubscribe類型的匿名內部類。

而咱們已經實現了該抽象方法:

        // 上游 observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "subscribe: ");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });
複製代碼

咱們以後就是調用的傳進來的ObservableEmitteronNext/onError/OnComplete來發送事件的。等等,咱們建立的時候不是傳進來的是CreateEmitter嗎,怎麼又變成了ObservableEmitter?其實,CreateEmitter是ObservableCreate的一個 static final 類型的內部類,而且實現了ObservableEmitter接口。由於是由create方法建立的,因此這樣命名咯,同時,又做爲內部類定義在 ObservableCreate 中,這樣,用到的時候是否是就不那麼凌亂啦?

到這裏,咱們知道了會經過回調emitter的各類方法來發送事件,這些事件又是怎麼被observer 正確接收並處理的呢?

咱們繼續回到 ObservableCreate 的subscribeActual(...)方法:

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

咱們發送事件,最終調用的實際上是 parent(即 CreateEmitter) 中的相應方法,而 CreateEmitter 裏又封裝了 observer。咱們到 CreateEmitter 這個類的源碼中,看看發送事件的時候,都幹嗎了:

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable 
{

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
        // 這裏返回了一個 SerializedEmitter,並傳入 this,也就是 CreateEmitter 對象
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            // 這裏判斷,是否已經處於 disposed 狀態,
            // 注意 get() 是定義在 AtomicReference 中的方法
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }
複製代碼

這裏的代碼也是比較簡單的,就是將發送的事件中的參數直接傳遞給 observer 中的相應方法。只不過中間多了背壓的判斷(該類實現了Disposable 接口)。同時注意,該類仍是 AtomicReference 的子類,能夠實現原子操做。而且在覆寫的 ObservableEmitter 的serialize()接口中建立並返回了一個SerializedEmitter,這些都是跟線程安全以及背壓相關的,不是本文的重點。

還有一點,須要你們注意,從RxJava2.x開始,已經不容許向onNext/onError中傳null值,不然會報空指針,這點在上面的源碼中也能看到。這就會對封裝網絡請求的時候產生影響,好比請求驗證驗證碼接口成功,可是後臺返回的 result 字段爲 null,咱們此時可能仍然想要它調用 onNext 方法去執行成功的回調。那這就須要額外的處理了。網上也有一些解決方案,可是總以爲不夠優雅,有大佬有比較好的建議,也能夠指點下。

好啦,本篇文章就寫到這裏,帶你們完成了訂閱、事件的發送及處理的整個流程。

關於線程切換的內容,放在下一篇文章RxJava2.x 源碼解析(二): 線程切換中講。畢竟,不談線程切換,談什麼 RxJava源碼 分析,哈哈。

歡迎關注公衆號來獲取其餘最新消息,有趣的靈魂在等你。

相關文章
相關標籤/搜索