需求瞭解:java
對於數據的觀察以及處理過程當中每每有須要過濾一些不須要的數據的需求,好比防抖(防止快速操做),獲取第一項、指定序列項或者最後一項的須要,獲取指定時間內的有效數據等。Rx中提供了豐富的數據過濾處理的操做方法。react
可用於過濾和選擇Observable發射的數據序列的方法:緩存
僅在過了一段指定的時間還沒發射數據時才發射一個數據。Debounce
操做符會過濾掉髮射速率過快的數據項。app
提示: 操做默認在 computation 調度器上執行,可是你能夠指定其它的調度器。ide
指定每一個數據發射後在 timeout
時間內,原始數據序列中沒有下一個數據發射時,發射此項數據,不然丟棄這項數據。此操做與 throttleWithTimeout
方法相同。函數
注意: 這個操做符會在原始數據的 onCompleted
時候直接發射發射數據,不會由於限流而丟棄數據。線程
實例代碼:3d
// 1. debounce(long timeout, TimeUnit unit) // 發送一個數據,若是在包含timeout時間內,沒有第二個數據發射,那麼就會發射此數據,不然丟棄此數據 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // 下一個數據到此數據發射, 30 < timeout --> skip Thread.sleep(30); emitter.onNext(2); // 下一個數據到此數據發射, 100 > timeout --> deliver Thread.sleep(100); emitter.onNext(3); // 下一個數據到此數據發射, 50 = timeout --> skip: Thread.sleep(50); emitter.onNext(4); // 下一個數據到此數據發射, onCompleted --> deliver emitter.onComplete(); } }).debounce(50, TimeUnit.MILLISECONDS) // 指定防抖丟棄時間段爲50毫秒 // .debounce(50, TimeUnit.MILLISECONDS, Schedulers.trampoline()) // 指定調度爲當前線程排隊 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept debounce(1-1): " + t); } });
輸出:code
--> accept debounce(1-1): 2 --> accept debounce(1-1): 4
Javadoc: debounce(timeout, unit)
Javadoc: debounce(timeout, unit, scheduler)
原始數據發射每個序列都經過綁定監聽debounceSelector
的數據通知,在debounceSelector
數據發送前,若是有下一個數據,則丟棄當前項數據,繼續監視下一個數據。
注意: 這個操做符會在原始數據的 onCompleted
時候直接發射發射數據,不會由於限流而丟棄數據。
實例代碼:
// 2. debounce(debounceSelector) // 原始數據發射每個序列的經過監聽debounceSelector的數據通知, // 在debounceSelector數據發送前,若是有下一個數據,則丟棄當前項數據,繼續監視下一個數據 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // skip --> debounceSelector is no emitter(<2s) Thread.sleep(1000); emitter.onNext(2); // skip --> debounceSelector is no emitter(<2s) Thread.sleep(200); emitter.onNext(3); // deliver --> debounceSelector is emitter(>2s) Thread.sleep(2500); emitter.onNext(4); // skip --> debounceSelector is no emitter(=2s) Thread.sleep(2000); emitter.onNext(5); // deliver --> onComplete Thread.sleep(500); emitter.onComplete(); } }).debounce(new Function<Integer, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Integer t) throws Exception { System.out.println("--> apply(1-2): " + t); // 設置過濾延遲時間爲2秒,此時返回的Observable從訂閱到發送數據時間段即爲timeout return Observable.timer(2, TimeUnit.SECONDS) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { // 開始訂閱,監聽數據的發送來過濾數據 System.out.println("--> debounceSelector(1-2) is onSubscribe!"); } }).doOnDispose(new Action() { @Override public void run() throws Exception { // 發射數據後,丟棄當前的數據,解除當前綁定 System.out.println("--> debounceSelector(1-2) is unSubscribe!"); } }); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("----------> accept(1-2): " + t); } });
輸出:
--> apply(1-2): 1 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! --> apply(1-2): 2 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! --> apply(1-2): 3 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! ----------> accept(1-2): 3 --> apply(1-2): 4 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! --> apply(1-2): 5 --> debounceSelector(1-2) is onSubscribe! ----------> accept(1-2): 5 --> debounceSelector(1-2) is unSubscribe!
Javadoc: debounce(debounceSelector)
主要應用於數據序列的節流操做,在指定的採樣週期內獲取指定的數據。Throttling
也用於稀疏序列。當生產者發出的值超出咱們想要的值時,咱們不須要每一個序列值,咱們能夠經過限制它來稀釋序列。
注意: 時間的劃分不必定是統一的。例如,發射數據的時間間隔與劃分數據的時間間隔一致時,在原始數據發送的一個時間點(此時數據尚未實際發送),此時可能因爲劃分時間已到,劃分的數據片直接關閉了,因此有的時間片數據會有時間間隙差別。
提示: 操做默認在 computation 調度器上執行,可是你能夠指定其它的調度器。
獲取每一個 windowDuration
時間段內的原始數據序列中的第一項數據,直到原始數據所有發送完畢。
解析: 實際在每一個採樣週期內,先發送第一項接收到的數據,而後丟棄後續週期內的數據項。
實例代碼:
// 1. throttleFirst(long windowDuration, TimeUnit unit) // 指定每一個指定時間內取第一項數據, 直到原始數據序列所有發送結束 Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource doOnNext : " + t); } }).throttleFirst(2, TimeUnit.SECONDS) // 獲取每隔2秒以內收集的第一項數據 // .throttleFirst(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定調度線程爲newThread() .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> throttleFirst onSubscribe"); } @Override public void onNext(Long t) { System.out.println("-------------> throttleFirst onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("--> throttleFirst onError: " + e); } @Override public void onComplete() { System.out.println("--> throttleFirst onComplete"); } });
輸出:
--> throttleFirst onSubscribe --> DataSource doOnNext : 1 -------------> throttleFirst onNext: 1 --> DataSource doOnNext : 2 --> DataSource doOnNext : 3 --> DataSource doOnNext : 4 -------------> throttleFirst onNext: 4 --> DataSource doOnNext : 5 --> DataSource doOnNext : 6 --> DataSource doOnNext : 7 -------------> throttleFirst onNext: 7 --> DataSource doOnNext : 8 --> DataSource doOnNext : 9 -------------> throttleFirst onNext: 9 --> DataSource doOnNext : 10 --> throttleFirst onComplete
Javadoc: throttleFirst(windowDuration, unit)
Javadoc: throttleFirst(windowDuration, unit, scheduler)
獲取每一個 windowDuration
時間段內的原始數據序列中的最近的一項數據,直到原始數據所有發送完畢。throttleLast
運算符以固定間隔而不是相對於最後一項來劃分時間。它會在每一個窗口中發出最後一個值,而不是它後面的第一個值。
解析: 實際在每一個採樣週期內,先緩存收集的數據,等週期結束髮送最後一項數據,丟棄最後數據項前面的數據。
實例代碼:
// 2. throttleLast(long intervalDuration, TimeUnit unit) // 指定間隔時間內取最後一項數據,直到原始數據序列所有發送結束 Observable.intervalRange(1, 10, 0, 1050, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource doOnNext : " + t); } }).throttleLast(2, TimeUnit.SECONDS) // 獲取每隔2秒以內收集的最後一項數據 // .throttleLast(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定調度線程爲newThread() .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> throttleLast onSubscribe"); } @Override public void onNext(Long t) { System.out.println("-------------> throttleLast onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("--> throttleLast onError: " + e); } @Override public void onComplete() { System.out.println("--> throttleLast onComplete"); } });
輸出:
--> throttleLast onSubscribe --> DataSource doOnNext : 1 --> DataSource doOnNext : 2 -------------> throttleLast onNext: 2 --> DataSource doOnNext : 3 --> DataSource doOnNext : 4 -------------> throttleLast onNext: 4 --> DataSource doOnNext : 5 --> DataSource doOnNext : 6 -------------> throttleLast onNext: 6 --> DataSource doOnNext : 7 --> DataSource doOnNext : 8 -------------> throttleLast onNext: 8 --> DataSource doOnNext : 9 --> DataSource doOnNext : 10 --> throttleLast onComplete
Javadoc: throttleLast(intervalDuration, unit)
Javadoc: throttleLast(intervalDuration, unit, scheduler)
指定每一個數據發射後在 timeout
時間內,原始數據序列中沒有下一個數據發射時,發射此項數據,不然丟棄這項數據。此操做與 debounce
方法相同。
注意: 這個操做符會在原始數據的 onCompleted
時候直接發射發射數據,不會由於限流而丟棄數據。
實例代碼:
// 3. throttleWithTimeout(long timeout, TimeUnit unit) // 發送一個數據,若是在包含timeout時間內,沒有第二個數據發射,那麼就會發射此數據,不然丟棄此數據 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // 下一個數據到此數據發射, --> skip: 30 < timeout Thread.sleep(30); emitter.onNext(2); // 下一個數據到此數據發射, --> skip: 50 = timeout Thread.sleep(50); emitter.onNext(3); // 下一個數據到此數據發射, --> deliver: 60 > timeout Thread.sleep(60); emitter.onNext(4); // onComplete --> deliver: onComplete emitter.onComplete(); } }).throttleWithTimeout(50, TimeUnit.MILLISECONDS) // 指定防抖丟棄時間段爲50毫秒 // .throttleWithTimeout(50, TimeUnit.MILLISECONDS, Schedulers.newThread()) // 指定調度線程爲newThread() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { // TODO Auto-generated method stub System.out.println("--> accept throttleWithTimeout(3): " + t); } });
輸出:
--> accept throttleWithTimeout(3): 3 --> accept throttleWithTimeout(3): 4
Javadoc: throttleWithTimeout(timeout, unit)
Javadoc: throttleWithTimeout(timeout, unit, scheduler)
sample
容許您經過將序列劃分爲時間片斷,並從每片中取出一個值來稀疏序列。當每片結束時,將發出其中的最後一個值(若是有的話)。
注意: 時間的劃分不必定是統一的。例如,發射數據的時間間隔與劃分數據的時間間隔一致時,在原始數據發送的一個時間點(此時數據尚未實際發送),此時可能因爲劃分時間已到,劃分的數據片直接關閉了,因此有的時間片數據會有時間間隙差別。
獲取每一個 period
時間片斷內手機收據序列的最後一項,忽略此時間片內收集的其餘數據項。
實例代碼:
// 1. sample(long period, TimeUnit unit)/sample(long period, TimeUnit unit) // 將序列分爲 period 的時間片斷,從每片重取出最近的一個數據 // 等同於throttleLast Observable.intervalRange(1, 5, 0, 1100, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource onNext: " + t); } }).sample(2, TimeUnit.SECONDS) // 每3秒時間段數據中取最近一個值 // .sample(2, TimeUnit.SECONDS, true) // 參數emitLast,設置是否忽略未採樣的最後一個數據 // .sample(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定調度器爲newThread() .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(1): " + t); } });
輸出:
--> DataSource onNext: 1 --> DataSource onNext: 2 --> accept(1): 2 --> DataSource onNext: 3 --> DataSource onNext: 4 --> accept(1): 4 --> DataSource onNext: 5
Javadoc: sample(long period, TimeUnit unit)
Javadoc: sample(long period, TimeUnit unit, emitLast)
Javadoc: sample(long period, TimeUnit unit, scheduler)
Javadoc: sample(long period, TimeUnit unit, scheduler, emitLast)
sample
的這個方法每當第二個 sampler
發射一個數據(或者當它終止)時就對原始 Observable 進行採樣。第二個Observable經過參數傳遞給 sample
。
實例代碼:
// 2. sample(ObservableSource sampler) // 每當第二個 sampler 發射一個數據(或者當它終止)時就對原始 Observable進行採樣 Observable.intervalRange(1, 5, 0, 1020, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource onNext: " + t); } }).sample(Observable.interval(2, TimeUnit.SECONDS)) // 每隔2秒進行一次採樣 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } });
輸出:
--> DataSource onNext: 1 --> DataSource onNext: 2 --> accept(2): 2 --> DataSource onNext: 3 --> DataSource onNext: 4 --> accept(2): 4 --> DataSource onNext: 5
Javadoc: sample(sampler)
Javadoc: sample(sampler, emitLast)
抑制(過濾掉)重複的數據項。Distinct 的過濾規則是:只容許尚未發射過的數據項經過。
在某些實現中,有一些方法中容許你調整斷定兩個數據不一樣( distinct )的標準。還有一些實現只比較一項數據和它的直接前驅,所以只會從序列中過濾掉連續重複的數據。
只容許尚未發射過的數據項經過,過濾數據序列中的全部重複的數據項,保證處理後的數據序列沒有重複。
示例代碼:
// 1. distinct() // 去除所有數據中重複的數據 Observable.just(1, 2, 3, 3, 3, 4, 4, 5, 6, 6) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinct(1): " + t); } });
輸出:
--> accept distinct(1): 1 --> accept distinct(1): 2 --> accept distinct(1): 3 --> accept distinct(1): 4 --> accept distinct(1): 5 --> accept distinct(1): 6
Javadoc: distinct()
這個操做符接受一個函數。這個函數根據原始Observable發射的數據項產生一個 Key
,而後,比較這些Key而不是數據自己,來斷定兩個數據是不是不一樣的。
實例代碼:
// 數根據原始Observable發射的數據項產生一個 Key,而後比較這些Key而不是數據自己,來斷定兩個數據是不是不一樣的(去除所有數據中重複的數據) Observable.just(1, 2, 3, 3, 4, 5, 6, 6) .distinct(new Function<Integer, String>() { @Override public String apply(Integer t) throws Exception { // 根據奇數或偶數來判斷數據序列的重複的key return t % 2 == 0 ? "even" : "odd"; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinct(2): " + t); } });
輸出:
--> accept distinct(2): 1 --> accept distinct(2): 2
Javadoc: distinct(keySelector)
distinctUntilChanged
操做符,去除數據序列中的連續重複項。它只斷定一個數據和它的直接前驅是不是不一樣的。
實例代碼:
// 3. distinctUntilChanged() // 去除連續重複的數據 Observable.just(1, 2, 3, 3, 4, 5, 6, 6, 3, 2) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinctUntilChanged(3): " + t); } });
輸出:
--> accept distinctUntilChanged(3): 1 --> accept distinctUntilChanged(3): 2 --> accept distinctUntilChanged(3): 3 --> accept distinctUntilChanged(3): 4 --> accept distinctUntilChanged(3): 5 --> accept distinctUntilChanged(3): 6 --> accept distinctUntilChanged(3): 3 --> accept distinctUntilChanged(3): 2
Javadoc: distinctUntilChanged()
distinctUntilChanged(keySelector)
操做符,根據一個函數產生的 Key
斷定兩個相鄰的數據項是否是相同的,去除連續重複的數據。
實例代碼:
// 4. distinctUntilChanged(Function<T,K>) // 數根據原始Observable發射的數據項產生的 Key,去除連續重複的數據 Observable.just(8, 2, 3, 5, 9, 5, 6, 6) .distinctUntilChanged(new Function<Integer, String>() { @Override public String apply(Integer t) throws Exception { // 根據原始數據處理後添加key,依據這個key來判斷是否重複(去除連續重複的數據) return t % 2 == 0 ? "even" : "odd"; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinctUntilChanged(4): " + t); } });
輸出:
--> accept distinctUntilChanged(4): 8 --> accept distinctUntilChanged(4): 3 --> accept distinctUntilChanged(4): 6
Javadoc: distinctUntilChanged(keySelector)
主要用於忽略Observable發射的指定的 N 項數據,如跳過數據序列的前面或後面 N 項數據,指定時間段內的數據項。
Skip
操做符的還有一些變體的操做方法以下:
忽略 Observable
發射的前 N
項數據,只保留以後的數據。
實例代碼:
// 1. skip(long count) // 跳過前count項數據,保留後面的數據 Observable.range(1, 10) .skip(5) // 過濾數據序列前5項數據 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept skip(1): " + t); } });
輸出:
--> accept skip(1): 6 --> accept skip(1): 7 --> accept skip(1): 8 --> accept skip(1): 9 --> accept skip(1): 10
Javadoc: skip(count)
skip
的這個變體接受一個時長參數,它會丟棄原始Observable開始的那段時間段發射的數據,時長和時間單位經過參數指定。
實例代碼:
// 2. skip(long time, TimeUnit unit) // 跳過開始的time時間段內的數據,保留後面的數據 Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) .skip(2, TimeUnit.SECONDS) // 跳過前2秒的數據 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept skip(2): " + t); } });
輸出:
--> accept skip(2): 4 --> accept skip(2): 5
Javadoc: skip(time, unit)
Javadoc: skip(time, unit, scheduler)
使用 SkipLast
操做符修改原始Observable,你能夠忽略Observable發射的後 N
項數據,只保留前面的數據。
實例代碼:
// 3. skipLast(int count) // 跳過數據後面的count個數據 Observable.range(1, 10) .skipLast(5) // 跳過數據序列的後5項數據 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept skipLast(3): " + t); } });
輸出:
--> accept skipLast(3): 1 --> accept skipLast(3): 2 --> accept skipLast(3): 3 --> accept skipLast(3): 4 --> accept skipLast(3): 5
Javadoc: skipLast(count)
還有一個 skipLast
變體接受一個時間段參數,它會丟棄在原始 Observable 的生命週期內最後一段時間內發射的數據。時長和時間單位經過參數指定。
注意: 這個機制是這樣實現的:延遲原始 Observable 發射的任何數據項,直到自原始數據發射以後過了給定的時長以後,纔開始發送數據。
實例代碼:
// 4. skipLast(long time, TimeUnit unit, [boolean delayError]) // 丟棄在原始Observable的生命周 期內最後time時間內發射的數據 // 可選參數delayError:延遲異常通知 Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource: " + t); } }).skipLast(2, TimeUnit.SECONDS) // .skipLast(2, TimeUnit.SECONDS, Schedulers.trampoline()) // 經過scheduler指定工做線程 // .skipLast(2, TimeUnit.SECONDS, true) // 延遲Error的通知,多用於組合Observable的場景 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept skipLast(4): " + t); } });
輸出:
--> DataSource: 1 --> DataSource: 2 --> DataSource: 3 --> accept skipLast(4): 1 --> DataSource: 4 --> accept skipLast(4): 2 --> DataSource: 5 --> accept skipLast(4): 3 --> DataSource: 6 --> accept skipLast(4): 4 --> DataSource: 7 --> accept skipLast(4): 5 --> DataSource: 8 --> accept skipLast(4): 6 --> DataSource: 9 --> accept skipLast(4): 7 --> DataSource: 10 --> accept skipLast(4): 8
注意: skipLast 的這個操做默認在 computation 調度器上執行,可是你可使用Scheduler參數指定其 它的調度器。
Javadoc: skipLast(time, unit)
Javadoc: skipLast(time, unit, delayError)
Javadoc: skipLast(time, unit, scheduler)
Javadoc: skipLast(time, unit, scheduler, delayError)
Javadoc: skipLast(time, unit, scheduler, delayError, bufferSize)
後續的Rx相關數據過濾部分請參考: Rxjava2 Observable的數據過濾詳解及實例(二)
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例