RxJava學習(一)

注意:文字和圖片轉載自拋物線博客

參考:http://gank.io/post/560e15be2dca930e00da1083html

RxJava 究竟是什麼

一個詞:異步java

RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫)。這就是 RxJava ,歸納得很是精準。git

然而,對於初學者來講,這太難看懂了。由於它是一個『總結』,而初學者更須要一個『引言』。數組

其實, RxJava 的本質能夠壓縮爲異步這一個詞。說到根上,它就是一個實現異步操做的庫,而別的定語都是基於這之上的。閉包

RxJava 好在哪

換句話說,『一樣是作異步,爲何人們用它,而不用現成的 AsyncTask / Handler / XXX / ... ?』異步

一個詞:簡潔async

異步操做很關鍵的一點是程序的簡潔性,由於在調度過程比較複雜的狀況下,異步代碼常常會既難寫也難被讀懂。 Android 創造的AsyncTask 和Handler ,其實都是爲了讓異步代碼更加簡潔。RxJava 的優點也是簡潔,但它的簡潔的不同凡響之處在於,隨着程序邏輯變得愈來愈複雜,它依然可以保持簡潔。ide

RxJava 的觀察者模式

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和Observer 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer函數

與傳統觀察者模式不一樣, RxJava 的事件回調方法除了普通事件 onNext() (至關於 onClick() / onEvent())以外,還定義了兩個特殊的事件:onCompleted() 和 onError()post

  • onCompleted(): 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
  • onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。
  • 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,而且是事件序列中的最後一個。須要注意的是,onCompleted() 和 onError() 兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。

RxJava 的觀察者模式大體以下圖:

RxJava 的觀察者模式

2. 基本實現

基於以上的概念, RxJava 的基本實現主要有三點:

1) 建立 Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行爲。

除了 Observer 接口以外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是徹底同樣的:

不只基本使用方式同樣,實質上,在 RxJava 的 subscribe 過程當中,Observer 也老是會先被轉換成一個 Subscriber 再使用。因此若是你只想使用基本功能,選擇 Observer 和 Subscriber 是徹底同樣的。它們的區別對於使用者來講主要有兩點:

  1. onStart(): 這是 Subscriber 增長的方法。它會在 subscribe 剛開始,而事件還未發送以前被調用,能夠用於作一些準備工做,例如數據的清零或重置。這是一個可選方法,默認狀況下它的實現爲空。須要注意的是,若是對準備工做的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,由於它老是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來作準備工做,可使用 doOnSubscribe() 方法,具體能夠在後面的文中看到。
  2. unsubscribe(): 這是 Subscriber 所實現的另外一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber將再也不接收事件。通常在這個方法調用前,可使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,由於在 subscribe() 以後, Observable 會持有 Subscriber 的引用,這個引用若是不能及時被釋放,將有內存泄露的風險。因此最好保持一個原則:要在再也不使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關係,以免內存泄露的發生。
2) 建立 Observable

Observable 即被觀察者,它決定何時觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併爲它定義事件觸發規則。能夠看到,參數這裏傳入了一個 OnSubscribe 對象做爲參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的做用至關於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。

create() 方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷建立事件隊列,例如:

  • just(T...): 將傳入的參數依次發送出來。
  • from(T[]) / from(Iterable<? extends T>) : 將傳入的數組或 Iterable 拆分紅具體對象後,依次發送出來。

下面展現了建立Observable的各類方法。

  • just( ) — 將一個或多個對象轉換成發射這個或這些對象的一個Observable
  • from( ) — 將一個Iterable, 一個Future, 或者一個數組轉換成一個Observable
  • repeat( ) — 建立一個重複發射指定數據或數據序列的Observable
  • repeatWhen( ) — 建立一個重複發射指定數據或數據序列的Observable,它依賴於另外一個Observable發射的數據
  • create( ) — 使用一個函數從頭建立一個Observable
  • defer( ) — 只有當訂閱者訂閱才建立Observable;爲每一個訂閱建立一個新的Observable
  • range( ) — 建立一個發射指定範圍的整數序列的Observable
  • interval( ) — 建立一個按照給定的時間間隔發射整數序列的Observable
  • timer( ) — 建立一個在給定的延時以後發射單個數據的Observable
  • empty( ) — 建立一個什麼都不作直接通知完成的Observable
  • error( ) — 建立一個什麼都不作直接通知錯誤的Observable
  • never( ) — 建立一個不發射任何數據的Observable
3) Subscribe (訂閱)

建立了 Observable 和 Observer 以後,再用 subscribe() 方法將它們聯結起來,整條鏈子就能夠工做了。

有人可能會注意到, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer /subscriber 訂閱了 observalbe』,這看起來就像『雜誌訂閱了讀者』同樣顛倒了對象關係。這讓人讀起來有點彆扭,不過若是把 API 設計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思惟邏輯,但對流式 API 的設計就形成影響了,比較起來明顯是得不償失的。

Observable.subscribe(Subscriber) 的內部實現是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
// 若是須要看源碼,能夠去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
能夠看到, 作了3件事:subscriber()
  1. 調用 Subscriber.onStart() 。這個方法在前面已經介紹過,是一個可選的準備方法。
  2. 調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這裏,事件發送的邏輯開始運行。從這也能夠看出,在 RxJava 中,Observable 並非在建立的時候就當即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。
  3. 將傳入的 Subscriber 做爲 Subscription 返回。這是爲了方便 unsubscribe().

整個過程當中對象間的關係以下圖:

關係靜圖

或者能夠看動圖:

關係靜圖

完整3步的代碼:

public static void learnRxJava(){
        //生成Observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        };

        //生成Subscriber
        //多了onStart和unsubscribe兩個很是有用的方法
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onStart() {
                super.onStart();
            }

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String o) {

            }
        };

        if(subscriber.isUnsubscribed()){
            subscriber.unsubscribe();
        }

        //生成Observable的三種方法

        //1.create
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onNext("2");
                subscriber.onNext("3");
                subscriber.onCompleted();
            }
        });

        //2.just
        // 將會依次調用:
        // onNext(1);
        // onNext(2);
        // onNext(3);
        // onNext(4);
        // onCompleted();
        Observable<Integer> observable1 = Observable.just(1, 2, 3, 4);

        //3.from 參數傳入一個集合
        // 將會依次調用:
        // onNext(1);
        // onNext(2);
        // onNext(3);
        // onNext(4);
        // onCompleted();
        Integer[] numbers = new Integer[]{1,2,3,4};
        Observable<Integer> observable2 = Observable.from(numbers);
        observable.subscribe(observer);
}

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調,RxJava 會自動根據定義建立出Subscriber 。形式以下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

簡單解釋一下這段代碼中出現的 Action1 和 Action0。 Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;因爲 onCompleted() 方法也是無參無返回值的,所以 Action0 能夠被當成一個包裝對象,將 onCompleted() 的內容打包起來將本身做爲一個參數傳入 subscribe() 以實現不完整定義的回調。這樣其實也能夠看作將 onCompleted() 方法做爲參數傳進了subscribe(),至關於其餘某些語言中的『閉包』。 Action1 也是一個接口,它一樣只有一個方法 call(T param),這個方法也無返回值,但有一個參數;與 Action0 同理,因爲 onNext(T obj) 和 onError(Throwable error) 也是單參數無返回值的,所以 Action1能夠將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現不完整定義的回調。事實上,雖然 Action0 和 Action1在 API 中使用最普遍,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2Action3) 的,它們能夠被用以包裝不一樣的無返回值的方法。

注:正如前面所提到的,Observer 和 Subscriber 具備相同的角色,並且 Observer 在 subscribe() 過程當中最終會被轉換成 Subscriber對象,所以,從這裏開始,後面的描述我將用 Subscriber 來代替 Observer ,這樣更加嚴謹。

相關文章
相關標籤/搜索