系列文章:app
本文 csdn 地址:友好 RxJava2.x 源碼解析(一)基本訂閱流程ide
本文基於 RxJava 2.1.3源碼分析
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete(): ");
}
});
複製代碼
輸出結果:post
E/TAG: onSubscribe():
E/TAG: onNext(): 1
E/TAG: onNext(): 2
E/TAG: onNext(): 3
E/TAG: onComplete():
複製代碼
咱們知道 subscribe()
方法是 Observable 和 Observer 的鏈接點,因此首先戳進 subscribe(Observer observer)
中,能夠發現該方法是 Observable 類的方法,傳入了一個 Observer 對象,那首先咱們須要弄明白這裏的 Observable 和 Observer 分別是什麼,觀察上方示例代碼咱們能夠知道 Observer 是 new 出來的,因此咱們只須要知道 Observable 是什麼,固然,這裏也很清晰,Observable 就是咱們調用 Observable.create(ObservableOnSubscribe)
所建立出來的 Observable,來一張圖 ——spa
Observable 和 Observer 咱們都弄清楚了,接下來就是查看 subscribe(Observer)
具體的實現了,以下 ——.net
@Override
public final void subscribe(Observer<? super T> observer) {
// 略去其餘源碼
subscribeActual(observer);
// 略去其餘源碼
}
複製代碼
略去非關鍵源碼後咱們發現它只作了一件事,就是調用 Observable#subscribeActual(observer)
,而在 Observable 中該方法是一個抽象方法:線程
protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼
這意味着咱們須要去找它的子類,咱們要看看它的 subscribeActual(Observer)
方法,那咱們就得從 create(ObservableOnSubscribe)
着手,看它是如何將一個 ObservableOnSubscribe 對象轉換成一個 Observable 對象的——3d
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 略去其餘源碼
return new ObservableCreate<T>(source);
}
複製代碼
一樣的,刪除非關鍵源碼以後,咱們就剩下這麼一行代碼,這也就是意味着咱們須要從 ObservableCreate
這個類中去尋找 subscribeActual(Observer)
的實現了,這裏筆者須要說起兩點——code
從上述方法能夠看出 ObservableCreate 是 Observable 的一個子類cdn
咱們自定義的 ObservableOnSubscribe 做爲一個名爲 source
字段被傳入了。事實上在 Observable 的子類實現中,它們都有一個名爲 source
的字段,指代上游 Observable(其實是 ObservableOnSubscribe,可是咱們不妨理解成就是 Observable)。
ObservableCreate#subscribeActual()
實現以下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 觸發 Observer#onSubscribe(Disposable)
observer.onSubscribe(parent);
try {
// 發射事件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
第5行調用了 Observer#onSubscribe(Disposable)
,因此咱們能夠知道 Observer#onSubscribe(Disposable)
是先被調用的,而此時 Observable 甚至尚未開始發射事件!接下來就是調用了 source.subscribe(ObservableEmitter)
,這個方法是交由開發者去實現的,在示例代碼是以下所寫 ——
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
複製代碼
在代碼中咱們調用了 CreateEmitter 對象的 onNext()
方法,因此咱們須要戳入 CreateEmitter 類中看一下 onNext(T)
的具體實現(固然 onComplete()
方法等同,此處就不作擴展了),源碼以下:
@Override
public void onNext(T t) {
// 略去其餘源碼
if (!isDisposed()) {
observer.onNext(t);
}
}
複製代碼
一目瞭然,噹噹前對象並不處於 DISPOSED
狀態時,那麼就將會調用下游 Observer 的 onNext(T)
方法,而下游 Observer 的 onNext(T)
方法也就是咱們上面示例代碼中所寫的——
public void onNext(String s) {
Log.e("TAG", "onNext(): ");
}
複製代碼
至此,基本訂閱流程咱們就理清楚了。咱們從 Observable#subscribe(Observer)
開始,將 Observer 傳給 Observable,而 Observable 又會在 onNext(T)
方法中激活 Observer 的 onNext(T)
方法。咱們在示例只涉及了少許的 Observable/Observer,事實上,咱們在 RxJava 中運用的操做符都會在內部建立一個 Observable 和 Observer,雖然在 Observable#subscribeActual(Observer)
中都有本身特定的實現,可是它們大部分都是作兩個操做,一是將「下游」傳來的 Observer 根據需求進行封裝;二就是讓「上游」的 Observable subscribe()
該 Observer。
通過了如上的分析後,筆者但願讀者可以理解 RxJava2.x 的基本訂閱流程是從 Observable#subscribe(Observer)
開始的,而該方法會觸發「上游」 Observable 的 Observable#subscribeActual(Observer)
方法,而在該「上游」 Observable 中又會觸發「上游的上游」Observable 的 Observable#subscribeActual(Observer)
方法。咱們不妨用如下述源碼舉例:
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
return Observable.just(s);
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
另附一張圖,圖中標明瞭後面講到的「第一個 Observable」、「第二個 Observable」等名詞:
接下來用圖展現整個訂閱的流程——
在 Observable#subscribe(Observer)
以前:
準備觸發 Observable#subscribe(Observer)
:
Observable#subscribe(Observer)
將會致使其上游 Observable 的 subscribe(Observer)
方法被調用:
上游 Observable 的 subscribe(Observer)
方法內部又會調用上游的上游 Observable 的 subscribe(Observer)
:
Observable#subscribe(Observer)
會調用Observable#subscribeActual(Observer)
,該方法是一個抽象方法,由子類覆寫,因此展示了 Observable 的多態性,並且如何激活上游 Observable 的subscribe(Observer)
/subscribeActual(Observer)
方法的關鍵點也在此。實現方式就在於Observable#subscribeActual(Observer)
方法雖然是一個抽象方法,可是它的子類實現中都包含有一句source.subscribe(Observer)
,其中 source 就是上游 Observable(其實是 ObservableSource,可是咱們此處不妨就理解成 Observable,畢竟咱們對這個對象更熟悉一些,Observable 是 ObservableSource 接口的實現),因此就能夠理解在每個 Observable 的subscribeActual(Observer)
方法中它都會調用上游的subscribe(Observer)/subscribeActual(Observer)
方法,直至到達第一個 Observable 的subscribe(Observer)/subscribeActual(Observer)
中。
訂閱的關係鏈理清了,可是尚未發射事件的流程還沒出來啊,咱們繼續往下走——
到達頂部 Observable 的時候,已經不能再往上走了,就要準備搞事情(準備發射事件了),此處咱們就以示例代碼中的 Observable 爲例,它的 subscribeActual(Observer)
中——
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
它首先封裝了一個 Disposable,接下來將調用 Observer#onSubscribe(Disposable)
將 Disposable 做爲參數傳給下一層 Observer。
到了下一層的 Observer 的 onSubscribe(Disposable)
中,該方法中針對上一層 Disposable 作一些操做(判斷、封裝等),而後再封裝一個 Disposable 做爲參數傳遞給 Observer#onSubscribe(Disposable)
。
而此時的 Observer 就是咱們所自定義的 Observer——
在 Observer#onSubscribe(Disposable)
流程結束後,就執行到第7行代碼 Observeable.subscribe(Observer)
,實質上也就是——
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
})
複製代碼
ps:爲了方便起見,此處只分析
onNext()
執行流程。
在 ObservableEmitter#onNext(T)
的內部實際上會觸發 Observer 的 onNext(T)
方法——
再向下觸發就是咱們所自定義的最底層的 Observer 了——
以示例代碼來講,頂遊 Observable 會觸發 ObservableEmitter#onNext(T)
方法,在該方法的內部又觸發了「下游」 Observer 的 onNext(T)
方法,而在該方法內部又會觸發「下游的下游」 Observer 的 onNext(T)
方法,直至最底層的 Observer —— 咱們所自定義的 Observer ——
到此,一套訂閱流程就執行完畢了。