來看一下基本代碼:java
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -> System.out.println("onNext : i= " + i));
複製代碼
很簡單,即訂閱時將task交給子線程去作,而數據的回調則在Android主線程中執行。ide
點擊查看源碼:學習
public final Observable<T> subscribeOn(Scheduler scheduler) {
//非空判斷和hook
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
實際上這個方法返回了一個ObservableSubscribeOn對象。咱們有理由猜想這個ObservableSubscribeOn應該和上文的ObservableMap及ObservableDoOnEach類似,都是Observable的一個包裝類(裝飾器):ui
//1.ObservableSubscribeOn也是Observable的一個裝飾器
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//2.存儲上游的ObservableSource和調度器
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//3.new 一個SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//4.回調方法,這說明下游的onSubscribe回調方法所在線程和線程調度無關
// 是訂閱時所在的線程
s.onSubscribe(parent);
//5.當即執行線程調度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
複製代碼
前兩步咱們不須要 再多解釋,直接看第三點,咱們看看SubscribeOnObserver這個類:this
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
//下游的Observer
final Observer<? super T> actual;
//保存上游的Disposable,自身dispose時,連同上游一塊兒dispose
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
複製代碼
相似Observable和ObservableMap,SubscribeOnObserver一樣是Disposable和Observer的一個裝飾器,提供了對下游數據的傳遞,以及將task dispose的接口。spa
第4步咱們以前就講過了,直接看第5步:線程
//5.當即執行線程調度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼
咱們看看SubscribeTask這個類:code
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
難以置信的簡單,SubscribeTask 僅僅是一個Runnable 接口的實現類而已,經過將SubscribeOnObserver做爲參數存起來,在run()方法中添加了上游Observable的被訂閱事件,就沒有了別的操做,cdn
接下來咱們看一下scheduler.scheduleDirect(SubscribeTask)中的代碼:server
public abstract class Scheduler {
//...
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// Worker 自己就是Disposable 的實現類
// 請注意, createWorker()所建立的worker,
// 實際就是Schdulers.io()所提供的IoScheduler所建立的worker
final Worker w = createWorker();
//hook相關
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//即 worker.schedule(task, 0, TimeUnit.NANOSECONDS): 當即執行task
w.schedule(task, delay, unit);
return task;
}
//...
}
複製代碼
咱們不要追究過深,咱們看一下這個createWorker方法的註釋說明:
/** * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions. * 檢索或建立一個新的{@link Scheduler.Worker}表示一系列的action * * When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}. * 當work完成後,應使用{@link Scheduler.Worker#dispose()}取消訂閱。 * * Work on a {@link Scheduler.Worker} is guaranteed to be sequential. * {@link Scheduler.Worker} 上面的work保證是順序執行的 */
複製代碼
如今咱們知道了:咱們經過調用subscribeOn()傳入Scheduler,當下遊ObservableSource被訂閱時(請注意,訂閱順序是由下到上的),距離最近的線程調度subscribeOn()方法中,保存的Scheduler會建立一個worker(對應相應的線程,本文中爲IoScheduler),在其對應的線程中,當即執行task
如今考慮一個問題,假如在咱們的代碼中,屢次使用了subscribeOn()代碼,到線程會怎麼處理呢?
上文已經講到了,無論咱們怎麼經過subscribeOn()方法切換線程,因爲訂閱執行順序是由下到上,所以當最上游的ObservableSource被訂閱時,所在線程固然是距離上游最近的subscribeOn()所提供的線程,即最終Observable老是在第一個subscribeOn()所在的線程中執行。
先看observeOn()內部,果真是hook+Observable的包裝類:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//實例化ObservableObserveOn對象並返回
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
再看ObservableObserveOn:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
//1.相關依賴注入
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//2.建立主線程的worker
Scheduler.Worker w = scheduler.createWorker();
//3.上游數據源被訂閱
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
複製代碼
和subscribeOn()不一樣的是,咱們並非當即在對應的線程執行task,而是將對應的線程(其實是worker)做爲參數,實例化ObserveOnObserver並存儲起來。
當上遊的數據傳遞過來時,ObserveOnObserver執行對應的方法,好比onNext(T),再切換到對應線程中,並交由下游的Observer去接收:
ObserveOnObserver中代碼極多,咱們簡單瞭解原理後,以onNext(T)爲例:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
//...省略其餘代碼
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//隊列
SimpleQueue<T> queue;
@Override
public void onNext(T t) {
if (done) {
return;
}
//將數據存入隊列
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//對應線程取出數據並交由下游的Observer
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//...省略其餘代碼
}
複製代碼
由上文得知,與subscribeOn()相反,observerOn()操做會將切換到對應的線程,而後交由下游的Observer處理,所以observerOn()僅對下游的Observer生效,而且,若是屢次調用,observerOn()的線程調度會持續到下一個observerOn()操做以前。
訂閱順序當從下到上,上游的ObservableSource被訂閱時,先切換線程,而後當即執行task;
當存在多個subscribeOn()方法時,僅第一個subscribeOn()有效。
訂閱順序當從下到上,上游的ObservableSource被訂閱時,會將對應的worker建立並做爲構造參數存儲在Observer的裝飾器中,並不會當即切換線程;
當數據由上游發送過來時,先將數據存儲到隊列中,而後切換線程,而後在新的線程中將數據發送給下游的Observer;
當存在多個observerOn()方法時,僅對距下游下一個observerOn()以前的observer有效
有興趣能夠關注個人小專欄,學習更多知識:小專欄