詳解 RxJava2 的線程切換原理

轉載請標明地址 QuincySx:[www.jianshu.com/p/a9ebf730c… ]java


讀了這篇文章你將會收穫什麼

  • RxJava2 基本的運行流程(並不會詳述)
  • RxJava2 線程切換原理
  • 爲何 subscribeOn() 只有第一次切換有效
  • RxAndroid 簡單分析

PS:建議您對 RxJava 有一些瞭解或使用經驗再看此文章,推薦結合源碼品嚐 RxJava入門文章 [給 Android 開發者的 RxJava 詳解-扔物線(gank.io/post/560e15…)bash

而後貼一下本篇文章分析的示例代碼app

CompositeDisposable comDisposable = new CompositeDisposable();

protected void test() {
        Observable<String> observable = Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws
                            Exception {
                        emitter.onNext("hello");
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                comDisposable.add(d);
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
}
複製代碼

RxJava2 基本的運行流程

根據上述源碼分析出流程圖,這裏顏色相同的表明同一對象。根據流程圖看一遍源碼基本流程就能理通ide

RxJava2 線程切換原理流程圖

RxJava2 線程切換原理

RxJava 切換線程怎麼用我就很少說了請參考個人另外一篇文章 Android:隨筆——RxJava的線程切換oop

1、observeOn() 的線程切換原理

根據運行流程來看 observeOn() 執行後是獲得 ObservableObserveOn 對象,那麼當 ObservableObserveOn 綁定監聽者的時候要運行 subscribe() 方法源碼分析

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //調用 subscribeActual()
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ...
    }
}
複製代碼

接下來咱們看一下 subscribeActual() 方法post

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //scheduler 是傳進來的線程調度對象,如 Schedulers.io() 、AndroidSchedulers.mainThread() 等,這裏調用了 createWorker() 方法暫時看一下就好稍後分析 RxAndroid 會說明 
        Scheduler.Worker w = scheduler.createWorker();
        //咱們看到他把 w 參數傳進去了
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
複製代碼

從上述代碼咱們能夠看到 ObservableObserveOn 是被 ObserveOnObserver 監聽的,因此收到通知也是由 ObserveOnObserver 做出響應,接下來咱們假設當 Rxjava 發送 onNext 通知時會調用 ObserveOnObserver 的 onNext() 方法 ( PS:固然若是是 onComplete()、onError() 等也是同樣的邏輯 ),而後咱們來看一看 ObserveOnObserver 的 onNext() 方法,ui

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //切換線程
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        //直接調用了 worker 的 schedule 方法,須要注意的是這裏他把本身傳了進去
        worker.schedule(this);
    }
}
複製代碼

如今我先把把 schedule(Runnable run) 貼出來this

public Disposable schedule(@NonNull Runnable run) {
    return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
複製代碼
  1. 咱們看到這個他接收的參數是一個 Runnable,這是怎麼回事呢,咱們看一下 ObserveOnObserver 對象,他不但實現了 Observer 接口而且也實現了 Runnable 接口
  2. 接下看,繼續調用 schedule( Runnable action, long delayTime, TimeUnit unit) 方法,可是這個方法是個抽象方法,這裏咱們就假設這裏這個 worker 是 IO 線程,因此我直接貼 IoScheduler 的代碼了
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
     if (tasks.isDisposed()) {
         // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } 複製代碼

而後再貼一下 scheduleActual 的方法spa

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //就是個 Runnable
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        //判斷延遲時間,而後使用線程池運行 Runnable
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }
    return sr;
}
複製代碼

這樣一來就會在相應的線程中運行 ObserveOnObserver 的 run 方法

public void run() {
    //這個地方具體的我尚未搞明白,大概就是在這個方法裏調用 onNext() ,而後 observeOn() 操做符以後的監聽者的運行線程就變了
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
        }
    }
複製代碼
2、subscribeOn() 的線程切換原理

PS:這個切換原理其實和 observeOn() 原理很像

跟 observeOn() 同樣,只不過這個操做的對象是 ObservableSubscribeOn, 這個對象也是一樣的代碼邏輯,運行 subscribe() 方法,而後調用 subscribeActual() 方法,因此就直接貼 subscribeActual() 的代碼

public void subscribeActual(final Observer<? super T> s) {
    //建立與之綁定的 SubscribeOnObserver
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    //1. 建立 SubscribeTask 實際上就是個 Runnable
    //2. 而後調用 scheduler.scheduleDirect 方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼

咱們看一下 scheduleDirect 的方法

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //一個 Runnable 具體做用沒分析
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //這個代碼看着熟悉嗎  沒錯上面 observeOn 提到過,知道它是運行 Runnable 咱們就直接看 Runnable 裏面的 run() 了
    w.schedule(task, delay, unit);
    return task;
}
複製代碼

咱們看一下 DisposeTask 的 run()

public void run() {
    runner = Thread.currentThread();
    try {
        decoratedRun.run();
    } finally {
        dispose();
        runner = null;
    }
}
複製代碼

調來調去咱們又回到了 SubscribeTask 的 run()

public void run() {
    source.subscribe(parent);
}
複製代碼

這個地方的運行線程已經被切換了,他又開始往上一層層的去訂閱,因此 create(new ObservableOnSubscribe(){})這個匿名實現接口運行 subscribe 的線程運行環境都被改變了,再去調用 onNext() 等方法線程環境也是被改變的

爲何 subscribeOn() 只有第一次切換有效

寫到這裏咱們這個問題也就能回答了 由於 RxJava 最終能影響 ObservableOnSubscribe 這個匿名實現接口的運行環境的只能是最後一次運行的 subscribeOn() ,又由於 RxJava 訂閱的時候是從下往上訂閱,因此從上往下第一個 subscribeOn() 就是最後運行的,這就形成了寫多個 subscribeOn() 並無什麼亂用的現象。


分析一下 RxAndroid

RxAndroid 源碼
其實 RxAndroid 裏面並無什麼複雜的代碼,他其實只是提供一個能切換到 Android 主線程線程調度器。

其實它的原理和 RxJava 自帶的那些線程調度器同樣,若是你想了解 RxJava 的 IO 線程池,什麼的能夠本身看一看,我這裏分析 RxAndroid 主要有如下幾點緣由

  1. 弄清楚 RxAndroid 這個庫的具體做用
  2. 弄清楚他是怎麼就能把線程切換到主線程(他是怎麼提供的主線程環境)
  3. 弄清楚線程調度器的運行原理
  4. 最重要的是它相對於 RxJava 自帶的那些調度器,他比較簡單容易分析

正文開始

首先咱們找一下入口 AndroidSchedulers.mainThread() 這個地方應該是就是入口了,咱們看一下 AndroidSchedulers 這個類的源碼吧,總共也沒幾行

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        }
    );

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }
複製代碼

這個應該不用我多說你們都能看明白,看到這裏咱們基本上明白了 RxAndroid 就是經過 Handler 來拿到主線程的

咱們拿 subscribeOn() 中的一些流程來講

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}
複製代碼

首先咱們看到調用了 createWorker() 這是個抽象方法咱們找到具體實現類 HandlerScheduler

public Worker createWorker() {
    return new HandlerWorker(handler);
}
複製代碼

單純的建立一個 Worker 並把主線程的 Handler 傳進去,而後調用 Worker 的 schedule() 方法

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    /**忽略一些代碼**/
    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } 複製代碼

到這裏看明白 RxJava 如何經過 RxAndroid 來切換到主線程運行,其實 RxAndroid 的核心就是 Handler


總結

本篇參考 RxJava 2.1.12 與 RxAndroid:2.0.2 源碼 不得不說 Handler 在安卓中的地位真的是很牛逼 看法不到的地方歡迎你們指出

相關文章
相關標籤/搜索