RxJava的基礎使用和基礎流程能夠看看上一篇的文章《Android的三方庫 - RxJava:RxJava的使用和基本訂閱流程》緩存
實際項目中常常會有一些數據獲取操做,這就須要使用到RxJava的線程了。 因此讓咱們來看看RxJava的線程切換。bash
首先看一個小的案例:ide
Log.i(TAG, "當前線程 : " + Thread.currentThread().getName());
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe: currentThread : " + Thread.currentThread().getName());
emitter.onNext("ONE");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: currentThread : " + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: currentThread : " + Thread.currentThread().getName());
Log.i(TAG, "onNext: s : " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: currentThread : " + Thread.currentThread().getName());
}
});
複製代碼
輸出結果:oop
當前線程 : main
onSubscribe: currentThread : main
subscribe: currentThread : RxCachedThreadScheduler-1
onNext: currentThread : main
onNext: s : ONE
onComplete: currentThread : main
複製代碼
而後咱們在線程中來執行這段代碼:源碼分析
new Thread(new Runnable() {
@Override
public void run() {
RxObservable3(); // 這個就是上面那段代碼
}
}).start();
複製代碼
輸出結果:post
當前線程 : Thread-5
onSubscribe: currentThread : Thread-5
subscribe: currentThread : RxCachedThreadScheduler-1
onNext: currentThread : main
onNext: s : ONE
onComplete: currentThread : main
複製代碼
咱們能夠很明顯得出幾個結論:ui
1. onSubscribe運行的線程和代碼運行所在的線程是一致的。
2. Observable 運行的線程是subscribeOn指定的線程。
3. Observer 運行的線程是 observeOn 指定的線程。
複製代碼
如今咱們對源碼來進行分析。this
在咱們的案例中的調用:spa
subscribeOn(Schedulers.io())
複製代碼
進入到它的源碼中:線程
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
在這裏傳入的參數 this 其實就是 當前建立的Observable
( 就是 ObservableCreate
)將它封裝成 ObservableSubscribeOn
,並返回它的對象。
scheduler
就是你使用的 Schedulers.io()
。
咱們進入到 io()
中
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
複製代碼
在這裏的 IO
是一個對象,它的具體實現要看 IOTask
:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
--------
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
複製代碼
能夠看到咱們就這樣獲得了IoScheduler
的對象。先靜置這裏,等會使用。
咱們根據上一篇的文章的分析,知道subscribeActual()
是一個抽象方法,以前的實現是ObservableCreate
中,
如今ObservableCreate
對象被包裝成爲一個新的ObservableSubscribeOn
對象
所以咱們來看看 ObservableSubscribeOn
中的 subscribeActual
方法:
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
目前看到都是沒有使用過任何其餘的線程,因此 觀察者( observer
) 的 onSubscribe()
運行的線程就是當前的線程。
因此 onSubscribe()
執行的線程就是 當前的線程。
即:
onSubscribe: currentThread : Thread-5
複製代碼
繼續來看下一行代碼,執行了 SubscribeTask
這個類。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
這是一個Runnable
類,在run
方法中 執行 subscribe
方法,
這裏的source
其實就是上一個Observable
,也就是ObservableCreate
的對象。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
-----------------------------------
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 這個createWorker 是一個抽象方法,具體的實現是在 Schedule的子類中,在這裏也就是IoScheduler
final Worker w = createWorker();
// decoratedRun實際上仍是SubscribeTask這個對象
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 將 Workder 和 Runnable 包裝成爲一個DisposeTask
DisposeTask task = new DisposeTask(decoratedRun, w);
// 執行
w.schedule(task, delay, unit);
return task;
}
複製代碼
再繼續看一下IoSchedule
r中的createWorker()
和 worker
的schedule()
方法。
final AtomicReference<CachedWorkerPool> pool; // AtomicReference 就是實現對象引用的原子更新
@NonNull
@Override
public Worker createWorker() {
// 獲得一個EventLoopWorker對象,並傳入一個緩存池
return new EventLoopWorker(pool.get());
}
--------------------------
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
.... //省略無關代碼
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } // Runnable 最終是交給了threadWorker 去執行 return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 複製代碼
咱們能夠看下這個 ThreadWorker
,它是沒有實現 scheduleActual()
這個方法的,
咱們來到它的父類 NewThreadWorker
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// 這個run 是SubscribeTask
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 繼續封裝成爲ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
.... // 省略無關代碼
Future<?> f;
try {
// 在這裏用線程池執行
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
複製代碼
最後就是 executor
這個線程池對象來執行任務,SubscribeTask
會被線程池執行,
也就是說 Observable
的subscribe()
方法會在IO線程中被調用。
以前的輸出結果:
subscribe: currentThread : RxCachedThreadScheduler-1
複製代碼
因此 subscribe()
執行的線程就是 subscribeOn
指定的線程(在這裏就是IoScheduler
)。
案例:
//省略先後代碼,看重點部分
.subscribeOn(Schedulers.io())//第一次
.subscribeOn(Schedulers.newThread())//第二次
.subscribeOn(AndroidSchedulers.mainThread())//第三次
複製代碼
輸出結果:
subscribe: currentThread : RxCachedThreadScheduler-1
複製代碼
就是第一次的 subscribeOn()
的設置起做用了,這是爲何呢?
由於 每一次調用 subscribeOn()
都會將以前的Observable
從新包裝。
咱們來看這張圖: 引用 玉剛說的 詳解 RxJava 的消息訂閱和線程切換原理中的圖
從第三次的ObservableSubscribeOn
每次都會通知它的上一個Obsevable
最後都會上傳到第一次的ObservableSubscribeOn
中,
因此不過設置多少,都只有第一次的subscribeOn()
纔是生效的。
在案例中的設置:
.observeOn(AndroidSchedulers.mainThread())
複製代碼
進入源碼:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
-----------------
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
這個也是從新包裝了一個ObservableObserveOn
對象,
這裏的this
就是以前被封裝過的 ObservableSubscribeOn
對象。
而後咱們繼續進入到ObservableObserveOn
中去。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 若是是當前線程的話,直接調用ObservableSubscribeOn的subscribe()方法
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 這裏的 scheduler 其實就是 AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
// 將worker封裝成ObserveOnObserver
// 這裏的source.subscribe是在IO線程中執行
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
接下來咱們來看看 ObserveOnObserver
中的源碼:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t); // 加入隊列中
}
schedule();
}
複製代碼
主要是執行 schedule()
。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
複製代碼
其實 ObserveOnObserver
本身也實現了 Runnable
,因此就是調用了本身。
這裏的worker
就是 你傳入的 主線程(mainThread
)
而後咱們來看看 run
方法:
@Override
public void run() {
if (outputFused) { // outputFused 默認是false
drainFused();
} else {
drainNormal();
}
}
複製代碼
進入到 drainNormal()
中:
void drainNormal() {
int missed = 1;
// 這裏就是存儲消息的隊列
final SimpleQueue<T> q = queue;
// 這裏是 downstream 就是 自定義的Observer
final Observer<? super T> a = downstream;
.... //省略部分代碼
// 隊列中取出消息
v = q.poll();
.... //省略部分代碼
// 這裏就是調用自定義的Observer#onNext()
a.onNext(v);
....
}
}
複製代碼
這樣咱們最終會調用到 咱們自定義的Observer#onNext()
因此Observer#onNext()
是在 observeOn()
指定的線程中調用的。
在這裏的包裝順序是
CreateEmitter
包裝了SubscribeOnObserver
;
SubscribeOnObserver
包裝了ObserveOnObserver
;
ObserveOnObserver
包裝了自定義的Observer
。
對於調用onNext()
方法:
咱們在subscribe
中使用 ObservableEmitter
中調用了onNext()
;
會在SubscribeOnObserver
中調用onNext()
;
而後會繼續調用到ObserveOnObserver#onNext()
;
因此在run
中調用的Observer
的對象實際上是自定義的Observer
。
以上就是我對於RxJava線程切換的源碼分析,若有不對,請指正。