RxJava是一個異步框架,使用RxJava可使代碼邏輯更加清晰,避免過多的嵌套調用致使代碼可讀性變差。在實際項目中,因爲主線程的繪製機制,咱們常常會使用到線程的切換,將耗時的操做放在工做線程,將結果使用Handler返回給主線程處理。而使用RxJava咱們能夠很方便的實現這一切操做。java
首先咱們先看一個例子:app
Observable.create<Int> {
it.onNext(0)
Log.d(TAG, "Observable: current Thread = ${Thread.currentThread().name}")
}
.subscribeOn(Schedulers.single())
.map {
Log.i(TAG, "${it.inc()} source")
Log.d(TAG, "map: current Thread = ${Thread.currentThread().name}")
it.inc()
}
.subscribeOn(Schedulers.newThread())
.map {
Log.i(TAG, "${it.inc()} source")
Log.d(TAG, "map: current Thread = ${Thread.currentThread().name}")
it.inc()
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.i(TAG, "${it.inc()} source")
Log.d(TAG, "Observer: current Thread = ${Thread.currentThread().name}")
}
複製代碼
首先大概說一下上述例子,能夠看到首開始是建立了一個Observable而後發射了一個int類型的數據源,而後分別進行了兩次map操做,最後交由Observer處理能夠看到分別進行了三次線程切換操做,下面是上述例子的輸出:框架
TAG: 1 source
TAG: map: current Thread = RxSingleScheduler-1
TAG: 2 source
TAG: map: current Thread = RxSingleScheduler-1
TAG: Observable: current Thread = RxSingleScheduler-1
TAG: 3 source
TAG: Observer: current Thread = main
複製代碼
能夠看到,數據發射源,以及兩個map都與運行在single()
線程,可是代碼中確實也是切換了一次newThread(),可是沒有生效,最後的Observer運行在main線程,說明observeOn生效了,這是爲何呢?下面咱們帶着這個問題去看看源碼:異步
在開始源碼閱讀前,咱們首先先說明一個概念,就是RxJava的事件流的訂閱順序其實是自下而上的。按照上述例子來講,首先訂閱的是observeOn -> map -> suscribeOn -> map -> subscribeOn -> ObservableCreate,爲何這麼說呢?咱們知道,真正的訂閱是經過subscribe方法,那麼先看一下subscribe的源碼:ide
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
//調用下面的重載方法
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
...
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//根據傳入的參數生成一個LambdaObserver
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
//真正的訂閱入口
subscribe(ls);
return ls;
}
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//執行訂閱步驟
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
複製代碼
上面的代碼就是subscribe的調用步驟,咱們能夠看出最後一步調用subscriActual(observer)
是真正執行訂閱的方法:函數
protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼
能夠看到subscribeActual是一個抽象方法,它的實如今哪呢?在上述例子中,最後一步調用的是observeOn(AndroidSchedulers.mainThread()).subscribe()
,也就是說明,observeOn()的返回結果,實現了第一個subscribeActual,那麼來看一下observeOn的實現:oop
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//調用下面的重載方法
return observeOn(scheduler, false, bufferSize());
}
...
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
能夠看到observeOn的最後一行,調用了RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
:ui
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
複製代碼
從RxJavaPlugin.onAssembly()
是將傳入的Observable經過一個轉換函數轉換爲另外一個Observable, 但咱們這裏沒有定義這個轉換函數,所以,最終獲得的仍是開始傳入的參數ObservableObserveOn。因此observeOn返回的是一個ObservableObserveOn對象,也就是說subscribe首先執行了ObservableObserveOn
的subscribeActual
方法,那麼咱們首先看一下它的源碼:this
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
//將傳入的Observable保存起來,也就是事件流中的Observable。
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//建立指定的線程
Scheduler.Worker w = scheduler.createWorker();
//調用上一步生成的Observable的subscribe方法
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
上面就是ObservableObserveOn的源碼,咱們能夠看到這個類首先作的是將source保存起來,這個source就是事件流中上一步生成的Observable,在subscribeActual中,首先是生成了一個線程,這個就是咱們想說的切換線程的關鍵步驟,而後調用source.subscribe去訂閱事件流中上一步生成的Observable。這裏就解釋了爲何RxJava的事件流訂閱順序是自下而上的。spa
咱們能夠看到在source.subscribe
中傳入了一個ObserverOnObserver,將所生成的線程做爲參數傳了進去。這個Observer就是Observable的觀察者,咱們看一下它的onNext方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
...
void schedule() {
if (getAndIncrement() == 0) {
//在工做線程中執行該Observer的方法
worker.schedule(this);
}
}
複製代碼
從上面的源碼來看,onNext經過調用worker.schedule()運行在worker的線程中,onError()、onComplete()都是同樣。這樣就實現了線程的切換,那麼這個線程又是如何生成的呢?咱們來看看scheduler.createWorker()
方法:
@NonNull
public abstract Worker createWorker();
複製代碼
createWorker也是一個抽象方法,它的實現就是咱們傳入的AndroidSchedulers.mainThread()
:
private static final class MainHolder {
//生成主線程
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
//生成一個HandlerScheduler,並將主線程的Looper做爲參數傳入
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
//對MAIN_THREAD進行檢查
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
複製代碼
上述代碼註釋已經說明了生成主線程的步驟,值得注意的是,這裏用到的是一個內部類的方式生成的其實是一個單例的HandlerScheduler對象。當咱們知道了在上面的提到的ObservableObserveOn中的worker是HandlerScheduler對象以後,咱們來分別看看createWorker()和worker.schedule(this)方法在HandlerScheduler中的實現便可:
createWorker():
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
複製代碼
因爲createWorker返回了一個HandlerWorker對象,所以schedule的實現也就是在HandlerWorker中:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//因爲該Handler的Looper是主線程Looper,因此該Handler運行在主線程中
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
複製代碼
上述代碼就是將咱們傳入的run方法的實現(ObservableObserveOn)傳遞給了ScheduleRunnable,而ScheduleRunnable也實現了Runnable接口:
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
複製代碼
到這裏能夠看出來,真正執行的是ObservableObserveOn.ObserveOnObserver
中的run方法:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
...
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
//這裏是從事件隊列中取出訂閱事件
...
//調用Observer的onNext方法,也就是咱們實現的onNext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
複製代碼
到此就完成了一個訂閱的流程,那麼咱們能夠總結一下observeOn的工做順序:
針對subscribeOn來講,流程和observeOn基本差很少,只是區別在於subscribeOn改變的是數據源的運行線程,而Observer是切換Observer所在的線程,這一點咱們能夠在subscribeOn的源碼中看出來:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
這裏調用了一個scheduleDirect方法而且建立了一個SubscribeTask(parent)
:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
scheduleDirect的工做就是建立了指定的線程,而且調用了schedule方法。而後再看看SubscribeTask:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
從這裏看出source.subscribe(parent)
經過run()運行在指定線程中,從而實現了上游數據源的線程切換,而因爲RxJava是由下而上訂閱的順序,所以subscribeOn只有第一個指定的切換有效,屢次設置是無效的。而observeOn則是每次切換都有效。