上一篇文章Rxjava2.x源碼解析(一): 訂閱流程中咱們講了 RxJava2 的訂閱部分的源碼。但 RxJava2 最強大的部分實際上是在異步。默認狀況下,下游接收事件所在的線程和上游發送事件所在的線程是同一個線程。接下來咱們在上一篇文章的示例代碼中加入線程切換相關代碼:php
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
});
// 下游 observer
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// onSubscribe 方法會最早被執行
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: ");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
// 在子線程中進行事件的發送
observable.subscribeOn(Schedulers.newThread())
// 切換到UI線程進行監聽
.observeOn(AndroidSchedulers.mainThread())
// 將上游和下游進行關聯
.subscribe(observer);
複製代碼
咱們經過subscribeOn(Schedulers.newThread())
這行代碼,就能夠將咱們上游的代碼切換到子線程中去執行,經過observeOn(AndroidSchedulers.mainThread())
又能指定下游監聽的代碼執行在主線程(這裏的 AndroidSchedulers 並非RxJava2 默認提供的,而是屬於Android領域的,由RxAndroid這個庫實現)。一行代碼,就能自由切換上下游的代碼執行的線程,這麼騷的操做,究竟是怎麼實現的呢?css
咱們上面兩個方法中傳入的都是一個Scheduler
實例,翻譯過來就是「調度器」,負責線程相關的調度。java
那接下來咱們就先從上游相關的subscribeOn(Schedulers.newThread())
開始分析。
先從參數入手,看看這個Schedulers.newThread()
中執行了什麼:編程
public final class Schedulers {
static final Scheduler SINGLE;
static final Scheduler COMPUTATION;
static final Scheduler IO;
static final Scheduler TRAMPOLINE;
// 這裏是 NEW_THREAD
static final Scheduler NEW_THREAD;
static final class SingleHolder {...}
static final class ComputationHolder {...}
static final class IoHolder {...}
// 初始化一個默認的 NewThreadScheduler
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
static {
...
// 由一個新建立的 NewThreadTask 來初始化 NEW_THREAD
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
...
static final class IOTask implements Callable<Scheduler> {...}
// 這裏是 NewThreadTask
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class SingleTask implements Callable<Scheduler> {...}
static final class ComputationTask implements Callable<Scheduler> {...}
}
複製代碼
能夠看到,newThread(...)
方法會返回一個Scheduler
類型的靜態變量NEW_THREAD
,而該變量的初始化是在以下的靜態代碼塊中:安全
static {
...
// 由一個新建立的 NewThreadTask 來初始化 NEW_THREAD,類型爲 Scheduler
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
複製代碼
這裏面建立了一個NewThreadTask
實例,該類也比較簡單,就是在call()
方法中返回了NewThreadHolder.DEFAULT
:markdown
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
複製代碼
NewThreadHolder.DEFAULT
則是一個NewThreadScheduler
對象:併發
// 初始化一個默認的 NewThreadScheduler
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
複製代碼
那咱們不由好奇,這個call()
方法又是何時調用的呢?咱們繼續回到RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())
這行代碼,從名稱來看是初始化NewThreadScheduler對象的,那咱們進去看下是如何進行的:app
public static Scheduler initNewThreadScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitNewThreadHandler;
if (f == null) {
// 直接看這裏
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
複製代碼
做爲聰明人,咱們直接看callRequireNonNull(defaultScheduler)
這行代碼:異步
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
// 能夠看到,這裏調用了 s.call(),並將結果返回;若爲空,則報異常
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
複製代碼
能夠看到,裏面直接調用了傳入的參數的call()
方法,並返回。
到這裏,就知道了,RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())
這行代碼其實就是初始化一個NewThreadScheduler
對象。ide
繞了這麼遠,其實Schedulers.newThread()
這句就是建立了一個NewThreadScheduler
對象,這裏講的比較細。
咱們繼續回來,看看subscribeOn(Schedulers.newThread())
裏面作了什麼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
根據第一篇文章裏的經驗,咱們知道,這裏又是將上一步生成的 Observable 進一步封裝成一個ObservableSubscribeOn
並返回。其實,RxJava之因此能進行鏈式調用,無外乎就是在每次調用操做符方法的時候,返回一個 Observable 的引用,可是這個 Observable 所具體指向的對象,多是不一樣的。中間可能就建立了新的對象,通過了一層層的包裝。RxJava 裏裝飾器模式用的仍是比較厲害的,因此說,千萬別覺的實際模式都是虛無縹緲的東西。
這裏返回的是一個ObservableSubscribeOn
對象(注意看命名哦!規律以前講過的)
通過上篇文章分析,咱們知道,使用 Observable 的 subscribe 方法進行訂閱的時候,最終會調用到 Observable 的subscribeActual(...)
方法,這裏的Observable
具體就是ObservableSubscribeOn
:
// ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
能夠看到,這裏將 observer 也進行了包裝,包裝成SubscribeOnObserver
對象。也至關於配套啦,haha。
而後又將這個封裝後的對象傳進了一個新建的 SubscribeTask 對象中。
???
這個SubscribeTask
又是啥?
這個SubscribeTask
是ObservableSubscribeOn
這個類的內部類,其實就是一個Runnable
實現類:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 建立一個新的 Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
// 進行線程任務的建立及分發
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
// 是個 Runnable 實現類
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 注意,此處是關鍵,正是從這裏開始,上游(即:source)在新線程從新對下游進行訂閱。
// 從而達到上游發送事件的線程進行切換的目的
// 這裏提早提醒下,屢次訂閱,並非只有第一次訂閱指定的線程纔有效,那只是普通使用場景下的「湊巧」
source.subscribe(parent);
}
}
}
複製代碼
到這,咱們總算看到了線程相關的東西了。Runnable 你們確定都熟悉吧?在它的run()
方法中,調用了source.subscribe(parent)
,這裏的 parent 咱們知道,是封裝以後的SubscribeOnObserver
,但source
又是啥?其實就是咱們在 ObservableSubscribeOn 的構造函數中傳進來的this
,即上游的 Observable :
// Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 這裏傳進來的 this對象,就是上游 Observable 對象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
抽象類 Observable 實現了 ObservableSource 接口,這個接口就是咱們進行訂閱時候用到的subscribe(...)
:
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
複製代碼
繼續看這個 run() 方法,它至關因而把以前的上游經過subscribe(...)
訂閱到了新的下游。也就是說:
subscribeOn(...)方法的本質是,在指定的線程中將上游和下游進行訂閱`。
複製代碼
這和咱們鏈式調用中最後一步的訂閱本質上是同樣的。
明白了這點,也就能知道,這個線程一旦啓動,新的 observer 接收和處理事件,也是在這個子線程裏。即,默認狀況下它會隨着上游線程的切換而切換,兩者始終在一個線程,除非它經過observeOn(...)
自行指定。
咱們如今明白了上游是如何經過一行代碼就能運行在子線程裏,但還沒看到這個線程是何時、如何啓動起來的。
那咱們就回到以前的位置,繼續看scheduler.scheduleDirect(new SubscribeTask(parent))
這行代碼,scheduler 具體指NewThreadScheduler
,但scheduleDirect(...)
這個方法是在父類中實現的,它沒有進行重寫(其餘類型的 scheduler 有進行重寫,好比 ComputationScheduler 等),那就進父類看看:
// Scheduler.java
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// createWorker()爲抽象方法,由子類實現
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
這個方法的參數中有個 Runnable 對象,那咱們直接啓動個線程不就行了?固然是能夠的。可是做爲一個成熟的庫,它必定要考慮更多的場景。須要考慮到線程安全問題,以及對線程的控制,好比,經過 Dispose 來截斷上下游之間事件的事件流。
咱們先看final Worker w = createWorker();
這行代碼,它建立了一個 Worker,具體點就是NewThreadWorker
,這裏貼下NewThreadScheduler.java
的源碼:
/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
複製代碼
繼續回到scheduleDirect(...)
方法的第 8 行:
DisposeTask task = new DisposeTask(decoratedRun, w);
複製代碼
它將咱們要執行的 runnable 和 Worker,又封裝進了一個DisposeTask
中,便於對流進行控制。DisposeTask
是 Scheduler 的靜態內部類,實現了Disposable
, Runnable
, SchedulerRunnableIntrospection
這三個接口:
public abstract class Scheduler {
...
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
複製代碼
建立了 DisposeTask 以後,就將它傳遞給了worker
執行:
w.schedule(task, delay, unit);
複製代碼
這行代碼就是開始執行指定任務,咱們能夠進入NewThreadWorker.java
源碼中查看詳細細節:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
// 最終會調用到 scheduleActual(...)方法
return scheduleActual(action, delayTime, unit, null);
}
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {...}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
/**********************************************
*** 將咱們的runnable對象,又通過了一層封裝 *****
*********************************************/
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
/*********************************************************************************
*** 最終會經過 executor 線程池去執行相應的任務,經過Future,來獲取線程執行後的返回值 *****
********************************************************************************/
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
/**
* Shuts down the underlying executor in a non-interrupting fashion.
*/
public void shutdown() {
if (!disposed) {
disposed = true;
executor.shutdown();
}
}
@Override
public boolean isDisposed() {
return disposed;
}
}
複製代碼
w.schedule(task, delay, unit)
最終會調用到第 46 行的scheduleActual(...)
方法。在該方法中,又將新傳進來的runnable對象封裝進 ScheduledRunnable ,封裝了這麼多層…~~(>_<)~~。而後就直接將這個 ScheduledRunnable
交給線程池去執行了。爲了能在線程執行完以後,接收返回值,使用了Future
。再往下,就徹底是線程池相關的知識點了,此處再也不贅述。
到這,咱們就徹底分析完了 RxJava2 是如何經過一行subscribeOn(...)
代碼切換上游發送事件所在線程的。接下來咱們就來分析observeOn(...)
是如何切換下游處理事件的線程的。
線程的建立,這裏跟以前是相同的。該方法最終會調用到以下重載方法:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
// 建立了一個 ObservableObserveOn 並返回
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
直接進ObservableObserveOn
的subscribeActual(...)
方法:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
這個方法就比較簡單了,直接將上游和新建立的ObserveOnObserver
進行綁定。而且在建立的ObserveOnObserver
的同時,也將 worker 傳進去,進行線程任務的相關處理。到這裏,咱們能夠猜測下,封裝以後的新的 ObserveOnObserver 是如何作到使原observer中的任務在指定的線程中執行的。其實就是重寫對應的方法,將以前的邏輯經過worker來指定執行線程。邊追源碼邊猜測,才能更好的理解。
接下來就來看ObservableObserveOn.java#ObserveOnObserver
的源碼:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
...
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
// 注意,這裏調用了 requestFusion 來獲取 mode,以後會用到
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
// 若是是sync,會當即調用 schedule()
// 執行線程任務,查看run方法
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 執行線程任務,查看run方法
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
// 執行線程任務,查看run方法
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
// 執行線程任務,查看run方法
schedule();
}
@Override
public void dispose() {... }
@Override
public boolean isDisposed() {
return disposed;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
// checkTerminated 方法會檢查任務是否執行結束。
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
// checkTerminated 方法會檢查任務是否執行結束。
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {...}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
// outputFused 是跟背壓及操做符相關,這裏直接分析 drainNormal()
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
// 是否設置了超時錯誤,是在 observeOn(scheduler, delayError, bufferSize()) 的第二個參數傳入的,
// 默認傳了false
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
// 根據是否報了異常,來決定是執行 onError 仍是 onComplete
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
...
}
複製代碼
爲了驗證咱們的猜測,咱們看看在onSubscribe/onNext/onError/onComplete
這些函數中都調用了什麼。
咱們發現,在這些函數中,差很少都調用了schedule();
(調用 requestFusion(…)相關邏輯暫時忽略)。查看該函數的調用出,在第93行:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
複製代碼
這裏直接將this
傳遞給了 worker 進行線程任務的執行,這裏的this
指的就是ObserveOnObserver
,上面說道,它實現了 runnable 接口。而onSubscribe/onNext/onError/onComplete
這些函數中都調用了同一個函數schedule();
,有理由猜測,對各個函數的區分處理,確定就在重寫的run()
方法裏了,查看第150行:
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
複製代碼
outputFused 涉及背壓及操做符的相關處理,這裏咱們直接看drainNormal();
:
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
// checkTerminated 方法會檢查任務是否執行結束。
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
// checkTerminated 方法會檢查任務是否執行結束。
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
// 若是沒結束,就調用新的Observer的 onNext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
複製代碼
在該方法中,首先經過checkTerminated(...)
判斷線程任務是否執行結束(complete或者error),若是沒有,就去執行新的下游Observer的onNext()方法。若是執行完了,就直接返回。
那啥時候調用了新的下游Observer的onComplete/onError
方法呢?固然是在checkTerminated(...)
方法中啦:
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
// 是否設置了超時錯誤,是在 observeOn(scheduler, delayError, bufferSize()) 的第二個參數傳入的,
// 默認傳了false
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
// 根據是否報了異常,來決定是執行 onError 仍是 onComplete
if (e != null) {
disposed = true;
queue.clear();
// 執行 onError
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
// 執行 onComplete
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
複製代碼
在該方法裏,咱們就看到了對onComplete()/onError
方法的調用了。
好了,到這裏,咱們就把rxjava2 中線程切換的知識講完了,裏面還有不少細節須要你們本身細細研究。
總結:
下游observer
的onSubscribe(...)
方法一直是在它所在的線程調用的。即observable.subscribe(observer)
這行代碼所在的線程。
subscribeOn(...)
指定的是上游發送事件的線程, 好比ObservableOnSubscribe
的subscribe(ObservableEmitter<Integer> emitter){...}
方法執行的線程,在該方法裏咱們每每會調用emitter.onNext(...)/onComplete()/onError(...)
來發送事件。
observeOn(...)
指定的是下游接收事件的線程,即onSubscribe(...)/ onNext(...)/onError(...)/onComplete()
這些回調方法的執行線程。
默認狀況下,下游接收事件的線程和上游發送事件的線程,是同一個線程,下游與上游保持一致。上游經過subscribeOn(…)
切換線程的時候,下游仍會自動與其保持一致。除非下游單獨經過observeOn(…)
來指定下游本身的線程。
此外,還須要特別指出的一點就是,屢次指定上游的線程只有第一次指定的有效
這個結論是:錯誤的 錯誤的 錯誤的
不少文章中也都是這麼說的,可是很遺憾,是錯誤的,由於不少人都只是從表象出發,連續調用兩次subscribeOn
,而後在下游Observer的onSubscribe
回調裏打印線程名稱,發現一直是第一次指定的那個線程,就開始想固然的總結結論了,他們的代碼應該是下面這樣的:
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
Log.d(TAG, "subscribe: 當前線程爲: " + Thread.currentThread().getName());
}
});
// 下游 observer
Observer<Integer> observer = new Observer<Integer>() {...}
observable
// 第一次指定
.subscribeOn(AndroidSchedulers.mainThread())
// 第二次指定
.subscribeOn(Schedulers.newThread())
// 切換到UI線程進行監聽
.observeOn(AndroidSchedulers.mainThread())
// 將上游和下游進行關聯
.subscribe(observer);
複製代碼
打印結果爲:
你不斷調整兩個的位置,發現仍然是指定的第一個有效,彷佛你是對的。不防試試下面的例子:
// 上游 observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
Log.d(TAG, "subscribe: 當前線程爲: " + Thread.currentThread().getName());
}
});
// 下游 observer
Observer<Integer> observer = new Observer<Integer>() {...}
observable
// 第一次指定
.subscribeOn(AndroidSchedulers.mainThread())
// 建立第一個 onSubscribe
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "accept1: 當前線程爲:" + Thread.currentThread().getName());
}
})
// 第二次指定
.subscribeOn(Schedulers.newThread())
// 建立第二個 onSubscribe
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "accept2: 當前線程爲:" + Thread.currentThread().getName());
}
})
// 切換到UI線程進行監聽
.observeOn(AndroidSchedulers.mainThread())
// 將上游和下游進行關聯
.subscribe(observer);
複製代碼
結果以下:
doOnSubscribe(...)
內的代碼,運行在它上面離它最近的
subscribeOn()
指定的線程。也就是說,屢次切換都生效了。這點也能夠參考咱們上面的總結裏的第一條:
下游observer的onSubscribe(...)方法一直是在它所在的線程調用的。即observable.subscribe(observer)這行代碼所在的線程。
複製代碼
對doOnSubscribe
操做符就不展開講了。
再仔細看上面的截圖,發現咱們在第二個doOnSubscribe(...)
方法中的代碼反而要比第一個先執行。Why?這實際上是在向上回溯。但願你還能記得,咱們前面說:
subscribeOn(...)方法的本質是,在指定的線程中將上游和下游進行訂閱`。
複製代碼
這個「上游」是個相對概念,上游之上,還有上游,因此就不斷回溯,最終調用到最開始指定的那個線程。
雖然表面上看,確實是第一個指定的有效,可是千萬別被欺騙了。
好了,到這,本篇文章就結束了。文章較長,能夠耐心點,反覆看看。
經過對 RxJava2 的研究,發現裏面涉及到不少知識,我也是一邊讀一遍補其餘知識。好比裏面涉及不少併發編程的知識,而併發編程又須要你對計算機組成原理、操做系統、編譯原理這些有必定的瞭解,還好大學考軟考的時候看過這些方面的書,拾起來相對容易點。
欠的技術債老是要還的,正面剛吧。
歡迎關注公衆號來獲取其餘最新消息,有趣的靈魂在等你。