RxJava2.x 從源碼分析原理

RxJava 相信各位已經使用了好久,但大部分人在剛學習 RxJava 感嘆切換線程的方便,調用邏輯清晰的同時,並不知道其中的原理,主要是靠記住運行的順序。 隨着咱們設計出的 RxJava流 愈來愈複雜,一些複雜的問題並不能靠着記住的運行順序就能解決。
下面,就經過最經常使用的操做符的源碼來看看所謂的是什麼運行的。java

首先咱們用Single舉例,設計一個最基本的 RxJava 流,只有一個 Observable(ColdObservable)Obsever設計模式

Disposable disposable = Single.just("wtf")
              			.subscribe(it -> Log.i("subscribe", it));
複製代碼

上游發送一個"wtf" ,下游接受時將其打印出來。上游發送端使用 Single.just 做爲建立方法, 看一下 just() 方法裏作了什麼。bash

public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }
    
    public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
    Function<? super Single, ? extends Single> f = onSingleAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
複製代碼

其中 ObjectHelper.requireNonNull 只是空檢查。
RxJavaPlugins.onAssembly 方法,這個方法其實就是經過一個全局的變量 onSingleAssembly 來對方法進行 Hook ,這一系列xxxAssembly全局變量默認爲空,因此實際上當咱們沒有設置的時候其實 just 方法是直接返回了一個 新實例化的SingleJust對象。app

再看看SingleJust內部:ide

public final class SingleJust<T> extends Single<T> {

    final T value;
    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

複製代碼

實例化的時候只是將值保存了下來,沒有其它操做。
下一步調用subscribe()來啓動這個流(ColdObservable),而後看看subscribe中作了什麼:函數

public final void subscribe(SingleObserver<? super T> subscriber) {
        ObjectHelper.requireNonNull(subscriber, "subscriber is null");
        subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");

        try {
 	         //核心邏輯
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }
複製代碼

一樣 RxJavaPlugins.onSubscribe 默認沒有做用,實際的核心邏輯是調用了subscribeActual(SingleObserver)
對於咱們上面設計的流,則是調用了 SingleJust 中的 subscribeActual(SingleObserver)oop

回顧上面 SingleJustsubscribeActual(SingleObserver) 的實現:學習

observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
複製代碼

獲得兩個信息ui

  • 首先調用下游觀察者 SingleObserverOnSubscribe 方法並傳遞用於取消操做的 Disposable
  • 調用OnSuccess 方法並傳遞以前保存下來的 value

Map 操做符

如今咱們加入一個經常使用且重要的Map操做到流中this

Disposable disposable = Single.just("wtf")
				 .map(it-> 0)
                 .subscribe(it -> Log.i("subscribe", String.of(it)));
複製代碼

上面這個流包括了三種典型的操做 建立Creation 操做符Transformation和 訂閱Subscribe

依然先檢查map() 方法,能夠看到其中實例化了一個SingleMap

public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
    }
複製代碼

再看看 SingleMap

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;
    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;
        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}
複製代碼

類中信息稍微複雜一些:

  1. 首先咱們關注在SingleMap實例化的時候也是隻作了保存數據的操做,而沒有實際邏輯:將流的上游保存爲 source 將數據轉換的方法保存爲 mapper
  2. 第二步咱們知道下游觀察者 SingleObserver 會調用核心邏輯 subscribeActual方法來啓動流
  3. 在這裏的subscribeActual方法中能夠看到幾個重要的信息
    • MapSingleObserver是一個觀察者
    • MapSingleObserver 保存了下游的觀察者 SingleObserver 以及 mapper
    • 上游 sourceMapSingleObserver 訂閱

由此能夠看出在SingleMap被下游觀察者訂閱了以後,實例化了一個新的觀察者MapSingleObserver並保存下游觀察者SingleObserver的信息,再去訂閱上游SingleJust
這種模式建立了一個裝飾類,用來包裝原有的類,並在保持類方法簽名完整性的前提下,提供了額外的功能的設計模式稱爲裝飾者模式

總結上面的執行順序:

  1. Rx流的最後一步調用 subscribe啓動流(ColdObservable)
  2. 首先執行SingleMap中的subscribeActual方法,其中包括生成新的MapSingleObserver並訂閱 SingleJust
  3. 執行SingleJust中的subscribeActual:調用下游MapSingleObserveronSubscribe onSuccess方法
  4. MapSingleObserver中的onSubsribe``onSuccess方法也很簡單,分別調用下游 ObserveronSubsribe``onSuccess(異常時 onError)方法

observeOn 操做符

Rxjava首先被你們津津樂道之處是能夠方便的切換線程,避免Callback Hell,如今來看看線程切換操做符。
咱們加入線程切換操做符 observeOn

Disposable disposable = Single.just("wtf")
				 .map(it-> 0)
				 .observeOn(Schedulers.io())
                 .subscribe(it -> Log.i("subscribe", String.of(it)));
複製代碼

一樣的,在 observeOn方法中實例化了一個SingleObserveOn

public final Single<T> observeOn(final Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
    }
複製代碼

繼續看SingleObserveOn類中信息

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;
    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> actual;
        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.actual = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                actual.onError(ex);
            } else {
                actual.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}


複製代碼

相似的

  • 構造函數中保存了上游和線程切換的信息
  • subscribeActual 實例化了一個新的觀察者ObserveOnSingleObserver

不一樣的

  • ObserveOnSingleObserver 還繼承了AtomicReference<Disposable>、實現了Disposable``Runnable接口
  • onSuccess``onError中都沒有直接調用下游的onSuccess``onError方法,而是調用了Disposable d = scheduler.scheduleDirect(this);來執行run方法中的邏輯,而run方法中的邏輯則是調用下游的onSuccess``onError方法

查看schedulerDirect內部信息

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }
複製代碼

建立了一個對應線程的Worker和一個可用於取消的DisposeTask並執行,對於IoScheduler則是建立了EventLoopWorker,再看看EventLoopWorker中的信息。

@Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
複製代碼
static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 複製代碼

EventLoopWorker中則是維護了一套包含相應的線程池、可取消的CompositeDisposable、以及用於運行RunableThreadWorker。總的來講就是一套能夠在相應線程運行且可取消的類和邏輯。

  • 上面則解釋了爲何observeOn能夠切換下游的線程(onSuccess``onError)
  • 一樣解釋了爲何不會改變onSubsribe的調用線程,由於能夠看到onSubscribe方法中直接調用了下游的onSucsribe,並無受到線程切換的影響。

SubscribeOn

如今設計兩個Rx流

Disposable disposable = Single.just("wtf")
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
                 .subscribe(it -> Log.i("subscribe", 4);
複製代碼
Disposable disposable2 = Single.just("wtf")
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
				 .subscribeOn(Schedulers.io())
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
				 .doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
                 .subscribe(it -> Log.i("subscribe", 4);
複製代碼

你可能已經知道並記住了兩個流的打印的順序分別是 01234``23014,可是爲何doOnSubsribe方法和RxJava1中調用順序徹底不同,爲何經過subscribeOn切換線程會影響執行順序?

先找到 SingleSubscribeOn

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;
    final Scheduler scheduler;
    
    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
        //直接調用下游 onSubscribe
        s.onSubscribe(parent);
        //再執行訂閱上游的方法
        Disposable f = scheduler.scheduleDirect(parent);
        parent.task.replace(f);
    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;
        final SingleObserver<? super T> actual;
        final SequentialDisposable task;
        final SingleSource<? extends T> source;
        
        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.actual = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
        	  //沒有繼續調用下游的 onSubscribe 方法
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            actual.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);
        }
    }

}
複製代碼

一樣的直接看subscribeActual方法及onSubscribe方法,和以前的操做符的邏輯區別很大:

  • SubscribeOnObserver一樣還繼承了AtomicReference<Disposable>,實現了Disposable``Runnable接口
  • 並無直接調用subscribe訂閱上游,而是執行了其它操做符在 onSubscribe中訂閱下游的操做
  • 而後再結合Disposable f = scheduler.scheduleDirect(parent);run方法能夠知道在新的線程中執行了訂閱上游的操做 source.subscribe(this);
  • onSubsribe中並無再繼續調用下游的 onSubsribe

綜合起來能夠知道,原本應該在整個流從下至上訂閱完成後按照從上至下的順序執行 onSubscribe的流,在使用subsribeOn操做符的後,在訂閱的時(執行subscribeActual),就開始執行下游的onSubscribe且在當前線程!而後纔在指定的io線程執行之下而上的操做,這也是爲何subsribeOn影響的是上游的線程。

小結:

我認爲實際上 Rx 使用了不少優秀的設計將咱們各類經常使用的操做進行了封裝,讓咱們自由組合使用,其自己並無用什麼黑科技。例如切換線程本質上則是幫咱們啓用了一個新的線程並把接下來的代碼放進去執行。 固然,其中還有不少更深刻的內容須要咱們繼續發現和學習。

相關文章
相關標籤/搜索