前言:通過前面兩篇文章對RxJava2源碼的分析,咱們已經對RxJava2的基本流程及操做符的原理有了必定程度的認識。這篇文章將在前面兩篇文章的基礎上,對RxJava2的線程調度進行分析,建議先閱讀前面兩篇的文章,再閱讀本文。html
注:文章內容過多,建議在空閒時閱讀。java
爲了更好的理解RxJava2的線程調度原理,不被其餘的代碼所幹擾,這裏就只貼出與線程調度有關的代碼,以下oop
private void threadScheduleCode() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e("wizardev", "上游所在的線程: "+Thread.currentThread().getName());
Thread.sleep(2*1000);
emitter.onNext("wizardev");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("wizardev", "onSubscribe: "+Thread.currentThread().getName() );
}
@Override
public void onNext(String s) {
Log.e("wizardev", "接收到上游發射的數據爲: " + s);
Log.e("wizardev", "下游所在的線程: "+ Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
複製代碼
能夠看下執行這段代碼後打印的日誌,以下源碼分析
能夠發現上游和下游確實不在同一個線程中,那麼RxJava2是怎麼進行線程切換的呢?想知道答案,請繼續閱讀本文。post
本文要解決的問題其實就一個,就是RxJava2是如何進行線程調度的?可是,圍繞着這個問題又會有兩個小的問題須要解決:ui
爲了可以更容易理解線程調度的原理,這裏對源碼分析的順序將會按照代碼的執行順序進行分析。this
由於前面的文章已經分析過了create
方法,因此就直接分析subscribeOn
這個方法,直接上源碼,以下spa
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
有了前面分析源碼的經驗,能夠知道,subscribeOn
方法其實就是返回了ObservableSubscribeOn類的實例並將上游的ObservableCreate和subscribeOn
方法的參數注入到了它的構造方法中。 繼續看下ObservableSubscribeOn類的源碼,以下
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//...
//省略部分源碼
}
複製代碼
從源碼中能夠看到,這裏分別將ObservableCreate類的實例以及subscribeOn
方法的參數即Schedulers.io()
做爲了ObservableSubscribeOn類的成員變量。 好了,上面的這些就是執行subscribeOn(Schedulers.io())
這句代碼所作的事情了,下面來看下observeOn(AndroidSchedulers.mainThread())
這句代碼所作的事情。
直接看源碼,以下
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
從上面的代碼能夠看出,observeOn
方法最終調用的是含有三個參數的observeOn
方法,而這個方法的做用是返回了ObservableObserveOn類的實例並將observeOn
方法的參數scheduler注入其中。 如今來看實例化ObservableObserveOn類的時候都作了什麼,ObservableObserveOn類的代碼以下
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//...
//省略部分代碼
}
複製代碼
能夠看到,實例化ObservableObserveOn類的時候,將ObservableSubscribeOn的實例及AndroidSchedulers.mainThread()
還有其餘的兩個默認參數都做爲了它的成員變量保存。
由前面的兩篇文章可知,下游的subscribe
方法最終會調用上游的subscribeActual
方法,因此會調用這裏的subscribeActual
方法,代碼以下
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
如今來一句句的分析上面的代碼,首先if條件確定是不成立的,由於這裏scheduler
實際上是HandlerScheduler
,爲何是HandlerScheduler呢?咱們來一點點的分析,由observeOn(AndroidSchedulers.mainThread())
這句代碼能夠知道,observeOn方法的參數是AndroidSchedulers.mainThread()
,那這個AndroidSchedulers.mainThread()
又是什麼呢?看代碼
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
//上面的方法,返回的就是MAIN_THREAD,而MAIN_THREAD最終返回的是
//MainHolder.DEFAULT
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
//上面的MainHolder.DEFAULT就是實例化了HandlerScheduler
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
複製代碼
上面貼出的代碼都是與AndroidSchedulers.mainThread()
有關的代碼,從上面的代碼中能夠得出結論,AndroidSchedulers.mainThread()
最終是實例化了HandlerScheduler
,因此,subscribeActual方法中的scheduler是HandlerScheduler,因此,if語句的條件不成立,這裏會執行subscribeActual方法中的else語句,即執行下面的代碼
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼
一樣,按照代碼的執行順序來分析,看第一句代碼,Scheduler.Worker w = scheduler.createWorker();
從前文中的分析能夠知道,這裏的scheduler
是HandlerScheduler,因此,這裏是調用HandlerScheduler類中的createWorker
方法,HandlerScheduler類中的createWorker
方法的代碼以下
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
複製代碼
從上面的代碼能夠得出,HandlerScheduler類中的createWorker
方法返回了HandlerWorker
類的實例,這裏傳入HandlerWorker
構造方法中的兩個參數是在上面已經分析過的方法中進行初始化的,以下
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
複製代碼
從這句代碼中能夠得知,HandlerWorker構造方法中的handler
是實例化在主線程中的Handler,async
的值是false。 好了,到這裏咱們知道了Scheduler.Worker w = scheduler.createWorker();
這句代碼的做用是實例化了HandlerWorker,而實例化HandlerWorker的同時,在其構造方法中初始化了兩個成員變量。
下面繼續看這句代碼source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
,先看這句代碼中的這段new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
代碼作了什麼,代碼以下
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
複製代碼
根據上文的分析能夠得出這裏的幾個參數分別表明什麼
this.downstream就是這段代碼
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("wizardev", "onSubscribe: "+Thread.currentThread().getName() );
}
@Override
public void onNext(String s) {
Log.e("wizardev", "接收到上游發射的數據爲: " + s);
Log.e("wizardev", "下游所在的線程: "+ Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
複製代碼
this.worker就是new HandlerWorker(handler, async);
this.delayError的值是false
this.bufferSize就是一個int型的數字
好了,如今繼續來看source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
這裏的source
就是上游的Observable,這裏就是ObservableSubscribeOn類的實例,因此這句代碼實際就是調用了ObservableSubscribeOn類中的subscribe
方法,而ObservableSubscribeOn沒有這個方法,因此是調用其父類的subscribr
方法,由以前的文章可知,最終調用的就是ObservableSubscribeOn類中的subscribeActual
方法。因此,如今須要把思路切換到ObservableSubscribeOn類中的subscribeActual
方法了
仍是看代碼,subscribeActual方法的以下
public void subscribeActual(final Observer<? super T> observer) {
//1
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//2
observer.onSubscribe(parent);
//3
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
根據前面的分析可知,這個方法中的參數就是new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
這段代碼。照舊,按照代碼的執行順序分析,代碼中已經標註了1,2,3的執行步驟,
如今來分析「1」處代碼,看下SubscribeOnObserver類,代碼以下
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
//這裏的downstream就是new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
複製代碼
重要部分已在代碼中註釋。
接着分析「2」處的代碼,這裏的observe就是ObserveOnObserver的實例,調用的就是ObserveOnObserver類中的onSubscribe
方法,onSubscribe
方法的代碼以下
public void onSubscribe(Disposable d) {
//這裏會直接進入if方法中
if (DisposableHelper.validate(this.upstream, d)) {
//這句代碼的做用就是將new SubscribeOnObserver<T>(observer);賦值給了this.upstream
this.upstream = d;
//d的值是SubscribeOnObserver的實例,這裏if條件不成立
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
//實例化大小爲bufferSize的隊列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//上文已經分析了downstream的值,最下游的onSubscribe與線程調度
//無關,在那個線程調用的subscribe就在哪一個線程回調
downstream.onSubscribe(this);
}
}
複製代碼
主要的代碼已在文中註釋,下面來分析「3」處的代碼
如今一步步的分析「3」處的代碼,new SubscribeTask(parent)
代碼以下
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
能夠看出SubscribeTask直接實現了的Runnable,並將new SubscribeOnObserver<T>(observer);
做爲成員變量。
繼續看scheduler.scheduleDirect(…)
這裏的scheduler是這句代碼Schedulers.io()
,Schedulers.io()代碼以下
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
//IO是實例化的IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
//最終會調用這個
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
複製代碼
上面的代碼能夠看出,Schedulers.io()
最終返回的是IoScheduler,因此scheduler.scheduleDirect(…)
這句代碼中的scheduler就是IoScheduler,而scheduleDirect
方法是IOTask父類中的方法,代碼以下
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
//最終調用的是這個方法
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//這裏的createWorker調用的是IoScheduler中的方法
final Worker w = createWorker();
//仍然是Runnable
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//實例化DisposeTask並將decoratedRun及w注入
DisposeTask task = new DisposeTask(decoratedRun, w);
//調用的w的schedule方法,將上面的三個值做爲參數
w.schedule(task, delay, unit);
return task;
}
複製代碼
這裏,詳細看下final Worker w = createWorker();
這句代碼,createWorker()
方法的代碼以下
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
複製代碼
能夠看到這句代碼的做用是實例化了EventLoopWorker並返回。 接着看w.schedule(task, delay, unit);
這句代碼,w爲EventLoopWorker,因此這裏是EventLoopWorker類中的schedule
方法,代碼以下
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//最終調用的是NewThreadWorker類中的方法
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
//最終會調用這個方法
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//實例化了ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
//不會進入這個方法
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//重點,這裏是把ScheduledRunnable放進了線程池中,關於java線程
//池你們能夠自行研究
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
複製代碼
上面中的代碼已經有了一些註釋,最重要的就是這句代碼
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
複製代碼
這段代碼的做用就是將任務放進了線程池中等待執行。 一樣,這段代碼就是
subscribeOn是怎樣將要處理的數據放到到工做線程的?
這個問題的答案。
經過分析「3」處的代碼,能夠發現其實就是將SubscribeTask實例進行了一層層的包裝,而後丟到線程隊列中等待執行,爲了便於理解,我畫了一下包裝層級圖,以下
根據上面的層級關係,會發現最後會調到SubscribeTask的run
方法,這個方法又調用了上游的subscribe
方法,而這個上游就是ObservableCreate
類,因爲前面兩篇已經分析過了這個類,這裏就再也不分析。根據前面兩篇的分析,知道最後會調用發射數據的方法,而這時這個發射數據其實就已是在工做線程中了。
根據前面兩篇的分析,能夠知道emitter.onNext("wizardev");
這句代碼就是調用下游的onNext
方法,這裏就會調用SubscribeOnObserver類的onNext
方法,SubscribeOnObserver類的onNext
方法的源碼以下
public void onNext(T t) {
downstream.onNext(t);
}
複製代碼
這裏直接調用了下游的onNext
方法,這個下游就是ObserveOnObserver類即這裏會調用ObserveOnObserver類中的onNext
方法,ObserveOnObserver類中的onNext
方法代碼以下
public void onNext(T t) {
//這裏的done爲初始值false
if (done) {
return;
}
//sourceMode爲初始值0
if (sourceMode != QueueDisposable.ASYNC) {
//將上游發射的數據放入隊列中,這個queue就是在onSubscribe方法中實例化的
queue.offer(t);
}
//調用方法
schedule();
}
複製代碼
繼續看schedule
方法的源碼。以下
void schedule() {
if (getAndIncrement() == 0) {
//根據上文的分析能夠知道這個worker就是HandlerScheduler的內部類
//HandlerWorker的實例
worker.schedule(this);
}
}
複製代碼
這裏講解一下這句worker.schedule(this);
代碼,這裏的worker
就是HandlerScheduler內部類HandlerWorker的實例,因此這裏調用了HandlerScheduler內部類HandlerWorker的schedule
方法並將ObserveOnObserver實例做爲參數傳入。 如今,來看HandlerWorker類中的schedule
方法,代碼以下
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
複製代碼
這個方法中重要的就是下面這段代碼
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
//將message放入messageQueue中等待輪詢,這裏的handler在主線程中,
//因此這裏執行scheduled的run方法,其實已經切換到主線程中了
handler.sendMessageDelayed(message, unit.toMillis(delay));
複製代碼
瞭解Handler原理的同窗就會知道上面的這段代碼最終會調用scheduled的run
方法。不瞭解Handler原理的同窗,能夠看下個人這篇文章。這裏的scheduled的run
方法會調用run = RxJavaPlugins.onSchedule(run);
這句代碼的run
方法,即調用的是ObserveOnObserver類中的run
方法,看下ObserveOnObserver類中的run
方法的代碼,以下
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
複製代碼
上面代碼中的outputFused
的初始值爲false,因此會執行else語句中的代碼,看下drainNormal
的代碼,以下
void drainNormal() {
int missed = 1;
//這個queue是在onSubscribe初始化的,在onNext中將上游的數據添加進去的
final SimpleQueue<T> q = queue;
//將下游的observe賦值給a
final Observer<? super T> a = downstream;
//
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//開始輪詢
for (;;) {
boolean d = done;
T v;
try {
//輪詢取出q中的值,這裏的值就是在上游發射的
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//取出的值,傳遞給下游的onNext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
複製代碼
重要的代碼,已經在代碼中進行了註釋,這裏就再也不講解,上面代碼的做用就是,不斷取出上游發射的數據,而後調用下游的onNext
方法並將取出的值傳遞進去。
分析到這裏,算是將RxJava2線程調度的源碼理清楚了。能夠發如今進行線程調度的時候大量的使用Runnable,一層層的包裝,而後在一層層的來調用。首先,將線程切換到工做線程中的方法是將調用上游subscribe
方法放在了Runnable類中的run
方法中,而後將這個Runnable層層包裝後放進線程隊列中等待執行,最後在工做線程中處理髮射的數據。
將線程切換到主線程中的方法是利用Handler,將處理好的數據放進一個隊列中,放進隊列中的這個動做仍是在工做線程中完成的,而後,利用Handler將線程切換到主線程,最後不斷的取出隊列中的數據,不斷調用下游的onNext
方法。經過這種方式來完成線程的調度。
經過上面的分析,能夠發現RxJava2線程調度仍是挺複雜的,牽涉到的知識點也是比較多的,爲了更簡單,更有條理的講解RxJava2線程調度的原理,同時爲了讓你們不至於在源碼中迷失,因此這裏分析源碼按照代碼的執行順序一步步的進行的。由於,代碼執行的時候會在不一樣的類之間來回切換,因此,你們會發現分析的時候在各個類中跳來跳去。
因爲篇幅的緣由,文章的一些知識沒有詳細的來說解,如Java的線程池,Handler原理等,你們能夠本身查閱相關資料或者留言一塊兒討論,若是發現文中有不對的地方,也歡迎指正。