先貼上系列的連接:
rxjava2源碼解析(一)基本流程分析
rxjava2源碼解析(二)線程切換分析
rxjava2源碼解析(三)線程池原理分析java
上一篇介紹了rxjava2源碼解析(一)基本流程分析,這篇準備說說線程切換。bash
仍是先從最基本的使用開始看:異步
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onNext("4");
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "s = " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
相較於前面的實例,多了兩段代碼,subscribeOn
,observeOn
。咱們知道,subscribeOn
是用來調整被觀察者(發射源)的線程,而observeOn
是調整觀察者(處理器)的線程。ide
咱們先從observeOn
的源碼來看它是如何控制處理器的線程:post
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//判空代碼,和hock相關機制,咱們能夠忽略,直接看ObservableObserveOn
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
首先咱們知道,observeOn
、subscribeOn
都是observable
這個裝飾器的方法,他們的返回值也都是observable
(前面講過這是裝飾器模式)。這裏仍是老樣子,直接看ObservableObserveOn
這個對象就好了。ui
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
//將上游source存在本地
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//若是是當前線程,就不作處理
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
重點仍是subscribeActual
這個方法。咱們看到,初始化ObservableObserveOn
的時候傳入了咱們設置的scheduler
。因此在subscribeActual
裏,先判斷scheduler
是不是TrampolineScheduler
。TrampolineScheduler
是什麼東西呢?咱們看官方註釋:this
/**
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
* after the current unit of work is completed.
*計劃在當前線程上工做,但不會當即執行。 將工做放入隊列並在當前工做單元完成後執行。
*/
複製代碼
OK,一目瞭然,若是是當前線程,不作任何處理,直接用綁定起來。不然,新建一個ObserveOnObserver
對象,將上游裝飾器(這裏的上游是代碼流程上的上游,即調用observeOn
的裝飾器)先於這個對象綁定。這個看起來是否是很眼熟?咱們回看上一篇裏面說的ObservableCreate
裏的subscribeActual
方法,對比一下有什麼不一樣。spa
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
能夠看到,二者都有在內部新建一個對象,並將上游裝飾器(或者是發射源)與之綁定。區別在於,ObservableCreate
內部是新建一個發射器CreateEmitter
對象,而ObservableObserveOn
內部是新建一個處理器ObserveOnObserver
對象。ObservableCreate
是將以前存儲的上游發射源與發射器綁定,ObservableObserveOn
是將上游裝飾器與處理器綁定。
根據這一點,咱們能夠將裝飾器分爲兩種樣式:線程
ObservableCreate
相似,屬於每一條流水線的開端,自己是裝飾器,上游是發射源,內部生成一個發射器,處理最開始的發射流程。咱們稱之爲起始裝飾器ObservableObserveOn
相似,屬於流水線中間流程,自己是裝飾器,上游是裝飾器,內部新建一個處理器來處理上游事件,下游是處理器或者其餘裝飾器。咱們稱之爲流程裝飾器。下面咱們能夠看看ObserveOnObserver
的源碼。code
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
//同步異步相關,暫時無論
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//重點是這個
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver繼承了runnable接口,意味着能夠當作是線程任務來執行。這裏表明着在新線程中執行run方法。
worker.schedule(this);
}
}
//ObserveOnObserver繼承了runnable接口
@Override
public void run() {
//同步異步相關,咱們直接看drainNormal()
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
····//省略一些判斷的代碼
v = q.poll();
//這裏就能夠看到,將下游的onNext方法,切換到新線程執行。
a.onNext(v);
}
···
}
}
}
複製代碼
這裏咱們就不深刻細究了(會在下一篇中詳細來講),只須要知道,這是上游的處理器執行onNext
,傳到這裏,使用以前設置的線程執行下游的onNext
方法。因此這裏就完成了線程切換功能。而且這個切換,延續到下游全部處理的onNext
。若是在下游再次調用ObserverOn
,就會將後面的處理器切換到另一個線程。
因此,咱們能夠獲得結論,ObserverOn
能夠屢次調用,每次調用會做用於下游的全部處理器,直到遇到新的ObserverOn
。
接下來能夠看看,SubscribeOn
的源碼內容。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
//判空和hock機制代碼,忽略。直接看ObservableSubscribeOn
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
同樣的配方,同樣的味道,咱們能夠直接看ObservableSubscribeOn
類的subscribeActual
方法。
@Override
public void subscribeActual(final Observer<? super T> observer) {
//建立SubscribeOnObserver內部類對象
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//這裏調用了下游的onSubscribe
observer.onSubscribe(parent);
//scheduler.scheduleDirect方法返回一個disposable
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
這裏能夠看到,ObservableSubscribeOn
一樣是一個流程裝飾器,在調用subscribe
的時候,內部新建一個處理器,這個處理器與下游處理器相互持有。這裏與ObservableObserveOn
有所不一樣,並無當即執行subscribe
將上游裝飾器與內部處理器鏈接起來,而是執行了:
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼
咱們看看SubscribeTask
是啥:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//這裏的source是上游裝飾器,parent是內部處理器
source.subscribe(parent);
}
}
複製代碼
很簡單,SubscribeTask
繼承了Runnable
,其中run
方法是執行上游裝飾器的subscribe
方法。咱們再看scheduleDirect
方法。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
這裏createWorker
方法,是每一個不一樣的scheduler
自行重寫的。咱們不深究這塊的代碼,只須要理解爲,在subscribeActual
中,根據前面設置的scheduler
,新建一個線程,在線程中當即執行上游source
的subscribe
方法,與內部處理器SubscribeOnObserver
綁定。SubscribeOnObserver
處理器與通常處理器沒什麼區別,就不貼代碼了。
那麼上面就實現了,在另一個線程執行subscribe
方法。看過前面基本流程的都知道,這裏基本上就肯定了,上游全部subscribe
方法執行的線程。因此咱們知道了,subscribeOn()
方法會切換上游全部的subscribe
方法,至於發射源所在的線程,只跟離它最近的subscribeOn()
方法中所切換的線程有關。因此說,subscribeOn()
方法只須要執行一次,且只有第一次是生效的。
咱們看到,終端Observer
這裏有一個onSubscribe
方法,咱們通常在這裏進行一些初始化的操做,而前面的源碼中也有不少地方有onSubscribe
方法。那這個方法究竟是執行在哪一個線程呢?咱們從源碼中尋找答案。
想要知道onSubscribe
方法在哪一個線程,只須要看,在離終端處理器最近的上游裝飾器中,是在哪一個線程調用onSubscribe()
的。咱們來看看subscribeOn
和observeOn
有沒有更改它所運行的線程。 先看ObserveOn
,畢竟這是做用於下游處理器的線程切換方法。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
從上面的源碼能夠看到,ObservableObserveOn
類中的subscribeActual
並無onSubscribe
相關的內容。咱們看看ObserveOnObserver
的源碼:
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
·····//不相干代碼
//這裏的downstream是指下游處理器
downstream.onSubscribe(this);
}
}
複製代碼
咱們看到,在ObserveOnObserver
被調用onSubscribe
的時候,會調用下游的onSubscribe
,參數是自己。也就是說,ObserveOn
並無切換onSubscribe
方法的線程。
再看subscribeOn
方法,很明顯是直接在subscribeActual
中執行下游處理器的onSubscribe
方法。
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
這兩個線程切換的方法,都沒有更改onSubscribe()
方法的線程。因此咱們能肯定,在終端的處理器Observer
裏面的onSubscribe()
方法,是跟外部在同一個線程上。
observeOn
做用於下游的全部處理器,能夠屢次調用。每個處理器所運行的線程,決定於它最近的上游observeOn
方法中指定的線程。subscribeOn
做用於上游的發射源,主要是用來指定subscribe
方法所在的線程。針對於發射源,只有離它最近的下游subscribeOn
方法中所指定的線程才生效。因此subscribeOn
方法屢次調用並無效果。onSubscribe()
方法並不會跟隨內部線程切換而切換線程。運行在哪一個線程,只跟外部建立這一整套觀察者模式的線程一致。OK,上面就是本篇的所有內容了,下一篇咱們深刻看一看,rxjava2 裏面的線程池和線程切換究竟是怎麼實現的!
每週更新,敬請期待~