本文的全部分析都是基於 RxJava2 進行的。如下的 RxJava 指 RxJava2 閱讀本文你將會知道:java
從 RxJava1.0 到 RxJava2.0,在項目開發中已經使用了很長時間這個庫了。鏈式調用,絲滑的線程切換很香,可是若是沒弄清楚其中的奧妙很容易掉進線程調度的坑裏。這篇文章咱們就來對 RxJava 的訂閱過程、時間發送過程、線程調度進行分析git
先說結論github
subscribe()
開始自下向上訂閱,這也是整個事件流的起點,當訂閱開始整個操做纔會生效執行爲了更便於理解訂閱的流轉方向,我將Observable調用 subscribe()
訂閱描述爲了 Observer beSubscribed()
網絡
此過程對應圖中黑色箭頭
部分,以操做符中的map()
操做爲例:app
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
調用map
操做符時,RxJavaPliguns 會註冊一個新的 ObservableMap
對象,查看其它操做符會發現都有對應的 Observable
對象產生。同時,上游的 Observabe
會做爲 source
參數傳入賦值給這個新的 Observable
的 source
屬性。層層向下,能夠對這個新生成的 Observable
又能夠繼續使用操做符。ide
當調用最後一個 Observable
的 subscribe()
方法時,即開始訂閱過程。此過程對應圖中紅色箭頭
部分源碼分析
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
複製代碼
在調用subscribe(Observer)
時實際上會去調用各個 Observable
實現子類中的 subscribeActual()
方法:ui
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼
而在這個subscribeActual()
方法也很簡單,調用了 source
去訂閱一個新生成的 Observer
對象,同時這個新的MapObserver
會將調用subscribe()
時傳入的 observer
,賦值給downstream
屬性。這樣每一級訂閱都會將上級的 Observable
、本級生成的 Observer
、訂閱下級傳入的Observer
聯繫起來,直到達到 Observable 最初建立的地方整個訂閱過程結束。this
此過程對應圖中綠色箭頭
部分Observable 事件起點建立有不少中操做符,他們都會建立出最初發送的事件/數據,以 ObservableCreate
爲例:spa
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
訂閱時會調用source.subscrebe(parent)
,而這個source
又是從哪兒來的呢?
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
複製代碼
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("data")
}
})
複製代碼
從代碼中咱們能夠看出,這個 source 即爲咱們建立時傳入的 ObservableOnSubscribe
,所以emitter.onNext("data")
便是事件發送的起點。咱們再繼續看emitter
的 onNext()
作了什麼:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
複製代碼
源碼中現實調用了observer.onNext()
,而這個observer
則是前面訂閱過程當中 source.subscribe(new MapObserver<T, U>(t, function))
傳入的那個 observer
,從而將事件發送到了下一級,下一級的 Observer 一樣在 onNext()
將事件發送到更下一級,一直到最終咱們 subscribe()
時傳入的那個Observer
實例完畢。
事件訂閱發送流程經過上面的文章基本已經可以摸清了,咱們接下來關注另外一個重點 線程調度
問題。
RxJava 中線程變換經過 subscribeOn()
和 observeOn()
兩個操做來進行。其中 subscribeOn()
改變的是訂閱線程的執行線程,即事件發生的線程。observeOn()
改變的是事件結果觀察者回調所在線程,即 onNext()
方法所在的線程。
subscribeOn()
指定的是網絡請求的線程,
observeOn()
指定的是網絡請求後事件流的執行線程。
前面說過,每次操做符的使用,RxJava 都會生成一個對應的新的 Observable
對象。observeOn()
與 subscribeOn()
也不例外。線程調度的核心邏輯都在 ObservableSubscribeOn
與 ObservableObserveOn
兩個類中
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
調用 subscribeOn()
時會產生一個新的ObservableSubscribeOn
並把當前這個Observable
和傳入的 Scheduler
做爲參數傳入。前面分析過當最終調用 subscribe()
時會引發整個觀察鏈的 Observable
自下而上調用 subscribe()
,而這個subscribe()
方法中實際爲調用抽象類 Observable
的各個實現子類的 subscribeActual()
方法 。
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
主要看這句 scheduler.scheduleDirect(new SubscribeTask(parent));
,SubscribeTask
前面內容已經分析過,就是調用上級 Observable
來訂閱生成的這個 SubscribeOnObserver
。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
scheduleDirect
方法,會使用傳入的 scheduler
在指定的線程建立一個 Worker
對象來執行SubscribeTask
,從而達到了切換訂閱線程的目的。因此多個subscribeOn()
疊加時,最終線程仍是會回到最後執行的(代碼第一次出現的)subscribeOn()
指定的線程。
調用 observeOn(Scheduler)
方法,會調用內部的同名方法生成一個新的 ObservableObserveOn
對象,並把當前這個Observable
和傳入的 Scheduler
做爲參數傳入。訂閱過程與ObservableSubscribeOn
不同,會直接在當前線程調用上級Observable
訂閱本身,,咱們主要看ObservableObserveOn
的ObserveOnObserver
是如何調度結果數據發送的線程的。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
複製代碼
從源碼中能夠發現,最終會使用 worker
去向下游發送事件。這個 worker
就是咱們observeOn()
方法中指定的線程建立的 worker。從而達到切換線程的目的,因爲事件又是自上而下的,因此每次切換都能在下游事件中感覺到線程的變化。
把subscribeOn()
和 observeOn()
放一塊兒來講不太容易說明白其中的線程變換,我先看看單獨使用其中的一個操做符的時候,致使的線程變化。
Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
.subscribeOn(Schedulers.io())
.doOnSubscribe {
Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
}
.map {
Log.d("Map 2 ", Thread.currentThread().name)
return@map it
}
.subscribeOn(Schedulers.newThread())
.doOnSubscribe {
Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
}
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
})
複製代碼
執行結果:
從日誌能夠看出:subscribeOn
訂閱線程將會發生改變,直到下次調用 subscribeOn
RxCachedThreadScheduler-1
Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
// }
// .subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 2 ", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
// }
// .subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
})
複製代碼
執行結果:
從日誌能夠看出:observeOn
觀察結果回調線程都將切換一次(main -->RxNewThreadScheduler-1 -->RxNewThreadScheduler-2)咱們把上述代碼中註釋部分都打開,獲得的日誌以下:
經過上面的三第二天志打印咱們能夠看出:訂閱鏈的日誌自下而上打印完畢後,再自上而下打印觀察結果。subscribeOn
會切換線程,並非像有的文章所說只有第一次指定線程(即自下而上的最後一次)有效。第一次有效只是咱們的錯覺,由於訂閱是自下而上的,無論前面的線程怎樣切換追蹤都會切換到 subscribeOn
第一次指定線程(即自下而上的最後一次)。咱們在回調結果中未進行線程切換操做時,只能感知到這一次線程切換 (Map1 與 doOnSubscribe 1 所在線程一致)。observeOn
的每次指定線程都會讓事件流切換到對應的線程中去。完整的事件訂閱和發送流程以下圖所示,從咱們調用 subscribe()
將觀察者和觀察對象關聯起來開始,subscribe()
中傳入的 Observer 的 onNext
或 onError
結束,造成了一個逆時針的 n
形的鏈條。右邊部分的觀察鏈中,每次 subscribeOn
都會切換觀察線程。左邊部分的事件發送鏈,會從觀察鏈的最後一次指定的線程開始發送事件,每次調用 observeOn
都會指定新的事件發送線程。
參照上面的源碼和日誌分析,再結合本圖相信你們會對 RxJava 的現場調度有一個更立體的認識