經過前一篇的RxJava2 是如何實現線程切換的 (上)咱們已經知道了在RxJava中,subscribeOn 將上游線程切換到指定的子線程是如何實現的。這裏就接着來看,observeOn 是如何將下游線程切換到指定線程的。java
這裏能夠經過UML圖簡單回顧一下subscribeOn的原理。git
經過 subscribeOn 咱們完成了如下操做:github
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼
將真正的 subscribe 操做安置在了SubscribeTask這樣個一個Runnable當中,這個 Runnable 將由scheduler 這個調度器負責啓動,所以就把上游操做放到了 scheduler 所在的線程中。app
簡單回顧完 subscribeOn 以後,咱們就來看看 observeOn 是如何工做的。ide
其實,瞭解 subscribeOn 的原理以後,再來看 observeOn 就簡單多了,類的命名及實現思路都有不少類似之處,能夠對照着理解。函數
RxJava的代碼寫的很是巧妙,能夠說是百讀不厭,能夠學習的地方特別多。爲了不陷入只見樹木不見森林的噩夢,咱們就帶着如下問題去探索 observeOn 的奧祕。oop
private void multiThread() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This msg from work thread :" + Thread.currentThread().getName());
sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: s= " + s);
}
});
}
複製代碼
咱們仍是以這段代碼爲例,來看看 observeOn 的工做原理。這裏經過observeOn(AndroidSchedulers.mainThread())將下游線程切換到了咱們很是熟悉的 Android UI 線程。這樣就能夠確保咱們在下游全部的操做都是在 UI 線程中完成。這裏和討論 subscribeOn 同樣,咱們就從這句代碼出發,看看這背後到底發生了什麼。post
有了上一篇的經驗,咱們知道 AndroidSchedulers.mainThread() 必定去建立了某種類型的調度器,爲了方便後面的敘述,這一次咱們先從調度器的建立提及,後面再看 observeOn() 的具體實現。學習
須要注意的是 AndroidSchedulers 並非 RxJava 的一部分,是爲了在 Android 中方便的使用 RxJava 而專門設計的一個調度器實現,源碼RxAndroid 設計很是巧妙;使用前記得在gradle文件中配置依賴。gradle
下面就來看看 AndroidSchedulers.mainThread() 這個咱們很是熟悉的 Scheduler 是如何建立的。
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
}
複製代碼
這裏咱們能夠認爲,當調用AndroidSchedulers.mainThread() 時,返回了一個HandlerScheduler 的實例,而這個實例使用到了咱們很是熟悉的 Handler。那麼重點就來到HandlerScheduler 了。
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
複製代碼
這個類雖然很簡單,可是設計很是巧妙。
首先 HandlerScheduler 是一個 Scheduler ,經過構造函數他獲取到了主線程所在的 Handler實例。而在他的 createWorker() 方法中,他又經過這個 Handler 實例建立了一個HandlerWorker 的實例,這個HandlerWorker 本質上就是一個 Worker。在他的 schedule 方法中,建立了一個 ScheduleRunnable 對象,並會把這個Runnable對象經過 handler 的 sendMessageDelayed 方法發送出去,而咱們知道這個 Handler 是主線程,這樣在下游中,就把任務從某個子線程轉移到了UI線程。
ScheduleRunnable 不但實現了 Runnable ,並且實現了咱們看到過無數次的 Disposable 。
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
複製代碼
這樣,正確狀況下 run 方法會正常執行線程中的任務,而一旦 disposable 對象執行了dispose()方法,那麼 handler.removeCallbacks(this),就可確保在 handler 的 dispatchMessage 方法中,不會在執行任何操做,從而達到了 dispose 的效果。
下面就來看看 Observable 中的 observeOn 方法
Observable.java --- observeOn
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
這個方法的實現和 subscribeOn 的實現很是類似,多了兩個參數 delayError 和 buffersize 。 buffersize 能夠認爲是RxJava內部的一個靜態變量,默認狀況下他的值是128。經過咱們以前的經驗,這裏能夠把 observeOn 的過程簡化以下:
new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
複製代碼
也就是說 observeOn 這個操做符給咱們返回了一個 ObservableObserveOn 對象。很容易想到他也是一個 Observeable。那麼咱們就去看看這個 ObservableObserveOn 究竟是什麼?咱們最關心的 subscribeActual 方法他又是怎樣實現的。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
複製代碼
和 ObservableSubscribeOn 同樣,他也繼承了 AbstractObservableWithUpstream ,這樣他也是一個擁有上游的 Observeable,他的構造函數很簡單,沒什麼能夠說。這裏咱們重點關注一下 subscribeActual 方法的實現。這裏咱們的使用的Scheduler 實例是 AndroidSchedulers.mainThread(),所以就按 else的邏輯分析。
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼
經過 scheduler.createWorker() 建立了 Worker 這個對象。這裏結合以前對 AndroidSchedulers.mainThread() 的分析,此處的 worker 對象是就是一個持有主線程 handler 引用的 Worker。
接着用這個worker又建立了一個ObserveOnObserver對象。看看這個類的實現。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ....}
複製代碼
這個類功能很是強大,首先是一個 Observer ,同時也是一個Runnable,而且還繼承了 BasicIntQueueDisposable(保證原子性、擁有操做隊列功能和 Disposable功能)。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼
咱們關注一下這行代碼,根據以前的說法這裏的 source 是其父類(AbstractObservableWithUpstream)中的成員變量,也就是說是上游,那麼當前ObservableObserveOn 的上游是誰呢? 就是咱們上一篇所說的 ObservableSubscribeOn 。
所以,當這裏開始執行訂閱方法 subscribe() 後,將以以下順序響應:
Observable.subscribe--->Observable.subscribeActual---> ObservableObserveOn.subscribeActual---> ObservableSubscribeOn.subscribeActual--->ObservableCreate.subscribeActual
這些方法的參數均爲 observer,經過層層回調,最後的 subscribeActual(Observer<? super T> observer) 執行時,這個 observer 持有以前幾個 observer 的引用。
咱們再看一下 ObservableCreate.subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
能夠看到,這裏首先會觸發 observer.onSubscribe ,咱們再看一下 ObservableSubscribeOn.subscribeActual
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
好了,這樣咱們又回到了原點:
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼
回到了最初的 Observer:ObserveOnObserver
這個 ObserveOnObserver 持有咱們一開始建立的observer,也就是一個Consumer對象。
下面就來看看這個 ObserveOnObserver
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
複製代碼
這裏指的注意的一點 ,actual 其實就是observer
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
// 現階段,咱們用到的Disposable 都是單個的,暫時不討論其
//爲QueueDisposable的狀況
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
複製代碼
在ObservableCreate.subscribeActual 中咱們知道,當執行subscribe 方法後,首先會執行 observer的 onSubscribe 方法。這裏的實現很是簡單,就是建立了一個queue,並觸發了這個 observer 本身的 onSubscribe 方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
複製代碼
在 onNext 中會執行 scheule() 方法。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
複製代碼
這個地方就有意思了,前面說過這裏的 worker 是一個持有主線程handler 的Worker對象,當他的 schedule 執行時,就會把特定的線程任務經過Handler.postDelay 方法轉移到主線中去執行 。
那麼這裏的this 又是什麼呢?前面咱們說過,ObserveOnObserver 這個類功能很是強大,他是一個Runnable,那麼這裏就是執行他本身的run方法嘍,咱們趕忙看看。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
複製代碼
這裏有一個參數 outputFused 他默認是false,至於他何時爲true,不做爲這裏討論的重點。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
複製代碼
這裏大概就是經過一個死循環,不斷從 onSubscribe 方法中建立的隊列中取出事件,執行observer 的 onNext方法。而當爲例爲空時,就會執行worker.dispose 取消整個事件流,同時從Handler中移除全部消息。
最後在看一眼 onComplete ,onError 和整個相似
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
複製代碼
能夠看到這裏的處理也很簡單,done 設置爲 true .這樣最後便完成了下游事件的執行。
好了,因爲一些無以訴說的緣由,經歷了好久終於把 RxJava 線程切換的下篇給完成了。