博客主頁java
RxJava 的過濾操做符主要包括如下幾種:segmentfault
只發射第 一 項(或者知足某個條件的第一項)數據ide
若是隻對 Observable 發射的第一項數據,或者知足某個條件的第一項數據感興趣,那麼就可使用 first 操做符。函數
在 RxJava 2.x 中,使用 first() 須要一個默認的 Item ,對於 Observable 而言,使用了 first()會返回 Single 類型。測試
public final Single<T> first(T defaultItem) { return elementAt(0L, defaultItem); } Observable.just(3, 4, 5) .first(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 執行結果 Next-> 3
若是 Observable 不發射任何數據,那麼 first 操做符的默認值就起了做用。ui
Observable.<Integer>empty() .first(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 執行結果 Next-> 8
在 R.Java 2.x 中,還有 firstElement 操做符表示只取第一個數據,沒有默認值。 firstOrError 操做符表示要麼能取到第一個數據,要麼執行 onError 方法,它們分別返回 Maybe 類型和 Single 類型。this
只發射最後一項(或者知足某個條件的最後一項)數據spa
若是隻對 Observable 發射的最後一項數據, 或者知足某個條件的最後一項數據感興趣,那麼就可使用 last 操做符。code
last 操做符跟 first 操做符相似,須要一個默認的 Item ,也是返回 Single 類型。blog
public final Single<T> last(T defaultItem) { ObjectHelper.requireNonNull(defaultItem, "defaultItem is null"); return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem)); } Observable.just(3, 4, 5) .last(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 執行結果 Next-> 5
在 RxJava 2.x 中,有 lastElement 操做符和 lastOrError 操做符。
只發射前面的 N 項數據
使用 take 操做符能夠只修改 Observable 的行爲,返回前面的 N 項數據,發射完成通知,忽略剩餘的數據
Observable.just(1, 2, 3, 4, 5) .take(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 1 Next-> 2 Next-> 3 Complete.
若是對一個 Observable 使用 take 操做符,而那個 Observabl 發射的數據少於 N 項,那麼 take 操做符生成的 Observable 就不會拋出異常或者發射 Error 通知,而是仍然會發射那些數據
Observable.just(1, 2, 3, 4, 5) .take(6) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Next-> 5 Complete.
take 有一個重載方法可以接受一個時長而不是數量參數。它會丟掉髮射 Observable 開始的那段時間發射的數據,時長和時間單位經過參數指定。
Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS) .take(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 0 Next-> 1 Next-> 2 Complete.
上述代碼使用了 intervalRange 操做符表示每隔 ls 會發射一個數據,它們從 0 開始到 9 結束,發射 10 個數據。因爲在這裏使用了 take 操做符,最後只打印前 3 個數據.
take 的這個重載方法默認在 computation 調度器上執行,也可使用參數來指定其餘調度器。
發射 Observable 發射的最後 N 項數據
使用 takeLast 操做符修改原始 Observable,咱們能夠只發射 Observable 發射的最後 N 項數據,忽略前面的數據。
Observable.just(1, 2, 3, 4, 5) .takeLast(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 3 Next-> 4 Next-> 5 Complete.
一樣,若是對一個 Observable 使用 takeLast(n) 操做符,而那個 Observable 發射的數據少於 N 項,那麼 takeLast 操做符生成的 Observable 不會拋出異常或者發射 onError 通知,而是仍然發射那些數據。
takeLast 也有一個重載方法可以接受一個時長而不是數量參數。它會發射在原始 Observable 生命週期內最後一段時間發射的數據,時長和時間單位經過參數指定。
Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS) .takeLast(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 7 Next-> 8 Next-> 9 Complete.
抑制 Observable 發射的前 N 項數據
使用 skip 操做符,能夠忽略 Observable 發射的前 N 項數據,只保留以後的數據
Observable.just(1, 2, 3, 4, 5) .skip(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 4 Next-> 5 Complete.
skip 有一個重載方法可以接受一個時長而不是數量參數。它會丟棄原始 Observable 開始那段時間發射的數據,時長和時間單位經過參數指定。
Observable.interval(1, TimeUnit.SECONDS) .skip(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 3 Next-> 4 Next-> 5 Next-> 6 Next-> 7 Next-> 8 Next-> 9 ......
抑制 Observable 發射的後 N 項數據
使用 skipLast 操做符修改原始 Observable,能夠忽略 Observable 發射後 N 項數據,只保留前面的數據。
Observable.just(1, 2, 3, 4, 5) .skipLast(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 1 Next-> 2 Complete.
一樣, skipLast 也有一個重載方法接受一個時長而不是數量參數。它會丟棄在原始 Observable 生命週期最後一段時間內發射的數據,時長和時間單位經過參數指定。
Observable.interval(1, TimeUnit.SECONDS) .skipLast(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 0 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Next-> 5 Next-> 6 ......
只發射第 N 項數據
elementAt 操做符獲取原始 Observable 發射的數據序列指定索引位置的數據項,而後看成本身的惟一數據發射
它傳遞一個基於 0 的索引值,發射原始 Observable 數據序列對應索引位置的值,若是傳遞給 elementAt 的值爲 5,那麼它會發射第 6 項數據。若是傳遞的是一個負數,則將會拋出 IndexOutOfBoundsException 異常。
Observable.just(1, 2, 3, 4, 5) .elementAt(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next-> 3
elementAt(index)返回一個 Maybe 類型。
public final Maybe<T> elementAt(long index) { if (index < 0) { throw new IndexOutOfBoundsException("index >= 0 required but it was " + index); } return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<T>(this, index)); }
若是原始 Observable 的數據項數小於 index+1 ,那麼會調用 onComplete 方法(在 RxJava l.x 中也會拋出一個 IndexOutOfBoundsException 異常)。因此 elementAt 還提供了一個帶默認值的方法,它返回一個 Single 類型。
Observable.just(1, 2, 3, 4, 5) .elementAt(8, 10) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Success: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 執行結果 Success: 10
若是 index 超出了索引範圍,那麼取默認值
不發射任何數據,只發射 Observable 終止通知
ignoreElements 操做符抑制原始 Observable 發射的全部數據,只容許它的終止通知( onError 或 onComplete )經過。它返回 Completable 類型
若是咱們不關心一個 Observable 發射的數據,可是但願在它完成時或遇到錯誤終止時收到通知,那麼就能夠對 Observable 使用 gnoreElements 操做符,它將確保永遠不會調用觀察者的 onNext 方法。
Observable.just(1, 2, 3, 4, 5) .ignoreElements() .subscribe(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }); // 執行結果 Complete.
過濾掉重複的數據項
distinct 的過濾規則是: 只容許尚未發射過的數據項經過
Observable.just(1, 2, 2, 3, 4, 4, 4, 5) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next->1 Next->2 Next->3 Next->4 Next->5 Complete.
distinct 還能接受 Function 做爲參數,這個函數根據原始 Observable 發射的數據項產生一個 Key ,而後比較這些 Key 而不是數據自己,來斷定兩個數據是否不一樣。
與 distinct 相似的是 distinctUntilChanged 操做符,該操做符與 distinct 的區別是:它只斷定一個數據和它的直接前驅是否不一樣
Observable.just(1, 2, 1, 2, 3, 4, 4, 4, 5) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next->1 Next->2 Next->1 Next->2 Next->3 Next->4 Next->5 Complete.
只發射經過謂詞測試的數據項
filter 操做符使用你指定的一個謂詞函數測試數據項,只有經過測試的數據纔會被髮射。
Observable.just(2, 30, 22, 5, 60, 1) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 10; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next->30 Next->22 Next->60 Complete.
僅在過了一段指定的時間還沒發射數據的才發射一個數據
debounce 操做符會過濾掉髮射速率過快的數據項
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { if (emitter.isDisposed()) return; try { for (int i = 0; i < 10; i++) { emitter.onNext(i); Thread.sleep(i * 100); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }).debounce(500, TimeUnit.MILLISECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next->6 Next->7 Next->8 Next->9 Complete.
debounce 還有另一種形式,使用一個 Function 函數來限制發送的數據。
跟 debounce 相似的是由throttleWithTimeout 操做符,它與只使用時間參數來限流的 debounce 的功能相同。
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)