Rxjava2 Observable的數據過濾詳解及實例(一)

簡要:

需求瞭解:java

對於數據的觀察以及處理過程當中每每有須要過濾一些不須要的數據的需求,好比防抖(防止快速操做),獲取第一項、指定序列項或者最後一項的須要,獲取指定時間內的有效數據等。Rx中提供了豐富的數據過濾處理的操做方法。react

可用於過濾和選擇Observable發射的數據序列的方法:緩存

  • Debounce:過濾發射速率較快的數據項,防抖操做。
  • Throttle: 對數據序列進行限流操做,能夠指定獲取週期內的指定數據項,也能夠用於防抖。
  • Sample: 容許經過將序列劃分爲時間片斷收集數據,並從每片中取出一個值來稀疏序列。
  • Distinct: 過濾掉重複數據。
  • Skip: 跳過指定的N項數據。
  • Filter: 經過函數指定過濾的數據。
  • First: 只發射第一項或者知足某個條件的第一項數據。
  • Single: 與 first 相似,可是若是原始Observable在完成以前不是正好發射一次數據,它會拋出一個NoSuchElementException 的異常通知。
  • ElementAt: 獲取原始Observable發射的數據序列指定索引位置的數據項,而後當作本身的惟一數據發射。
  • ignoreElements: 不發射任何數據,只發射Observable的終止通知。
  • Last: 只發射最後一項(或者知足某個條件的最後一項)數據。
  • Take: 只返回Observable發送數據項序列前面的N項數據,忽略剩餘的數據。
  • TakeLast: 只發射Observable發送數據項序列的後N項數據,忽略其餘數據。
  • ofType: 過濾一個Observable只返回指定類型的數據。

1. Debounce

僅在過了一段指定的時間還沒發射數據時才發射一個數據。Debounce 操做符會過濾掉髮射速率過快的數據項。app

提示: 操做默認在 computation 調度器上執行,可是你能夠指定其它的調度器。ide

1.1 debounce(timeout, unit)

指定每一個數據發射後在 timeout 時間內,原始數據序列中沒有下一個數據發射時,發射此項數據,不然丟棄這項數據。此操做與 throttleWithTimeout 方法相同。函數

注意: 這個操做符會在原始數據的 onCompleted 時候直接發射發射數據,不會由於限流而丟棄數據。線程

img-debounce(timeout, unit)

實例代碼:3d

// 1. debounce(long timeout, TimeUnit unit)
     // 發送一個數據,若是在包含timeout時間內,沒有第二個數據發射,那麼就會發射此數據,不然丟棄此數據
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);  // 下一個數據到此數據發射, 30 < timeout    --> skip            
            Thread.sleep(30);
            emitter.onNext(2);  // 下一個數據到此數據發射, 100 > timeout   --> deliver
            Thread.sleep(100);
            emitter.onNext(3);  // 下一個數據到此數據發射, 50 = timeout    --> skip:           
            Thread.sleep(50);
            emitter.onNext(4);  // 下一個數據到此數據發射, onCompleted     --> deliver
            emitter.onComplete();

        }
    }).debounce(50, TimeUnit.MILLISECONDS)  // 指定防抖丟棄時間段爲50毫秒
    //  .debounce(50, TimeUnit.MILLISECONDS, Schedulers.trampoline())   // 指定調度爲當前線程排隊
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept debounce(1-1): " + t);
            }
        });

輸出:code

--> accept debounce(1-1): 2
--> accept debounce(1-1): 4

Javadoc: debounce(timeout, unit)
Javadoc: debounce(timeout, unit, scheduler)

1.2 debounce(debounceSelector)

原始數據發射每個序列都經過綁定監聽debounceSelector的數據通知,在debounceSelector數據發送前,若是有下一個數據,則丟棄當前項數據,繼續監視下一個數據。

注意: 這個操做符會在原始數據的 onCompleted 時候直接發射發射數據,不會由於限流而丟棄數據。

img-debounce(debounceSelector)

實例代碼:

// 2. debounce(debounceSelector)
    // 原始數據發射每個序列的經過監聽debounceSelector的數據通知,
    // 在debounceSelector數據發送前,若是有下一個數據,則丟棄當前項數據,繼續監視下一個數據
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);      // skip     --> debounceSelector is no emitter(<2s)
            Thread.sleep(1000); 
            emitter.onNext(2);      // skip     --> debounceSelector is no emitter(<2s)
            Thread.sleep(200);
            emitter.onNext(3);      // deliver  --> debounceSelector is emitter(>2s)
            Thread.sleep(2500);
            emitter.onNext(4);      // skip     --> debounceSelector is no emitter(=2s)
            Thread.sleep(2000);
            emitter.onNext(5);      // deliver  --> onComplete
            Thread.sleep(500);
            emitter.onComplete();
        }
    }).debounce(new Function<Integer, ObservableSource<Long>>() {

            @Override
            public ObservableSource<Long> apply(Integer t) throws Exception {
                System.out.println("--> apply(1-2): " + t);
                // 設置過濾延遲時間爲2秒,此時返回的Observable從訂閱到發送數據時間段即爲timeout
                return Observable.timer(2, TimeUnit.SECONDS)
                        .doOnSubscribe(new Consumer<Disposable>() {

                            @Override
                            public void accept(Disposable t) throws Exception {
                                // 開始訂閱,監聽數據的發送來過濾數據
                                System.out.println("--> debounceSelector(1-2) is onSubscribe!");
                            }
                        }).doOnDispose(new Action() {
        
                            @Override
                            public void run() throws Exception {
                                // 發射數據後,丟棄當前的數據,解除當前綁定
                                System.out.println("--> debounceSelector(1-2) is unSubscribe!");
                            }
                        });
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("----------> accept(1-2): " + t);
            }
        });

輸出:

--> apply(1-2): 1
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 2
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 3
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
----------> accept(1-2): 3
--> apply(1-2): 4
--> debounceSelector(1-2) is onSubscribe!
--> debounceSelector(1-2) is unSubscribe!
--> apply(1-2): 5
--> debounceSelector(1-2) is onSubscribe!
----------> accept(1-2): 5
--> debounceSelector(1-2) is unSubscribe!

Javadoc: debounce(debounceSelector)

2. Throttle

主要應用於數據序列的節流操做,在指定的採樣週期內獲取指定的數據。Throttling 也用於稀疏序列。當生產者發出的值超出咱們想要的值時,咱們不須要每一個序列值,咱們能夠經過限制它來稀釋序列。

注意: 時間的劃分不必定是統一的。例如,發射數據的時間間隔與劃分數據的時間間隔一致時,在原始數據發送的一個時間點(此時數據尚未實際發送),此時可能因爲劃分時間已到,劃分的數據片直接關閉了,因此有的時間片數據會有時間間隙差別。

提示: 操做默認在 computation 調度器上執行,可是你能夠指定其它的調度器。

2.1 throttleFirst(windowDuration, unit)

獲取每一個 windowDuration 時間段內的原始數據序列中的第一項數據,直到原始數據所有發送完畢。

img-throttleFirst(windowDuration, unit)

解析: 實際在每一個採樣週期內,先發送第一項接收到的數據,而後丟棄後續週期內的數據項。

實例代碼:

// 1. throttleFirst(long windowDuration, TimeUnit unit)
    // 指定每一個指定時間內取第一項數據, 直到原始數據序列所有發送結束
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource doOnNext : " + t);
            }
        }).throttleFirst(2, TimeUnit.SECONDS)                           // 獲取每隔2秒以內收集的第一項數據
     //   .throttleFirst(2, TimeUnit.SECONDS, Schedulers.newThread())   // 指定調度線程爲newThread()
          .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> throttleFirst onSubscribe");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("-------------> throttleFirst onNext: " + t);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> throttleFirst onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> throttleFirst onComplete");
                }
            });

輸出:

--> throttleFirst onSubscribe
--> DataSource doOnNext : 1
-------------> throttleFirst onNext: 1
--> DataSource doOnNext : 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleFirst onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
--> DataSource doOnNext : 7
-------------> throttleFirst onNext: 7
--> DataSource doOnNext : 8
--> DataSource doOnNext : 9
-------------> throttleFirst onNext: 9
--> DataSource doOnNext : 10
--> throttleFirst onComplete

Javadoc: throttleFirst(windowDuration, unit)
Javadoc: throttleFirst(windowDuration, unit, scheduler)

2.2 throttleLast(intervalDuration, unit)

獲取每一個 windowDuration 時間段內的原始數據序列中的最近的一項數據,直到原始數據所有發送完畢。throttleLast 運算符以固定間隔而不是相對於最後一項來劃分時間。它會在每一個窗口中發出最後一個值,而不是它後面的第一個值。

img-throttleLast(intervalDuration, unit)

解析: 實際在每一個採樣週期內,先緩存收集的數據,等週期結束髮送最後一項數據,丟棄最後數據項前面的數據。

實例代碼:

// 2. throttleLast(long intervalDuration, TimeUnit unit)
    // 指定間隔時間內取最後一項數據,直到原始數據序列所有發送結束
    Observable.intervalRange(1, 10, 0, 1050, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource doOnNext : " + t);
            }
        }).throttleLast(2, TimeUnit.SECONDS)                            // 獲取每隔2秒以內收集的最後一項數據
     //   .throttleLast(2, TimeUnit.SECONDS, Schedulers.newThread())    // 指定調度線程爲newThread()
          .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> throttleLast onSubscribe");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("-------------> throttleLast onNext: " + t);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> throttleLast onError: " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> throttleLast onComplete");
                }
            });

輸出:

--> throttleLast onSubscribe
--> DataSource doOnNext : 1
--> DataSource doOnNext : 2
-------------> throttleLast onNext: 2
--> DataSource doOnNext : 3
--> DataSource doOnNext : 4
-------------> throttleLast onNext: 4
--> DataSource doOnNext : 5
--> DataSource doOnNext : 6
-------------> throttleLast onNext: 6
--> DataSource doOnNext : 7
--> DataSource doOnNext : 8
-------------> throttleLast onNext: 8
--> DataSource doOnNext : 9
--> DataSource doOnNext : 10
--> throttleLast onComplete

Javadoc: throttleLast(intervalDuration, unit)
Javadoc: throttleLast(intervalDuration, unit, scheduler)

2.3 throttleWithTimeout(timeout, unit)

指定每一個數據發射後在 timeout 時間內,原始數據序列中沒有下一個數據發射時,發射此項數據,不然丟棄這項數據。此操做與 debounce 方法相同。

注意: 這個操做符會在原始數據的 onCompleted 時候直接發射發射數據,不會由於限流而丟棄數據。

img-throttleWithTimeout(timeout, unit)

實例代碼:

// 3. throttleWithTimeout(long timeout, TimeUnit unit)
    // 發送一個數據,若是在包含timeout時間內,沒有第二個數據發射,那麼就會發射此數據,不然丟棄此數據
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);  // 下一個數據到此數據發射, --> skip:       30 < timeout
            Thread.sleep(30);
            emitter.onNext(2);  // 下一個數據到此數據發射, --> skip:       50 = timeout
            Thread.sleep(50);
            emitter.onNext(3);  // 下一個數據到此數據發射, --> deliver:    60 > timeout
            Thread.sleep(60);
            emitter.onNext(4);  // onComplete           --> deliver:    onComplete
            emitter.onComplete();
        }
    }).throttleWithTimeout(50, TimeUnit.MILLISECONDS) // 指定防抖丟棄時間段爲50毫秒
 //   .throttleWithTimeout(50, TimeUnit.MILLISECONDS, Schedulers.newThread()) // 指定調度線程爲newThread()
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("--> accept throttleWithTimeout(3): " + t);
            }
        });

輸出:

--> accept throttleWithTimeout(3): 3
--> accept throttleWithTimeout(3): 4

Javadoc: throttleWithTimeout(timeout, unit)
Javadoc: throttleWithTimeout(timeout, unit, scheduler)

3. Sample

sample 容許您經過將序列劃分爲時間片斷,並從每片中取出一個值來稀疏序列。當每片結束時,將發出其中的最後一個值(若是有的話)。

注意: 時間的劃分不必定是統一的。例如,發射數據的時間間隔與劃分數據的時間間隔一致時,在原始數據發送的一個時間點(此時數據尚未實際發送),此時可能因爲劃分時間已到,劃分的數據片直接關閉了,因此有的時間片數據會有時間間隙差別。

3.1 sample(period, unit)

獲取每一個 period 時間片斷內手機收據序列的最後一項,忽略此時間片內收集的其餘數據項。

實例代碼:

// 1. sample(long period, TimeUnit unit)/sample(long period, TimeUnit unit)
    // 將序列分爲 period 的時間片斷,從每片重取出最近的一個數據
    // 等同於throttleLast
    Observable.intervalRange(1, 5, 0, 1100, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource onNext: " + t);
            }
        }).sample(2, TimeUnit.SECONDS)                              // 每3秒時間段數據中取最近一個值
    //    .sample(2, TimeUnit.SECONDS, true)                        // 參數emitLast,設置是否忽略未採樣的最後一個數據
    //    .sample(2, TimeUnit.SECONDS, Schedulers.newThread())      // 指定調度器爲newThread()
          .subscribe(new Consumer<Long>() {
    
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
          });

輸出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(1): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(1): 4
--> DataSource onNext: 5

Javadoc: sample(long period, TimeUnit unit)
Javadoc: sample(long period, TimeUnit unit, emitLast)
Javadoc: sample(long period, TimeUnit unit, scheduler)
Javadoc: sample(long period, TimeUnit unit, scheduler, emitLast)

3.2 sample(sampler)

sample 的這個方法每當第二個 sampler 發射一個數據(或者當它終止)時就對原始 Observable 進行採樣。第二個Observable經過參數傳遞給 sample

img-sample(sampler)

實例代碼:

// 2. sample(ObservableSource sampler)
    // 每當第二個 sampler 發射一個數據(或者當它終止)時就對原始 Observable進行採樣
    Observable.intervalRange(1, 5, 0, 1020, TimeUnit.MILLISECONDS)
        .doOnNext(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> DataSource onNext: " + t);
                }
        }).sample(Observable.interval(2, TimeUnit.SECONDS)) // 每隔2秒進行一次採樣
          .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
          });

輸出:

--> DataSource onNext: 1
--> DataSource onNext: 2
--> accept(2): 2
--> DataSource onNext: 3
--> DataSource onNext: 4
--> accept(2): 4
--> DataSource onNext: 5

Javadoc: sample(sampler)
Javadoc: sample(sampler, emitLast)

4. Distinct

抑制(過濾掉)重複的數據項。Distinct 的過濾規則是:只容許尚未發射過的數據項經過。

在某些實現中,有一些方法中容許你調整斷定兩個數據不一樣( distinct )的標準。還有一些實現只比較一項數據和它的直接前驅,所以只會從序列中過濾掉連續重複的數據。

4.1 distinct()

只容許尚未發射過的數據項經過,過濾數據序列中的全部重複的數據項,保證處理後的數據序列沒有重複。

img-distinct

示例代碼:

// 1. distinct()
    // 去除所有數據中重複的數據
    Observable.just(1, 2, 3, 3, 3, 4, 4, 5, 6, 6)
            .distinct()
            .subscribe(new Consumer<Integer>() {
            
                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinct(1): " + t);
                }
            });

輸出:

--> accept distinct(1): 1
--> accept distinct(1): 2
--> accept distinct(1): 3
--> accept distinct(1): 4
--> accept distinct(1): 5
--> accept distinct(1): 6

Javadoc: distinct()

4.2 distinct(keySelector)

這個操做符接受一個函數。這個函數根據原始Observable發射的數據項產生一個 Key,而後,比較這些Key而不是數據自己,來斷定兩個數據是不是不一樣的

img-distinct(keySelector)

實例代碼:

// 數根據原始Observable發射的數據項產生一個 Key,而後比較這些Key而不是數據自己,來斷定兩個數據是不是不一樣的(去除所有數據中重複的數據)
    Observable.just(1, 2, 3, 3, 4, 5, 6, 6)
            .distinct(new Function<Integer, String>() {

                @Override
                public String apply(Integer t) throws Exception {
                    // 根據奇數或偶數來判斷數據序列的重複的key
                    return t % 2 == 0 ? "even" : "odd";
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinct(2): " + t);
                }
            });

輸出:

--> accept distinct(2): 1
--> accept distinct(2): 2

Javadoc: distinct(keySelector)

4.3 distinctUntilChanged()

distinctUntilChanged 操做符,去除數據序列中的連續重複項。它只斷定一個數據和它的直接前驅是不是不一樣的。

img-distinctUntilChanged

實例代碼:

// 3. distinctUntilChanged()
    // 去除連續重複的數據
    Observable.just(1, 2, 3, 3, 4, 5, 6, 6, 3, 2)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept distinctUntilChanged(3): " + t);
            }
        });

輸出:

--> accept distinctUntilChanged(3): 1
--> accept distinctUntilChanged(3): 2
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 4
--> accept distinctUntilChanged(3): 5
--> accept distinctUntilChanged(3): 6
--> accept distinctUntilChanged(3): 3
--> accept distinctUntilChanged(3): 2

Javadoc: distinctUntilChanged()

4.4 distinctUntilChanged(keySelector)

distinctUntilChanged(keySelector) 操做符,根據一個函數產生的 Key 斷定兩個相鄰的數據項是否是相同的,去除連續重複的數據。

實例代碼:

// 4. distinctUntilChanged(Function<T,K>)
    // 數根據原始Observable發射的數據項產生的 Key,去除連續重複的數據
    Observable.just(8, 2, 3, 5, 9, 5, 6, 6)
            .distinctUntilChanged(new Function<Integer, String>() {

                @Override
                public String apply(Integer t) throws Exception {
                    // 根據原始數據處理後添加key,依據這個key來判斷是否重複(去除連續重複的數據)
                    return t % 2 == 0 ? "even" : "odd";
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept distinctUntilChanged(4): " + t);
                }
            });

輸出:

--> accept distinctUntilChanged(4): 8
--> accept distinctUntilChanged(4): 3
--> accept distinctUntilChanged(4): 6

Javadoc: distinctUntilChanged(keySelector)

5. Skip

主要用於忽略Observable發射的指定的 N 項數據,如跳過數據序列的前面或後面 N 項數據,指定時間段內的數據項。

Skip 操做符的還有一些變體的操做方法以下:

5.1 skip(count)

忽略 Observable 發射的前 N 項數據,只保留以後的數據。

img-skip(count)

實例代碼:

// 1. skip(long count)
    // 跳過前count項數據,保留後面的數據
    Observable.range(1, 10)
        .skip(5) // 過濾數據序列前5項數據
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept skip(1): " + t);
            }
        });

輸出:

--> accept skip(1): 6
--> accept skip(1): 7
--> accept skip(1): 8
--> accept skip(1): 9
--> accept skip(1): 10

Javadoc: skip(count)

5.2 skip(time, unit)

skip 的這個變體接受一個時長參數,它會丟棄原始Observable開始的那段時間段發射的數據,時長和時間單位經過參數指定。

img-skip(time, unit)

實例代碼:

// 2. skip(long time, TimeUnit unit)
    // 跳過開始的time時間段內的數據,保留後面的數據
    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
        .skip(2, TimeUnit.SECONDS)  // 跳過前2秒的數據
        .subscribe(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> accept skip(2): " + t);
            }
        });

輸出:

--> accept skip(2): 4
--> accept skip(2): 5

Javadoc: skip(time, unit)
Javadoc: skip(time, unit, scheduler)

5.3 skipLast(count)

使用 SkipLast 操做符修改原始Observable,你能夠忽略Observable發射的後 N 項數據,只保留前面的數據。

img-skipLast(count)

實例代碼:

// 3. skipLast(int count)
    // 跳過數據後面的count個數據
    Observable.range(1, 10)
        .skipLast(5) // 跳過數據序列的後5項數據
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept skipLast(3): " + t);
            }
        });

輸出:

--> accept skipLast(3): 1
--> accept skipLast(3): 2
--> accept skipLast(3): 3
--> accept skipLast(3): 4
--> accept skipLast(3): 5

Javadoc: skipLast(count)

5.4 skipLast(time, unit)

還有一個 skipLast 變體接受一個時間段參數,它會丟棄在原始 Observable 的生命週期內最後一段時間內發射的數據。時長和時間單位經過參數指定。

注意: 這個機制是這樣實現的:延遲原始 Observable 發射的任何數據項,直到自原始數據發射以後過了給定的時長以後,纔開始發送數據。

img-skipLast(time, unit)

實例代碼:

// 4. skipLast(long time, TimeUnit unit, [boolean delayError])
    // 丟棄在原始Observable的生命周 期內最後time時間內發射的數據
    // 可選參數delayError:延遲異常通知
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .doOnNext(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> DataSource: " + t);
            }
        }).skipLast(2, TimeUnit.SECONDS)
    //    .skipLast(2, TimeUnit.SECONDS, Schedulers.trampoline()) // 經過scheduler指定工做線程
    //    .skipLast(2, TimeUnit.SECONDS, true)                  // 延遲Error的通知,多用於組合Observable的場景
          .subscribe(new Consumer<Long>() {

              @Override
              public void accept(Long t) throws Exception {
                  System.out.println("--> accept skipLast(4): " + t);
              }
          });

輸出:

--> DataSource: 1
--> DataSource: 2
--> DataSource: 3
--> accept skipLast(4): 1
--> DataSource: 4
--> accept skipLast(4): 2
--> DataSource: 5
--> accept skipLast(4): 3
--> DataSource: 6
--> accept skipLast(4): 4
--> DataSource: 7
--> accept skipLast(4): 5
--> DataSource: 8
--> accept skipLast(4): 6
--> DataSource: 9
--> accept skipLast(4): 7
--> DataSource: 10
--> accept skipLast(4): 8

注意: skipLast 的這個操做默認在 computation 調度器上執行,可是你可使用Scheduler參數指定其 它的調度器。
Javadoc: skipLast(time, unit)
Javadoc: skipLast(time, unit, delayError)
Javadoc: skipLast(time, unit, scheduler)
Javadoc: skipLast(time, unit, scheduler, delayError)
Javadoc: skipLast(time, unit, scheduler, delayError, bufferSize)

接續:

後續的Rx相關數據過濾部分請參考: Rxjava2 Observable的數據過濾詳解及實例(二)

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

相關文章
相關標籤/搜索