RxJava2源碼分析——線程切換

本文章主要是對RxJava2線程切換流程進行源碼分析,在閱讀以前,能夠先閱讀如下文章:java

RxJava2源碼分析——訂閱react

本文章用的RxJavaRxAndroid版本以下:android

implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
複製代碼

咱們先寫段示例代碼,代碼以下:git

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan");
    emitter.onNext("Jia");
    emitter.onNext("Jun");
    emitter.onComplete();
  
    Log.i("TanJiaJun", "subscribe方法所在的線程:" + Thread.currentThread().getName());
})
        // 切換上游Observable到io線程
        .subscribeOn(Schedulers.io())
        // 切換下游Observer到主線程,使用AndroidSchedulers.mainThread須要使用RxAndroid這個庫
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("TanJiaJun", "onSubscribe方法所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", "onNext方法所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TanJiaJun", "onError所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                Log.i("TanJiaJun", "onComplete方法所在的線程:" + Thread.currentThread().getName());
            }
        });
複製代碼

源碼分析

首先咱們看下Schedulers這個類。github

Schedulers

閱讀源碼後,咱們能夠得知,總共有5種類型。框架

computation

@NonNull
public static Scheduler computation() {
    return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
複製代碼

該方法返回一個默認、共享的調度器實例用於計算工做,這能夠用於事件循環處理回調其餘計算工做異步

io

@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
複製代碼

該方法返回一個默認、共享的調度器實例用於IO綁定的工做,這能夠用於異步執行阻塞IO,默認是由單線程實例池實現的,能夠重用已經啓動的線程,要注意的是,這個調度器的線程數量可能會無限制增加,從而致使內存溢出(OOM)ide

trampoline

@NonNull
public static Scheduler trampoline() {
    return TRAMPOLINE;
}
複製代碼

該方法返回一個默認、共享的調度器實例,用於隊列工做,並以FIFO方式在一個參與線程中執行它們,也就是說會等到當前線程執行完畢纔會執行下個線程。oop

newThread

÷@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
複製代碼

該方法返回一個默認、共享的調度器實例,該實例爲每一個工做單元建立一個新線程,默認實現是建立一個新的單線程,要注意的是,每次調用Scheduler.scheduleDirect方法(及其重載方法)和Scheduler.createWorker方法均可以建立數目無限制的線程,從而形成內存溢出(OOM)源碼分析

single

@NonNull
public static Scheduler single() {
    return RxJavaPlugins.onSingleScheduler(SINGLE);
}
複製代碼

該方法返回一個默認、共享的調度器實例,該實例會建立一個單獨的線程。

負責線程切換有兩個方法:subscribeOnobserveOn

subscribeOn

這個方法負責切換上游Observable的線程,代碼以下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼

根據上篇文章閱讀subscribe方法源碼的經驗,咱們只看ObservableSubscribeOn類就能夠了,要注意的點我都寫上註釋了,代碼以下:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        // source是上游Observable
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 建立SubscribeOnObserver對象,傳入下游Observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        // 建立SubscribeTask任務,使用指定的調度器進行調度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        // 省略部分代碼
    }

    // SubscribeTask繼承Runnable,因此咱們能夠看下它的run方法
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            // 這裏已經切換到想要的線程了,source是上游Observable,調用它的subscribe方法,而且傳入下游observer,根據上篇文章的經驗,上游Observable的subscribeActual方法會被執行
            source.subscribe(parent);
        }
    }
}
複製代碼

咱們的示例代碼中調用subscribeOn方法傳入的是Schedulers.io(),看下這個方法對應的源碼,代碼以下:

// Schedulers.java
@NonNull
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
複製代碼

IO是一個final的靜態變量,它是經過Schedulers這個類的靜態代碼塊賦值的,代碼以下:

static {
    // 省略部分代碼

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    // 省略部分代碼
}
複製代碼

它會建立一個IOTask對象,代碼以下:

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
複製代碼

這個類實現了Callable接口,而且重寫了call方法,返回IoHolder.DEFAULT,代碼以下:

// DEFAULT是final的靜態類IoHolder裏的final的靜態變量
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
複製代碼

咱們看到這裏建立了一個IoScheduler對象,代碼以下:

// IoScheduler.java
static final RxThreadFactory WORKER_THREAD_FACTORY;

static {
        // 省略部分代碼

        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_IO_PRIORITY, Thread.NORM_PRIORITY)));

        // RxThreadFactory是一個線程工廠,能夠刪除對new Thread調用的硬鏈接
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

        // 省略部分代碼
        // 建立CachedWorkerPool對象,第二個參數是傳入TimeUnit,若是是null的話,是不會建立線程池的,下面會講到
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        NONE.shutdown();
    }

// IoScheduler的構造方法
public IoScheduler() {
    // 這裏會調用下面那個方法
    this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
    // 賦值給成員變量threadFactory
    this.threadFactory = threadFactory;
    // 用CachedWorkerPool建立一個原子引用
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    // 調用start方法
    start();
}

@Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    // compareAndSet方法第一個參數是預期值,第二個參數是新值,若是NONE==update的話,就會將值原子性地設置會更新值,而且返回true,不然不會更新,而且返回false,而後調用shutdown方法
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

static final class CachedWorkerPool implements Runnable {
    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
        this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
        this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
        this.allWorkers = new CompositeDisposable();
        this.threadFactory = threadFactory;

        ScheduledExecutorService evictor = null;
        Future<?> task = null;
        if (unit != null) {
            // 當unit不是null的話,就會建立一個newScheduledThreadPool線程池
            evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
            task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
        }
        evictorService = evictor;
        evictorTask = task;
    }
}
複製代碼

咱們再回到上面說的ObservableSubscribeOn類,看到以下這段代碼:

// ObservableSubscribeOn.java
@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    // 調用了scheduler的scheduleDirect方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
複製代碼

咱們再看下scheduleDirect方法,代碼以下:

// Scheduler.java
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 調用createWorker方法,createWorker是個抽象方法,剛纔咱們所說的IoScheduler是Scheduler的實現類,它重寫了createWorker方法
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    // DisposeTask實現了Runnable接口
    DisposeTask task = new DisposeTask(decoratedRun, w);

    // 調用worker的scheduler方法
    w.schedule(task, delay, unit);

    return task;
}
複製代碼

咱們再看下createWorker方法,代碼以下:

// IoScheduler.java
@NonNull
@Override
public Worker createWorker() {
    // 建立EventLooperWork,而且傳入從原子引用獲得的當前的值
    return new EventLoopWorker(pool.get());
}
複製代碼

EventLoopWorkerIoScheduler的一個final靜態內部類,繼承Scheduler.Worker,代碼以下:

// IoScheduler.java
static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

    @Override
    public void dispose() {
        if (once.compareAndSet(false, true)) {
            tasks.dispose();

            // releasing the pool should be the last action
            pool.release(threadWorker);
        }
    }

    @Override
    public boolean isDisposed() {
        return once.get();
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        // 若是已經取消訂閱了,就再也不調度
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }

        // 調用了ThreadWorker的scheduleActual方法
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

// ThreadWorker繼承NewThreadWorker
static final class ThreadWorker extends NewThreadWorker {
    private long expirationTime;

    ThreadWorker(ThreadFactory threadFactory) {
        super(threadFactory);
        this.expirationTime = 0L;
    }

    public long getExpirationTime() {
        return expirationTime;
    }

    public void setExpirationTime(long expirationTime) {
        this.expirationTime = expirationTime;
    }
}
複製代碼

咱們看下NewThreadWorkerscheduleActual方法,代碼以下:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    // ScheduledExecutorService是個接口,繼承ExecutorService接口
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        // 調用SchedulerPoolFactory的create方法,建立線程池
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    // 省略部分代碼

    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future<?> f;
            // executor.submit和excutor.schedule其實最後會調用同一個方法,執行這個方法後任務就提交上去了
            if (delayTime <= 0L) {
                // 若是不須要延遲就調用submit方法,提交一個有返回結果的任務
                f = executor.submit(task);
            } else {
                // 若是須要延遲就調用schedule方法,提交一個有返回結果的任務
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }

    // 省略部分代碼
}
複製代碼

到這裏,上游Observable的代碼就會被切換到對應的線程了,咱們這裏是拿**Schedulers.io()**做爲例子來說解,其餘類型你們能夠本身看下源碼。

結論:訂閱事件從下往上傳遞的,最終傳遞到上游Observablesubscribe方法。

observeOn

這個方法負責切換下游Observer的線程,代碼以下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    // 調用下面那個方法
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼

也像以前那樣,咱們只須要看ObservableObserveOn這個方法,代碼以下:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        // source是上游Observable
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 判斷要指定的調度器是否是TrampolineScheduler,也就是是否是傳入Schedulers.trampoline()
        if (scheduler instanceof TrampolineScheduler) {
            // 若是是,就直接調用subscribe方法,由於TrampolineScheduler是在當前線程調度的,上面也說起過
            source.subscribe(observer);
        } else {
            // 若是不是,就經過調度器建立worker,而後調用subscribe方法傳入建立的ObserveOnObserver對象
            Scheduler.Worker w = scheduler.createWorker();

            // 與subscribeOn不一樣,subscribe方法不是在已經切換好的線程中執行
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

    // ObserveOnObserver是一個final的靜態內部類,實現了Runnable接口,因此咱們看下它的run方法
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        // 省略部分代碼

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                // 若是checkTerminated方法返回false就會return
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    // 最後調用下游Observer的onNext方法
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        // 省略部分代碼

        @Override
        public void run() {
            // 到這裏已經切換到想要的線程了,outputFused變量是經過requestFusion設置的
            if (outputFused) {
                drainFused();
            } else {
                // 咱們主要看這個方法
                drainNormal();
            }
        }

        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                // delayError在咱們調用的observeOn方法中是傳入false的
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        queue.clear();
                        // 若是Throwable不是null的話,就會調用下游Observer的onError方法
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        // 若是任務隊列是空的話,證實任務執行完畢,就會調用下游Observer的onComplete方法
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

        // 這個方法和背壓(Backpressure)有關係,不是本文章的主要內容,暫時不討論
        @Override
        public int requestFusion(int mode) {
            if ((mode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }

        // 省略部分代碼
    }
}
複製代碼

結論:觀察事件從上往下傳遞的,最終傳遞到下游Observer的回調方法,例如:onNext方法、onComplete方法、onError方法,注意onSubscribe方法所在的線程是當前的線程,不會隨着訂閱線程或者觀察線程的切換而改變。

屢次調用subscribeOn方法,切換訂閱線程

咱們試下屢次調用subscribeOn方法,把示例代碼改爲以下:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan");
    emitter.onNext("Jia");
    emitter.onNext("Jun");
    emitter.onComplete();

    Log.i("TanJiaJun", "subscribe方法所在的線程:" + Thread.currentThread().getName());
})
        .subscribeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("TanJiaJun", "onSubscribe方法所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", "onNext方法所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TanJiaJun", "onError所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                Log.i("TanJiaJun", "onComplete方法所在的線程:" + Thread.currentThread().getName());
            }
        });
複製代碼

Log以下:

subscribeOnLog.png

根據以前的源碼分析,其實它像以下代碼:

new Thread("AndroidSchedulers.mainThread()") {
    @Override
    public void run() {
        new Thread("Schedulers.io()") {
            @Override
            public void run() {
                System.out.println("上游Observable的subscribe方法所在的線程:" + getName());
            }
        }.start();
    }
}.start();
複製代碼

Log以下:

subscribeOnDemoLog.png

結論:若是咱們屢次調用subscribeOn方法,切換訂閱線程的話,上游Observablesubscribe方法所在的線程只會是在第一次切換的線程,上面也提到過了,由於訂閱事件從下往上傳遞的,最終傳遞到上游Observablesubscribe方法。

屢次調用observeOn方法,切換觀察線程

咱們試下屢次調用obsesrveOn方法,把示例代碼改爲以下:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan");
    emitter.onNext("Jia");
    emitter.onNext("Jun");
    emitter.onComplete();

    Log.i("TanJiaJun", "subscribe方法所在的線程:" + Thread.currentThread().getName());
})
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("TanJiaJun", "onSubscribe方法所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", "onNext方法所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TanJiaJun", "onError所在的線程:" + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                Log.i("TanJiaJun", "onComplete方法所在的線程:" + Thread.currentThread().getName());
            }
        });
複製代碼

Log以下:

observeOnLog.png

根據以前的源碼分析,其實它像以下代碼:

new Thread("AndroidSchedulers.mainThread()") {
    @Override
    public void run() {
        new Thread("Schedulers.io()") {
            @Override
            public void run() {
                System.out.println("下游Observer的回調方法所在的線程:" + getName());
            }
        }.start();
    }
}.start();
複製代碼

Log以下:

observeOnDemoLog.png

結論:若是咱們屢次調用observeOn方法,切換觀察線程的話,下游Observer的回調方法,例如:onNext方法、onComplete方法、onError方法,它們所在的線程會隨着每次切換而切換,由於觀察事件從上往下傳遞的,最終傳遞到下游Observer的回調方法。

Demo:RxJavaDemo

個人GitHub:TanJiaJunBeyond

Android通用框架:Android通用框架(Kotlin-MVVM)

個人掘金:譚嘉俊

個人簡書:譚嘉俊

相關文章
相關標籤/搜索