友好 RxJava2.x 源碼解析(一)基本訂閱流程

系列文章:app

本文 csdn 地址:友好 RxJava2.x 源碼解析(一)基本訂閱流程ide

本文基於 RxJava 2.1.3源碼分析

前言

本文基於讀者會使用 RxJava 2.x 而講解,基本原理不涉及,示例只純粹爲示例而示例。

示例代碼

示例源碼:
Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("1");
                    emitter.onNext("2");
                    emitter.onNext("3");
                    emitter.onComplete();
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("TAG", "onSubscribe():  ");
                }

                @Override
                public void onNext(String s) {
                    Log.e("TAG", "onNext():  " + s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.e("TAG", "onComplete():  ");
                }
            });
複製代碼

輸出結果:post

E/TAG: onSubscribe():  
E/TAG: onNext():  1
E/TAG: onNext():  2
E/TAG: onNext():  3
E/TAG: onComplete():  
複製代碼

訂閱流程解析

咱們知道 subscribe() 方法是 Observable 和 Observer 的鏈接點,因此首先戳進 subscribe(Observer observer) 中,能夠發現該方法是 Observable 類的方法,傳入了一個 Observer 對象,那首先咱們須要弄明白這裏的 Observable 和 Observer 分別是什麼,觀察上方示例代碼咱們能夠知道 Observer 是 new 出來的,因此咱們只須要知道 Observable 是什麼,固然,這裏也很清晰,Observable 就是咱們調用 Observable.create(ObservableOnSubscribe) 所建立出來的 Observable,來一張圖 ——spa

Observable.create(ObservableOnSubscribe)

Observable 和 Observer 咱們都弄清楚了,接下來就是查看 subscribe(Observer) 具體的實現了,以下 ——.net

@Override
public final void subscribe(Observer<? super T> observer) {
	// 略去其餘源碼
    subscribeActual(observer);
	// 略去其餘源碼
}
複製代碼

略去非關鍵源碼後咱們發現它只作了一件事,就是調用 Observable#subscribeActual(observer),而在 Observable 中該方法是一個抽象方法:線程

protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼

這意味着咱們須要去找它的子類,咱們要看看它的 subscribeActual(Observer) 方法,那咱們就得從 create(ObservableOnSubscribe) 着手,看它是如何將一個 ObservableOnSubscribe 對象轉換成一個 Observable 對象的——3d

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
	// 略去其餘源碼
    return new ObservableCreate<T>(source);
}
複製代碼

一樣的,刪除非關鍵源碼以後,咱們就剩下這麼一行代碼,這也就是意味着咱們須要從 ObservableCreate 這個類中去尋找 subscribeActual(Observer) 的實現了,這裏筆者須要說起兩點——code

  1. 從上述方法能夠看出 ObservableCreate 是 Observable 的一個子類cdn

  2. 咱們自定義的 ObservableOnSubscribe 做爲一個名爲 source 字段被傳入了。事實上在 Observable 的子類實現中,它們都有一個名爲 source 的字段,指代上游 Observable(其實是 ObservableOnSubscribe,可是咱們不妨理解成就是 Observable)。

ObservableCreate#subscribeActual() 實現以下:

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 觸發 Observer#onSubscribe(Disposable)
    observer.onSubscribe(parent);

    try {
        // 發射事件
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
複製代碼

第5行調用了 Observer#onSubscribe(Disposable) ,因此咱們能夠知道 Observer#onSubscribe(Disposable) 是先被調用的,而此時 Observable 甚至尚未開始發射事件!接下來就是調用了 source.subscribe(ObservableEmitter),這個方法是交由開發者去實現的,在示例代碼是以下所寫 ——

@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
      emitter.onNext("1");
      emitter.onNext("2");
      emitter.onNext("3");
      emitter.onComplete();
  }
複製代碼

在代碼中咱們調用了 CreateEmitter 對象的 onNext() 方法,因此咱們須要戳入 CreateEmitter 類中看一下 onNext(T) 的具體實現(固然 onComplete() 方法等同,此處就不作擴展了),源碼以下:

@Override
    public void onNext(T t) {
		// 略去其餘源碼
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
複製代碼

一目瞭然,噹噹前對象並不處於 DISPOSED 狀態時,那麼就將會調用下游 Observer 的 onNext(T) 方法,而下游 Observer 的 onNext(T) 方法也就是咱們上面示例代碼中所寫的——

public void onNext(String s) {
    Log.e("TAG", "onNext():  ");
}
複製代碼

至此,基本訂閱流程咱們就理清楚了。咱們從 Observable#subscribe(Observer) 開始,將 Observer 傳給 Observable,而 Observable 又會在 onNext(T) 方法中激活 Observer 的 onNext(T) 方法。咱們在示例只涉及了少許的 Observable/Observer,事實上,咱們在 RxJava 中運用的操做符都會在內部建立一個 Observable 和 Observer,雖然在 Observable#subscribeActual(Observer) 中都有本身特定的實現,可是它們大部分都是作兩個操做,一是將「下游」傳來的 Observer 根據需求進行封裝;二就是讓「上游」的 Observable subscribe() 該 Observer。

訂閱流程

通過了如上的分析後,筆者但願讀者可以理解 RxJava2.x 的基本訂閱流程是從 Observable#subscribe(Observer) 開始的,而該方法會觸發「上游」 Observable 的 Observable#subscribeActual(Observer) 方法,而在該「上游」 Observable 中又會觸發「上游的上游」Observable 的 Observable#subscribeActual(Observer) 方法。咱們不妨用如下述源碼舉例:

Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
        }
    })
    .flatMap(new Function<String, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(String s) throws Exception {
            return Observable.just(s);
        }
    })
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(String s) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });
複製代碼

另附一張圖,圖中標明瞭後面講到的「第一個 Observable」、「第二個 Observable」等名詞:

例圖

接下來用圖展現整個訂閱的流程——

Observable#subscribe(Observer) 流程

Observable#subscribe(Observer) 以前:

Observable#subscribe(Observer) 之前

準備觸發 Observable#subscribe(Observer)

準備觸發 Observable#subscribe(Observer)

Observable#subscribe(Observer) 將會致使其上游 Observable 的 subscribe(Observer) 方法被調用:

Observable#subscribe(Observer) 以後

上游 Observable 的 subscribe(Observer) 方法內部又會調用上游的上游 Observable 的 subscribe(Observer)

觸發上級 Observer

Observable#subscribe(Observer) 會調用 Observable#subscribeActual(Observer) ,該方法是一個抽象方法,由子類覆寫,因此展示了 Observable 的多態性,並且如何激活上游 Observable 的 subscribe(Observer)/subscribeActual(Observer) 方法的關鍵點也在此。實現方式就在於 Observable#subscribeActual(Observer) 方法雖然是一個抽象方法,可是它的子類實現中都包含有一句 source.subscribe(Observer),其中 source 就是上游 Observable(其實是 ObservableSource,可是咱們此處不妨就理解成 Observable,畢竟咱們對這個對象更熟悉一些,Observable 是 ObservableSource 接口的實現),因此就能夠理解在每個 Observable 的 subscribeActual(Observer) 方法中它都會調用上游的 subscribe(Observer)/subscribeActual(Observer) 方法,直至到達第一個 Observable 的 subscribe(Observer)/subscribeActual(Observer) 中。

Observer#onSubscribe(Disposable) 流程

訂閱的關係鏈理清了,可是尚未發射事件的流程還沒出來啊,咱們繼續往下走——

到達頂部 Observable 的時候,已經不能再往上走了,就要準備搞事情(準備發射事件了),此處咱們就以示例代碼中的 Observable 爲例,它的 subscribeActual(Observer) 中——

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
複製代碼

它首先封裝了一個 Disposable,接下來將調用 Observer#onSubscribe(Disposable) 將 Disposable 做爲參數傳給下一層 Observer。

Disposable 初始化

到了下一層的 Observer 的 onSubscribe(Disposable) 中,該方法中針對上一層 Disposable 作一些操做(判斷、封裝等),而後再封裝一個 Disposable 做爲參數傳遞給 Observer#onSubscribe(Disposable)

Disposable 傳遞

而此時的 Observer 就是咱們所自定義的 Observer——

Observer#onSubscribe(Disposable)

Observer#onNext(T) 流程

Observer#onSubscribe(Disposable) 流程結束後,就執行到第7行代碼 Observeable.subscribe(Observer),實質上也就是——

new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("1");
    }
})
複製代碼

ps:爲了方便起見,此處只分析 onNext() 執行流程。

ObservableEmitter#onNext(T) 的內部實際上會觸發 Observer 的 onNext(T) 方法——

發射事件

再向下觸發就是咱們所自定義的最底層的 Observer 了——

發射事件

以示例代碼來講,頂遊 Observable 會觸發 ObservableEmitter#onNext(T) 方法,在該方法的內部又觸發了「下游」 Observer 的 onNext(T) 方法,而在該方法內部又會觸發「下游的下游」 Observer 的 onNext(T) 方法,直至最底層的 Observer —— 咱們所自定義的 Observer ——

Observer#onNext(T)

到此,一套訂閱流程就執行完畢了。

相關文章
相關標籤/搜索