RxJava 是如何實現線程切換的(下)

前言

經過前一篇的RxJava2 是如何實現線程切換的 (上)咱們已經知道了在RxJava中,subscribeOn 將上游線程切換到指定的子線程是如何實現的。這裏就接着來看,observeOn 是如何將下游線程切換到指定線程的。java

RxJava - subscribeOn

這裏能夠經過UML圖簡單回顧一下subscribeOn的原理。git

經過 subscribeOn 咱們完成了如下操做:github

  • 建立了一個 ObservableSubscribeOn 對象,本質上來講他就是一個Observable,他同時實現了 AbstractObservableWithUpstream(HasUpstreamObservableSource )這樣一個接口,是他變了一個擁有上游的Observeable。
  • 在 ObservableSubscribeOn 的 subscribeActual 方法中
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼

將真正的 subscribe 操做安置在了SubscribeTask這樣個一個Runnable當中,這個 Runnable 將由scheduler 這個調度器負責啓動,所以就把上游操做放到了 scheduler 所在的線程中。app

  • Schedulers.newThread()或者Schedulers.io() 都是經過工廠方法的模式建立了某種指定類型的線程, 當這個特定的線程執行是,就是執行真實的 subscribe 方法,這樣就把上游操做放到了一個特定的線程中去執行。

RxJava - observeOn

簡單回顧完 subscribeOn 以後,咱們就來看看 observeOn 是如何工做的。ide

其實,瞭解 subscribeOn 的原理以後,再來看 observeOn 就簡單多了,類的命名及實現思路都有不少類似之處,能夠對照着理解函數

ObserveOn

RxJava的代碼寫的很是巧妙,能夠說是百讀不厭,能夠學習的地方特別多。爲了不陷入只見樹木不見森林的噩夢,咱們就帶着如下問題去探索 observeOn 的奧祕。oop

  1. 在 Android 中線程間傳遞消息會使用 Handler,這裏是否使用?又是如何使用的?
  2. AndroidSchedulers.mainThread() 作了什麼 ?
  3. 下游任務是如何保證被分配到指定線程的。

示例

private void multiThread() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("This msg from work thread :" + Thread.currentThread().getName());
                sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: s= " + s);
                    }
                });
    }

複製代碼

咱們仍是以這段代碼爲例,來看看 observeOn 的工做原理。這裏經過observeOn(AndroidSchedulers.mainThread())將下游線程切換到了咱們很是熟悉的 Android UI 線程。這樣就能夠確保咱們在下游全部的操做都是在 UI 線程中完成。這裏和討論 subscribeOn 同樣,咱們就從這句代碼出發,看看這背後到底發生了什麼。post

有了上一篇的經驗,咱們知道 AndroidSchedulers.mainThread() 必定去建立了某種類型的調度器,爲了方便後面的敘述,這一次咱們先從調度器的建立提及,後面再看 observeOn() 的具體實現。學習

須要注意的是 AndroidSchedulers 並非 RxJava 的一部分,是爲了在 Android 中方便的使用 RxJava 而專門設計的一個調度器實現,源碼RxAndroid 設計很是巧妙;使用前記得在gradle文件中配置依賴。gradle

AndroidSchedulers.mainThread()

下面就來看看 AndroidSchedulers.mainThread() 這個咱們很是熟悉的 Scheduler 是如何建立的。

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
}
複製代碼

這裏咱們能夠認爲,當調用AndroidSchedulers.mainThread() 時,返回了一個HandlerScheduler 的實例,而這個實例使用到了咱們很是熟悉的 Handler。那麼重點就來到HandlerScheduler 了。

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}
複製代碼

這個類雖然很簡單,可是設計很是巧妙。

  • 首先 HandlerScheduler 是一個 Scheduler ,經過構造函數他獲取到了主線程所在的 Handler實例。而在他的 createWorker() 方法中,他又經過這個 Handler 實例建立了一個HandlerWorker 的實例,這個HandlerWorker 本質上就是一個 Worker。在他的 schedule 方法中,建立了一個 ScheduleRunnable 對象,並會把這個Runnable對象經過 handler 的 sendMessageDelayed 方法發送出去,而咱們知道這個 Handler 是主線程,這樣在下游中,就把任務從某個子線程轉移到了UI線程。

  • ScheduleRunnable 不但實現了 Runnable ,並且實現了咱們看到過無數次的 Disposable 。

@Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {

            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }
複製代碼

這樣,正確狀況下 run 方法會正常執行線程中的任務,而一旦 disposable 對象執行了dispose()方法,那麼 handler.removeCallbacks(this),就可確保在 handler 的 dispatchMessage 方法中,不會在執行任何操做,從而達到了 dispose 的效果。

observeOn

下面就來看看 Observable 中的 observeOn 方法

Observable.java --- observeOn

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
複製代碼

這個方法的實現和 subscribeOn 的實現很是類似,多了兩個參數 delayError 和 buffersize 。 buffersize 能夠認爲是RxJava內部的一個靜態變量,默認狀況下他的值是128。經過咱們以前的經驗,這裏能夠把 observeOn 的過程簡化以下:

new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)
複製代碼

也就是說 observeOn 這個操做符給咱們返回了一個 ObservableObserveOn 對象。很容易想到他也是一個 Observeable。那麼咱們就去看看這個 ObservableObserveOn 究竟是什麼?咱們最關心的 subscribeActual 方法他又是怎樣實現的。

ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

複製代碼

和 ObservableSubscribeOn 同樣,他也繼承了 AbstractObservableWithUpstream ,這樣他也是一個擁有上游的 Observeable,他的構造函數很簡單,沒什麼能夠說。這裏咱們重點關注一下 subscribeActual 方法的實現。這裏咱們的使用的Scheduler 實例是 AndroidSchedulers.mainThread(),所以就按 else的邏輯分析。

Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼

經過 scheduler.createWorker() 建立了 Worker 這個對象。這裏結合以前對 AndroidSchedulers.mainThread() 的分析,此處的 worker 對象是就是一個持有主線程 handler 引用的 Worker。

接着用這個worker又建立了一個ObserveOnObserver對象。看看這個類的實現。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { ....}
複製代碼

這個類功能很是強大,首先是一個 Observer ,同時也是一個Runnable,而且還繼承了 BasicIntQueueDisposable(保證原子性、擁有操做隊列功能和 Disposable功能)。

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼

咱們關注一下這行代碼,根據以前的說法這裏的 source 是其父類(AbstractObservableWithUpstream)中的成員變量,也就是說是上游,那麼當前ObservableObserveOn 的上游是誰呢? 就是咱們上一篇所說的 ObservableSubscribeOn 。

所以,當這裏開始執行訂閱方法 subscribe() 後,將以以下順序響應:

Observable.subscribe--->Observable.subscribeActual---> ObservableObserveOn.subscribeActual---> ObservableSubscribeOn.subscribeActual--->ObservableCreate.subscribeActual

這些方法的參數均爲 observer,經過層層回調,最後的 subscribeActual(Observer<? super T> observer) 執行時,這個 observer 持有以前幾個 observer 的引用。

咱們再看一下 ObservableCreate.subscribeActual

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

能夠看到,這裏首先會觸發 observer.onSubscribe ,咱們再看一下 ObservableSubscribeOn.subscribeActual

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
複製代碼

好了,這樣咱們又回到了原點:

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼

回到了最初的 Observer:ObserveOnObserver

這個 ObserveOnObserver 持有咱們一開始建立的observer,也就是一個Consumer對象。

下面就來看看這個 ObserveOnObserver

  • 構造函數
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
複製代碼

這裏指的注意的一點 ,actual 其實就是observer

  • onSubscribe
@Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
				// 現階段,咱們用到的Disposable 都是單個的,暫時不討論其
				//爲QueueDisposable的狀況

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                actual.onSubscribe(this);
            }
        }
複製代碼

ObservableCreate.subscribeActual 中咱們知道,當執行subscribe 方法後,首先會執行 observer的 onSubscribe 方法。這裏的實現很是簡單,就是建立了一個queue,並觸發了這個 observer 本身的 onSubscribe 方法。

  • onNext
@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
複製代碼

在 onNext 中會執行 scheule() 方法。

void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
複製代碼

這個地方就有意思了,前面說過這裏的 worker 是一個持有主線程handler 的Worker對象,當他的 schedule 執行時,就會把特定的線程任務經過Handler.postDelay 方法轉移到主線中去執行

那麼這裏的this 又是什麼呢?前面咱們說過,ObserveOnObserver 這個類功能很是強大,他是一個Runnable,那麼這裏就是執行他本身的run方法嘍,咱們趕忙看看。

@Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
複製代碼

這裏有一個參數 outputFused 他默認是false,至於他何時爲true,不做爲這裏討論的重點。

void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
複製代碼

這裏大概就是經過一個死循環,不斷從 onSubscribe 方法中建立的隊列中取出事件,執行observer 的 onNext方法。而當爲例爲空時,就會執行worker.dispose 取消整個事件流,同時從Handler中移除全部消息。

最後在看一眼 onComplete ,onError 和整個相似

@Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
複製代碼

能夠看到這裏的處理也很簡單,done 設置爲 true .這樣最後便完成了下游事件的執行。

最後

好了,因爲一些無以訴說的緣由,經歷了好久終於把 RxJava 線程切換的下篇給完成了。

相關文章
相關標籤/搜索