RxJava之subscribeOn解惑

RxJava之subscribeOn解惑


有一天,我在使用RxJava和Retrofit實現Android上面的網絡請求。忽然,遇到了一個坑,在過了這些坑以後獲得一些經驗,以爲須要和你們分享一二。java

引子

用Retrofit搭配RxJava的朋友應該都知道,通常實現代碼最終大都長得一幅下面的樣子。api

public interface BeanApi {
    @GET("bean/{id}")
    Observable<Bean> getBean(@Path("id") int beanId);
}

BeanApi api = ···經過Retrofit的create實例化···

api.getBean(1)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

上面的代碼形式相信各位都寫得很熟了,可是我實在煩透了每一個api調用的地方都寫一次subscribOn,observeOn。而後,我找到一篇不錯的文章——Don't break the chain: use RxJava's compose() operator,裏面提到了一個方法避免這種重複代碼(其實做者本意是要體現「不要打破鏈式操做」,而非避免重複代碼)。最後改進到的代碼就長下面的樣子了。網絡

// This is a common method.
<T> Transformer<T, T> applySchedulers() {  
    return observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
}

api.getBean(1)
  .compose(applySchedulers())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

改進後的代碼比原來的代碼少了一行。可是寫多幾回以後,我仍是煩透了這個applySchedulers()。因而我瘋了,就本身實現一個Retrofit的CallAdapter.Factory,讓Retrotfit在每次調用api的時候自動就給我封裝好subscribeOn和observeOn這些重複的代碼,具體實現能夠參考個人另一篇文章——經過委派模式包裝一個RxJavaCallAdapterFactory。最後,個人代碼就是長下面這個樣子了。app

api.getBean(1)
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

全部的subscribeOn和observeOn不用再寫了。由於每次調用api.getBean(1),Retrotfit就調用我自定義的CallAdapter.Factory把結果封裝成Observable對象的時候就已經把subscribeOn和observeOn添加上去了。async

問題

好,用得很爽。可是問題問題比辦法多,因此問題來了。有幾個特殊的地方我須要網絡加載和結果監聽都在當前線程。相信理解了上面代碼的朋友都已經看出來了,如今我經過api.getBean(1)獲取到的Observable<Bean>最後被訂閱都會是在io線程獲取網絡數據,而後在mainThread線程進行結果處理。因此,我要想個辦法出來,覆蓋原來的Schedulers,包括subscribeOn的Scheduler和observeOn的Scheduler。因而,我寫了下面的代碼。ide

// isAsync is a boolean variable indicate whether the request is a asynchronous or not.
api.getBean(1)
  .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate())
  .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

上面的代碼再結合我以前寫的CallAdapter.Factory,其實就是至關於沒有自定義CallAdapter.Factory以前顯式調用兩次subscribeOn和兩次observeOn,就像下面的樣子。this

api.getBean(1)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate())
  .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate())
  .subscribe(
    bean -> {
      // do something on main thread.
    }, 
    throwable -> {
      // do something on main thread. 
    }
  );

前湊

做爲一名RxJava的標準菜鳥,我被驗證了本身的確很菜,我天真的認爲後面的subscribeOn和observeOn會覆蓋以前的Scheduler,我理所固然的認爲,當isAsync爲true的時候,此次api的調用就會在當前線程執行網絡訪問和結果處理。因而,我被搞瘋了。我就看了subscribeOn的源碼,以下。.net

public final Observable<T> subscribeOn(Scheduler scheduler) {
  ··· 省略一些於邏輯閱讀不重要的代碼
  return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

nest的代碼以下,線程

public final Observable<Observable<T>> nest() {
  return just(this);
}

意思就是新建一個Observable,而且只會向訂閱者發送一個元素——原來api.getBean(1)得到的Observale<Bean>對象。因此nest操做得到的結果是Observable<Observale<Bean>>對象。好,這裏記着這個東東。下面我先分析一下lift操做,而後咱們再回頭把他們串在一塊兒。code

Observable結構

在看lift操做以前,咱們稍微回顧一下Observable的建立方法,

final OnSubscribe<T> onSubscribe;

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

其餘的什麼fromjust等建立方法最後都是把數據轉化爲一個OnSubscribe對象再經過上面的create方法建立。因此咱們只關注這個create方法。上面代碼的意思很簡單,就是new一個Observable對象,而且設置onSubscribe。因此這裏的關鍵是onSubscribe這個對象。這裏我管它作數據源,即Observable對象會用它來產生數據,而且發佈給訂閱者。

看到這裏,咱們能夠發現,Observable其實沒有什麼,只有兩個關鍵點:一、裝載着一個onSubscribe對象,二、有訂閱者註冊時,就調用用這個onSubscribecall(Subscriber)方法。

這裏咱們要看一下這個call(Subscriber)方法。該方法接受一個參數Subscriber,即訂閱者。當有訂閱者註冊到Observable對象時,Observable對象就調用onSubscribe的這個call方法,而且把當前當前註冊的訂閱者做爲參數傳遞過去。因此在call方法的實現中就能夠調用訂閱者的onNext方法來發布數據或者作其餘事(不必定是發佈數據)。

lift操做說明

先把lift操做的代碼貼出來。

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    ··· 省略一些於邏輯閱讀不重要的代碼
                    st.onError(e);
                }
            } catch (Throwable e) {
                ··· 省略一些於邏輯閱讀不重要的代碼
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    });
}

從上面代碼第2行的new Observable<R>能夠看出,lift操做實際上是新建一個Observable對象而後返回。這裏加點高級的標識方便下面的閱讀,被new出來的Observable對象咱們叫它作派生Observable,而當前Observable就叫父級Observable。

在上面Observable結構一節中,咱們知道每一個Observable都持有一個onSubscribe對象做爲數據源。經過lift方法派生所得的Observable也不例外,也有一個,就是上面代碼第二行new OnSubscribe<R>實例化的匿名對象。這個OnSubscribe雖然也是數據源,可是本身卻歷來不產生數據,也不直接向訂閱者直接發佈數據。它作的事就只是把本身的訂閱者包裝成爲一個父級Observable承認的訂閱者,而後委派給父級的數據源(上面代碼第十行)。父級Observable的數據源向本身的訂閱者發佈數據,就是發送到被包裝出來的這個訂閱者中來。

小結:到這裏爲止,要記住很重要的一點,經過lift操做產生的派生Observable對象的數據源(onSubscribe)是不實際產生數據的,它作的事就只是把本身的訂閱者包裝成爲一個父級Observable承認的訂閱者,而後委派給父級的數據源

lift操做實例分析

那麼被包裝出來的這個訂閱者是怎麼處理父級數據源發佈的數據呢?這裏就要回到上面代碼的第6行。那裏經過一個咱們調用lift操做時傳進去的Operator把派生Observable承認的Subscriber包裝成一個父級Observable承認的Subscriber。

下面我看一個lift操做的例子,用lift模擬了兩次map操做。

代碼視圖1:

class Bean {
    int value;

    Bean(int v) {
        this.value = v;
    }
}

Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4))
    .lift(new Observable.Operator<Integer, Bean>() {
        @Override
        public Subscriber<? super Bean> call(Subscriber<? super Integer> subscriber) {
            return new Subscriber<Bean>(subscriber) {
                @Override
                public void onCompleted() {
                    subscriber.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }

                @Override
                public void onNext(Bean bean) {
                    subscriber.onNext(bean.value);
                }
            };
        }
    })
    .lift(new Observable.Operator<String, Integer>() {
        @Override
        public Subscriber<? super Integer> call(Subscriber<? super String> subscriber) {
            return new Subscriber<Integer>(subscriber) {
                @Override
                public void onCompleted() {
                    subscriber.onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }

                @Override
                public void onNext(Integer i) {
                    subscriber.onNext(String.valueOf(i));
                }
            };
        }
    })
    .subscribe(System.out::println);

上面的代碼中Observable的鏈式操做實際上是等價於下面代碼形式的,

代碼視圖2:

Observable<Bean> o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4));
Observable<Integer> o2 = o1.lift(···);
Observable<String> o3 = o2.lift(···);

Subscriber<String> subscriber3 = new Subscriber<String>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(String s) {
            System.out.println(s);
        }
    };

o3.subscribe(subscriber3);

代碼視圖2中,咱們能夠發現這一連串的操做下來,一共產生了3個Observable對象:o一、o二、o3。以前咱們說過每一個Observable對象都會持有一個onSubscribe對象做爲數據源,用來向訂閱者發佈數據。咱們以不一樣的標識來區分一下上面三個Observable對象對應的onSubscribe對象:o1 => onSubscribe1, o2 => onSubscribe2, o3 => onSubscribe3。

從上面lift操做說明一節,咱們知道lift操做產生的Observable對象的數據源是不產生數據的,它作的事就只是把本身的訂閱者包裝成爲一個父級Observable承認的訂閱者,而後委派給父級的數據源。o3是一個經過lift操做產生的派生Observable,當訂閱者subscriber3註冊到o3時,o3的數據源onSubscribe3就會把subscriber3包裝成一個父級(o2)能夠的訂閱者(這裏命名爲subscriber2),而後委派給父級數據源(onSubscribe2)。 如今請回頭看代碼視圖1的第34-49行。這一段代碼就顯示了把subscriber3包裝成爲subscriber2的過程。能夠發現,被包裝出來的subscriber2只作一件事,就是把onSubscribe2發佈給本身的數據轉化爲subscriber3能夠消化的數據,而後就交給subscriber3,至關於充當了一個subscriber3和onSubscribe2之間的橋樑。

接着分析,onSubscribe2雖說,經過subscriber2間接把數據發佈到了subscriber3。可是實際上,做爲數據源,它的持有者o2,也是經過lift操做產生的派生Observable,因此這個onSubscribe2也是不直接產生數據的。它也是把本身的訂閱者(subscriber2)包裝一個父級(o1)承認的訂閱者(這裏命名爲subscriber1),而後委派給父級數據源(onSubscribe1)。 如今請再回頭看代碼視圖1的第13-28行。這段代碼顯示瞭如何把subscriber2包裝成爲subscriber1的過程。一樣,subscriber1也只作一件事,就是把onSubscribe1發佈給本身的數據轉化爲subscriber2能夠消化的數據,而後就交給subscriber2,至關於充當了一個subscriber2和onSubscribe1之間的橋樑。

最後,整個過程能夠描述爲下面的一個示意圖,

輸入圖片說明

subscribeOn與lift

RxJava中,lift操做幾乎貫穿了整個Observable,由於差很少全部全部的操做符都是經過lift操做來實現的。好比map操做符,其實就是經過lift操做產生的派生Observable而已。因此這個派生Observable的數據源也就如上面我所述,本身不產生數據,而是把本身的訂閱者包裝成一個父級承認的訂閱者。怎麼包裝呢?上面的講述中,這個包裝過程實際上是經過咱們調用lift操做時傳遞的參數Operator來完成的。

咱們再回顧subscribeOn操做符的源碼。首先,經過nest操做產生一個Observable<Observable<Bean>>(咱們原來操做的是Observable<Bean>),而後它上面調用lift操做,那麼Observable<Observable<Bean>>就是父級了,lift操做最終產生的派生Observable就是整個subscribeOn操做產生的結果。看subscribeOn操做的源碼咱們能夠發現,它是經過OperatorSubscribeOn這麼一個Operator來實現Subscriber的包裝。那麼咱們來看一下OperatorSubscribeOn的源碼,分析一下具體的包裝過程。

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

    private final Scheduler scheduler;

    public OperatorSubscribeOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {
        
            ···這裏省略了一些於邏輯閱讀無關的代碼。

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }
                            
                            ···這裏省略了一些於邏輯閱讀無關的代碼。

                        });
                    }
                });
            }

        };
    }
}

從上面的分析中,咱們知道subscribeOn操做實際上是先經過nest操做產生一個Observable<Observable<Bean>>對象,再經過lift操做產生派生Observable(即(Observable<Bean>)對象的,因此Observable<Observable<Bean>>對象就是父級。因此OperatorSubscribeOn的職責就是爲包裝一個Observable<Observable<Bean>>承認的訂閱者。被包裝出來的訂立者接受到父級發佈的數據(即一個Observable<Bean>對象)時,它這裏沒有把數據轉換成下級subscriber(即上面代碼第10行傳遞給call方法的參數)可消化的數據,而是經過inner對象來安裝一次訂閱。

小結,通過subscribeOn操做產生了一個派生Observable<Bean>對象,這個對象的數據源(onSubscribe)的工做是爲本身的訂閱者在某個線程上安排訂閱。

樣例分析

代碼視圖3

Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4))
  .subscribeOn(Schedulers.io())
  .subscribeOn(Schedulers.immediate())
  .subscribe(bean -> System.out.println(bean.value));

看過subscribeOn的源碼以後,咱們應該知道上面的代碼幾乎等價於下面的寫法,

代碼視圖4

Observable<Bean> o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4));
Observable<Observable<Bean>> o2 = o1.nest();
Observable<Bean> o3 = o2.lift(new OperatorSubscribeOn<Bean>(Schedulers.io()));
Observable<Observable<Bean>> o4 = o3.nest();
Observable<Bean> o5 = o4.lift(new OperatorSubscribeOn<Bean>(Schedulers.immediate()));

Subscriber<Bean> subscriber4 = new Subscriber<Bean>() {
        @Override
        public void onCompleted() {}

        @Override
        public void onError(Throwable e) {}

        @Override
        public void onNext(Bean s) {
            System.out.println(s.value);
        }
    };

o5.subscribe(subscriber5);

上面代碼的執行過程,能夠表示成以下示意圖,

輸入圖片說明

經過上面示意圖能夠看出,最後整個整個訂閱過程的運行線程是 currentThread -> immediateThread -> ioThread。

聲名

By 啪嗒科技 AtanL(atanl@padakeji.com)

©啪嗒科技版本全部,沒有經做者容許,只能發表於padakeji.com相關域名下。

相關文章
相關標籤/搜索