RxJava2源碼分析(三):線程調度分析

前言:通過前面兩篇文章對RxJava2源碼的分析,咱們已經對RxJava2的基本流程及操做符的原理有了必定程度的認識。這篇文章將在前面兩篇文章的基礎上,對RxJava2的線程調度進行分析,建議先閱讀前面兩篇的文章,再閱讀本文。html

注:文章內容過多,建議在空閒時閱讀。java

相關文章

示例代碼

  爲了更好的理解RxJava2的線程調度原理,不被其餘的代碼所幹擾,這裏就只貼出與線程調度有關的代碼,以下oop

private void threadScheduleCode() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.e("wizardev", "上游所在的線程: "+Thread.currentThread().getName());
                Thread.sleep(2*1000);
                emitter.onNext("wizardev");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("wizardev", "onSubscribe: "+Thread.currentThread().getName() );
                    }
                    @Override
                    public void onNext(String s) {
                        Log.e("wizardev", "接收到上游發射的數據爲: " + s);
                        Log.e("wizardev", "下游所在的線程: "+ Thread.currentThread().getName());
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onComplete() {

                    }
                });
    }

複製代碼

能夠看下執行這段代碼後打印的日誌,以下源碼分析

能夠發現上游和下游確實不在同一個線程中,那麼RxJava2是怎麼進行線程切換的呢?想知道答案,請繼續閱讀本文。post

本文要解決的問題

  本文要解決的問題其實就一個,就是RxJava2是如何進行線程調度的?可是,圍繞着這個問題又會有兩個小的問題須要解決:ui

  1. subscribeOn是怎樣將要處理的數據放到到工做線程的?
  2. observeOn是怎樣將工做線程切換到主線程的?

爲了可以更容易理解線程調度的原理,這裏對源碼分析的順序將會按照代碼的執行順序進行分析。this

subscribeOn方法分析

  由於前面的文章已經分析過了create方法,因此就直接分析subscribeOn這個方法,直接上源碼,以下spa

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

有了前面分析源碼的經驗,能夠知道,subscribeOn方法其實就是返回了ObservableSubscribeOn類的實例並將上游的ObservableCreate和subscribeOn方法的參數注入到了它的構造方法中。 繼續看下ObservableSubscribeOn類的源碼,以下

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    //...
    //省略部分源碼
}
複製代碼

從源碼中能夠看到,這裏分別將ObservableCreate類的實例以及subscribeOn方法的參數即Schedulers.io()做爲了ObservableSubscribeOn類的成員變量。 好了,上面的這些就是執行subscribeOn(Schedulers.io())這句代碼所作的事情了,下面來看下observeOn(AndroidSchedulers.mainThread())這句代碼所作的事情。

observeOn方法分析

  直接看源碼,以下

public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
複製代碼

從上面的代碼能夠看出,observeOn方法最終調用的是含有三個參數的observeOn方法,而這個方法的做用是返回了ObservableObserveOn類的實例並將observeOn方法的參數scheduler注入其中。 如今來看實例化ObservableObserveOn類的時候都作了什麼,ObservableObserveOn類的代碼以下

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    //...
    //省略部分代碼
}
複製代碼

能夠看到,實例化ObservableObserveOn類的時候,將ObservableSubscribeOn的實例及AndroidSchedulers.mainThread()還有其餘的兩個默認參數都做爲了它的成員變量保存。

  由前面的兩篇文章可知,下游的subscribe方法最終會調用上游的subscribeActual方法,因此會調用這裏的subscribeActual方法,代碼以下

protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
複製代碼

如今來一句句的分析上面的代碼,首先if條件確定是不成立的,由於這裏scheduler實際上是HandlerScheduler,爲何是HandlerScheduler呢?咱們來一點點的分析,由observeOn(AndroidSchedulers.mainThread())這句代碼能夠知道,observeOn方法的參數是AndroidSchedulers.mainThread(),那這個AndroidSchedulers.mainThread()又是什麼呢?看代碼

public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
//上面的方法,返回的就是MAIN_THREAD,而MAIN_THREAD最終返回的是
//MainHolder.DEFAULT
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
//上面的MainHolder.DEFAULT就是實例化了HandlerScheduler
 private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }
複製代碼

上面貼出的代碼都是與AndroidSchedulers.mainThread()有關的代碼,從上面的代碼中能夠得出結論,AndroidSchedulers.mainThread()最終是實例化了HandlerScheduler,因此,subscribeActual方法中的scheduler是HandlerScheduler,因此,if語句的條件不成立,這裏會執行subscribeActual方法中的else語句,即執行下面的代碼

Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
複製代碼

一樣,按照代碼的執行順序來分析,看第一句代碼,Scheduler.Worker w = scheduler.createWorker();從前文中的分析能夠知道,這裏的scheduler是HandlerScheduler,因此,這裏是調用HandlerScheduler類中的createWorker方法,HandlerScheduler類中的createWorker方法的代碼以下

public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
複製代碼

從上面的代碼能夠得出,HandlerScheduler類中的createWorker方法返回了HandlerWorker類的實例,這裏傳入HandlerWorker構造方法中的兩個參數是在上面已經分析過的方法中進行初始化的,以下

private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }
複製代碼

從這句代碼中能夠得知,HandlerWorker構造方法中的handler是實例化在主線程中的Handler,async的值是false。 好了,到這裏咱們知道了Scheduler.Worker w = scheduler.createWorker();這句代碼的做用是實例化了HandlerWorker,而實例化HandlerWorker的同時,在其構造方法中初始化了兩個成員變量。

  下面繼續看這句代碼source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));,先看這句代碼中的這段new ObserveOnObserver<T>(observer, w, delayError, bufferSize)代碼作了什麼,代碼以下

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
複製代碼

根據上文的分析能夠得出這裏的幾個參數分別表明什麼

  • this.downstream就是這段代碼

    new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("wizardev", "onSubscribe: "+Thread.currentThread().getName() );
                        }
    
                        @Override
                        public void onNext(String s) {
                            Log.e("wizardev", "接收到上游發射的數據爲: " + s);
                            Log.e("wizardev", "下游所在的線程: "+ Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    }
    複製代碼
  • this.worker就是new HandlerWorker(handler, async);

  • this.delayError的值是false

  • this.bufferSize就是一個int型的數字

好了,如今繼續來看source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));這裏的source就是上游的Observable,這裏就是ObservableSubscribeOn類的實例,因此這句代碼實際就是調用了ObservableSubscribeOn類中的subscribe方法,而ObservableSubscribeOn沒有這個方法,因此是調用其父類的subscribr方法,由以前的文章可知,最終調用的就是ObservableSubscribeOn類中的subscribeActual方法。因此,如今須要把思路切換到ObservableSubscribeOn類中的subscribeActual方法了

ObservableSubscribeOn類中subscribeActual方法分析

  仍是看代碼,subscribeActual方法的以下

public void subscribeActual(final Observer<? super T> observer) {
    //1
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//2
        observer.onSubscribe(parent);
//3
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
複製代碼

根據前面的分析可知,這個方法中的參數就是new ObserveOnObserver<T>(observer, w, delayError, bufferSize)這段代碼。照舊,按照代碼的執行順序分析,代碼中已經標註了1,2,3的執行步驟,

  • 如今來分析「1」處代碼,看下SubscribeOnObserver類,代碼以下

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer<? super T> downstream;
    
            final AtomicReference<Disposable> upstream;
    //這裏的downstream就是new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
            SubscribeOnObserver(Observer<? super T> downstream) {
                this.downstream = downstream;
                this.upstream = new AtomicReference<Disposable>();
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                DisposableHelper.setOnce(this.upstream, d);
            }
    
            @Override
            public void onNext(T t) {
                downstream.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                downstream.onError(t);
            }
    
            @Override
            public void onComplete() {
                downstream.onComplete();
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(upstream);
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            void setDisposable(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
        }
    複製代碼

    重要部分已在代碼中註釋。

  • 接着分析「2」處的代碼,這裏的observe就是ObserveOnObserver的實例,調用的就是ObserveOnObserver類中的onSubscribe方法,onSubscribe方法的代碼以下

    public void onSubscribe(Disposable d) {
        //這裏會直接進入if方法中
                if (DisposableHelper.validate(this.upstream, d)) {
                    //這句代碼的做用就是將new SubscribeOnObserver<T>(observer);賦值給了this.upstream
                    this.upstream = d;
    //d的值是SubscribeOnObserver的實例,這裏if條件不成立
                    if (d instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) d;
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            downstream.onSubscribe(this);
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            downstream.onSubscribe(this);
                            return;
                        }
                    }
    //實例化大小爲bufferSize的隊列
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
    //上文已經分析了downstream的值,最下游的onSubscribe與線程調度
     //無關,在那個線程調用的subscribe就在哪一個線程回調
                    downstream.onSubscribe(this);
                }
            }
    複製代碼

    主要的代碼已在文中註釋,下面來分析「3」處的代碼

  • 如今一步步的分析「3」處的代碼,new SubscribeTask(parent)代碼以下

    final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    複製代碼

    能夠看出SubscribeTask直接實現了的Runnable,並將new SubscribeOnObserver<T>(observer);做爲成員變量。

    繼續看scheduler.scheduleDirect(…)這裏的scheduler是這句代碼Schedulers.io(),Schedulers.io()代碼以下

    public static Scheduler io() {
            return RxJavaPlugins.onIoScheduler(IO);
        }
    
    static {
            SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    
            COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    //IO是實例化的IOTask
            IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
            TRAMPOLINE = TrampolineScheduler.instance();
    
            NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
        }
    
    static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        }
    //最終會調用這個
    static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
    
    複製代碼

    上面的代碼能夠看出,Schedulers.io()最終返回的是IoScheduler,因此scheduler.scheduleDirect(…)這句代碼中的scheduler就是IoScheduler,而scheduleDirect方法是IOTask父類中的方法,代碼以下

    public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
    //最終調用的是這個方法
     public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
         //這裏的createWorker調用的是IoScheduler中的方法
            final Worker w = createWorker();
    //仍然是Runnable
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //實例化DisposeTask並將decoratedRun及w注入
            DisposeTask task = new DisposeTask(decoratedRun, w);
    //調用的w的schedule方法,將上面的三個值做爲參數
            w.schedule(task, delay, unit);
    
            return task;
        }
    複製代碼

    這裏,詳細看下final Worker w = createWorker();這句代碼,createWorker()方法的代碼以下

    public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    複製代碼

    能夠看到這句代碼的做用是實例化了EventLoopWorker並返回。 接着看w.schedule(task, delay, unit);這句代碼,w爲EventLoopWorker,因此這裏是EventLoopWorker類中的schedule方法,代碼以下

    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    //最終調用的是NewThreadWorker類中的方法
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    //最終會調用這個方法
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //實例化了ScheduledRunnable
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                //不會進入這個方法
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
    //重點,這裏是把ScheduledRunnable放進了線程池中,關於java線程
                    //池你們能夠自行研究
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    複製代碼

    上面中的代碼已經有了一些註釋,最重要的就是這句代碼

    if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
    複製代碼

    這段代碼的做用就是將任務放進了線程池中等待執行。 一樣,這段代碼就是

    subscribeOn是怎樣將要處理的數據放到到工做線程的?

    這個問題的答案。

經過分析「3」處的代碼,能夠發現其實就是將SubscribeTask實例進行了一層層的包裝,而後丟到線程隊列中等待執行,爲了便於理解,我畫了一下包裝層級圖,以下

根據上面的層級關係,會發現最後會調到SubscribeTask的run方法,這個方法又調用了上游的subscribe方法,而這個上游就是ObservableCreate類,因爲前面兩篇已經分析過了這個類,這裏就再也不分析。根據前面兩篇的分析,知道最後會調用發射數據的方法,而這時這個發射數據其實就已是在工做線程中了。

發射數據流程分析

  根據前面兩篇的分析,能夠知道emitter.onNext("wizardev");這句代碼就是調用下游的onNext方法,這裏就會調用SubscribeOnObserver類的onNext方法,SubscribeOnObserver類的onNext方法的源碼以下

public void onNext(T t) {
            downstream.onNext(t);
        }
複製代碼

這裏直接調用了下游的onNext方法,這個下游就是ObserveOnObserver類即這裏會調用ObserveOnObserver類中的onNext方法,ObserveOnObserver類中的onNext方法代碼以下

public void onNext(T t) {
     //這裏的done爲初始值false
            if (done) {
                return;
            }
//sourceMode爲初始值0
            if (sourceMode != QueueDisposable.ASYNC) {
//將上游發射的數據放入隊列中,這個queue就是在onSubscribe方法中實例化的
                queue.offer(t);
            }
     //調用方法
            schedule();
        }
複製代碼

繼續看schedule方法的源碼。以下

void schedule() {
            if (getAndIncrement() == 0) {
 //根據上文的分析能夠知道這個worker就是HandlerScheduler的內部類
                //HandlerWorker的實例
                worker.schedule(this);
            }
        }
複製代碼

這裏講解一下這句worker.schedule(this);代碼,這裏的worker就是HandlerScheduler內部類HandlerWorker的實例,因此這裏調用了HandlerScheduler內部類HandlerWorker的schedule方法並將ObserveOnObserver實例做爲參數傳入。 如今,來看HandlerWorker類中的schedule方法,代碼以下

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }
複製代碼

這個方法中重要的就是下面這段代碼

run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }
//將message放入messageQueue中等待輪詢,這裏的handler在主線程中,
//因此這裏執行scheduled的run方法,其實已經切換到主線程中了
            handler.sendMessageDelayed(message, unit.toMillis(delay));
複製代碼

瞭解Handler原理的同窗就會知道上面的這段代碼最終會調用scheduled的run方法。不瞭解Handler原理的同窗,能夠看下個人這篇文章。這裏的scheduled的run方法會調用run = RxJavaPlugins.onSchedule(run);這句代碼的run方法,即調用的是ObserveOnObserver類中的run方法,看下ObserveOnObserver類中的run方法的代碼,以下

public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
複製代碼

上面代碼中的outputFused的初始值爲false,因此會執行else語句中的代碼,看下drainNormal的代碼,以下

void drainNormal() {
            int missed = 1;
//這個queue是在onSubscribe初始化的,在onNext中將上游的數據添加進去的
            final SimpleQueue<T> q = queue;
  //將下游的observe賦值給a
            final Observer<? super T> a = downstream;
//
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
//開始輪詢
                for (;;) {
                    boolean d = done;
                    T v;

                    try {
             //輪詢取出q中的值,這裏的值就是在上游發射的
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }
//取出的值,傳遞給下游的onNext方法
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
複製代碼

重要的代碼,已經在代碼中進行了註釋,這裏就再也不講解,上面代碼的做用就是,不斷取出上游發射的數據,而後調用下游的onNext方法並將取出的值傳遞進去。

總結

  分析到這裏,算是將RxJava2線程調度的源碼理清楚了。能夠發如今進行線程調度的時候大量的使用Runnable,一層層的包裝,而後在一層層的來調用。首先,將線程切換到工做線程中的方法是將調用上游subscribe方法放在了Runnable類中的run方法中,而後將這個Runnable層層包裝後放進線程隊列中等待執行,最後在工做線程中處理髮射的數據。

  將線程切換到主線程中的方法是利用Handler,將處理好的數據放進一個隊列中,放進隊列中的這個動做仍是在工做線程中完成的,而後,利用Handler將線程切換到主線程,最後不斷的取出隊列中的數據,不斷調用下游的onNext方法。經過這種方式來完成線程的調度。

結束語

  經過上面的分析,能夠發現RxJava2線程調度仍是挺複雜的,牽涉到的知識點也是比較多的,爲了更簡單,更有條理的講解RxJava2線程調度的原理,同時爲了讓你們不至於在源碼中迷失,因此這裏分析源碼按照代碼的執行順序一步步的進行的。由於,代碼執行的時候會在不一樣的類之間來回切換,因此,你們會發現分析的時候在各個類中跳來跳去。

  因爲篇幅的緣由,文章的一些知識沒有詳細的來說解,如Java的線程池,Handler原理等,你們能夠本身查閱相關資料或者留言一塊兒討論,若是發現文中有不對的地方,也歡迎指正。

歡迎關注個人公衆號
掃碼關注公衆號,回覆「獲取資料」有驚喜
相關文章
相關標籤/搜索