本文章主要是對RxJava2的線程切換流程進行源碼分析,在閱讀以前,能夠先閱讀如下文章:java
RxJava2源碼分析——訂閱react
本文章用的RxJava和RxAndroid版本以下:android
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
複製代碼
咱們先寫段示例代碼,代碼以下:git
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
Log.i("TanJiaJun", "subscribe方法所在的線程:" + Thread.currentThread().getName());
})
// 切換上游Observable到io線程
.subscribeOn(Schedulers.io())
// 切換下游Observer到主線程,使用AndroidSchedulers.mainThread須要使用RxAndroid這個庫
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun", "onSubscribe方法所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun", "onNext方法所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun", "onError所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("TanJiaJun", "onComplete方法所在的線程:" + Thread.currentThread().getName());
}
});
複製代碼
首先咱們看下Schedulers這個類。github
閱讀源碼後,咱們能夠得知,總共有5種類型。框架
@NonNull
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
複製代碼
該方法返回一個默認、共享的調度器實例用於計算工做,這能夠用於事件循環、處理回調和其餘計算工做。異步
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
複製代碼
該方法返回一個默認、共享的調度器實例用於IO綁定的工做,這能夠用於異步執行阻塞IO,默認是由單線程實例池實現的,能夠重用已經啓動的線程,要注意的是,這個調度器的線程數量可能會無限制增加,從而致使內存溢出(OOM)。ide
@NonNull
public static Scheduler trampoline() {
return TRAMPOLINE;
}
複製代碼
該方法返回一個默認、共享的調度器實例,用於隊列工做,並以FIFO方式在一個參與線程中執行它們,也就是說會等到當前線程執行完畢纔會執行下個線程。oop
÷@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
複製代碼
該方法返回一個默認、共享的調度器實例,該實例爲每一個工做單元建立一個新線程,默認實現是建立一個新的單線程,要注意的是,每次調用Scheduler.scheduleDirect方法(及其重載方法)和Scheduler.createWorker方法均可以建立數目無限制的線程,從而形成內存溢出(OOM)。源碼分析
@NonNull
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
複製代碼
該方法返回一個默認、共享的調度器實例,該實例會建立一個單獨的線程。
負責線程切換有兩個方法:subscribeOn和observeOn。
這個方法負責切換上游Observable的線程,代碼以下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
根據上篇文章閱讀subscribe方法源碼的經驗,咱們只看ObservableSubscribeOn類就能夠了,要注意的點我都寫上註釋了,代碼以下:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// source是上游Observable
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 建立SubscribeOnObserver對象,傳入下游Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// 建立SubscribeTask任務,使用指定的調度器進行調度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
// 省略部分代碼
}
// SubscribeTask繼承Runnable,因此咱們能夠看下它的run方法
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 這裏已經切換到想要的線程了,source是上游Observable,調用它的subscribe方法,而且傳入下游observer,根據上篇文章的經驗,上游Observable的subscribeActual方法會被執行
source.subscribe(parent);
}
}
}
複製代碼
咱們的示例代碼中調用subscribeOn方法傳入的是Schedulers.io(),看下這個方法對應的源碼,代碼以下:
// Schedulers.java
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
複製代碼
IO是一個final的靜態變量,它是經過Schedulers這個類的靜態代碼塊賦值的,代碼以下:
static {
// 省略部分代碼
IO = RxJavaPlugins.initIoScheduler(new IOTask());
// 省略部分代碼
}
複製代碼
它會建立一個IOTask對象,代碼以下:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
複製代碼
這個類實現了Callable接口,而且重寫了call方法,返回IoHolder.DEFAULT,代碼以下:
// DEFAULT是final的靜態類IoHolder裏的final的靜態變量
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
複製代碼
咱們看到這裏建立了一個IoScheduler對象,代碼以下:
// IoScheduler.java
static final RxThreadFactory WORKER_THREAD_FACTORY;
static {
// 省略部分代碼
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));
// RxThreadFactory是一個線程工廠,能夠刪除對new Thread調用的硬鏈接
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
// 省略部分代碼
// 建立CachedWorkerPool對象,第二個參數是傳入TimeUnit,若是是null的話,是不會建立線程池的,下面會講到
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
// IoScheduler的構造方法
public IoScheduler() {
// 這裏會調用下面那個方法
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
// 賦值給成員變量threadFactory
this.threadFactory = threadFactory;
// 用CachedWorkerPool建立一個原子引用
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
// 調用start方法
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
// compareAndSet方法第一個參數是預期值,第二個參數是新值,若是NONE==update的話,就會將值原子性地設置會更新值,而且返回true,不然不會更新,而且返回false,而後調用shutdown方法
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
static final class CachedWorkerPool implements Runnable {
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
// 當unit不是null的話,就會建立一個newScheduledThreadPool線程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
}
複製代碼
咱們再回到上面說的ObservableSubscribeOn類,看到以下這段代碼:
// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// 調用了scheduler的scheduleDirect方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
複製代碼
咱們再看下scheduleDirect方法,代碼以下:
// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 調用createWorker方法,createWorker是個抽象方法,剛纔咱們所說的IoScheduler是Scheduler的實現類,它重寫了createWorker方法
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// DisposeTask實現了Runnable接口
DisposeTask task = new DisposeTask(decoratedRun, w);
// 調用worker的scheduler方法
w.schedule(task, delay, unit);
return task;
}
複製代碼
咱們再看下createWorker方法,代碼以下:
// IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
// 建立EventLooperWork,而且傳入從原子引用獲得的當前的值
return new EventLoopWorker(pool.get());
}
複製代碼
EventLoopWorker是IoScheduler的一個final的靜態的內部類,繼承Scheduler.Worker,代碼以下:
// IoScheduler.java
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
// 若是已經取消訂閱了,就再也不調度
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// 調用了ThreadWorker的scheduleActual方法
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
// ThreadWorker繼承NewThreadWorker
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
複製代碼
咱們看下NewThreadWorker的scheduleActual方法,代碼以下:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
// ScheduledExecutorService是個接口,繼承ExecutorService接口
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
// 調用SchedulerPoolFactory的create方法,建立線程池
executor = SchedulerPoolFactory.create(threadFactory);
}
// 省略部分代碼
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
// executor.submit和excutor.schedule其實最後會調用同一個方法,執行這個方法後任務就提交上去了
if (delayTime <= 0L) {
// 若是不須要延遲就調用submit方法,提交一個有返回結果的任務
f = executor.submit(task);
} else {
// 若是須要延遲就調用schedule方法,提交一個有返回結果的任務
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
// 省略部分代碼
}
複製代碼
到這裏,上游Observable的代碼就會被切換到對應的線程了,咱們這裏是拿**Schedulers.io()**做爲例子來說解,其餘類型你們能夠本身看下源碼。
結論:訂閱事件是從下往上傳遞的,最終傳遞到上游Observable的subscribe方法。
這個方法負責切換下游Observer的線程,代碼以下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
// 調用下面那個方法
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
也像以前那樣,咱們只須要看ObservableObserveOn這個方法,代碼以下:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
// source是上游Observable
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 判斷要指定的調度器是否是TrampolineScheduler,也就是是否是傳入Schedulers.trampoline()
if (scheduler instanceof TrampolineScheduler) {
// 若是是,就直接調用subscribe方法,由於TrampolineScheduler是在當前線程調度的,上面也說起過
source.subscribe(observer);
} else {
// 若是不是,就經過調度器建立worker,而後調用subscribe方法傳入建立的ObserveOnObserver對象
Scheduler.Worker w = scheduler.createWorker();
// 與subscribeOn不一樣,subscribe方法不是在已經切換好的線程中執行
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ObserveOnObserver是一個final的靜態內部類,實現了Runnable接口,因此咱們看下它的run方法
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
// 省略部分代碼
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
// 若是checkTerminated方法返回false就會return
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
// 最後調用下游Observer的onNext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
// 省略部分代碼
@Override
public void run() {
// 到這裏已經切換到想要的線程了,outputFused變量是經過requestFusion設置的
if (outputFused) {
drainFused();
} else {
// 咱們主要看這個方法
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
// delayError在咱們調用的observeOn方法中是傳入false的
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
// 若是Throwable不是null的話,就會調用下游Observer的onError方法
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
// 若是任務隊列是空的話,證實任務執行完畢,就會調用下游Observer的onComplete方法
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
// 這個方法和背壓(Backpressure)有關係,不是本文章的主要內容,暫時不討論
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
// 省略部分代碼
}
}
複製代碼
結論:觀察事件是從上往下傳遞的,最終傳遞到下游Observer的回調方法,例如:onNext方法、onComplete方法、onError方法,注意onSubscribe方法所在的線程是當前的線程,不會隨着訂閱線程或者觀察線程的切換而改變。
咱們試下屢次調用subscribeOn方法,把示例代碼改爲以下:
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
Log.i("TanJiaJun", "subscribe方法所在的線程:" + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun", "onSubscribe方法所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun", "onNext方法所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun", "onError所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("TanJiaJun", "onComplete方法所在的線程:" + Thread.currentThread().getName());
}
});
複製代碼
Log以下:
根據以前的源碼分析,其實它像以下代碼:
new Thread("AndroidSchedulers.mainThread()") {
@Override
public void run() {
new Thread("Schedulers.io()") {
@Override
public void run() {
System.out.println("上游Observable的subscribe方法所在的線程:" + getName());
}
}.start();
}
}.start();
複製代碼
Log以下:
結論:若是咱們屢次調用subscribeOn方法,切換訂閱線程的話,上游Observable的subscribe方法所在的線程只會是在第一次切換的線程,上面也提到過了,由於訂閱事件是從下往上傳遞的,最終傳遞到上游Observable的subscribe方法。
咱們試下屢次調用obsesrveOn方法,把示例代碼改爲以下:
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("Tan");
emitter.onNext("Jia");
emitter.onNext("Jun");
emitter.onComplete();
Log.i("TanJiaJun", "subscribe方法所在的線程:" + Thread.currentThread().getName());
})
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("TanJiaJun", "onSubscribe方法所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i("TanJiaJun", "onNext方法所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("TanJiaJun", "onError所在的線程:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("TanJiaJun", "onComplete方法所在的線程:" + Thread.currentThread().getName());
}
});
複製代碼
Log以下:
根據以前的源碼分析,其實它像以下代碼:
new Thread("AndroidSchedulers.mainThread()") {
@Override
public void run() {
new Thread("Schedulers.io()") {
@Override
public void run() {
System.out.println("下游Observer的回調方法所在的線程:" + getName());
}
}.start();
}
}.start();
複製代碼
Log以下:
結論:若是咱們屢次調用observeOn方法,切換觀察線程的話,下游Observer的回調方法,例如:onNext方法、onComplete方法、onError方法,它們所在的線程會隨着每次切換而切換,由於觀察事件是從上往下傳遞的,最終傳遞到下游Observer的回調方法。
Demo:RxJavaDemo
個人GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架(Kotlin-MVVM)
個人掘金:譚嘉俊
個人簡書:譚嘉俊