RxJava 相信各位已經使用了好久,但大部分人在剛學習 RxJava 感嘆切換線程的方便,調用邏輯清晰的同時,並不知道其中的原理,主要是靠記住運行的順序。 隨着咱們設計出的 RxJava流 愈來愈複雜,一些複雜的問題並不能靠着記住的運行順序就能解決。
下面,就經過最經常使用的操做符的源碼來看看所謂的流
是什麼運行的。java
首先咱們用Single
舉例,設計一個最基本的 RxJava 流,只有一個 Observable(ColdObservable)
和Obsever
:設計模式
Disposable disposable = Single.just("wtf")
.subscribe(it -> Log.i("subscribe", it));
複製代碼
上游發送一個"wtf"
,下游接受時將其打印出來。上游發送端使用 Single.just
做爲建立方法, 看一下 just()
方法裏作了什麼。bash
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "value is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
複製代碼
其中 ObjectHelper.requireNonNull
只是空檢查。
RxJavaPlugins.onAssembly
方法,這個方法其實就是經過一個全局的變量 onSingleAssembly
來對方法進行 Hook ,這一系列xxxAssembly
全局變量默認爲空,因此實際上當咱們沒有設置的時候其實 just
方法是直接返回了一個 新實例化的SingleJust
對象。app
再看看SingleJust
內部:ide
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
複製代碼
實例化的時候只是將值保存了下來,沒有其它操做。
下一步調用subscribe()
來啓動這個流(ColdObservable)
,而後看看subscribe
中作了什麼:函數
public final void subscribe(SingleObserver<? super T> subscriber) {
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
try {
//核心邏輯
subscribeActual(subscriber);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
複製代碼
一樣 RxJavaPlugins.onSubscribe
默認沒有做用,實際的核心邏輯是調用了subscribeActual(SingleObserver)
。
對於咱們上面設計的流,則是調用了 SingleJust 中的 subscribeActual(SingleObserver)
oop
回顧上面 SingleJust
中 subscribeActual(SingleObserver)
的實現:學習
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
複製代碼
獲得兩個信息ui
SingleObserver
的 OnSubscribe
方法並傳遞用於取消操做的 Disposable
OnSuccess
方法並傳遞以前保存下來的 value
如今咱們加入一個經常使用且重要的Map
操做到流中this
Disposable disposable = Single.just("wtf")
.map(it-> 0)
.subscribe(it -> Log.i("subscribe", String.of(it)));
複製代碼
上面這個流包括了三種典型的操做 建立Creation
操做符Transformation
和 訂閱Subscribe
。
依然先檢查map()
方法,能夠看到其中實例化了一個SingleMap
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
複製代碼
再看看 SingleMap
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
複製代碼
類中信息稍微複雜一些:
SingleMap
實例化的時候也是隻作了保存數據的操做,而沒有實際邏輯:將流的上游保存爲 source
將數據轉換的方法保存爲 mapper
SingleObserver
會調用核心邏輯 subscribeActual
方法來啓動流subscribeActual
方法中能夠看到幾個重要的信息
MapSingleObserver
是一個觀察者MapSingleObserver
保存了下游的觀察者 SingleObserver
以及 mapper
source
被 MapSingleObserver
訂閱由此能夠看出在SingleMap
被下游觀察者訂閱了以後,實例化了一個新的觀察者MapSingleObserver
並保存下游觀察者SingleObserver
的信息,再去訂閱上游SingleJust
。
這種模式建立了一個裝飾類,用來包裝原有的類,並在保持類方法簽名完整性的前提下,提供了額外的功能的設計模式稱爲裝飾者模式
。
總結上面的執行順序:
Rx流
的最後一步調用 subscribe
啓動流(ColdObservable)
SingleMap
中的subscribeActual
方法,其中包括生成新的MapSingleObserver
並訂閱 SingleJust
SingleJust
中的subscribeActual
:調用下游MapSingleObserver
的onSubscribe
onSuccess
方法MapSingleObserver
中的onSubsribe``onSuccess
方法也很簡單,分別調用下游 Observer
的 onSubsribe``onSuccess(異常時 onError)
方法Rxjava首先被你們津津樂道之處是能夠方便的切換線程,避免Callback Hell
,如今來看看線程切換操做符。
咱們加入線程切換操做符 observeOn
Disposable disposable = Single.just("wtf")
.map(it-> 0)
.observeOn(Schedulers.io())
.subscribe(it -> Log.i("subscribe", String.of(it)));
複製代碼
一樣的,在 observeOn
方法中實例化了一個SingleObserveOn
public final Single<T> observeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
}
複製代碼
繼續看SingleObserveOn
類中信息
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> actual;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onSuccess(value);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
複製代碼
相似的
subscribeActual
實例化了一個新的觀察者ObserveOnSingleObserver
不一樣的
ObserveOnSingleObserver
還繼承了AtomicReference<Disposable>
、實現了Disposable``Runnable
接口onSuccess``onError
中都沒有直接調用下游的onSuccess``onError
方法,而是調用了Disposable d = scheduler.scheduleDirect(this);
來執行run
方法中的邏輯,而run
方法中的邏輯則是調用下游的onSuccess``onError
方法查看schedulerDirect
內部信息
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
建立了一個對應線程的Worker
和一個可用於取消的DisposeTask
並執行,對於IoScheduler
則是建立了EventLoopWorker
,再看看EventLoopWorker
中的信息。
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
複製代碼
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 複製代碼
EventLoopWorker
中則是維護了一套包含相應的線程池
、可取消的CompositeDisposable
、以及用於運行Runable
的ThreadWorker
。總的來講就是一套能夠在相應線程運行且可取消的類和邏輯。
observeOn
能夠切換下游的線程(onSuccess``onError
)onSubsribe
的調用線程,由於能夠看到onSubscribe
方法中直接調用了下游的onSucsribe
,並無受到線程切換的影響。如今設計兩個Rx流
Disposable disposable = Single.just("wtf")
.doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
.subscribe(it -> Log.i("subscribe", 4);
複製代碼
Disposable disposable2 = Single.just("wtf")
.doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
.subscribeOn(Schedulers.io())
.doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
.subscribe(it -> Log.i("subscribe", 4);
複製代碼
你可能已經知道並記住了兩個流的打印的順序分別是 01234``23014
,可是爲何doOnSubsribe
方法和RxJava1
中調用順序徹底不同,爲何經過subscribeOn
切換線程會影響執行順序?
先找到 SingleSubscribeOn
類
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
//直接調用下游 onSubscribe
s.onSubscribe(parent);
//再執行訂閱上游的方法
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> actual;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.actual = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable d) {
//沒有繼續調用下游的 onSubscribe 方法
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
task.dispose();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public void run() {
source.subscribe(this);
}
}
}
複製代碼
一樣的直接看subscribeActual
方法及onSubscribe
方法,和以前的操做符的邏輯區別很大:
SubscribeOnObserver
一樣還繼承了AtomicReference<Disposable>
,實現了Disposable``Runnable
接口subscribe
訂閱上游,而是執行了其它操做符在 onSubscribe
中訂閱下游的操做Disposable f = scheduler.scheduleDirect(parent);
和run
方法能夠知道在新的線程中執行了訂閱上游的操做 source.subscribe(this);
onSubsribe
中並無再繼續調用下游的 onSubsribe
綜合起來能夠知道,原本應該在整個流從下至上訂閱完成後按照從上至下的順序執行 onSubscribe
的流,在使用subsribeOn
操做符的後,在訂閱的時(執行subscribeActual
),就開始執行下游的onSubscribe
且在當前線程!而後纔在指定的io
線程執行之下而上的操做,這也是爲何subsribeOn
影響的是上游的線程。
我認爲實際上 Rx 使用了不少優秀的設計將咱們各類經常使用的操做進行了封裝,讓咱們自由組合使用,其自己並無用什麼黑科技。例如切換線程本質上則是幫咱們啓用了一個新的線程並把接下來的代碼放進去執行。 固然,其中還有不少更深刻的內容須要咱們繼續發現和學習。