目錄html
需求瞭解:java
在使用 Rxjava 開發中,常常有一些各類條件的操做 ,如比較兩個 Observable 誰先發射了數據、跳過指定條件的 Observable 等一系列的條件操做需求,那麼很幸運, Rxjava 中已經有了不少條件操做符,一塊兒來了解一下吧。react
下面列出了一些Rxjava
的用於條件操做符:git
Amb
:給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的全部數據。DefaultIfEmpty
:發射來自原始Observable的值,若是原始 Observable 沒有發射任何數據項,就發射一個默認值。SwitchIfEmpty
:若是原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。SkipUntil
:丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,而後發射原始 Observable 的剩餘數據。SkipWhile
:丟棄原始 Observable 發射的數據,直到一個特定的條件爲假,而後發射原始 Observable 剩餘的數據。TakeUntil
:發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。給定兩個或多個Observables,它只發射首先發射數據或通知的那個Observable的全部數據。github
解析: 對多個Observable進行監聽,首先發射通知(包括數據)的Observable將會被觀察者觀察,發射這個Observable的全部數據。數組
示例代碼:ide
// 建立Observable Observable<Integer> delayObservable = Observable.range(1, 5) .delay(100, TimeUnit.MILLISECONDS); // 延遲100毫秒發射數據 Observable<Integer> rangeObservable = Observable.range(6, 5); // 建立Observable的集合 ArrayList<Observable<Integer>> list = new ArrayList<>(); list.add(delayObservable); list.add(rangeObservable); // 建立Observable的數組 Observable<Integer>[] array = new Observable[2]; array[0] = delayObservable; array[1] = rangeObservable; /** * 1. ambWith(ObservableSource<? extends T> other) * 與另一個Observable比較,只發射首先發射通知的Observable的數據 */ rangeObservable.ambWith(delayObservable) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.in.read(); System.out.println("------------------------------------------------"); /** * 2. amb(Iterable<? extends ObservableSource<? extends T>> sources) * 接受一個Observable類型的集合, 只發射集合中首先發射通知的Observable的數據 */ Observable.amb(list) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(2): " + integer); } }); System.in.read(); System.out.println("------------------------------------------------"); /** * 3. ambArray(ObservableSource<? extends T>... sources) * 接受一個Observable類型的數組, 只發射數組中首先發射通知的Observable的數據 */ Observable.ambArray(array) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(3): " + integer); } }); System.in.read();
輸出:函數
--> accept(1): 6 --> accept(1): 7 --> accept(1): 8 --> accept(1): 9 --> accept(1): 10 ------------------------------------------------ --> accept(2): 6 --> accept(2): 7 --> accept(2): 8 --> accept(2): 9 --> accept(2): 10 ------------------------------------------------ --> accept(3): 6 --> accept(3): 7 --> accept(3): 8 --> accept(3): 9 --> accept(3): 10
Javadoc: ambWith(ObservableSource other)
Javadoc: amb(Iterable sources)
Javadoc: ambArray(ObservableSource... sources).net
發射來自原始Observable的值,若是原始 Observable 沒有發射數據項,就發射一個默認值。
3d
解析: DefaultIfEmpty
簡單的精確地發射原始Observable的值,若是原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式), DefaultIfEmpty
返回的Observable就發射一個你提供的默認值。若是你須要發射更多的數據,或者切換備用的Observable,你能夠考慮使用 switchIfEmpty
操做符 。
示例代碼:
/** * defaultIfEmpty(@NotNull T defaultItem) * 若是原始Observable沒有發射任何數據正常終止(以 onCompleted 的形式), * DefaultIfEmpty 返回的Observable就發射一個你提供的默認值defaultItem。 */ Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onComplete(); // 不發射任何數據,直接發射完成通知 } }).defaultIfEmpty("No Data emitter!!!") .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(String s) { System.out.println("--> onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
輸出:
--> onSubscribe --> onNext: No Data emitter!!! --> onComplete
Javadoc: defaultIfEmpty(T defaultItem)
若是原始Observable沒有發射數據時,發射切換一個指定的Observable繼續發射數據。
解析: 若是原始 Observable 沒有發射數據時,發射切換指定的 other
繼續發射數據。
示例代碼:
/** * switchIfEmpty(ObservableSource other) * 若是原始Observable沒有發射數據時,發射切換指定的other繼續發射數據 */ Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); // 不發射任何數據,直接發射完成通知 } }).switchIfEmpty(Observable.just(888)) // 若是原始Observable沒有發射數據項,默認發射備用的Observable,發射數據項888 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext: " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
輸出:
--> onSubscribe --> onNext: 888 --> onComplete
丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,而後發射原始 Observable 的剩餘數據。
示例代碼:
/** * skipUntil(ObservableSource other) * 丟棄原始Observable發射的數據,直到other發射了一個數據,而後發射原始Observable的剩餘數據。 */ Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS) // 丟棄2000毫秒的原始Observable發射的數據,接受後面的剩餘部分數據 .skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS)) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext: " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); System.in.read();
輸出:
--> onSubscribe --> onNext: 5 --> onNext: 6 --> onNext: 7 --> onNext: 8 --> onNext: 9 --> onNext: 10 --> onComplete
Javadoc: skipUntil(ObservableSource other)
丟棄原始 Observable 發射的數據,直到一個特定的條件爲假,而後發射原始 Observable 剩餘的數據。
示例代碼:
/** * skipWhile(Predicate<? super T> predicate) * 丟棄原始 Observable 發射的數據,直到函數predicate的條件爲假,而後發射原始Observable剩餘的數據。 */ Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS) .skipWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { if (aLong > 5) { return false; // 當原始數據大於5時,發射後面的剩餘部分數據 } return true; // 丟棄原始數據項 } }).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext: " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); System.in.read();
輸出:
--> onSubscribe --> onNext: 6 --> onNext: 7 --> onNext: 8 --> onNext: 9 --> onNext: 10 --> onComplete
Javadoc: skipWhile(Predicate predicate)
發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知。
TakeUntil
訂閱並開始發射原始 Observable,它還監視你提供的第二個 Observable。若是第二個 Observable 發射了一項數據或者發射了一個終止通知,TakeUntil
返回的 Observable 會中止發射原始 Observable 並終止。
解析: 第二個Observable發射一項數據或一個 onError
通知或一個 onCompleted
通知都會致使 takeUntil
中止發射數據。
示例代碼:
// 建立Observable,發送數字1~10,每間隔200毫秒發射一個數據 Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS); /** * 1. takeUntil(ObservableSource other) * 發射來自原始Observable的數據,直到other發射了一個數據或一個通知後中止發射原始Observable並終止。 */ observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒後中止發射原始數據 .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(1)"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext(1): " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError(1): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(1)"); } }); System.in.read();
輸出:
--> onSubscribe(1) --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 3 --> onNext(1): 4 --> onNext(1): 5 --> onComplete(1)
Javadoc: takeUntil(ObservableSource other)
每次發射數據後,經過一個謂詞函數來斷定是否須要終止發射數據。
解析: 每次發射數據後,經過一個謂詞函數 stopPredicate
來斷定是否須要終止發射數據,若是 stopPredicate
返回 true
怎表示中止發射原始Observable後面的數據,不然繼續發射後面的數據。
示例代碼:
/** * 2. takeUntil(Predicate<? super T> stopPredicate) * 每次發射數據後,經過一個謂詞函數stopPredicate來斷定是否須要終止發射數據 * 若是stopPredicate返回true怎表示中止發射後面的數據,不然繼續發射後面的數據 */ observable.takeUntil(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { // 函數返回false則爲繼續發射原始數據,true則中止發射原始數據 if(aLong > 5){ return true; // 知足條件後,中止發射數據 } return false; // 繼續發射數據 } }).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext(2): " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError(2): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(2)"); } }); System.in.read();
輸出:
--> onSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 --> onNext(2): 3 --> onNext(2): 4 --> onNext(2): 5 --> onNext(2): 6 --> onComplete(2)
Javadoc: takeUntil(Predicate stopPredicate)
發射原始Observable的數據,直到一個特定的條件,而後跳過剩餘的數據。
解析: 發射原始 Observable 的數據,直到 predicate
的條件爲 false
,而後跳過剩餘的數據。
示例代碼:
// 建立Observable,發送數字1~10,每間隔200毫秒發射一個數據 Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS); /** * takeWhile(Predicate predicate) * 發射原始Observable的數據,直到predicate的條件爲false,而後跳過剩餘的數據 */ observable.takeWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { // 函數返回值決定是否繼續發射後續的數據 if(aLong > 5){ return false; // 知足條件後跳事後面的數據 } return true; // 繼續發射數據 } }).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext: " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); System.in.read();
輸出:
--> onSubscribe(1) --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 3 --> onNext(1): 4 --> onNext(1): 5 --> onComplete(1)
Javadoc: takeWhile(Predicate predicate)
本節主要介紹了Rxjava
條件操做符能夠根據不一樣的條件進行數據的發射,變換等相關行爲。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: