目錄html
接續上篇: Rxjava2 Observable的數據過濾詳解及實例(一)java
只發射經過了函數過濾的數據項。react
實例代碼:git
// filter(Predicate<? super Integer> predicate) // 驗證數據,決定是否發射數據 Observable.range(1, 10) .filter(new Predicate<Integer>() { @Override public boolean test(Integer t) throws Exception { // 進行測試驗證是否須要發射數據 return t > 5 ? true : false; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept filter: " + t); } });
輸出:github
--> accept filter: 6 --> accept filter: 7 --> accept filter: 8 --> accept filter: 9 --> accept filter: 10
Javadoc: filter(predicate)數據庫
只發射第一項或者知足某個條件的第一項數據。若是你只對Observable發射的第一項數據,或者知足某個條件的第一項數據感興趣,你可使用 First
操做符。緩存
Frist
操做符有如下幾種操做:網絡
只發射第一個數據,當數據存在的狀況。併發
實例代碼:ide
// 1. firstElement() // 只發射第一個數據 Observable.range(1, 10) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept firstElement(1): " + t); } });
輸出:
--> accept firstElement(1): 1
Javadoc: firstElement()
first(defaultItem)
與 firstElement()
相似,可是在Observagle沒有發射任何數據時發射一個你在參數中指定的 defaultItem
默認值。
實例代碼:
// 2. first(Integer defaultItem) // 發射第一個數據項,若是沒有數據項,發送默認的defaultItem Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).first(999) // 沒有數據發送時,發送默認值999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept first(2): " + t); } });
輸出:
--> accept first(2): 999
Javadoc: first(defaultItem)
發射第一個數據項,若是沒有數據項,會發送 NoSuchElementException
通知。
實例代碼:
// 3. first(Integer defaultItem) // 發射第一個數據項,若是沒有數據項,會有Error: NoSuchElementException Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).firstOrError() // 沒有數據發送時,將會發送NoSuchElementException通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe: "); } @Override public void onSuccess(Integer t) { System.out.println("--> accept onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> acctpt onError(3): " + e); } });
輸出:
--> onSubscribe: --> acctpt onError(3): java.util.NoSuchElementException
Javadoc: firstOrError()
single
與 first
相似,可是若是原始Observable在完成以前不是正好發射一次數據,它會拋出一個NoSuchElementException
的通知。
Single 有如下幾種操做:
發射單例數據,超過一個就會發送 NoSuchElementException
通知。
實例代碼:
// 1.singleElement() // 發射單例數據,超過一個就會NoSuchElementException Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).singleElement() // 發送單個數據,大於1項數據就會有Error通知 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept singleElement(1): " + t); } },new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.out.println("--> OnError(1): " + t); } });
輸出:
--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!
Javadoc: singleElement()
發射單例數據,沒有接收到數據項則發送指定默認 defaultItem
數據。
實例代碼:
// 2. single(Integer defaultItem) // 發射單例數據,沒有數據項發送指定默認defaultItem Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).single(999) // 沒有接受到數據則發送默認數據999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept single(2): " + t); } });
輸出:
--> accept single(2): 999
Javadoc: single(defaultItem)
發射一個單例的數據,若是數據源沒有數據項,則發射一個 NoSuchElementException
通知。
實例代碼:
// 3.singleOrError() // 發射一個單例的數據,若是數據源 沒有數據項,則發送一個NoSuchElementException異常通知 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).singleOrError() // 若是沒有數據項發送,則發送一個NoSuchElementException異常通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3): "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
輸出:
--> onSubscribe(3): --> onError(3): java.util.NoSuchElementException
Javadoc: singleOrError()
ElementAt
操做符獲取原始Observable發射的數據序列指定索引位置的數據項,而後當作本身的惟一數據發射。
ElementAt 操做符有如下幾種操做:
發射索引位置第 index
項數據(從0開始計數),若是數據不存在,會 IndexOutOfBoundsException
異常。
實例代碼:
// 1. elementAt(long index) // 指定發射第N項數據(從0開始計數),若是數據不存在,會IndexOutOfBoundsException異常 Observable.range(1, 10) .elementAt(5) // 發射數據序列中索引爲5的數據項,索引從0開始 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept ElementAt(1): " + t); } });
輸出:
--> accept ElementAt(1): 6
Javadoc: elementAt(index)
發射索引位置第 index
項數據(從0開始計數),若是數據不存在,發送默認 defaultItem
數據。
實例代碼:
// 2. elementAt(long index, Integer defaultItem) // 指定發射第N項數據(從0開始計數),若是數據不存在,發送默認defaultItem Observable.range(1, 10) .elementAt(20, 0) // 發射索引第20項數據,不存在此項數據時,發送默認數據0 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept elementAt(2): " + t); } });
輸出:
--> accept elementAt(2): 0
Javadoc: elementAt(index, defaultItem)
發射索引位置第 index
項數據(從0開始計數),若是指定發射的數據不存在,會發射NoSuchElementException
異常通知。
實例代碼:
// 3. elementAtOrError(long index) // 若是指定發射的數據不存在,會拋出NoSuchElementException Observable.range(1, 10) .elementAtOrError(50) // 發射索引爲50的數據,不存在則發送NoSuchElementException異常通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3): "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
輸出:
--> onSubscribe(3): --> onError(3): java.util.NoSuchElementException
Javadoc: elementAtOrError(index)
不發射任何數據,只發射Observable的終止通知。
IgnoreElements
操做符抑制原始Observable發射的全部數據,只容許它的終止通知 (onError 或 onCompleted )經過。
解析: 若是你不關心一個Observable發射的數據,可是但願在它完成時或遇到錯誤終止時收到通知,你能夠對Observable使用 ignoreElements 操做符,它會確保永遠不會調用觀察者的 onNext() 方法。
實例代碼:
// ignoreElements() // 只接受onError或onCompleted通知,攔截onNext事件(不關心發射的數據,只但願在成功或者失敗的時候收到通知) Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // int i = 1/0; emitter.onComplete(); } }).ignoreElements() .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
輸出:
--> onSubscribe --> onComplete
Javadoc: ignoreElements()
只發射最後一項(或者知足某個條件的最後一項)數據。
若是你只對Observable發射的最後一項數據,或者知足某個條件的最後一項數據感興趣,你可使用 Last
操做符。
Last
有如下幾種操做:
只發射最後一項數據,使用沒有參數的 last
操做符,若是Observable中沒有數據發送,則一樣沒有數據發送。
實例代碼:
// 1. lastElement() // 接受最後一項數據 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).lastElement() // 存在數據發送的話,即發射最後一項數據,不然沒有數據發射 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept lastElement(1): " + t); } });
輸出:
--> accept lastElement(1): 3
Javadoc: lastElement()
只發射最後一項數據,若是Observable中沒有數據發送,則發送指定的默認值 defaultItem
。
實例代碼:
// 2. last(Integer defaultItem) // 接受最後一項數據,若是沒有數據發送,發送默認數據:defaultItem Observable.range(0, 0) .last(999) // 接受最後一項數據,沒有數據則發送默認數據999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: last(2): " + t); } });
輸出:
--> accept: last(2): 999
Javadoc: last(defaultItem)
接受最後一項數據,若是沒有數據發送,拋出 NoSuchElementException
異常通知。
實例代碼:
// 3. lastOrError() // 接受最後一項數據,若是沒有數據發送,拋出onError: NoSuchElementException Observable.range(0, 0) .lastOrError() // 接受最後一項數據,若是沒有數據,則反射NoSuchElementException異常通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe: "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3)"); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
輸出:
--> onSubscribe: --> onError(3): java.util.NoSuchElementException
Javadoc: lastOrError()
使用Take
操做符讓你能夠修改Observable的行爲,只返回前面的N項數據,而後發射完成通知,忽略剩餘的數據。
Take 操做符有如下幾種操做:
若是你對一個Observable使用 take(n)
操做符,而那個Observable發射的數據少於N項,那麼 take
操做生成的Observable不會拋異常或發射 onError
通知,在完成前它只會發射相同的少許數據。
實例代碼:
// 1. take(long count) // 返回前count項數據 Observable.range(1, 100) .take(5) // 返回前5項數據 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept take(1): " + t); } });
輸出:
--> accept take(1): 1 --> accept take(1): 2 --> accept take(1): 3 --> accept take(1): 4 --> accept take(1): 5
Javadoc: take(count)
取必定時間間隔內的數據,有可選參數 scheduler
指定線程調度器。
實例代碼:
// 2. take(long time, TimeUnit unit,[Scheduler] scheduler) // 取必定時間間隔內的數據,可選參數scheduler指定線程調度器 Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS) .take(5, TimeUnit.SECONDS) // 返回前5秒的數據項 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept take(2): " + t); } });
輸出:
--> accept take(2): 1 --> accept take(2): 2 --> accept take(2): 3 --> accept take(2): 4 --> accept take(2): 5
Javadoc: take(timeout, TimeUnit)
Javadoc: take(timeout, TimeUnit, Scheduler)
使用 TakeLast
操做符修改原始Observable,你能夠只發射Observable發射的後N項數據,忽略前面的數據。
takeLast
的這個變體默認在 computation
調度器上執行,可是你可使用第三個參數指定其它的調度器。
TakeLast 通常有下面幾種操做:
使用 takeLast(count)
操做符,你能夠只發射原始Observable發射的後 count
項數據(或者原始Observable發射onCompleted()
前的 count
項數據),忽略以前的數據。 注意:這會延遲原始Observable發射的任何數據項,直到它所有完成。
實例代碼:
// 1. takeLast(int count) // 接受Observable數據發射完成前的Count項數據, 忽略前面的數據 Observable.range(1, 10) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(1): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(1): "); } }) .takeLast(5) // 發送數據發射完成前的5項數據 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept takeLast(1): " + t); } });
輸出:
--> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 --> accept(1): 6 --> accept(1): 7 --> accept(1): 8 --> accept(1): 9 --> accept(1): 10 --> onCompleted(1): --> accept takeLast(1): 6 --> accept takeLast(1): 7 --> accept takeLast(1): 8 --> accept takeLast(1): 9 --> accept takeLast(1): 10
Javadoc: takeLast(count)
還有一個 takeLast
變體接受一個時長而不是數量參數。它會發射在原始Observable的生命週期內最後一段時間內發射的數據。時長和時間單位經過參數指定。
注意: 這會延遲原始Observable發射的任何數據項,直到它所有完成。
實例代碼:
// 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) // 可選參數 scheduler:指定工做調度器 delayError:延遲Error通知 bufferSize:指定緩存大小 // 接受Observable數據發射完成前指定時間間隔發射的數據項 Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(2): "); } }) .takeLast(3, TimeUnit.SECONDS) // 發送數據發射完成前3秒時間段內的數據 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept takeLast(2): " + t); } });
輸出:
--> accept(2): 1 --> accept(2): 2 --> accept(2): 3 --> accept(2): 4 --> accept(2): 5 --> onCompleted(2): --> accept takeLast(2): 3 --> accept takeLast(2): 4 --> accept takeLast(2): 5
Javadoc: takeLast(long time, TimeUnit unit)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
接受 Observable 發射完成前 time
時間段內收集 count
項數據併發射。
示例代碼:
// 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) // 可選參數 scheduler:指定工做調度器 delayError:延遲Error通知 bufferSize:指定緩存大小 // 接受Observable數據發射完成前time時間段內收集count項數據併發射 Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(3): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(3): "); } }) .takeLast(2, 500, TimeUnit.MILLISECONDS) // 在原數據發射完成前500毫秒內接受2項數據 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept takeLast(3): " + t); } });
輸出:
--> accept(3): 1 --> accept(3): 2 --> accept(3): 3 --> accept(3): 4 --> accept(3): 5 --> accept(3): 6 --> accept(3): 7 --> accept(3): 8 --> accept(3): 9 --> accept(3): 10 --> onCompleted(3): --> accept takeLast(3): 9 --> accept takeLast(3): 10
Javadoc: takeLast(long count, long time, TimeUnit unit)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
ofType
是 filter 操做符的一個特殊形式。它過濾一個Observable只返回指定類型的數據。
示例代碼:
Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)}; // ofType(Class clazz) // 過濾數據,只返回特定類型的數據 Observable.fromArray(dataObjects) .ofType(Integer.class) // 過濾Integer類型的數據 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept ofType: " + t); } });
輸出:
--> accept ofType: 1 --> accept ofType: 5
Javadoc: ofType(Class clazz)
數據過濾的操做符主要是過濾被觀察者(Observable)發射的數據序列,按照指定的規則過濾數據項,忽略並丟棄其餘的數據。實際開發場景如網絡數據的過濾,數據庫數據的過濾等,是開發中重要且常見的操做之一。
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: