有一天,我在使用RxJava和Retrofit實現Android上面的網絡請求。忽然,遇到了一個坑,在過了這些坑以後獲得一些經驗,以爲須要和你們分享一二。java
用Retrofit搭配RxJava的朋友應該都知道,通常實現代碼最終大都長得一幅下面的樣子。api
public interface BeanApi { @GET("bean/{id}") Observable<Bean> getBean(@Path("id") int beanId); } BeanApi api = ···經過Retrofit的create實例化··· api.getBean(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );
上面的代碼形式相信各位都寫得很熟了,可是我實在煩透了每一個api調用的地方都寫一次subscribOn,observeOn。而後,我找到一篇不錯的文章——Don't break the chain: use RxJava's compose() operator,裏面提到了一個方法避免這種重複代碼(其實做者本意是要體現「不要打破鏈式操做」,而非避免重複代碼)。最後改進到的代碼就長下面的樣子了。網絡
// This is a common method. <T> Transformer<T, T> applySchedulers() { return observable -> observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } api.getBean(1) .compose(applySchedulers()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );
改進後的代碼比原來的代碼少了一行。可是寫多幾回以後,我仍是煩透了這個applySchedulers()。因而我瘋了,就本身實現一個Retrofit的CallAdapter.Factory,讓Retrotfit在每次調用api的時候自動就給我封裝好subscribeOn和observeOn這些重複的代碼,具體實現能夠參考個人另一篇文章——經過委派模式包裝一個RxJavaCallAdapterFactory。最後,個人代碼就是長下面這個樣子了。app
api.getBean(1) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );
全部的subscribeOn和observeOn不用再寫了。由於每次調用api.getBean(1),Retrotfit就調用我自定義的CallAdapter.Factory把結果封裝成Observable對象的時候就已經把subscribeOn和observeOn添加上去了。async
好,用得很爽。可是問題問題比辦法多,因此問題來了。有幾個特殊的地方我須要網絡加載和結果監聽都在當前線程。相信理解了上面代碼的朋友都已經看出來了,如今我經過api.getBean(1)獲取到的Observable<Bean>
最後被訂閱都會是在io線程獲取網絡數據,而後在mainThread線程進行結果處理。因此,我要想個辦法出來,覆蓋原來的Schedulers,包括subscribeOn的Scheduler和observeOn的Scheduler。因而,我寫了下面的代碼。ide
// isAsync is a boolean variable indicate whether the request is a asynchronous or not. api.getBean(1) .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate()) .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );
上面的代碼再結合我以前寫的CallAdapter.Factory,其實就是至關於沒有自定義CallAdapter.Factory以前顯式調用兩次subscribeOn和兩次observeOn,就像下面的樣子。this
api.getBean(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate()) .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );
做爲一名RxJava的標準菜鳥,我被驗證了本身的確很菜,我天真的認爲後面的subscribeOn和observeOn會覆蓋以前的Scheduler,我理所固然的認爲,當isAsync爲true的時候,此次api的調用就會在當前線程執行網絡訪問和結果處理。因而,我被搞瘋了。我就看了subscribeOn的源碼,以下。.net
public final Observable<T> subscribeOn(Scheduler scheduler) { ··· 省略一些於邏輯閱讀不重要的代碼 return nest().lift(new OperatorSubscribeOn<T>(scheduler)); }
nest的代碼以下,線程
public final Observable<Observable<T>> nest() { return just(this); }
意思就是新建一個Observable
,而且只會向訂閱者發送一個元素——原來api.getBean(1)得到的Observale<Bean>
對象。因此nest操做得到的結果是Observable<Observale<Bean>>
對象。好,這裏記着這個東東。下面我先分析一下lift操做,而後咱們再回頭把他們串在一塊兒。code
在看lift操做以前,咱們稍微回顧一下Observable
的建立方法,
final OnSubscribe<T> onSubscribe; public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f)); } protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
其餘的什麼from
、just
等建立方法最後都是把數據轉化爲一個OnSubscribe
對象再經過上面的create方法建立。因此咱們只關注這個create方法。上面代碼的意思很簡單,就是new一個Observable對象,而且設置onSubscribe
。因此這裏的關鍵是onSubscribe
這個對象。這裏我管它作數據源,即Observable對象會用它來產生數據,而且發佈給訂閱者。
看到這裏,咱們能夠發現,Observable其實沒有什麼,只有兩個關鍵點:一、裝載着一個onSubscribe
對象,二、有訂閱者註冊時,就調用用這個onSubscribe
的call(Subscriber)
方法。
這裏咱們要看一下這個call(Subscriber)
方法。該方法接受一個參數Subscriber,即訂閱者。當有訂閱者註冊到Observable對象時,Observable對象就調用onSubscribe的這個call方法,而且把當前當前註冊的訂閱者做爲參數傳遞過去。因此在call方法的實現中就能夠調用訂閱者的onNext方法來發布數據或者作其餘事(不必定是發佈數據)。
先把lift操做的代碼貼出來。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { ··· 省略一些於邏輯閱讀不重要的代碼 st.onError(e); } } catch (Throwable e) { ··· 省略一些於邏輯閱讀不重要的代碼 // as we don't have the operator available to us o.onError(e); } } }); }
從上面代碼第2行的new Observable<R>
能夠看出,lift操做實際上是新建一個Observable對象而後返回。這裏加點高級的標識方便下面的閱讀,被new出來的Observable對象咱們叫它作派生Observable,而當前Observable就叫父級Observable。
在上面Observable結構一節中,咱們知道每一個Observable都持有一個onSubscribe對象做爲數據源。經過lift方法派生所得的Observable也不例外,也有一個,就是上面代碼第二行new OnSubscribe<R>
實例化的匿名對象。這個OnSubscribe雖然也是數據源,可是本身卻歷來不產生數據,也不直接向訂閱者直接發佈數據。它作的事就只是把本身的訂閱者包裝成爲一個父級Observable承認的訂閱者,而後委派給父級的數據源(上面代碼第十行)。父級Observable的數據源向本身的訂閱者發佈數據,就是發送到被包裝出來的這個訂閱者中來。
小結:到這裏爲止,要記住很重要的一點,經過lift操做產生的派生Observable對象的數據源(onSubscribe)是不實際產生數據的,它作的事就只是把本身的訂閱者包裝成爲一個父級Observable承認的訂閱者,而後委派給父級的數據源。
那麼被包裝出來的這個訂閱者是怎麼處理父級數據源發佈的數據呢?這裏就要回到上面代碼的第6行。那裏經過一個咱們調用lift操做時傳進去的Operator
把派生Observable承認的Subscriber包裝成一個父級Observable承認的Subscriber。
下面我看一個lift操做的例子,用lift模擬了兩次map操做。
代碼視圖1:
class Bean { int value; Bean(int v) { this.value = v; } } Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)) .lift(new Observable.Operator<Integer, Bean>() { @Override public Subscriber<? super Bean> call(Subscriber<? super Integer> subscriber) { return new Subscriber<Bean>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(Bean bean) { subscriber.onNext(bean.value); } }; } }) .lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(Subscriber<? super String> subscriber) { return new Subscriber<Integer>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(Integer i) { subscriber.onNext(String.valueOf(i)); } }; } }) .subscribe(System.out::println);
上面的代碼中Observable的鏈式操做實際上是等價於下面代碼形式的,
代碼視圖2:
Observable<Bean> o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)); Observable<Integer> o2 = o1.lift(···); Observable<String> o3 = o2.lift(···); Subscriber<String> subscriber3 = new Subscriber<String>() { @Override public void onCompleted() {} @Override public void onError(Throwable e) {} @Override public void onNext(String s) { System.out.println(s); } }; o3.subscribe(subscriber3);
從代碼視圖2中,咱們能夠發現這一連串的操做下來,一共產生了3個Observable對象:o一、o二、o3。以前咱們說過每一個Observable對象都會持有一個onSubscribe對象做爲數據源,用來向訂閱者發佈數據。咱們以不一樣的標識來區分一下上面三個Observable對象對應的onSubscribe對象:o1 => onSubscribe1, o2 => onSubscribe2, o3 => onSubscribe3。
從上面lift操做說明一節,咱們知道lift操做產生的Observable對象的數據源是不產生數據的,它作的事就只是把本身的訂閱者包裝成爲一個父級Observable承認的訂閱者,而後委派給父級的數據源。o3是一個經過lift操做產生的派生Observable,當訂閱者subscriber3註冊到o3時,o3的數據源onSubscribe3就會把subscriber3包裝成一個父級(o2)能夠的訂閱者(這裏命名爲subscriber2),而後委派給父級數據源(onSubscribe2)。 如今請回頭看代碼視圖1的第34-49行。這一段代碼就顯示了把subscriber3包裝成爲subscriber2的過程。能夠發現,被包裝出來的subscriber2只作一件事,就是把onSubscribe2發佈給本身的數據轉化爲subscriber3能夠消化的數據,而後就交給subscriber3,至關於充當了一個subscriber3和onSubscribe2之間的橋樑。
接着分析,onSubscribe2雖說,經過subscriber2間接把數據發佈到了subscriber3。可是實際上,做爲數據源,它的持有者o2,也是經過lift操做產生的派生Observable,因此這個onSubscribe2也是不直接產生數據的。它也是把本身的訂閱者(subscriber2)包裝一個父級(o1)承認的訂閱者(這裏命名爲subscriber1),而後委派給父級數據源(onSubscribe1)。 如今請再回頭看代碼視圖1的第13-28行。這段代碼顯示瞭如何把subscriber2包裝成爲subscriber1的過程。一樣,subscriber1也只作一件事,就是把onSubscribe1發佈給本身的數據轉化爲subscriber2能夠消化的數據,而後就交給subscriber2,至關於充當了一個subscriber2和onSubscribe1之間的橋樑。
最後,整個過程能夠描述爲下面的一個示意圖,
RxJava中,lift操做幾乎貫穿了整個Observable,由於差很少全部全部的操做符都是經過lift操做來實現的。好比map操做符,其實就是經過lift操做產生的派生Observable而已。因此這個派生Observable的數據源也就如上面我所述,本身不產生數據,而是把本身的訂閱者包裝成一個父級承認的訂閱者。怎麼包裝呢?上面的講述中,這個包裝過程實際上是經過咱們調用lift操做時傳遞的參數Operator
來完成的。
咱們再回顧subscribeOn操做符的源碼。首先,經過nest操做產生一個Observable<Observable<Bean>>
(咱們原來操做的是Observable<Bean>
),而後它上面調用lift操做,那麼Observable<Observable<Bean>>
就是父級了,lift操做最終產生的派生Observable就是整個subscribeOn操做產生的結果。看subscribeOn操做的源碼咱們能夠發現,它是經過OperatorSubscribeOn這麼一個Operator來實現Subscriber的包裝。那麼咱們來看一下OperatorSubscribeOn的源碼,分析一下具體的包裝過程。
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> { private final Scheduler scheduler; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; } @Override public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); return new Subscriber<Observable<T>>(subscriber) { ···這裏省略了一些於邏輯閱讀無關的代碼。 @Override public void onNext(final Observable<T> o) { inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } ···這裏省略了一些於邏輯閱讀無關的代碼。 }); } }); } }; } }
從上面的分析中,咱們知道subscribeOn操做實際上是先經過nest操做產生一個Observable<Observable<Bean>>
對象,再經過lift操做產生派生Observable(即(Observable<Bean>
)對象的,因此Observable<Observable<Bean>>
對象就是父級。因此OperatorSubscribeOn的職責就是爲包裝一個Observable<Observable<Bean>>
承認的訂閱者。被包裝出來的訂立者接受到父級發佈的數據(即一個Observable<Bean>
對象)時,它這裏沒有把數據轉換成下級subscriber(即上面代碼第10行傳遞給call方法的參數)可消化的數據,而是經過inner對象來安裝一次訂閱。
小結,通過subscribeOn操做產生了一個派生Observable<Bean>
對象,這個對象的數據源(onSubscribe)的工做是爲本身的訂閱者在某個線程上安排訂閱。
代碼視圖3
Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.immediate()) .subscribe(bean -> System.out.println(bean.value));
看過subscribeOn的源碼以後,咱們應該知道上面的代碼幾乎等價於下面的寫法,
代碼視圖4
Observable<Bean> o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)); Observable<Observable<Bean>> o2 = o1.nest(); Observable<Bean> o3 = o2.lift(new OperatorSubscribeOn<Bean>(Schedulers.io())); Observable<Observable<Bean>> o4 = o3.nest(); Observable<Bean> o5 = o4.lift(new OperatorSubscribeOn<Bean>(Schedulers.immediate())); Subscriber<Bean> subscriber4 = new Subscriber<Bean>() { @Override public void onCompleted() {} @Override public void onError(Throwable e) {} @Override public void onNext(Bean s) { System.out.println(s.value); } }; o5.subscribe(subscriber5);
上面代碼的執行過程,能夠表示成以下示意圖,
經過上面示意圖能夠看出,最後整個整個訂閱過程的運行線程是 currentThread -> immediateThread -> ioThread。
By 啪嗒科技 AtanL(atanl@padakeji.com)
©啪嗒科技版本全部,沒有經做者容許,只能發表於padakeji.com相關域名下。