RxJava學習入門

RxJava是什麼

一個詞:異步。java

RxJava 在 GitHub 主頁上的自我介紹是 「a library for composing asynchronous and event-based programs using observable sequences for the Java VM」(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫)。這就是 RxJava ,歸納得很是精準。react

然而,對於初學者來講,這太難看懂了。由於它是一個『總結』,而初學者更須要一個『引言』。android

其實, RxJava 的本質能夠壓縮爲異步這一個詞。說到根上,它就是一個實現異步操做的庫,而別的定語都是基於這之上的。數據庫

引入RxJava

在build.gradle在dependencies加入json

dependencies { compile 'io.reactivex:rxandroid:1.1.0' compile 'io.reactivex:rxjava:1.1.0' }

異步網絡請求:

場景:異步網絡請求一個User數據,並在TextView展現。api

日常代碼:數組

TextView textView = ...;

Map<String, String> params = new HashMap<>();
params.put("user_id", userid);// 請求參數

UserHttp client = new UserHttp();
client.post("http://server.com/user", params, new CallBack<String>() { // 異步請求
     @Override
     protected void onSuccess(String result) { // 在UI線程回調
         // 返回的字符串(一般是一個json),解析成User對象
         User user = parse(result); 

          textView.setText(user.getName());
     }
});

大概就是這樣子了吧,固然通常都會再封裝一下。markdown

用RxJava大概是這樣子:網絡

TextView textView = ...;

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(final Subscriber<? super String> subscriber) {// 下面subscribeOn(Schedulers.newThread()) 把這方法設定在新線程回調
        Map<String, String> params = new HashMap<>();
        params.put("user_id", userid);// 請求參數

        UserHttp client   = new UserHttp();
        Response response = client.post("http://kkmike999.com/user", params);// 同步請求

        if (response.status == 200) { // 請求成功
            String result = response.getResult();
            subscriber.onNext(result);
            subscriber.onCompleted();
        } else {
            // 請求失敗
            subscriber.onError(new Throwable(response.getMessage()));
        }
    }
})
.subscribeOn(Schedulers.newThread()) // 設置call(...)方法,在新線程回調;

// 可封裝得更美觀 Observable<String> observable = UserHttp.create(userid);
observable
          .observeOn(AndroidSchedulers.mainThread())// 讓下面onNext()、onError()、onComplete()在UI線程(主線程)回調
          .subscribe(new Subscriber<String>() {
              @Override
              public void onNext(String result) { // 上面 subscriber.onNext(result)在這裏回調
                  // 返回的字符串(一般是一個json),解析成User對象
                  User user = parse(result);

                  textView.setText(user.getName());
              }

              @Override
              public void onError(Throwable e) {} // 上面subscriber.onError(new Throwable(msg))在這裏回調

              @Override
              public void onCompleted() {}
});

雖然代碼增多了,RxJava 好在哪?就好在簡潔,好在那把什麼複雜邏輯都能穿成一條線的簡潔。數據結構

API 介紹和原理簡析

RxJava 的異步實現,是經過一種擴展的觀察者模式來實現的

RxJava 的觀察者模式

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer。

與傳統觀察者模式不一樣, RxJava 的事件回調方法除了普通事件 onNext() (至關於 onClick() / onEvent())以外,還定義了兩個特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
  • onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。
  • 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,而且是事件序列中的最後一個。須要注意的是,onCompleted() 和 onError() 兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。

RxJava 的觀察者模式大體以下圖:

基本實現

基於以上的概念, RxJava 的基本實現主要有三點:

1) 建立 Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行爲。 RxJava 中的 Observer 接口的實現方式:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口以外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是徹底同樣的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不只基本使用方式同樣,實質上,在 RxJava 的 subscribe 過程當中,Observer 也老是會先被轉換成一個 Subscriber 再使用。因此若是你只想使用基本功能,選擇 Observer 和 Subscriber 是徹底同樣的。它們的區別對於使用者來講主要有兩點:

  • onStart(): 這是 Subscriber 增長的方法。它會在 subscribe 剛開始,而事件還未發送以前被調用,能夠用於作一些準備工做,例如數據的清零或重置。這是一個可選方法,默認狀況下它的實現爲空。須要注意的是,若是對準備工做的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,由於它老是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來作準備工做,可使用 doOnSubscribe() 方法,具體能夠在後面的文中看到。
  • unsubscribe(): 這是 Subscriber 所實現的另外一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將再也不接收事件。通常在這個方法調用前,可使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,由於在 subscribe() 以後, Observable 會持有 Subscriber 的引用,這個引用若是不能及時被釋放,將有內存泄露的風險。因此最好保持一個原則:要在再也不使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關係,以免內存泄露的發生。

2) 建立 Observable

Observable 即被觀察者,它決定何時觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併爲它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

能夠看到,這裏傳入了一個 OnSubscribe 對象做爲參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的做用至關於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

這個例子很簡單:事件的內容是字符串,而不是一些複雜的對象;事件的內容是已經定好了的,而不像有的觀察者模式同樣是待肯定的(例如網絡請求的結果在請求返回以前是未知的);全部事件在一瞬間被所有發送出去,而不是夾雜一些肯定或不肯定的時間間隔或者通過某種觸發器來觸發的。總之,這個例子看起來毫無實用價值。但這是爲了便於說明,實質上只要你想,各類各樣的事件發送規則你均可以本身來寫。至於具體怎麼作,後面都會講到,但如今不行。只有把基礎原理先說明白了,上層的運用才能更容易說清楚。

create() 方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷建立事件隊列,例如:

just(T…): 將傳入的參數依次發送出來。

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

from(T[]) / from(Iterable

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

上面 just(T…) 的例子和 from(T[]) 的例子,都和以前的 create(OnSubscribe) 的例子是等價的。

3) Subscribe (訂閱)

建立了 Observable 和 Observer 以後,再用 subscribe() 方法將它們聯結起來,整條鏈子就能夠工做了。代碼形式很簡單:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

有人可能會注意到, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜誌訂閱了讀者』同樣顛倒了對象關係。這讓人讀起來有點彆扭,不過若是把 API 設計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思惟邏輯,但對流式 API 的設計就形成影響了,比較起來明顯是得不償失的。

Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
// 若是須要看源碼,能夠去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

能夠看到,subscriber() 作了3件事:

調用 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。
調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這裏,事件發送的邏輯開始運行。從這也能夠看出,在 RxJava 中, Observable 並非在建立的時候就當即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。
將傳入的 Subscriber 做爲 Subscription 返回。這是爲了方便 unsubscribe().
整個過程當中對象間的關係以下圖:

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調,RxJava 會自動根據定義建立出 Subscriber 。形式以下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

簡單解釋一下這段代碼中出現的 Action1 和 Action0。 Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;因爲 onCompleted() 方法也是無參無返回值的,所以 Action0 能夠被當成一個包裝對象,將 onCompleted() 的內容打包起來將本身做爲一個參數傳入 subscribe() 以實現不完整定義的回調。這樣其實也能夠看作將 onCompleted() 方法做爲參數傳進了 subscribe(),至關於其餘某些語言中的『閉包』。 Action1 也是一個接口,它一樣只有一個方法 call(T param),這個方法也無返回值,但有一個參數;與 Action0 同理,因爲 onNext(T obj) 和 onError(Throwable error) 也是單參數無返回值的,所以 Action1 能夠將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回調。事實上,雖然 Action0 和 Action1 在 API 中使用最普遍,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的,它們能夠被用以包裝不一樣的無返回值的方法。

注:正如前面所提到的,Observer 和 Subscriber 具備相同的角色,並且 Observer 在 subscribe() 過程當中最終會被轉換成 Subscriber 對象,所以,從這裏開始,後面的描述我將用 Subscriber 來代替 Observer ,這樣更加嚴謹。

4) 場景示例

下面舉兩個例子:

爲了把原理用更清晰的方式表述出來,本文中挑選的都是功能儘量簡單的例子,以致於有些示例代碼看起來會有『多此一舉』『明明不用 RxJava 能夠更簡便地解決問題』的感受。當你看到這種狀況,不要以爲是由於 RxJava 太囉嗦,而是由於在過早的時候舉出真實場景的例子並不利於原理的解析,所以我刻意挑選了簡單的情景。

a. 打印字符串數組

將字符串數組 names 中的全部字符串依次打印出來:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });

b. 由 id 取得圖片並顯示

由指定的一個 drawable 文件 id drawableRes 取得圖片,並顯示在 ImageView 中,並在出現異常的時候打印 Toast 報錯:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

正如上面兩個例子這樣,建立出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來,一次 RxJava 的基本使用就完成了。很是簡單。

線程控制 —— Scheduler (一)

在不指定線程的狀況下, RxJava 遵循的是線程不變的原則,即:在哪一個線程調用 subscribe(),就在哪一個線程生產事件;在哪一個線程生產事件,就在哪一個線程消費事件。若是須要切換線程,就須要用到 Scheduler (調度器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——調度器,至關於線程控制器,RxJava 經過它來指定每一段代碼應該運行在什麼樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:

  • Schedulers.immediate(): 直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler。
  • Schedulers.newThread(): 老是啓用新線程,並在新線程執行操做。
  • Schedulers.io(): I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread() 更有效率。不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。

另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操做將在 Android 主線程運行。
有了這幾個 Scheduler ,就可使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫作事件產生的線程。 * observeOn(): 指定 Subscriber 所運行在的線程。或者叫作事件消費的線程。

文字敘述總歸難理解,上代碼:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面這段代碼中,因爲 subscribeOn(Schedulers.io()) 的指定,被建立的事件的內容 一、二、三、4 將會在 IO 線程發出;而因爲 observeOn(AndroidScheculers.mainThread()) 的指定,所以 subscriber 數字的打印將發生在主線程 。事實上,這種在 subscribe() 以前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式很是常見,它適用於多數的 『後臺線程取數據,主線程顯示』的程序策略。

而前面提到的由圖片 id 取得圖片並顯示的例子,若是也加上這兩句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

那麼,加載圖片將會發生在 IO 線程,而設置圖片則被設定在了主線程。這就意味着,即便加載圖片耗費了幾十甚至幾百毫秒的時間,也不會形成絲毫界面的卡頓。

2) Scheduler 的原理 (一)

RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把線程切換了,怎麼作到的?並且 subscribe() 不是最外層直接調用的方法嗎,它居然也能被指定線程?)。然而 Scheduler 的原理須要放在後面講,由於它的原理是如下一節《變換》的原理做爲基礎的。

好吧這一節其實我屁也沒說,只是爲了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。

變換

終於要到牛逼的地方了,無論你激動不激動,反正我是激動了。

RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大緣由。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不一樣的事件或事件序列。概念說着老是模糊難懂的,來看 API。

1) API

首先看一個 map() 的例子:

Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 參數類型 String
            return getBitmapFromPath(filePath); // 返回類型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 參數類型 Bitmap
            showBitmap(bitmap);
        }
    });

這裏出現了一個叫作 Func1 的類。它和 Action1 很是類似,也是 RxJava 的一個接口,用於包裝含有一個參數的方法。 Func1 和 Action 的區別在於, Func1 包裝的是有返回值的方法。另外,和 ActionX 同樣, FuncX 也有多個,用於不一樣參數個數的方法。FuncX 和 ActionX 的區別在 FuncX 包裝的是有返回值的方法。

能夠看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象後返回,而在通過 map() 方法後,事件的參數類型也由 String 轉爲了 Bitmap。這種直接變換對象並返回的,是最多見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不只能夠針對事件對象,還能夠針對整個事件隊列,這使得 RxJava 變得很是靈活。我列舉幾個經常使用的變換:

map(): 事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava 最經常使用的變換。 map() 的示意圖:

flatmap

這是一個頗有用但很是難理解的變換,所以我決定花多些篇幅來介紹它。 首先假設這麼一種需求:假設有一個數據結構『學生』,如今須要打印出一組學生的名字。實現方式很簡單:

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

很簡單。那麼再假設:若是要打印出每一個學生所須要修的全部課程的名稱呢?(需求的區別在於,每一個學生只有一個名字,但卻有多個課程。)首先能夠這樣實現:

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);

依然很簡單。那麼若是我不想在 Subscriber 中使用 for 循環,而是但願 Subscriber 中直接傳入單個的 Course 對象呢(這對於代碼複用很重要)?用 map() 顯然是不行的,由於 map() 是一對一的轉化,而我如今的要求是一對多的轉化。那怎麼才能把一個 Student 轉化成多個 Course 呢?

這個時候,就須要用 flatMap() 了:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

從上面的代碼能夠看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數轉化以後返回另外一個對象。但須要注意,和 map() 不一樣的是, flatMap() 中返回的是個 Observable 對象,而且這個 Observable 對象並非被直接發送到了 Subscriber 的回調方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件對象建立一個 Observable 對象;2. 並不發送這個 Observable, 而是將它激活,因而它開始發送事件;3. 每個建立出來的 Observable 發送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,經過一組新建立的 Observable 將初始的對象『鋪平』以後經過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。

flatMap() 示意圖:

3) 延伸:doOnSubscribe()

然而,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程以前倒是能夠利用的。

在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 能夠用做流程開始前的初始化。然而 onStart() 因爲在 subscribe() 發生時就被調用了,所以不能指定線程,而是隻能執行在 subscribe() 被調用時的線程。這就致使若是 onStart() 中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar,這必須在主線程執行),將會有線程非法的風險,由於有時你沒法預測 subscribe() 將會在什麼線程執行。

而與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 一樣是在 subscribe() 調用後並且在事件發送前執行,但區別在於它能夠指定線程。默認狀況下, doOnSubscribe() 執行在 subscribe() 發生的線程;而若是在 doOnSubscribe() 以後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。

進度條示例代碼:

ProgressDialog progress = ...;

    ...
    .observeOn(AndroidSchedulers.mainThread()) // 規定Subscriber在主線程回調
    .doOnSubscribe(new Action0() { //  主線程
        @Override
        public void call() {
            progress.show();
        }
    })
    .doOnCompleted(new Action0() { // 主線程
        @Override
        public void call() {
            progress.dismiss();
        }
    })
    .subscribe(new Subscriber<List<String>>() { // 主線程
        @Override
        public void onNext(List<String> strings) {...}
        @Override
        public void onComplete() {...}
    }

參考連接

給 Android 開發者的 RxJava 詳解

RxJava快速入門 - 簡書

相關文章
相關標籤/搜索