Rxjava 2.x 源碼系列 - 線程切換 (下)

Rxjava 2.x 源碼系列 - 基礎框架分析java

Rxjava 2.x 源碼系列 - 線程切換 (上)bash

Rxjava 2.x 源碼系列 - 線程切換 (下)框架

Rxjava 2.x 源碼系列 - 變換操做符 Map(上)ide

前言

在上一篇博客 Rxjava 2.x 源碼系列 - 線程切換 (上) 咱們講解到,Observable#subscribeOn 是如何控制上游 Observable 的執行線程的,他的實質是將 Observable#subscribe(Observer) 的操做放在了指定線程,當咱們調用 subcribe 的時候,它的過程是從下往上的,即下面的 Observable 調用上面的 Observable。用下面的流程圖表示以下。oop

接下來,咱們先來回顧一下,Observable 與 Observer 之間是如何訂閱的ui

簡單來講就是,當咱們調用 Observable 的 subsribe 方法的時候,會調用當前對應 observbale 的 subscribeActual 方法,在該方法裏面,會調用 observer 的 onSubeciber 方法,並調用對應 ObservableOnSubscirbe 的 subcribe 的方法,並將 ObservableEmitter 做爲方法參數暴露出去。而 ObservableEmitter 持有咱們的 Observer 的引用,當咱們調用 ObservableEmitter 的 onNext,onErrot,onComplete 方法的時候,會調用他持有的 Observer 的相應的方法。this

這篇博客主要講解如下問題:spa

  • observeOn 是如何控制 Observer 的回調線程的

Observable#observeOn 方法

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
複製代碼

observeOn 的套路跟 Observable.create 方法的套路基本同樣,都是先判斷是否爲空,不爲 null,用一個新的類包裝起來,並持有上游的引用 source。這裏咱們的包裝類是 ObservableObserveOn。.net

這裏咱們來看一下 ObservableObserveOn線程

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 若是是當前線程,直接低啊用
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 不然,經過 worker 的形式調用
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}
複製代碼

從第一篇博客 Rxjava 2.x 源碼系列 - 基礎框架分析,咱們知道,當咱們調用 Observable.subscibe(observer) 方法的時候,會調用到 對應的 Observable 實例的 subscribeActual 方法,而這裏咱們的 Observable 爲 ObservableObserveOn 。

在 ObservableObserveOn.subscribeActual 方法中,首先會判斷 scheduler instanceof TrampolineScheduler (是不是當前線程),true 的話,會直接調用 source.subscribe(observer)。不然,先用 ObserveOnObserver 包裝 observer,再調用 source.subscribe 方法

接下來,咱們一塊兒來看一下 ObserveOnObserver 類

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    
     final Observer<? super T> actual;
        
    
    }
    
public abstract class BasicIntQueueDisposable<T>
extends AtomicInteger
implements QueueDisposable<T> {
複製代碼

ObserveOnObserver 繼承於 BasicIntQueueDisposable,實現 Observer, Runnable 接口,而 BasicIntQueueDisposable extends AtomicInteger ,是原子操做類。

其中,還有一個很重要的屬性 actual ,便是實際的 observer。
複製代碼

接下來,咱們來看一下幾個重要的方法:

onNext,onError,onComplete,onSubscribition

public void onSubscribe(Disposable s) {
    if (DisposableHelper.validate(this.s, s)) {
    
            -------

            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                actual.onSubscribe(this);
                schedule();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                actual.onSubscribe(this);
                return;
            }
        }

        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        actual.onSubscribe(this);
    }
}

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

     ------
    schedule();
}

@Override
public void onError(Throwable t) {
    if (done) {
        RxJavaPlugins.onError(t);
        return;
    }
    ----
    
    schedule();
}

@Override
public void onComplete() {
    if (done) {
        return;
    }
     ----
     
    schedule();
}


複製代碼

在 onNext,onError,onComplete 方法中首先都會先判斷是否 done,若是沒有的話,會調用 schedule() 方法。

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}
複製代碼

而在 schedule() 方法中,直接調用 Worker 的 schedule 方法,這樣就會執行咱們當前 ObserveOnObserver 的 run 方法,

public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}
複製代碼

在 drainFused 和 drainNormal 方法中,會根據狀態去調用 actual(外部傳入的 observer) 的 onNext、onError、onComplete 方法。所以 observer 的回調所在的線程將取決於外部傳入的 scheduler 的 schedule 方法所在的線程。

假設咱們傳入的是 observeOn(AndroidSchedulers.mainThread())

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    ---
}

複製代碼
private static final class HandlerWorker extends Worker {
    private final Handler handler;

    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } } 複製代碼

從上面的分析咱們知道 observer 的回調所在的線程將取決於外部傳入的 scheduler 的 schedule 方法所在的線程。即 指定 observeOn(AndroidSchedulers.mainThread()) 以後,將取決於 HandlerWorker 的 schedule 方法執行的線程,在該方法中,很明顯執行於主線程。


總結

控制 Observer 的回調實際是放到 ObservableObserveOn 的 run 方法中,即 ObservableObserveOn 的 run 執行在主線程, Observer 的回調也發生在主線程,而 ObservableObserveOn 的 run 執行在哪一個線程,取決於 外部傳入的 scheduler。所以, 當外部傳入的 scheduler 的 schedule 方法在主線程,那麼 observer 也在主線程回調。

Android 技術人

掃一掃,歡迎關注個人公衆號。若是你有好的文章,也歡迎你的投稿。

相關文章
相關標籤/搜索