Android 入門(十三)Rxjava

要求:會使用 rxjava 進行平常開發,複雜功能能夠經過搜索和查閱官方文檔解決便可java

基礎用法

想要用 RxJava 必需要在 build.gradle 內加入依賴react

implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
複製代碼

使用步驟就是:1. 建立被觀察者,2. 建立觀察者,3. 訂閱android

// 建立被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "subscribe: ");
        emitter.onNext("1");
        emitter.onNext("2");
        emitter.onNext("3");
        emitter.onComplete();
    }
})
複製代碼

ObservableEmitter emitter 對象是發射器的意思,有三種發射的方法 void onNext(T value)、void onError(Throwable error)、void onComplete(),onNext 方法能夠無限調用,Observer(觀察者)全部的都能接收到,onError和onComplete是互斥的,Observer(觀察者)只能接收到一個,OnComplete 能夠重複調用,可是Observer(觀察者)只會接收一次,而 onError 不能夠重複調用,第二次調用就會報異常。數據庫

// 建立觀察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe: ");
    }
    @Override
    public void onNext(String s) {
        Log.d(TAG, "onNext: " + s);
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: ");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: ");
    }
};
複製代碼

onNext、onError、onComplete 都是跟被觀察者發射的方法一一對應的,這裏就至關於接收了。須要特別說明的是 onSubscribe 方法中的 Disposable 參數,它只有兩個方法 dispose() 和 isDisposed(),前者是取消對 Observable(被觀察者)的訂閱,後面很明顯就是查看訂閱的狀態。編程

// 訂閱
observable.subscribe(observer);
複製代碼

爲了保證鏈式編程,這裏的邏輯好像是 Observable 訂閱了 Observer。其實我認爲還有其餘的緣由,熟悉「觀察者模式」的應該知道「具體被觀察者」會持有一個容器用來存儲觀察者,這樣才能實現數據更新以後通知全部的觀察者。bash

異步

若是認爲 RxJava 只是實現了一個「觀察者模式」那就大錯特錯,其實 RxJava 主要是給咱們提供了一個異步編程的工具。網絡

在介紹異步以前,咱們先看一下 Observable.just() 方法,這個方法最多能夠接受 10 個參數,而且對這些參數依次調用 onNext 方法,執行完 onNext 以後還會調用 onComplete。app

public static <T> Observable<T> just(T item1, T item2)
複製代碼

其實 RxJava 的異步使用也很簡單,在訂閱以前給 Observable 加上 subscribeOn(Scheduler scheduler) 和 observeOn(Scheduler scheduler) 描述,subscribeOn() 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程,或者叫作事件產生的線程。observeOn() 指定 Observer 所運行的線程,或者叫事件消費的線程。異步

  • Schedulers.immediate(): 直接在當前線程運行,至關於不指定線程。這是默認的 Scheduler。
  • Schedulers.newThread(): 老是啓用新線程,並在新線程執行操做。
  • Schedulers.io(): I/O 操做(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行爲模式和 newThread() 差很少,區別在於 io() 的內部實現是是用一個無數量上限的線程池,能夠重用空閒的線程,所以多數狀況下 io() 比 newThread() 更有效率。不要把計算工做放在 io() 中,能夠避免建立沒必要要的線程。
  • Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操做限制性能的操做,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小爲 CPU 核數。不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。
  • 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操做將在 Android 主線程運行。

因此簡單的使用以下:ide

Observable.just(11, 22, 33)
        .subscribeOn(Schedulers.io())   // 指定 subscribe() 發生在 IO 線程
        .observeOn(AndroidSchedulers.mainThread())  // 指定 Observer 的回調發生在主線程
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
複製代碼

因爲 subscribeOn(Schedulers.io()) 的指定,被建立的事件內容 11,22,33 將會在 IO 線程發出。而因爲 observeOn(AndroidScheculers.mainThread()) 的指定,所以 Observer 數字的打印將發生在主線程 。

這種策略很是適合大多數的「後臺線程處理數據,主線程顯示結果」的狀況。

變換

所謂變換,就是將事件序列中的對象或者整個序列進行加工處理,轉換成不一樣的事件或者事件序列。

map() 變換 將對象集合轉化成對象

Observable.just(1, 2, 3)
        .map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "I'm " + integer;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
複製代碼

在這裏面最重要的是 Function 接口,它就一個方法,就是將構造函數中的第一個參數轉爲第二個參數進行返回。

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}
複製代碼

flatMap() 變換

private void flatMap() {
        Log.d(TAG, "flatMap: =============");
        List<Person> personList = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            List<Plan> plans = new ArrayList<>();
            // 計劃一
            Plan plan1 = new Plan("8:30", "上班");
            // 填充準備作的事情
            List<String> actions1 = new ArrayList<>();
            actions1.add("1打開電腦");
            actions1.add("1打開AS");
            actions1.add("1打開WX");
            plan1.setActionList(actions1);

            // 計劃二
            Plan plan2 = new Plan("12:00", "吃飯");
            // 填充準備作的事情
            List<String> actions2 = new ArrayList<>();
            actions2.add("2下樓");
            actions2.add("2買飯");
            actions2.add("2開吃");
            plan2.setActionList(actions2);

            // 計劃三
            Plan plan3 = new Plan("18:00", "下班");
            // 填充準備作的事情
            List<String> actions3 = new ArrayList<>();
            actions3.add("3關閉WX");
            actions3.add("3關閉AS");
            actions3.add("3關閉電腦");
            plan3.setActionList(actions3);

            plans.add(plan1);
            plans.add(plan2);
            plans.add(plan3);

            Person person = new Person("tom" + i, plans);
            personList.add(person);
        }

        Observable.fromIterable(personList)
                .flatMap(new Function<Person, ObservableSource<Plan>>() {
                    @Override
                    public ObservableSource<Plan> apply(Person person) throws Exception {
                        if ("tom1".equals(person.getName())) {
                            return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.SECONDS);
                        }
                        return Observable.fromIterable(person.getPlanList());
                    }
                })
                .flatMap(new Function<Plan, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Plan plan) throws Exception {
                        return Observable.fromIterable(plan.getActionList());
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: " + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }
複製代碼

參考

給 Android 開發者的 RxJava 詳解

RxJava2 只看這一篇文章就夠了

相關文章
相關標籤/搜索