Rxjava(七):條件操做符和布爾操做符

博客主頁java

RxJava 的條件操做符主要包括如下幾個:segmentfault

  • amb :給定多個 Observable ,只讓第一個發射數據的 Observable 發射所有數據
  • defaultlfEmpty :發射來自原始 Observable 的數據,若是原始 Observable 沒有發射數據,則發射一個默認數據
  • skipUntil :丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一個數據,而後發射原始 Observable 的剩餘數據
  • skipWhile :丟棄原始 Observable 發射的數據,直到一個特定的條件爲假,而後發射原始 Observable 剩餘的數據
  • takeUntil :發射來自原始 Observable 的數據,直到第二個 Observable 發射了一個數據或一個通知
  • takeWhile and takeWhileWithIndex:發射原始 Observable 的數據,直到一個特定的條件爲真,而後跳過剩餘的數據

RxJava 的布爾操做符主要包括:ide

  • all :判斷是否全部的數據項都知足某個條件
  • contains :判斷 Observable 是否會發射一個指定的值
  • exists and isEmpty :判斷 Observable 是否發射了一個值
  • sequenceEqual :判斷兩個 Observables 發射的序列是否相等

1. all、contains 和 amb

1.1 all 操做符

斷定 Observable 發射的全部數據是否都知足某個條件
函數

傳遞一個謂詞函數給 all 操做符,這個函數接受原始 Observable 發射的數據,根據計算返回一個布爾值。 all 返回一個只發射單個布爾值的 Observable,若是原始 Observable 正常終止而且每一項數據都知足條件,就返回 true。若是原始 Observabl 的任意一項數據不知足條件,就返回falseui

Observable.just(1, 2, 3, 4, 5)
        .all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 10;
            }
        }).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
});

// 執行結果
 Success: true

判斷 Observable 發射的全部數據是否都大於 3spa

Observable.just(1, 2, 3, 4, 5)
        .all(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer > 3;
            }
        }).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
});

// 執行結果
 Success: false

1.2 contains 操做符

斷定一個 Observable 是否發射了一個特定的值
code

給 contains 傳一個指定的值,若是原始 Observable 發射了那個值,那麼返回的 Observable 將發射 true,不然發射 false 。與它相關的一個操做符是 isEmpty ,用於斷定原始 Observable 是否未發射任何數據。對象

Observable.just(2, 30, 22, 5, 60, 1)
        .contains(22)
        .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG, "Success: " + aBoolean);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "Error: " + throwable);
            }
        });

// 執行結果
 Success: true

1.3 amb 操做符

給定兩個或多個 Observable ,它只發射首先發射數據或通知的那個 Observable 的全部數據
blog

當傳遞多個 Observable 給 amb 時,它只發射其中一個 Observable 數據和通知: 首先發送通知給 amb 的那個 Observable ,無論發射的是一項數據 ,仍是一個 onError 或 onCompleted 通知。 amb 忽略和丟棄其餘全部 Observables 的發射物。ip

在 RxJava 中, amb 還有一個相似的操做符 ambWith。 例如, Observable.amb(ol, o2 )和
ol.ambWith(o2)是等價的

在 RxJava 2.x 中, amb 須要傳遞 Iterable 對象,或者使用 ambArray 來傳遞可變參數。

Observable.ambArray(
        Observable.just(1, 2, 3),
        Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 執行結果
 Next: 1
 Next: 2
 Next: 3

修改一下代碼,第一個 Observable 延遲 ls 後再發射數據

Observable.ambArray(
        Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS),
        Observable.just(4, 5, 6)
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 執行結果
 Next: 4
 Next: 5
 Next: 6

因爲第一個 Observable 延遲發射,所以咱們只消費了第二個 Observable 的數據,第一個 Observable 發射的數據就再也不處理了。

2. defaultlfEmpty

發射來自原始 Observable 值,若是原始 Observable 沒有發射任何值,就發射一個默認值

defaultIfEmpty 簡單精確地發射原始 Observable 的值,若是原始 Observable 沒有發射任何數據,就正常終止(以 onComplete 形式了),那麼 defaultlfEmpty 返回的 Observable 就發射一個咱們提供的默認值。

Observable.empty()
        .defaultIfEmpty(8)
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                Log.d(TAG, "Next: " + o);
            }
        });

// 執行結果
 Next: 8

在 defaultIfEmpty 方法內部,其實調用的是 switchIfEmpty 操做符,源碼以下:

public final Observable<T> defaultIfEmpty(T defaultItem) {
    ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
    return switchIfEmpty(just(defaultItem));
}

defaultIfEmpty 和 switchIfEmpty 的區別是, defaultIfEmpty 操做符只能在被觀察者不發送數據時發送一個默認的數據 ,若是想要發送更多數據,則可使用 switchIfEmpty 操做符,發送自定義的被觀察者做爲替代。

Observable.empty()
        .switchIfEmpty(Observable.just(1, 2, 3))
        .subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                Log.d(TAG, "Next: " + o);
            }
        });

// 執行結果
 Next: 1
 Next: 2
 Next: 3

3. sequenceEqual

斷定兩個 Observable 是否發射相同的數據序列

傳遞兩個 Observable 給 sequenceEqual 操做符時,它會比較兩個 Observable 發射物,若是兩個序列相同(相同的數據,相同的順序,相同的終止狀態〉 ,則發射 true 不然發射 false

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5)
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 執行結果
 Success: true

將兩個 Observable 改爲不一致

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5, 6)
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 執行結果
 Success: false

sequenceEqual 還有一個版本接受第三個參數,能夠傳遞一個函數用於比較兩個數據項是否相同。對於複雜對象的比較,用三個參數的版本更爲合適。

Observable.sequenceEqual(
        Observable.just(1, 2, 3, 4, 5),
        Observable.just(1, 2, 3, 4, 5),
        new BiPredicate<Integer, Integer>() {
            @Override
            public boolean test(Integer integer, Integer integer2) throws Exception {
                return integer == integer2;
            }
        }
).subscribe(new Consumer<Boolean>() {
    @Override
    public void accept(Boolean aBoolean) throws Exception {
        Log.d(TAG, "Success: " + aBoolean);
    }
});

// 執行結果
 Success: true

4. skipUntil 和 skipWhile

4.1 skipUntil 操做符

丟棄原始 Observable 發射的數據,直到第二個 Observable 發射了一項數據

skipUntil 訂閱原始的 Observable,可是忽略它的發射物,直到第二個 Observable 發射一項數據那一刻,它纔開始發射原始的 Observabl。 skipUntil 默認不在任何特定的調度器上執行。

Observable.intervalRange(1, 9, 0, 1, TimeUnit.MILLISECONDS)
        .skipUntil(Observable.timer(4, TimeUnit.MILLISECONDS))
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.d(TAG, "Next: " + aLong);
            }
        });

// 執行結果
 Next: 4
 Next: 5
 Next: 6
 Next: 7
 Next: 8
 Next: 9

上述代碼,原始的 Observable 發射 1 到 9 這 9 個數 ,初始延遲時間是 0,每間隔 lms。因爲使用 skipUntil,所以它會發射原始 Observable 在 3ms 以後的數據。

4.2 skipWhile 操做符

丟棄 Observable 發射的數據,直到一個指定的條件不成立。

skipWhile 訂閱原始的 Observable ,可是忽略它的發射物,直到指定的某個條件變爲 false。它纔開始發射原始的 Observable。skipWhile 默認不在任何特定的調度器上執行

Observable.just(1, 2, 3, 4, 5)
        .skipWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer <= 2;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 執行結果
 Next: 3
 Next: 4
 Next: 5

5. takeUntil 和 takeWhile

5.1 takeUntil 操做符

當第二個 Observable 發射了一項數據或者終止時,丟棄原始 Observable 發射的任何數據

takeUntil 訂閱並開始發射原始 Observable ,它還監視你提供的第二個 Observable。若是第二個 Observable 發射了一項數據或者發射了一個終止通知,則 takeUntil 返回的 Observable 會中止發射原始 Observable 並終止。

Observable.just(1, 2, 3, 4, 5, 6)
        .takeUntil(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer == 4;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
});

// 執行結果
 Next: 1
 Next: 2
 Next: 3
 Next: 4

5.2 takeWhile 操做符

發射原始 Observable 發射的數據,直到一個指定的條件不成立

takeWhile 發射原始的 Observable 直到某個指定的條件不成立,它會當即中止發射原始 Observable,並終止本身的 Observable。

RxJava 中的 takeWhile 操做符返回一個原始 Observable 行爲的 Observable,直到某項數據,指定的函數返回 false ,這個新的 Observable 就會發射 onComplete 終止通知

Observable.just(1, 2, 3, 4, 5, 6)
        .takeWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer <= 2;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 執行結果
 Next: 1
 Next: 2
 Complete.

若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)

相關文章
相關標籤/搜索