Android高級開發之【RxJava】詳解(附項目源碼)

文章大綱

1、什麼是RxJava
2、爲何要用RxJava
3、RxJava使用詳解
4、項目源碼下載java

1、什麼是RxJava

Rx(Reactive Extensions)是一個庫,用來處理事件和異步任務,在不少語言上都有實現,RxJava是Rx在Java上的實現。簡單來講,RxJava就是處理異步的一個庫,最基本是基於觀察者模式來實現的。經過Obserable和Observer的機制,實現所謂響應式的編程體驗。react

2、爲何要用RxJava

好比說一個龐大的項目,一個事件傳遞的整個過程可能要經歷不少方法,方法套方法,每一個方法的位置七零八落,一個個方法跳進去看,跳過去跳過來很容易把腦殼弄暈,不夠直觀。可是Rxjava能夠把全部邏輯用鏈式加閉包的方式呈現,作了哪些操做,誰在前誰在後很是直觀,邏輯清晰,維護就會很是輕鬆。就算不是你寫的你也能夠很快的瞭解,你能夠把它看做一條河流,整個過程就是對裏面的水流作進行加工。懂了這個特性咱們才知道在複雜的邏輯中運用Rxjava是多麼的重要。
  假設有這樣一個需求:界面上有一個自定義的視圖 imageCollectorView ,它的做用是顯示多張圖片,並能使用 addImage(Bitmap) 方法來任意增長顯示的圖片。如今須要程序將一個給出的目錄數組 File[] folders 中每一個目錄下的 png 圖片都加載出來並顯示在 imageCollectorView 中。須要注意的是,因爲讀取圖片的這一過程較爲耗時,須要放在後臺執行,而圖片的顯示則必須在 UI 線程執行。經常使用的實現方式有多種,我這裏貼出其中一種:android

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

複製代碼

而若是使用 RxJava ,實現方式是這樣的:面試

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

複製代碼

3、RxJava使用詳解

1. RxJava設計模式

RxJava 的異步實現,是經過一種擴展的觀察者模式來實現的。
  觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,須要在 B 變化的一瞬間作出反應。舉個例子,新聞裏喜聞樂見的警察抓小偷,警察須要在小偷伸手做案的時候實施抓捕。在這個例子裏,警察是觀察者,小偷是被觀察者,警察須要時刻盯着小偷的一舉一動,才能保證不會漏過任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不一樣,觀察者不須要時刻盯着被觀察者(例如 A 不須要每過 2ms 就檢查一次 B 的狀態),而是採用註冊(Register)或者稱爲訂閱(Subscribe)的方式,告訴被觀察者:我須要你的某某狀態,你要在它變化的時候通知我。 Android 開發中一個比較典型的例子是點擊監聽器 OnClickListener 。對設置 OnClickListener 來講, View 是被觀察者, OnClickListener 是觀察者,兩者經過 setOnClickListener() 方法達成訂閱關係。訂閱以後用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發送給已經註冊的 OnClickListener 。採起這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也可以獲得最高的反饋速度。固然,這也得益於咱們能夠隨意定製本身程序中的觀察者和被觀察者,而警察叔叔明顯沒法要求小偷『你在做案的時候務必通知我』。數據庫

OnClickListener 的模式大體以下圖:編程

RxJava 的觀察者模式
  RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。ObservableObserver 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer
  與傳統觀察者模式不一樣, RxJava 的事件回調方法除了普通事件 onNext() (至關於 onClick() / onEvent())以外,還定義了兩個特殊的事件:onCompleted()onError()設計模式

  • onCompleted(): 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted()方法做爲標誌。
  • onError(): 事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。
  • 在一個正確運行的事件序列中, onCompleted()onError() 有且只有一個,而且是事件序列中的最後一個。須要注意的是,onCompleted()onError() 兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。

RxJava 的觀察者模式大體以下圖:數組

開始接入RxJava之間,添加依賴性能優化

dependencies {
  compile 'io.reactivex:rxandroid:1.2.1'
  compile 'io.reactivex:rxjava:1.1.6'
  }

複製代碼

2. 建立RxJava幾種方式

方式1:簡單建立Rxjavabash

/** * 簡單建立Rxjava * * Observable是被觀察者,建立後傳入一個OnSubscribe對象,當Observable(觀察者)調用subscribe進行註冊觀察者時,OnSubscribe的call方法會觸發。 ObservableEmitter: Emitter 是發射器的意思,它能夠發出三種類型的事件,與之對應的。 Observer有三個回調方法: onNext:接受到一個事件 onCompleted:接受完事件後調用,只會調用一次 onError :發生錯誤時調用,並中止接受事件,調用一次 注:onCompleted和onError不會同時調用,只會調用其中之一 */
    public static void createOne() {

        //建立被觀察者
        Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("吳");
                subscriber.onNext("曉暢");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {

                System.out.println("Item: " + s);
            }

            ////事件隊列完結,RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
            @Override
            public void onCompleted() {

                System.out.println("Completed!");
            }

            ////事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。
            @Override
            public void onError(Throwable e) {
                System.out.println("Error!");
            }
        };

        observable.subscribe(subscriber);

    }

複製代碼

運行結果以下所示:

方式2:just(T...): 將傳入的參數依次發送出來

public static void createTwo() {

        //至關於
        // 將會依次調用:
        // onNext("Hello");
        // onNext("Hi");
        // onNext("Aloha");
        // onCompleted();
        Observable observable = Observable.just("Hello", "wu", "xiaochang");

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {

                System.out.println("Item: " + s);
            }

            ////事件隊列完結,RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
            @Override
            public void onCompleted() {

                System.out.println("Completed!");
            }

            ////事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。
            @Override
            public void onError(Throwable e) {
                System.out.println("Error!");
            }
        };

        observable.subscribe(subscriber);

    }

複製代碼

運行結果以下所示:

方式3:將傳入的數組或 Iterable 拆分紅具體對象後,依次發送出來

public static void createThree() {

        String[] words = {"Hello", "wu", "xiaochang"};

        //至關於
        // 將會依次調用:
        // onNext("Hello");
        // onNext("Hi");
        // onNext("Aloha");
        // onCompleted();
        Observable observable = Observable.from(words);

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {

                System.out.println("Item: " + s);
            }

            ////事件隊列完結,RxJava 規定,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。
            @Override
            public void onCompleted() {

                System.out.println("Completed!");
            }

            ////事件隊列異常。在事件處理過程當中出異常時,onError() 會被觸發,同時隊列自動終止,不容許再有事件發出。
            @Override
            public void onError(Throwable e) {
                System.out.println("Error!");
            }
        };

        observable.subscribe(subscriber);
    }

複製代碼

運行結果以下圖所示:

方式4:發送多種類型參數

/** *發送多種類型參數 */
    public static void createFour() {
        //Just相似於From,可是From會將數組或Iterable的元素具取出而後逐個發射,而Just只是簡單的原樣發射,將數組或Iterable當作單個數據。
        //Just接受一至九個參數,返回一個按參數列表順序發射這些數據的Observable
        Observable justObservable = Observable.just(1, "someThing", false, 3.256f, "NewYork");
        justObservable.subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onNext(Object o) {

                System.out.println(o);
            }
        });
    }

複製代碼

運行結果以下所示:

方式5:自定義Subscriber

/** * 自定義Subscriber */
    public static void createFive() {

        Observable observable = Observable.just("Hello", "Hi", "Aloha");

        Action1<String> onNextAction = new Action1<String>() {
            // onNext()
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        };
        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            // onError()
            @Override
            public void call(Throwable throwable) {
                // Error handling
            }
        };
        Action0 onCompletedAction = new Action0() {
            // onCompleted()
            @Override
            public void call() {
                System.out.println("completed");
            }
        };

        // 自動建立 Subscriber ,並使用 onNextAction 來定義 onNext()
                observable.subscribe(onNextAction);
        // 自動建立 Subscriber ,並使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
                observable.subscribe(onNextAction, onErrorAction);
        // 自動建立 Subscriber ,並使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
                observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
    }

複製代碼

運行結果以下圖所示:

3. 建立觀察者方法

建立方式以下:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.d(tag, "Item: " + s);
            }

            @Override
            public void onCompleted() {
                Log.d(tag, "Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(tag, "Error!");
            }
        };

        //建立方式2
        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                Log.d("MainActivity", "Item: " + s);
            }

            @Override
            public void onCompleted() {
                Log.d("MainActivity", "Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("MainActivity", "Error!");
            }
        };

複製代碼

實質上,在 RxJava 的 subscribe 過程當中,Observer 也老是會先被轉換成一個 Subscriber 再使用。因此若是你只想使用基本功能,選擇 Observer 和 Subscriber 是徹底同樣的。它們的區別對於使用者來講主要有兩點:
  onStart(): 這是 Subscriber 增長的方法。它會在 subscribe 剛開始,而事件還未發送以前被調用,能夠用於作一些準備工做,例如數據的清零或重置。這是一個可選方法,默認狀況下它的實現爲空。須要注意的是,若是對準備工做的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,由於它老是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來作準備工做,可使用 doOnSubscribe() 方法,具體能夠在後面的文中看到。
  unsubscribe(): 這是 Subscriber 所實現的另外一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將再也不接收事件。通常在這個方法調用前,可使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,由於在 subscribe() 以後, Observable 會持有 Subscriber 的引用,這個引用若是不能及時被釋放,將有內存泄露的風險。因此最好保持一個原則:要在再也不使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關係,以免內存泄露的發生。

3. 線程Scheduler (調度器)

在不指定線程的狀況下, RxJava 遵循的是線程不變的原則,即:在哪一個線程調用 subscribe(),就在哪一個線程生產事件;在哪一個線程生產事件,就在哪一個線程消費事件。若是須要切換線程,就須要用到 Scheduler (調度器)。
在RxJava 中,Scheduler ——調度器,至關於線程控制器,RxJava 經過它來指定每一段代碼應該運行在什麼樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:
  Schedulers.immediate(): 直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler。
  Schedulers.newThread(): 老是啓用新線程,並在新線程執行操做。
  Schedulers.io(): I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread() 更有效率。不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程。
  Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。
另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操做將在 Android 主線程運行。
  有了這幾個 Scheduler ,就可使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫作事件產生的線程。 * observeOn(): 指定 Subscriber 所運行在的線程。或者叫作事件消費的線程。

public class RxJavaScheduler {

    public static void showScheduler() {
        Observable.just(1, 2, 3, 4)
// .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程

// .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程

                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer number) {

                        System.out.println("number:" + number);
                    }
                });

    }

    public static void main(String[] args) {

        showScheduler();
    }

}

複製代碼

4、項目源碼下載

連接:pan.baidu.com/s/1Na7DH_N2…
密碼:xvr2

Android前沿技術

關於RXJava的所有學習內容,咱們這邊都有系統的知識體系以及進階視頻資料,有須要的朋友能夠加羣免費領取安卓進階視頻教程,源碼,面試資料,羣內有大牛一塊兒交流討論技術;點擊連接加入羣聊【騰訊@Android高級架構】:
(包括自定義控件、NDK、架構設計、混合式開發工程師(React native,Weex)、性能優化、完整商業項目開發等)

Android高級進階視頻教程
相關文章
相關標籤/搜索