Rxjava2 Observable的條件操做符詳解及實例

簡要:

需求瞭解:java

在使用 Rxjava 開發中,常常有一些各類條件的操做 ,如比較兩個 Observable 誰先發射了數據、跳過指定條件的 Observable 等一系列的條件操做需求,那麼很幸運, Rxjava 中已經有了不少條件操做符,一塊兒來了解一下吧。react

下面列出了一些Rxjava的用於條件操做符:git

  • Amb:給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的全部數據。
  • DefaultIfEmpty:發射來自原始Observable的值,若是原始 Observable 沒有發射任何數據項,就發射一個默認值。
  • SwitchIfEmpty:若是原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。
  • SkipUntil:丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,而後發射原始 Observable 的剩餘數據。
  • SkipWhile:丟棄原始 Observable 發射的數據,直到一個特定的條件爲假,而後發射原始 Observable 剩餘的數據。
  • TakeUntil:發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。

1. Amb

給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的全部數據。github

img-Amb

解析: 對多個Observable進行監聽,首先發射通知(包括數據)的Observable將會被觀察者觀察,發射這個Observable的全部數據。數組

示例代碼:ide

// 建立Observable
   Observable<Integer> delayObservable = Observable.range(1, 5)
                                                                                    .delay(100, TimeUnit.MILLISECONDS); // 延遲100毫秒發射數據
    Observable<Integer> rangeObservable = Observable.range(6, 5);

    // 建立Observable的集合
    ArrayList<Observable<Integer>> list = new ArrayList<>();
    list.add(delayObservable);
    list.add(rangeObservable);

    // 建立Observable的數組
    Observable<Integer>[] array = new Observable[2];
    array[0] = delayObservable;
    array[1] = rangeObservable;

    /**
     *  1. ambWith(ObservableSource<? extends T> other)
     *  與另一個Observable比較,只發射首先發射通知的Observable的數據
     */
    rangeObservable.ambWith(delayObservable)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("------------------------------------------------");
    /**
     *  2. amb(Iterable<? extends ObservableSource<? extends T>> sources)
     *  接受一個Observable類型的集合, 只發射集合中首先發射通知的Observable的數據
     */
    Observable.amb(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.in.read();
    System.out.println("------------------------------------------------");
    /**
     *  3. ambArray(ObservableSource<? extends T>... sources)
     *  接受一個Observable類型的數組, 只發射數組中首先發射通知的Observable的數據
     */
    Observable.ambArray(array)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.in.read();

輸出:函數

--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
------------------------------------------------
--> accept(2): 6
--> accept(2): 7
--> accept(2): 8
--> accept(2): 9
--> accept(2): 10
------------------------------------------------
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10

Javadoc: ambWith(ObservableSource other)
Javadoc: amb(Iterable sources)
Javadoc: ambArray(ObservableSource... sources).net

2. DefaultIfEmpty

發射來自原始Observable的值,若是原始 Observable 沒有發射數據項,就發射一個默認值。
img-DefaultIfEmpty3d

解析: DefaultIfEmpty 簡單的精確地發射原始Observable的值,若是原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式), DefaultIfEmpty 返回的Observable就發射一個你提供的默認值。若是你須要發射更多的數據,或者切換備用的Observable,你能夠考慮使用 switchIfEmpty 操做符 。

示例代碼:

/**
     *   defaultIfEmpty(@NotNull T defaultItem)
     *  若是原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式),
     *  DefaultIfEmpty 返回的Observable就發射一個你提供的默認值defaultItem。
     */
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onComplete();   // 不發射任何數據,直接發射完成通知
        }
    }).defaultIfEmpty("No Data emitter!!!")
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(String s) {
                    System.out.println("--> onNext: " + s);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

輸出:

--> onSubscribe
--> onNext: No Data emitter!!!
--> onComplete

Javadoc: defaultIfEmpty(T defaultItem)

3. SwitchIfEmpty

若是原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。

img-SwitchIfEmpty

解析: 若是原始 Observable 沒有發射數據時,發射切換指定的 other 繼續發射數據。

示例代碼:

/**
     *  switchIfEmpty(ObservableSource other)
     *  若是原始Observable沒有發射數據時,發射切換指定的other繼續發射數據
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();           // 不發射任何數據,直接發射完成通知
        }
    }).switchIfEmpty(Observable.just(888))  // 若是原始Observable沒有發射數據項,默認發射備用的Observable,發射數據項888
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext: " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

輸出:

--> onSubscribe
--> onNext: 888
--> onComplete

Javadoc: switchIfEmpty(ObservableSource other)

4. SkipUntil

丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,而後發射原始 Observable 的剩餘數據。

img-SkipUntil
示例代碼:

/**
     *  skipUntil(ObservableSource other)
     *  丟棄原始Observable發射的數據,直到other發射了一個數據,而後發射原始Observable的剩餘數據。
     */
    Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
            // 丟棄2000毫秒的原始Observable發射的數據,接受後面的剩餘部分數據
            .skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS))
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext: " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete");
                }
            });

    System.in.read();

輸出:

--> onSubscribe
--> onNext: 5
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete

Javadoc: skipUntil(ObservableSource other)

5. SkipWhile

丟棄原始 Observable 發射的數據,直到一個特定的條件爲假,而後發射原始 Observable 剩餘的數據。

img-SkipWhile

示例代碼:

/**
         *  skipWhile(Predicate<? super T> predicate)
         *  丟棄原始 Observable 發射的數據,直到函數predicate的條件爲假,而後發射原始Observable剩餘的數據。
         */
        Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS)
                .skipWhile(new Predicate<Long>() {
                    @Override
                    public boolean test(Long aLong) throws Exception {
                        if (aLong > 5) {
                            return false;       // 當原始數據大於5時,發射後面的剩餘部分數據
                        }
                        return true;            // 丟棄原始數據項
                    }
                }).subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("--> onNext: " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("--> onError: " + e);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("--> onComplete");
                    }
                });

        System.in.read();

輸出:

--> onSubscribe
--> onNext: 6
--> onNext: 7
--> onNext: 8
--> onNext: 9
--> onNext: 10
--> onComplete

Javadoc: skipWhile(Predicate predicate)

6. TakeUntil

發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。

img-takeUntil

6.1 takeUntil(ObservableSource other)

TakeUntil 訂閱並開始發射原始 Observable,它還監視你提供的第二個 Observable。若是第二個 Observable 發射了一項數據或者發射了一個終止通知,TakeUntil 返回的 Observable 會中止發射原始 Observable 並終止。

img-takeUntil-other

解析: 第二個Observable發射一項數據或一個 onError 通知或一個 onCompleted 通知都會致使 takeUntil 中止發射數據。

示例代碼:

// 建立Observable,發送數字1~10,每間隔200毫秒發射一個數據
    Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

    /**
     *  1. takeUntil(ObservableSource other)
     *  發射來自原始Observable的數據,直到other發射了一個數據或一個通知後中止發射原始Observable並終止。
     */
    observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒後中止發射原始數據
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Long aLong) {
                    System.out.println("--> onNext(1): " + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(1)");
                }
            });

    System.in.read();

輸出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)

Javadoc: takeUntil(ObservableSource other)

6.2 takeUntil(Predicate stopPredicate)

每次發射數據後,經過一個謂詞函數來斷定是否須要終止發射數據。

img-takeUntil-predicate

解析: 每次發射數據後,經過一個謂詞函數 stopPredicate 來斷定是否須要終止發射數據,若是 stopPredicate 返回 true 怎表示中止發射原始Observable後面的數據,不然繼續發射後面的數據。

示例代碼:

/**
     *  2. takeUntil(Predicate<? super T> stopPredicate)
     *  每次發射數據後,經過一個謂詞函數stopPredicate來斷定是否須要終止發射數據
     *  若是stopPredicate返回true怎表示中止發射後面的數據,不然繼續發射後面的數據
     */
    observable.takeUntil(new Predicate<Long>() {
        @Override
        public boolean test(Long aLong) throws Exception {  // 函數返回false則爲繼續發射原始數據,true則中止發射原始數據
            if(aLong > 5){
                return true;      // 知足條件後,中止發射數據
            }
            return false;         // 繼續發射數據
        }
    }).subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Long aLong) {
            System.out.println("--> onNext(2): " + aLong);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(2)");
        }
    });

    System.in.read();

輸出:

--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> onNext(2): 3
--> onNext(2): 4
--> onNext(2): 5
--> onNext(2): 6
--> onComplete(2)

Javadoc: takeUntil(Predicate stopPredicate)

7. TakeWhile

發射原始Observable的數據,直到一個特定的條件,而後跳過剩餘的數據。

img-TakeWhile

解析: 發射原始 Observable 的數據,直到 predicate 的條件爲 false ,而後跳過剩餘的數據。

示例代碼:

// 建立Observable,發送數字1~10,每間隔200毫秒發射一個數據
        Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS);

        /**
         *  takeWhile(Predicate predicate)
         *  發射原始Observable的數據,直到predicate的條件爲false,而後跳過剩餘的數據
         */
        observable.takeWhile(new Predicate<Long>() {
            @Override
            public boolean test(Long aLong) throws Exception {  // 函數返回值決定是否繼續發射後續的數據
                if(aLong > 5){
                    return false;        // 知足條件後跳事後面的數據
                }
                return true;             // 繼續發射數據
            }
        }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe");
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("--> onNext: " + aLong);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete");
            }
        });

        System.in.read();

輸出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
--> onNext(1): 4
--> onNext(1): 5
--> onComplete(1)

Javadoc: takeWhile(Predicate predicate)

小結

本節主要介紹了Rxjava條件操做符能夠根據不一樣的條件進行數據的發射,變換等相關行爲。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

相關文章
相關標籤/搜索