RxJava2.0(轉載)

零、來源

來源:Carson_Ho-簡書java

1、基礎知識

角色 做用 類比
被觀察者(Observable) 產生事件 顧客
觀察者(Observer) 接收事件,並給出響應動做 廚房
訂閱(Subscribe) 鏈接 被觀察者 & 觀察者 服務員
事件(Event) 被觀察者 & 觀察者 溝通的載體 菜式


2、基礎使用

1.導入鏈接

implementation 'io.reactivex.rxjava2:rxjava:2.2.19'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

2.建立被觀察者

//建立被觀察者,產生事件
    public Observable<Integer> createObservable() {
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });
        return observable;
    }

3.建立觀察者

//建立觀察者
    public Observer<Integer> createObserver() {
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "onSubscribe 鏈接");
            }

            @Override
            public void onNext(Integer value) {
                Log.v("lanjiabinRx", "onNext " + value + " 事件");
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "onComplete 事件");
            }
        };
        return observer;
    }

4.創建subscribe()鏈接

//觀察者訂閱被觀察者
    public void createSubscribe() {
        createObservable().subscribe(createObserver());
    }

5.調用和結果

createSubscribe();

image.png

6.鏈式調用

//鏈式調用
    public void chainCall() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "onSubscribe 鏈接");
            }

            @Override
            public void onNext(Integer value) {
                Log.v("lanjiabinRx", "onNext " + value + " 事件");
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "onComplete 事件");
            }
        });
    }

6.切斷鏈接

即觀察者 沒法繼續 接收 被觀察者的事件,但被觀察者仍是能夠繼續發送事件react

Disposable mDisposable; //1.定義

@Override
public void onSubscribe(Disposable d) {
       mDisposable=d; //2.賦值
       Log.v("lanjiabinRx", "onSubscribe 鏈接");
}

@Override
public void onNext(Integer value) {
      if (value==2) mDisposable.dispose(); //3.在第二個next事件斷開鏈接
      Log.v("lanjiabinRx", "onNext " + value + " 事件");
}

3、建立操做符

0.總圖


1. create (基礎發送)

最基礎的建立android

//1.create
    public void chainCall() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "onSubscribe 鏈接");
            }

            @Override
            public void onNext(Integer value) {
                Log.v("lanjiabinRx", "onNext " + value + " 事件");
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "onError 事件");
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "onComplete 事件");
            }
        });
    }

2. just (馬上發送10如下)

  • 快速建立1個被觀察者對象(Observable)
  • 發送事件的特色:直接發送傳入的事件
  • 最多隻能發送十個參數
  • 應用場景:快速建立 被觀察者對象(Observable) & 發送10個如下事件
//2.just
    public void justDo() {
        Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:
image.png編程

3. fromArray (數組發送)

  • 快速建立1個被觀察者對象(Observable)
  • 發送事件的特色:直接發送 傳入的數組數據
  • 會將數組中的數據轉換爲Observable對象

應用場景:
1.快速建立 被觀察者對象(Observable) & 發送10個以上事件(數組形式)
2.數組元素遍歷數組

//3.fromArray
    public void fromArrayDo() {
        Integer[] items = {0, 1, 2, 3, 4};
        Observable.fromArray(items).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:ide

image.png

4. fromIterable (集合發送)

  • 快速建立1個被觀察者對象(Observable)
  • 發送事件的特色:直接發送 傳入的集合List數據
  • 會將數組中的數據轉換爲Observable對象

應用場景:
1.快速建立 被觀察者對象(Observable) & 發送10個以上事件(集合形式)
2.集合元素遍歷測試

//4.fromIterable
    public void fromIterableDo(){
        List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        Observable.fromIterable(list).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:線程

image.png

// 下列方法通常用於測試使用

<-- empty()  -->
// 該方法建立的被觀察者對象發送事件的特色:僅發送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即觀察者接收後會直接調用onCompleted()

<-- error()  -->
// 該方法建立的被觀察者對象發送事件的特色:僅發送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接調用onError()

<-- never()  -->
// 該方法建立的被觀察者對象發送事件的特色:不發送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不調用

5. defer (獲取最新數據)

  • 直到有觀察者(Observer )訂閱時,才動態建立被觀察者對象(Observable) & 發送事件
  • 經過 Observable工廠方法建立被觀察者對象(Observable)
  • 每次訂閱後,都會獲得一個剛建立的最新的Observable對象,這能夠確保Observable對象裏的數據是最新的

應用場景:
動態建立被觀察者對象(Observable) & 獲取最新的Observable對象數據3d

//5.defer
    Integer i = 10; //第一次賦值

    public void deferDo() {
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });

        i = 15; //第二次賦值
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:獲得最新賦值的數字,說明取到了最新的數據code

image.png

6. timer (延遲發送)

  • 快速建立1個被觀察者對象(Observable)
  • 發送事件的特色:延遲指定時間後,發送1個數值0(Long類型)
  • 本質 = 延遲指定時間後,調用一次 onNext(0)

應用場景:
延遲指定事件,發送一個0,通常用於檢測

//6.timer
    public void timerDo() {
        // 注:timer操做符默認運行在一個新線程上
        // 也可自定義線程調度器(第3個參數):timer(long,TimeUnit,Scheduler)
        //TimeUnit.SECONDS延遲2s後,發送一個0
     
        /**
         * timer(long delay, TimeUnit unit)
         * delay 數值
         * unit 單位
         * 下面就是 2數值,單位爲秒,因此是2秒
         * */
        Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Long aLong) {
                /*
                * 獲得的結果爲0 通常用於檢測
                * */
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

7. interval (週期發送,無限)

  • 快速建立1個被觀察者對象(Observable)
  • 發送事件的特色:每隔指定時間 就發送 事件
  • 發送的事件序列 = 從0開始、無限遞增1的的整數序列
//7.interval
    public void intervalDo() {
        /**
         * 從0開始遞增
         *
         * @param initialDelay (Long)
         *          初始延遲時間(第一次延遲時間)
         * @param period (Long)
         *          後續數字發射之間的時間間隔(一個週期時間)
         * @param unit
         *          時間單位
         * */
        Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Long aLong) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

8. intervalRange (週期發送,有限,指定數據)

  • 做用相似於interval(),但可指定發送的數據的數量
//8.intervalRange
    public void intervalRangeDo() {
        /**
         *
         * @param start 起始值
         * @param count 總共要發送的值的數量,若是爲零,則運算符將在初始延遲後發出onComplete
         * @param initialDelay 發出第一個值(開始)以前的初始延遲
         * @param period 後續值之間的時間段
         * @param unit 時間單位
         * */
        Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Long aLong) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:3-12 經歷10個數

image.png

9. range (無延遲,Integer類型指定數據)

  • 做用相似於intervalRange(),但區別在於:無延遲發送事件
//9.range
    public void rangeDo(){
        /**
         * @param start
         *            序列中第一個Integer的值
         * @param count
         *           要生成的順序整數的數量
         * */
        Observable.range(3,5).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Integer integer) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

10. rangeLong (無延遲,Long類型指定數據)

//10.rangeLong
    public void rangeLongDo(){
        /**
         * @param start
         *            Long類型,序列中第一個Integer的值
         * @param count
         *           Long類型,要生成的順序整數的數量
         * */
        Observable.rangeLong(3,8).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.v("lanjiabinRx", "開始採用subscribe鏈接");
            }

            @Override
            public void onNext(Long aLong) {
                Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.v("lanjiabinRx", "接受的事件 onComplete");
            }
        });
    }

結果:

image.png

編程中咱們會遇到多少挫折?表放棄,沙漠盡頭必是綠洲。

相關文章
相關標籤/搜索