博客主頁java
RxJava 的條件操做符主要包括如下幾個:segmentfault
RxJava 的布爾操做符主要包括:ide
斷定 Observable 發射的全部數據是否都知足某個條件
函數
傳遞一個謂詞函數給 all 操做符,這個函數接受原始 Observable 發射的數據,根據計算返回一個布爾值。 all 返回一個只發射單個布爾值的 Observable,若是原始 Observable 正常終止而且每一項數據都知足條件,就返回 true。若是原始 Observabl 的任意一項數據不知足條件,就返回falseui
Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 10; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 執行結果 Success: true
判斷 Observable 發射的全部數據是否都大於 3spa
Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 3; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 執行結果 Success: false
斷定一個 Observable 是否發射了一個特定的值
code
給 contains 傳一個指定的值,若是原始 Observable 發射了那個值,那麼返回的 Observable 將發射 true,不然發射 false 。與它相關的一個操做符是 isEmpty ,用於斷定原始 Observable 是否未發射任何數據。對象
Observable.just(2, 30, 22, 5, 60, 1) .contains(22) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 執行結果 Success: true
給定兩個或多個 Observable ,它只發射首先發射數據或通知的那個 Observable 的全部數據
blog
當傳遞多個 Observable 給 amb 時,它只發射其中一個 Observable 數據和通知: 首先發送通知給 amb 的那個 Observable ,無論發射的是一項數據 ,仍是一個 onError 或 onCompleted 通知。 amb 忽略和丟棄其餘全部 Observables 的發射物。ip
在 RxJava 中, amb 還有一個相似的操做符 ambWith。 例如, Observable.amb(ol, o2 )和
ol.ambWith(o2)是等價的
在 RxJava 2.x 中, amb 須要傳遞 Iterable 對象,或者使用 ambArray 來傳遞可變參數。
Observable.ambArray( Observable.just(1, 2, 3), Observable.just(4, 5, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 執行結果 Next: 1 Next: 2 Next: 3
修改一下代碼,第一個 Observable 延遲 ls 後再發射數據
Observable.ambArray( Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS), Observable.just(4, 5, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 執行結果 Next: 4 Next: 5 Next: 6
因爲第一個 Observable 延遲發射,所以咱們只消費了第二個 Observable 的數據,第一個 Observable 發射的數據就再也不處理了。
發射來自原始 Observable 值,若是原始 Observable 沒有發射任何值,就發射一個默認值
defaultIfEmpty 簡單精確地發射原始 Observable 的值,若是原始 Observable 沒有發射任何數據,就正常終止(以 onComplete 形式了),那麼 defaultlfEmpty 返回的 Observable 就發射一個咱們提供的默認值。
Observable.empty() .defaultIfEmpty(8) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } }); // 執行結果 Next: 8
在 defaultIfEmpty 方法內部,其實調用的是 switchIfEmpty 操做符,源碼以下:
public final Observable<T> defaultIfEmpty(T defaultItem) { ObjectHelper.requireNonNull(defaultItem, "defaultItem is null"); return switchIfEmpty(just(defaultItem)); }
defaultIfEmpty 和 switchIfEmpty 的區別是, defaultIfEmpty 操做符只能在被觀察者不發送數據時發送一個默認的數據 ,若是想要發送更多數據,則可使用 switchIfEmpty 操做符,發送自定義的被觀察者做爲替代。
Observable.empty() .switchIfEmpty(Observable.just(1, 2, 3)) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } }); // 執行結果 Next: 1 Next: 2 Next: 3
斷定兩個 Observable 是否發射相同的數據序列
傳遞兩個 Observable 給 sequenceEqual 操做符時,它會比較兩個 Observable 發射物,若是兩個序列相同(相同的數據,相同的順序,相同的終止狀態〉 ,則發射 true 不然發射 false
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5) ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 執行結果 Success: true
將兩個 Observable 改爲不一致
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5, 6) ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 執行結果 Success: false
sequenceEqual 還有一個版本接受第三個參數,能夠傳遞一個函數用於比較兩個數據項是否相同。對於複雜對象的比較,用三個參數的版本更爲合適。
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5), new BiPredicate<Integer, Integer>() { @Override public boolean test(Integer integer, Integer integer2) throws Exception { return integer == integer2; } } ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 執行結果 Success: true
丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一項數據
skipUntil 訂閱原始的 Observable,可是忽略它的發射物,直到第二個 Observable 發射一項數據那一刻,它纔開始發射原始的 Observabl。 skipUntil 默認不在任何特定的調度器上執行。
Observable.intervalRange(1, 9, 0, 1, TimeUnit.MILLISECONDS) .skipUntil(Observable.timer(4, TimeUnit.MILLISECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next: " + aLong); } }); // 執行結果 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Next: 9
上述代碼,原始的 Observable 發射 1 到 9 這 9 個數 ,初始延遲時間是 0,每間隔 lms。因爲使用 skipUntil,所以它會發射原始 Observable 在 3ms 以後的數據。
丟棄 Observable 發射的數據,直到一個指定的條件不成立。
skipWhile 訂閱原始的 Observable ,可是忽略它的發射物,直到指定的某個條件變爲 false。它纔開始發射原始的 Observable。skipWhile 默認不在任何特定的調度器上執行
Observable.just(1, 2, 3, 4, 5) .skipWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer <= 2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 執行結果 Next: 3 Next: 4 Next: 5
當第二個 Observable 發射了一項數據或者終止時,丟棄原始 Observable 發射的任何數據
takeUntil 訂閱並開始發射原始 Observable ,它還監視你提供的第二個 Observable。若是第二個 Observable 發射了一項數據或者發射了一個終止通知,則 takeUntil 返回的 Observable 會中止發射原始 Observable 並終止。
Observable.just(1, 2, 3, 4, 5, 6) .takeUntil(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer == 4; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 執行結果 Next: 1 Next: 2 Next: 3 Next: 4
發射原始 Observable 發射的數據,直到一個指定的條件不成立
takeWhile 發射原始的 Observable 直到某個指定的條件不成立,它會當即中止發射原始 Observable,並終止本身的 Observable。
RxJava 中的 takeWhile 操做符返回一個原始 Observable 行爲的 Observable,直到某項數據,指定的函數返回 false ,這個新的 Observable 就會發射 onComplete 終止通知
Observable.just(1, 2, 3, 4, 5, 6) .takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer <= 2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next: 1 Next: 2 Complete.
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)