RxJava是什麼?根據RxJava在GitHub上給出的描述:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Javajava
大體意思是:
RxJava—一個能夠在JVM上運行的,基於觀察者模式 實現異步操做的java庫。git
RxJava的做用:
就是異步
RxJava的使用,可使「邏輯複雜的代碼」保持極強的閱讀性。github
RxAndorid的做用:
Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了AndroidSchedulers.mainThread()
,Android開發者使用過程當中,能夠輕鬆的將任務post Andorid主線程
中,執行頁面更新操做。網絡
// Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //一、「異步線程」 執行耗時操做 //二、「執行完畢」 調用onNext觸發回調,通知觀察者 e.onNext("1"); e.onComplete(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { // 訂閱線程 訂閱的那一刻在訂閱線程中執行 } @Override public void onNext(String value) { // 「主線程」執行的方法 } @Override public void onError(Throwable e) { // "主線程"執行的方法 } @Override public void onComplete() { // "主線程"執行的方法 } });
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { // IO 線程 // 請求網絡數據 e.onNext("123456"); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) { // IO 線程 // 網絡數據解析(數據轉化) // // throw new RequestFailException("獲取網絡請求失敗"); return 123; } }).doOnNext(new Consumer<Integer>() { //保存登陸結果UserInfo @Override public void accept(@NonNull Integer bean) throws Exception { // IO 線程 // 保存網絡數據 } }).subscribeOn(Schedulers.io()) //IO線程 .observeOn(AndroidSchedulers.mainThread()) //主線程 .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer bean) throws Exception { // 更新UI } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { // 錯誤 顯示錯誤頁面 } });
Flowable是爲了應對Backpressure
產生的。
Flowable是一個被觀察者
,與Subscriber(觀察者)
配合使用異步
// Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> emitter) throws Exception { //一、「異步線程」 執行耗時操做 //二、「執行完畢」 調用onNext觸發回調,通知觀察者 emitter.onNext(0); emitter.onComplete(); } // 若消費者消費能力不足,則拋出MissingBackpressureException異常 }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { // 訂閱時執行,發生在「訂閱線程」 // 這個方法是用來向生產者申請能夠消費的事件數量 // 這裏代表消費者擁有Long.MAX_VALUE的消費能力 s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { // 「主線程」執行的方法 } @Override public void onError(Throwable t) { // "主線程"執行的方法 } @Override public void onComplete() { // "主線程"執行的方法 } });
Backpressure(背壓)
即生產者的生產速度
大於消費者的消費能力
引發的問題。async
在RxJava中有一種狀況就是被觀察者發送消息十分迅速
以致於觀察者不能及時的響應這些消息
。ide
例如:post
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 「異步線程」中 生產者有無限的生產能力 while (true){ e.onNext(1); } } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 「主線程」中 消費者消費能力不足,從而形成事件無限堆積,最後致使OOM Thread.sleep(2000); System.out.println(integer); } });
異步線程中
生產者有無限的生產能力;主線程
中 消費者消費能力不足,從而形成事件無限堆積,最後致使OOM。
上述的現象,有個專有的名詞來來形容,即:Backpressure(背壓)
Subscription.request(long n)
方法是用來向生產者申請能夠消費的事件數量
。
request(long n)
方法後,生產者便發送對應數量的事件供消費者消費;不顯示調用request
就表示消費能力爲0
。在異步調用時,RxJava中有個緩存池,用來緩存消費者處理不了暫時緩存下來的數據,緩存池的默認大小爲128,即只能緩存128個事件。
不管request()中傳入的數字比128大或小,緩存池中在剛開始都會存入128個事件;固然若是自己並無這麼多事件須要發送,則不會存128個事件。
BackpressureStrategy.ERROR
策略下,若是生產者生產的事件大於128個,緩存池便會溢出,從而拋出MissingBackpressureException
異常;BackpressureStrategy.BUFFER
策略:將RxJava中默認的128個事件的緩存池換成一個更大的緩存池,這樣,消費者經過request()即便傳入一個很大的數字,生產者也會生產事件。可是這種方式比較消耗內存,除非是咱們比較瞭解消費者的消費能力,可以把握具體狀況,不會產生OOM。總之BUFFER要慎用。BackpressureStrategy.DROP
策略:當消費者處理不了事件,則丟棄。消費者經過request()傳入其需求n,而後生產者把n個事件傳遞給消費者供其消費。其餘消費不掉的事件就丟掉。BackpressureStrategy.LATEST
策略: LATEST與DROP功能基本一致。消費者經過request()傳入其需求n,而後生產者把n個事件傳遞給消費者供其消費。其餘消費不掉的事件就丟掉。惟一的區別就是LATEST總能使消費者可以接收到生產者產生的最後一個事件。注:當前使用的源碼版本 rxjava:2.1.9
從這段不涉及操做符和線程切換的簡單例子開始:
// 建立觀察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String o) { } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError data is :" + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }; // 建立被觀察者 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); // 訂閱 observable.subscribe(observer);
先看一下ObservableOnSubscribe.java
這個類
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; }
由代碼可知 ObservableOnSubscribe
是一個回調接口,回調方法中參數爲ObservableEmitter
,下邊看一下ObservableEmitter
這個類。
ObservableEmitter.java
ObservableEmitter字面意思是被觀察者發射器,看一下源碼:
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); @NonNull ObservableEmitter<T> serialize(); @Experimental boolean tryOnError(@NonNull Throwable t); }
ObservableEmitter
是對Emitter
的擴展,而擴展的方法正是 RxJava2.0 以後引入的。提供了可中途取消等新能力,咱們看 Emitter
源碼:
public interface Emitter<T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); }
Emitter
字面意思是發射器,這裏邊的三個方法,你們都很熟悉了。其對應瞭如下這段代碼:
new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("hello"); e.onNext("world"); e.onComplete(); } }
回調說完,下邊咱們來看Observable.create(ObservableOnSubscribe<T> source)
這段代碼。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
ObservableOnSubscribe
被用來建立ObservableCreate
,其實ObservableCreate
就是Observable
的一個實現類所以 Observable.create(ObservableOnSubscribe<T> source)
這段代碼,實際是:
// ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO線程中執行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } });
ObservableOnSubscribe.subscribe
方法被執行時,用戶經過調用ObservableEmitter.onNext
方法,將數據發送出去(發送給觀察者)下邊咱們看一下ObservableCreate
這個類
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } // 省略部分代碼 ... }
ObservableOnSubscribe.subscribe
方法是在ObservableCreate.subscribeActual
方法中第四行中被執行了;subscribe
方法中,用戶經過調用ObservableEmitter.onNext
方法,將數據發送出去;subscribeActual
方法第二行,調用了observer.onSubscribe(parent);
方法。 訂閱發生時,在訂閱線程主動執行了observer
的onSubscribe
方法;CreateEmitter
是對ObservableCreate.subscribeActual(Observer<? super T> observer)
方法傳入的Observer
的封裝;CreateEmitter
的做用是任務取消時,能夠再也不回調其封裝的觀察者;observer
的onNext
方法,由CreateEmitter.onNext
方法調用;Observable.create(ObservableOnSubscribe<T> source);
方法最終返回一個 ObservableCreate
對象。
下邊看 observable.subscribe(observer);
方法
observable.subscribe(observer);
即 訂閱發生的那一刻。observable.subscribe(observer);
實際是ObservableCreate.subscribe(observer);
下邊查看Observable
的subscribe(observer)
方法
Observable.subscribe(Observer observer)
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // Observable的subscribe方法,實際執行的是subscribeActual方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); // NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
observable.subscribe(observer);
方法時,實際是調用了observable.subscribeActual(observer)
方法。observable
爲ObservableCreate
的引用,所以這裏調用的是ObservableCreate.subscribeActual(observer)
方法。咱們又回到 ObservableCreate
這個類的subscribeActual
方法
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } // subscribeActual 方法在 訂閱發生的那一刻被調用 既 observable.subscribe(observer);時被調用 @Override protected void subscribeActual(Observer<? super T> observer) { // 若中途任務取消,經過CreateEmitter 可終止對observer中方法onNext 、onError 等的回調 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法 observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } // 省略部分代碼 ... }
subscribeActual
方法在 訂閱發生的那一刻被調用的;在 observable.subscribe(observer);
時被調用;observer.onSubscribe(parent);
訂閱發生時,在訂閱線程回調observer
的onSubscribe
方法subscribeActual
方法中,傳入的Observer
會被包裝成一個CreateEmitter
;若中途任務取消,經過CreateEmitter
可終止對observer
中方法onNext 、onError
等的回調;subscribeActual 中第二行代碼 observer.onSubscribe(parent);
observer.onSubscribe(parent);
訂閱發生時,執行 觀察者的onSubscribe(Disposable d)
方法,這裏回到瞭如下代碼
// 建立觀察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe"); } // ... 省略onNext、onError、onComplete };
new CreateEmitter<T>(observer)
,其實現了Disposable
接口,若任務取消,則不回調傳入的觀察者observer
對應的onNext 、onError、onComplete
等方法subscribeActual 中第四行代碼 source.subscribe(parent);
source.subscribe(parent);
是ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));
代碼最終回到ObservableOnSubscribe
的 subscribe
:
new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { e.onNext("hello"); e.onNext("world"); e.onComplete(); } }
subscribe
中,調用到 CreateEmitter
類的onNext 、onComplete、onError
方法,將數據發送CreateEmitter
中的觀察者
到此,「這段不涉及操做符和線程切換的簡單例子」 的代碼跟蹤結束。
注:當前使用的源碼版本 rxjava:2.1.9
從這段線程切換的簡單例子開始:
// 建立觀察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { // 訂閱線程 訂閱的那一刻在訂閱線程中執行 } @Override public void onNext(String o) { // Android 主線程中執行 } @Override public void onError(@NonNull Throwable e) { // Android 主線程中執行 } @Override public void onComplete() { // Android 主線程中執行 } }; // 建立被觀察者 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO線程中執行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); // 被觀察者 IO 線程 observable = observable.subscribeOn(Schedulers.io()); // 觀察者 Android主線程 observable = observable.observeOn(AndroidSchedulers.mainThread()); // 訂閱 observable.subscribe(observer);
先來個我總結的RxJava2的整個代碼執行流程:
在 源碼閱讀——簡單例子 (一) 中咱們瞭解到了Observable.create(ObservableOnSubscribe<T> source)
實際是 以下代碼:
// ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO線程中執行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } });
ObservableCreate
中含有一個subscribeActual(observer)
方法,用於執行傳入觀察者的observer.onSubscribe
方法,和間接調用 觀察者的onNext、onComplete
等方法;ObservableCreate
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } // 省略部分代碼 ... }
subscribeActual
方法第二行,調用了傳入的觀察者的observer.onSubscribe(parent);
方法; 訂閱發生時,在訂閱線程主動執行了observer
的onSubscribe
方法;subscribeActual
方法第四行,調用了傳入的觀察者的observer.subscribe
方法;subscribe
方法中,用戶經過調用CreateEmitter.onNext
方法,將數據發送出去;CreateEmitter
是對ObservableCreate.subscribeActual(Observer<? super T> observer)
方法傳入的Observer
的封裝;CreateEmitter
的做用是任務取消時,能夠再也不回調其封裝的觀察者;observer
的onNext
方法,由CreateEmitter.onNext
方法調用;下邊查看observable.subscribeOn(Schedulers.io())相關代碼
注:ObservableEmitter
是CreateEmitter
的引用,是對Observer
的進一步封裝。CreateEmitter
在執行onNext
時,若是任務取消,則再也不回調Observer
的onNext
方法。
下邊咱們查看Observable
類的subscribeOn(Scheduler scheduler)
方法
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); // 生成一個ObservableSubscribeOn對象 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
RxJavaPlugins
ObservableSubscribeOn
對象這裏Observable observable = observableCreate.subscribeOn(Schedulers.io())
代碼實際是
ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
observable.subscribeOn(Schedulers.io())
返回的是一個ObservableSubscribeOn
的引用下邊查看ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } // ... 省略部分代碼 }
看一下ObservableSubscribeOn
中的subscribeActual
方法
subscribeActual
方法第二行代碼中,執行了傳入Observer
的 onSubscribe
方法;subscribeActual
方法第三行: 在 scheduler
對應的IO線程
中,執行observableCreate
的subscribe
方法,傳入參數爲SubscribeOnObserver
,即:IO線程中
執行observableCreate.subscribe(new SubscribeOnObserver(observer));
所以,不管ObservableSubscribeOn.subscribeActual(observer)
在哪一個線程中被調用observableCreate.subscribe(new SubscribeOnObserver<T>(observer))
均在IO線程中執行,所以觀察者的e.onNext("hello"); e.onComplete();
亦在IO線程中執行;
下邊咱們查看Observable
類的observeOn(Scheduler scheduler)
方法
public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } // public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
這裏能夠看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())
實際是:
ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
所以 ,observable.observeOn(AndroidSchedulers.mainThread())
返回的是ObservableObserveOn
的引用。
下邊查看ObservableObserveOn
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } // ... 省略部分代碼 }
看一下ObservableObserveOn
中的subscribeActual
方法
subscribeActual
方法第五行代碼,實際爲observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
ObserveOnObserver
的做用是在ObserveOnObserver
的onNext
方法被實行時;將observer
的onNext
方法post到 Android主線程
中;Observable
的subscribe(Observer<? super T> observer)
方法,實際調用到了Observable
的subscribeActual(Observer<? super T> observer)
方法;observable
實際是ObservableObserveOn
的引用;所以,observable.subscribe(observer)
實際執行的是observableObserveOn.subscribeActual(observer)
到這裏,咱們 線程切換 (二) 的小例子變換爲了如下代碼:
// 建立觀察者 Observer observer = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { // 訂閱線程 訂閱的那一刻在訂閱線程中執行 } @Override public void onNext(String o) { // Android 主線程中執行 } @Override public void onError(@NonNull Throwable e) { // Android 主線程中執行 } @Override public void onComplete() { // Android 主線程中執行 } }; // ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { // IO線程中執行 e.onNext("hello"); e.onNext("world"); e.onComplete(); } }); // ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io()) // ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128); // observableObserveOn.subscribeActual(observer);
下邊咱們查看observableObserveOn.subscribeActual(observer)
ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { // source 爲 observableSubscribeOn super(source); // scheduler 爲AndroidSchedulers.mainThread() this.scheduler = scheduler; // false this.delayError = delayError; // 128 this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { // AndroidSchedulers.mainThread() 爲 HandlerScheduler,所以會走到else部分代碼 if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } // 代碼會走到else 部分 else { Scheduler.Worker w = scheduler.createWorker(); // source 爲 observableSubscribeOn source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } // ... 省略部分代碼 }
subscribeActual
方法中,AndroidSchedulers.mainThread()
爲HandlerScheduler
,所以 if 中的判斷語句直接忽略,直接走到代碼的 else 部分。subscribeActual
方法中,將觀察者observer
封裝成了ObserveOnObserver
;而且調用observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
實際是ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize) // 一、「訂閱線程中」 —— 執行onSubscribe, 實際執行的是observer的onSubscribe方法 observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver)); // 二、「IO程中」 —— 執行subscribe ;IO線程 subscribe方法中,用戶主動調用ObserveOnObserver的onNext、onError、onComplete方法,將數據發出去 observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
SubscribeOnObserver
的onNext
是將數據發送出去SubscribeOnObserver.onNext
調用了observeOnObserver.onNext
observeOnObserver.onNext
經過HandlerScheduler
將observer.onNext、observer.onError、observer.onComplete
等方法post到Android主線程中執行。最後總結一下RxJava2的整個執行流程: