RxJava 是如何實現線程切換的(上)

前言

經過前一篇的從觀察者模式出發,聊聊RxJava,咱們大體理解了RxJava的實現原理,在RxJava中能夠很是方便的實現不一樣線程間的切換。subscribeOn 用於指定上游線程,observeOn 用於指定下游線程,屢次用 subscribeOn 指定上游線程只有第一次有效,屢次用 observeOn 指定下次線程,每次都有效;簡直太方便了,比直接使用Handler省了很多力氣,同時也不用去關注內存泄漏的問題了。本篇就來看看在RxJava中上游是如何實現線程切換。java

RxJava 基礎原理

爲了方便後面的敘述,這裏經過下面的UML圖簡單回顧一下上一篇的內容。網絡

此圖並無完整的展示圖中各個接口和類之間的各類關係,由於那樣會致使整個圖錯綜複雜,不便於查看,這裏只繪製出了RxJava各個類之間核心關係網絡app

從上面的UML圖中能夠看出,具體的實現類只有ObservableCreate和CreateEmitter。CreateEmitter是ObservableCreate的內部類(PlantUML 怎麼繪製內部類,沒搞懂,玩的轉的同窗請賜教呀(^▽^))。ide

上篇說過Observable建立的過程,能夠簡化以下:函數

Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
複製代碼

結合圖能夠更直觀的體現出這一點。ObservableCreate 內部持有ObservableOnSubscribe的引用。工具

當觀察者訂閱主題後:oop

mObservable.subscribe(mObserver);
複製代碼

ObservableCreate 中的subscribeActual()方法就會執行,post

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

複製代碼

在這個過程當中會建立CreateEmitter 的實例,而這個CreateEmitter實現了Emitter和Disposable接口,同時又持有Observer的引用(固然這個引用是ObservableCreate傳遞給他的)。接着就會執行ObservableOnSubscribe的subscribe 方法,方法的參數即爲剛剛建立的CreateEmitter 的實例,接着一系列連鎖反應,Emitter 接口中的方法(onNext,onComplete等)開始執行,在CreateEmitter內部,Observer接口中對應的方法依次執行,這樣就實現了一次從主題(上游)到觀察者(下游)的事件傳遞。ui

source.subscribe(parent)this

這裏的 source 是ObservableOnSubscribe的實例,parent是CreateEmitter的實例。上面加粗文本敘述的內容,就是這行代碼,能夠說這是整個訂閱過程最核心的實現。

好了,回顧完基礎知識後,立刻進入正題,看看RxJava是如何實現線程切換的。

RxJava 之 subscribeOn

咱們知道正常狀況下,全部的內容都是在主線程執行,既然這裏提到了線程切換,那麼必然是切換到了子線程,所以,這裏須要關注線程的問題,咱們就帶着下面這幾個問題去閱讀代碼。

  • 1.是哪一個對象在何時建立了子線程,是一種怎樣的方式建立的?
  • 2.子線程又是如何啓動的?
  • 3.上游事件是怎麼跑到子線程裏執行的?
  • 4.屢次用 subscribeOn 指定上游線程爲何只有第一次有效 ?

示例

首先看一下,平常開發中實現線程切換的具體實現

private void multiThread() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("This msg from work thread :" + Thread.currentThread().getName());
                sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: s= " + s);
                    }
                });
    }

複製代碼

這段代碼,使用過RxJava的同窗再熟悉不過了,上游事件會在一個名爲 RxNewThreadScheduler-1 的線程執行,下游線程會切換回咱們熟悉的Android UI線程。

咱們就從subscribeOn(Schedulers.newThread()) 出發,看看這個代碼的背後,到底發生了什麼。

subscribeOn

這裏咱們先無論Schedulers.newThread() 是什麼鬼,首先看看這個subscribeOn()方法。

Observable.java--- subscribeOn(Scheduler scheduler)

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
複製代碼

能夠看到,這個方法須要一個Scheduler 類型的參數。

RxJavaPlugins.java--- onAssembly(@NonNull Observable source)

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
複製代碼

O(∩_∩)O哈哈~,是否是以爲似曾相識,和create操做符一個套路呀。所以,observeOn也能夠簡化以下:

new ObservableSubscribeOn<T>(this, Schedulers.newThread());
複製代碼

這裏你也許會有疑問,這個this是什麼呢?其實這個this就是Observable,具體到上面的代碼來講就是ObservableCreate,總之就是一個具體的Observable。

接着看ObservableSubscribeOn 這個類

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
}

複製代碼

看一下 AbstractObservableWithUpstream.java

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}
複製代碼

再看一下 HasUpstreamObservableSource.java

/** * Interface indicating the implementor has an upstream ObservableSource-like source available * via {@link #source()} method. * * @param <T> the value type */
public interface HasUpstreamObservableSource<T> {
    /** * Returns the upstream source of this Observable. * <p>Allows discovering the chain of observables. * @return the source ObservableSource */
    ObservableSource<T> source();
}
複製代碼

饒了半天,ObservableSubscribeOn 原來和上一篇說的ObservableCreate同樣,也是Observable的一個子類。只不過比ObservableCreate多實現了一個接口HasUpstreamObservableSource,這個接口頗有意思,他的source()方法返回類型是ObservableSource(還記得這個類的角色嗎?)。也就是說ObservableSubscribeOn這個Observable是一個擁有上游的Observable。他有一個很是關鍵的屬性source,這個source就表明了他的上游。

咱們接着看ObservableSubscribeOn的具體實現。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
		// observer 調用onSubscribe方法,獲取上游的控制權
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}
複製代碼
  • 首先看他的構造函數,參數source就是咱們以前提到過的this,scheduler就是Schedulers.newThread()。同時調用了父類AbstractObservableWithUpstream的構造函數,這裏結合以前的結論,咱們能夠肯定經過這個構造函數,就建立出來了一個包含上游的ObservableSubscribeOn實例。
  • 再看實現訂閱關係的關鍵方法subscribeActual(),在這裏建立了一個SubscribeOnObserver的實例,SubscribeOnObserver 是AtomicReference的子類(保證原子性),同時實現了 Observer接口 和 Disposable 接口;你能夠把他理解成一個Observer。

咱們以前說過,subscribeActual()是實現上下游之間訂閱關係的重要方法。由於只有真正實現了訂閱關係,上下游之間才能鏈接起來。咱們看這個方法的最後一句代碼。

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼

這句代碼,能夠說就是很是關鍵,由於從這裏開始了一系列的連鎖反應。首先看一下SubscribeTask

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
複製代碼

看到這句 source.subscribe(parent),是否是以爲似曾相識呢?

SubscribeTask 實現了是Runnable接口,在其run方法中,定義了一個須要在線程中執行的任務。按照類的繼承關係,很明顯source 就是ObservableSubscribeOn 的上游Observable,parent是一個Observer。也就是說這個run方法要執行的內容就是實現ObservableSubscribeOn的上游和Observer的訂閱。一旦某個線程執行了這個Runnable(SubscribeTask),就會觸發了這個run方法,從而實現訂閱,而一旦這個訂閱實現,那麼後面的流程就是上節所說的事情了。

這裏能夠解答第三個問題了,上游事件是怎麼給弄到子線程裏去的,這裏很明顯了,就是直接把訂閱方法放在了一個Runnable中去執行,這樣就一旦這個Runnable在某個子線程執行,那麼上游全部事件只能在這個子線程中執行了。

好了,線程要執行的任務彷佛建立完了,下面就接着找看看子線程是怎麼建立的。回過頭繼續看剛纔的方法,

scheduler.scheduleDirect(new SubscribeTask(parent))
複製代碼

Scheduler.java----scheduleDirect

public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }


    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
		// 對run進行了一次裝飾
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

	@NonNull
	// 抽象方法
    public abstract Worker createWorker();
複製代碼

首先看一下Worker類

/** * Sequential Scheduler for executing actions on a single thread or event loop. * <p> * Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup. */
    public abstract static class Worker implements Disposable {
  
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

  
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

        
    }

複製代碼

Worker是Scheduler內部的一個靜態抽象類,實現了Disposable接口,其schedule()方法也是抽象的。

再看一下DisposeTask

static final class DisposeTask implements Runnable, Disposable {
        final Runnable decoratedRun;
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }
    }

複製代碼

DisposeTask 又是一個Runnable,同時也實現了Disposable接口。能夠看到在他的run方法中會執行decoratedRun的run方法,這個decoratedRun其實就是參數中傳遞進來的run,也就是說,執行了這個DisposeTask的run方法,就會觸發SubscribeTask中的run方法,所以,咱們就要關注是誰執行了這個DisposeTask。

回到scheduleDirect()方法

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
		// 對run進行了一次裝飾
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
複製代碼

scheduleDirect()方法的實現咱們總結一下:

  1. 建立一個Worker對象w,而在Scheduler類中createWorker()方法被定義爲抽象方法,所以咱們須要去Scheduler的具體實現中瞭解這個Worker的具體實現。
  2. 對參數run經過RxJavaPlugins進行一次裝飾,生成一個decoratedRun的Runnable(經過源碼能夠發現,其實什麼也沒幹,就是原樣返回)
  3. 經過decoratedRun和w生成一個DisposeTask對象task
  4. 經過Worker的schedule方法開始執行這個task。

ε=(´ο`*)))唉,說了這麼久,子線程是如何建立的依然不清楚,不管是SubscribeTask仍是DisposeTask只是定義會在某個子線程中執行的任務,並不表明子線程已被建立。可是經過以上代碼,咱們也能夠收穫一些有價值的結論:

  • 最終的Runnable任務,將由某個具體的Worker對象的scheduler()方法執行。
  • 這個scheduleDirect會返回一個Disposable對象,這樣咱們就能夠經過Observer去控制整個上游的執行了。

好了,到這裏對於subscribeOn()方法的分析已經到了盡頭,咱們找了最終須要運行子任務的對象Worker,而這個Worker是個抽象類,所以咱們須要關注Worker的具體實現了。

下面咱們就從剛纔丟下的Schedulers.newThread() 換個角度來分析,看看能不能找到這個Worker的具體實現。

Schedulers.newThread()

前面說了subscribeOn()方法須要一個Scheduler 類型的參數,然而經過前面的分析咱們知道Scheduler是個抽象類,是沒法被實例化的。所以,這裏就從Schedulers類出發。

/** * Static factory methods for returning standard Scheduler instances. */
public final class Schedulers {
}
複製代碼

註釋很清楚,這個Schedulers就是一個用於生成Scheduler實例的靜態工廠。

下面咱們就來看看,在這個工廠中newThread() 生成了一個什麼樣的Scheduler實例。

@NonNull
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

	NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }
複製代碼

newThread() 方法通過層層委託處理(最終的建立方式,有點單例模式的意味),最終咱們須要的就是一個NewThreadScheduler的實例。

NewThreadScheduler.java

public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}
複製代碼

不出所料NewThreadScheduler 是Scheduler的一個子類,在他的靜態代碼塊中構造了一個Priority=5的線程工廠。而在咱們最最關注的createWorker()方法中他又用這個線程工廠建立了一個NewThreadWorker 的實例。下面就讓咱們看看最終的NewThreadWorker 作了些什麼工做。

NewThreadWorker.java(節選關鍵內容)

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    

    @Override
    public void dispose() {
        if (!disposed) {
            disposed = true;
            executor.shutdownNow();
        }
    }

}
複製代碼

衆裏尋他千百度,終於找到了Worker的實現了,同時再一次不出所料的又一次實現了Disposable接口,o(╥﹏╥)o。

在其構造函數中,經過NewThreadScheduler中提供的線程工廠threadFactory建立了一個ScheduledExecutorService。

ScheduledExecutorService.java ---create

public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }
複製代碼

用大名鼎鼎的Executors(Executor的工具類),建立了一個核心線程爲1的線程。

至此,咱們終於找到了第一個問題的答案,子線程是誰如何建立的;在NewThreadScheduler的createWorker()方法中,經過其構建好的線程工廠,在Worker實現類的構造函數中建立了一個ScheduledExecutorService的實例,是經過SchedulerPoolFactory建立的。

同時能夠看到,經過執行dispose 方法,可使用ScheduledExecutorService的shutdown()方法,中止線程的執行。

線程已經建立好了,下面就來看看究竟是誰啓動了這個線程。前面咱們說過,Worker的schedule()方法若是執行了,就會執行咱們定義好的Runnable,經過這個Runnable中run方法的執行,就能夠實現上下游訂閱關係。下面就來看看這個scheduler()方法。

@NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
複製代碼

到這裏,已經很明顯了,在schedulerActual方法中,會經過剛纔建立好的子線程對象executor經過submit或schedule執行一個Runnable任務(雖然這個Runnable對象再一次通過了各類裝飾和包裝,但其本質沒有發生變化),並將執行結果封裝後返回。而這個Runnable對象追根溯源來講,就是咱們在ObservableSubscribeOn類中建立的一個SubscribeTask對象。所以,當這個子線程開始運行的時候就是執行SubscribeTask中run()方法的時機;一旦這個run方法執行,那麼

source.subscribe(parent)
複製代碼

這句最關鍵的代碼就開始執行了,一切的一切又回到了咱們上一篇那熟悉的流程了。

好了,按照上面的流程捋下來,感受仍是有點分散,那麼就用UML圖看看總體的結構。

咱們看最下面的ObservableSubscribeOn,他是subscribeOn 返回的Observable對象,他持有一個Scheduler 實例的引用,而這個Scheduler實例就是NewThreadScheduler(即Schedulers.newThreade())的一個實例。ObservableSubscribeOn 的subscribeActual方法,會觸發NewThreadScheduler去執行SubscribeTask中定義的任務,而這個具體的任務又將由Worker類建立的子線程去執行。這樣就把上游事件放到了一個子線程中實現。

至於最後一個問題,屢次用 subscribeOn 指定上游線程爲何只有第一次有效?,看完通篇其實也很好理解了,由於上游Observable只有一個任務,就是subscribe(準確的來講是subscribeActual()),而subscribeOn 要作的事情就是把上游任務切換到一個指定線程裏,那麼一旦被切換到了某個指定的線程裏,後面的切換不就是沒有意義了嗎。

好了,至此上游事件切換到子線程的過程咱們就明白了。下游事件又是如何切換的且聽下回分解,原本想一篇寫完的,結果發現越寫越多,只能分紅兩篇了!!!o(╯□╰)o。

寫在後面的話

關於Disposable

在RxJava的分析中,咱們常常會遇到Disposable這個單詞,確切的說是接口,這裏簡單說一說這個接口。

/** * Represents a disposable resource. */
public interface Disposable {
    void dispose();
    boolean isDisposed();
}
複製代碼

咱們知道,在Java中,類實現某個接口,通俗來講就是表明這個類多了一項功能,好比一個類實現Serializable接口,表明這個類是能夠序列化的。這裏Disposable也是表明一種能力,這個能力就是Disposable,就是表明一次性的,用後就丟棄的,好比一次性筷子,還有那啥。

在RxJava中不少類都實現了這個接口,這個接口有兩個方法,isDisposed()顧名思義返回當前類是否被拋棄,dispose()就是主動拋棄。所以,全部實現了這個接口的類,都擁有了這樣一種能力,就是能夠判斷本身是否被拋棄,同時也能夠主動拋棄本身。

上一篇咱們說了,Observer經過onSubscribe(@NonNull Disposable d),會得到一個Disposable,這樣就有能力控制上游的事件發送了。這樣,咱們就不難理解,爲何那麼多類實現了這個接口,由於下游獲取到的是一個擁有Disposable的對象,而一旦擁有了一個這樣的對象,那麼就能夠經過下游控制上游了。能夠說,這是RxJava對常規的觀察者模式所作的最給力的改變。

關於各類ObservableXXX ,subscribeXXX,ObserverXXX

在查看RxJava的源碼時,可能不少人都和我同樣,有一個巨大的困擾,就是這些類的名字好他媽難記,感受長得都差很少,關鍵念起來好像也差很少。但其實本質上來講,RxJava對類的命名仍是很是規範的,只是咱們不太習慣而已。按照英文單詞翻譯:

  • Observable 可觀察的
  • Observer 觀察者
  • Subscribe 訂閱

其實就這麼三個主語,其餘的什麼ObservableCreate,ObservableSubscribeOn,AbstractObservableWithUpstream,還有上面提到的Disposable,都是對各類各樣的Observable和Observer的變形和修飾結果,只要理解這個類的核心含義是什麼,就不會被這些名字搞暈了。

RxJava 能夠說是博大精深,以上全部分析徹底是我的平時使用時的總結與感悟,有任何錯誤之處,還望各位讀者提出,共同進步。

關於RxJava 這裏牆裂推薦一篇文章一篇不太同樣的RxJava介紹,感受是自扔物線那篇以後,對RxJava思想感悟最深的一篇了。對RxJava 有興趣的同窗,能夠多度幾遍,每次都會有收穫!!

相關文章
相關標籤/搜索