Rxjava2 Observable的數據過濾詳解及實例(二)

接續上篇: Rxjava2 Observable的數據過濾詳解及實例(一)java

6. Filter

只發射經過了函數過濾的數據項。react

img-filter

實例代碼:git

// filter(Predicate<? super Integer> predicate)
    // 驗證數據,決定是否發射數據
    Observable.range(1, 10)
            .filter(new Predicate<Integer>() {

                @Override
                public boolean test(Integer t) throws Exception {
                    // 進行測試驗證是否須要發射數據
                    return t > 5 ? true : false;
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept filter: " + t);
                }
            });

輸出:github

--> accept filter: 6
--> accept filter: 7
--> accept filter: 8
--> accept filter: 9
--> accept filter: 10

Javadoc: filter(predicate)數據庫

7. Frist

只發射第一項或者知足某個條件的第一項數據。若是你只對Observable發射的第一項數據,或者知足某個條件的第一項數據感興趣,你可使用 First 操做符。緩存

Frist 操做符有如下幾種操做:網絡

7.1 firstElement()

只發射第一個數據,當數據存在的狀況。併發

img-firstElement()

實例代碼:ide

// 1. firstElement()
    // 只發射第一個數據
    Observable.range(1, 10)
        .firstElement()
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept firstElement(1): "  + t);
            }
        });

輸出:

--> accept firstElement(1): 1

Javadoc: firstElement()

7.2 first(defaultItem)

first(defaultItem)firstElement() 相似,可是在Observagle沒有發射任何數據時發射一個你在參數中指定的 defaultItem 默認值。

img-first(defaultItem)

實例代碼:

// 2. first(Integer defaultItem)
    // 發射第一個數據項,若是沒有數據項,發送默認的defaultItem
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).first(999) // 沒有數據發送時,發送默認值999
      .subscribe(new Consumer<Integer>() {

          @Override
          public void accept(Integer t) throws Exception {
              System.out.println("--> accept first(2): " + t);
          }
      });

輸出:

--> accept first(2): 999

Javadoc: first(defaultItem)

7.3 firstOrError()

發射第一個數據項,若是沒有數據項,會發送 NoSuchElementException 通知。

img-firstOrError

實例代碼:

// 3. first(Integer defaultItem)
    // 發射第一個數據項,若是沒有數據項,會有Error: NoSuchElementException
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).firstOrError() // 沒有數據發送時,將會發送NoSuchElementException通知
      .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe: ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> accept onSuccess(3): " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> acctpt onError(3): " + e);
            }
      });

輸出:

--> onSubscribe: 
--> acctpt onError(3): java.util.NoSuchElementException

Javadoc: firstOrError()

8. Single

singlefirst 相似,可是若是原始Observable在完成以前不是正好發射一次數據,它會拋出一個NoSuchElementException 的通知。

Single 有如下幾種操做:

8.1 singleElement()

發射單例數據,超過一個就會發送 NoSuchElementException 通知。

img-singleElement

實例代碼:

// 1.singleElement()
    // 發射單例數據,超過一個就會NoSuchElementException  
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
        }
    }).singleElement() // 發送單個數據,大於1項數據就會有Error通知
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept singleElement(1): " + t);
            }
      },new Consumer<Throwable>() {

        @Override
        public void accept(Throwable t) throws Exception {
            System.out.println("--> OnError(1): " + t);
        }
    });

輸出:

--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!

Javadoc: singleElement()

8.2 single(defaultItem)

發射單例數據,沒有接收到數據項則發送指定默認 defaultItem 數據。

img-single(defaultItem)

實例代碼:

// 2. single(Integer defaultItem)
    // 發射單例數據,沒有數據項發送指定默認defaultItem
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).single(999) // 沒有接受到數據則發送默認數據999
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept single(2): " + t);
            }
      });

輸出:

--> accept single(2): 999

Javadoc: single(defaultItem)

8.3 singleOrError()

發射一個單例的數據,若是數據源沒有數據項,則發射一個 NoSuchElementException 通知。

img-singleOrError

實例代碼:

// 3.singleOrError()
    // 發射一個單例的數據,若是數據源 沒有數據項,則發送一個NoSuchElementException異常通知
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).singleOrError() // 若是沒有數據項發送,則發送一個NoSuchElementException異常通知
      .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(3): ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> onSuccess(3): " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(3): " + e);
            }
        });

輸出:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: singleOrError()

9. ElementAt

ElementAt 操做符獲取原始Observable發射的數據序列指定索引位置的數據項,而後當作本身的惟一數據發射。

ElementAt 操做符有如下幾種操做:

9.1 elementAt(index)

發射索引位置第 index 項數據(從0開始計數),若是數據不存在,會 IndexOutOfBoundsException 異常。

img-elementAt(index)

實例代碼:

// 1. elementAt(long index)
    // 指定發射第N項數據(從0開始計數),若是數據不存在,會IndexOutOfBoundsException異常
    Observable.range(1, 10)
        .elementAt(5) // 發射數據序列中索引爲5的數據項,索引從0開始
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept ElementAt(1): " + t);
            }
        });

輸出:

--> accept ElementAt(1): 6

Javadoc: elementAt(index)

9.2 elementAt(index, defaultItem)

發射索引位置第 index 項數據(從0開始計數),若是數據不存在,發送默認 defaultItem 數據。

img-elementAt(index, defaultItem)

實例代碼:

// 2. elementAt(long index, Integer defaultItem)
    // 指定發射第N項數據(從0開始計數),若是數據不存在,發送默認defaultItem
    Observable.range(1, 10)
        .elementAt(20, 0) // 發射索引第20項數據,不存在此項數據時,發送默認數據0
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept elementAt(2): " + t);
            }
        });

輸出:

--> accept elementAt(2): 0

Javadoc: elementAt(index, defaultItem)

9.3 elementAtOrError(index)

發射索引位置第 index 項數據(從0開始計數),若是指定發射的數據不存在,會發射NoSuchElementException 異常通知。

img-elementAtOrError(index)

實例代碼:

// 3. elementAtOrError(long index)
    // 若是指定發射的數據不存在,會拋出NoSuchElementException
    Observable.range(1, 10)
        .elementAtOrError(50) // 發射索引爲50的數據,不存在則發送NoSuchElementException異常通知
        .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(3): ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> onSuccess(3): " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(3): " + e);
            }
        });

輸出:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: elementAtOrError(index)

10. ignoreElements

不發射任何數據,只發射Observable的終止通知。

IgnoreElements 操做符抑制原始Observable發射的全部數據,只容許它的終止通知 (onError 或 onCompleted )經過。

img-ignoreElements

解析: 若是你不關心一個Observable發射的數據,可是但願在它完成時或遇到錯誤終止時收到通知,你能夠對Observable使用 ignoreElements 操做符,它會確保永遠不會調用觀察者的 onNext() 方法。

實例代碼:

// ignoreElements()
    // 只接受onError或onCompleted通知,攔截onNext事件(不關心發射的數據,只但願在成功或者失敗的時候收到通知)
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            //  int i = 1/0;
            emitter.onComplete();
        }
    }).ignoreElements()
      .subscribe(new CompletableObserver() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete");
            }
      });

輸出:

--> onSubscribe
--> onComplete

Javadoc: ignoreElements()

11. Last

只發射最後一項(或者知足某個條件的最後一項)數據。

若是你只對Observable發射的最後一項數據,或者知足某個條件的最後一項數據感興趣,你可使用 Last 操做符。

Last 有如下幾種操做:

11.1 lastElement()

只發射最後一項數據,使用沒有參數的 last 操做符,若是Observable中沒有數據發送,則一樣沒有數據發送。

img-lastElement

實例代碼:

// 1. lastElement()
    // 接受最後一項數據
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    }).lastElement() // 存在數據發送的話,即發射最後一項數據,不然沒有數據發射
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept lastElement(1): " + t);
            }
        });

輸出:

--> accept lastElement(1): 3

Javadoc: lastElement()

11.2 last(defaultItem)

只發射最後一項數據,若是Observable中沒有數據發送,則發送指定的默認值 defaultItem

img-last(defaultItem)

實例代碼:

// 2. last(Integer defaultItem)
    // 接受最後一項數據,若是沒有數據發送,發送默認數據:defaultItem
    Observable.range(0, 0)
        .last(999) // 接受最後一項數據,沒有數據則發送默認數據999
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept: last(2): " + t);
            }
        });

輸出:

--> accept: last(2): 999

Javadoc: last(defaultItem)

11.3 lastOrError()

接受最後一項數據,若是沒有數據發送,拋出 NoSuchElementException 異常通知。

img-lastOrError

實例代碼:

// 3. lastOrError()
    // 接受最後一項數據,若是沒有數據發送,拋出onError: NoSuchElementException
    Observable.range(0, 0)
        .lastOrError() // 接受最後一項數據,若是沒有數據,則反射NoSuchElementException異常通知
        .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe: ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> onSuccess(3)");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(3): " + e);
            }
        });

輸出:

--> onSubscribe: 
--> onError(3): java.util.NoSuchElementException

Javadoc: lastOrError()

12. Take

使用Take操做符讓你能夠修改Observable的行爲,只返回前面的N項數據,而後發射完成通知,忽略剩餘的數據。

Take 操做符有如下幾種操做:

12.1 take(count)

若是你對一個Observable使用 take(n) 操做符,而那個Observable發射的數據少於N項,那麼 take 操做生成的Observable不會拋異常或發射 onError 通知,在完成前它只會發射相同的少許數據。

img-take(count)

實例代碼:

// 1. take(long count)
    // 返回前count項數據
    Observable.range(1, 100)
        .take(5) // 返回前5項數據
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept take(1): " + t);
            }
        });

輸出:

--> accept take(1): 1
--> accept take(1): 2
--> accept take(1): 3
--> accept take(1): 4
--> accept take(1): 5

Javadoc: take(count)

12.2 take(timeout, TimeUnit)

取必定時間間隔內的數據,有可選參數 scheduler 指定線程調度器。

img-Take(timeout, TimeUnit)

實例代碼:

// 2. take(long time, TimeUnit unit,[Scheduler] scheduler)
    //  取必定時間間隔內的數據,可選參數scheduler指定線程調度器
    Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS)
        .take(5, TimeUnit.SECONDS) // 返回前5秒的數據項
        .subscribe(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> accept take(2): " + t);
            }
        });

輸出:

--> accept take(2): 1
--> accept take(2): 2
--> accept take(2): 3
--> accept take(2): 4
--> accept take(2): 5

Javadoc: take(timeout, TimeUnit)
Javadoc: take(timeout, TimeUnit, Scheduler)

13. TakeLast

使用 TakeLast 操做符修改原始Observable,你能夠只發射Observable發射的後N項數據,忽略前面的數據。

takeLast 的這個變體默認在 computation 調度器上執行,可是你可使用第三個參數指定其它的調度器。

TakeLast 通常有下面幾種操做:

13.1 takeLast(count)

使用 takeLast(count) 操做符,你能夠只發射原始Observable發射的後 count 項數據(或者原始Observable發射onCompleted() 前的 count 項數據),忽略以前的數據。 注意:這會延遲原始Observable發射的任何數據項,直到它所有完成。

img-takeLast(count)

實例代碼:

// 1. takeLast(int count)
    // 接受Observable數據發射完成前的Count項數據, 忽略前面的數據
    Observable.range(1, 10)
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(1): ");
                }
            })
            .takeLast(5) // 發送數據發射完成前的5項數據
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept takeLast(1): " + t);
                }
            });

輸出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
--> onCompleted(1): 
--> accept takeLast(1): 6
--> accept takeLast(1): 7
--> accept takeLast(1): 8
--> accept takeLast(1): 9
--> accept takeLast(1): 10

Javadoc: takeLast(count)

13.2 takeLast(time, TimeUnit)

還有一個 takeLast 變體接受一個時長而不是數量參數。它會發射在原始Observable的生命週期內最後一段時間內發射的數據。時長和時間單位經過參數指定。

注意: 這會延遲原始Observable發射的任何數據項,直到它所有完成。

img-takeLast(time, TimeUnit)

實例代碼:

// 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // 可選參數 scheduler:指定工做調度器  delayError:延遲Error通知  bufferSize:指定緩存大小
    // 接受Observable數據發射完成前指定時間間隔發射的數據項
    Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(2): ");
                }
            })
            .takeLast(3, TimeUnit.SECONDS) // 發送數據發射完成前3秒時間段內的數據
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(2): " + t);
                }
            });

輸出:

--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
--> onCompleted(2): 
--> accept takeLast(2): 3
--> accept takeLast(2): 4
--> accept takeLast(2): 5

Javadoc: takeLast(long time, TimeUnit unit)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

13.3 takeLast(count, time, TimeUnit)

接受 Observable 發射完成前 time 時間段內收集 count 項數據併發射。

img-takeLast(count, time, TimeUnit)

示例代碼:

// 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // 可選參數 scheduler:指定工做調度器  delayError:延遲Error通知  bufferSize:指定緩存大小
    // 接受Observable數據發射完成前time時間段內收集count項數據併發射
    Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(3): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(3): ");
                }
            })
            .takeLast(2, 500, TimeUnit.MILLISECONDS) // 在原數據發射完成前500毫秒內接受2項數據
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(3): " + t);
                }
            });

輸出:

--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10
--> onCompleted(3): 
--> accept takeLast(3): 9
--> accept takeLast(3): 10

Javadoc: takeLast(long count, long time, TimeUnit unit)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

14. OfType

ofType 是 filter 操做符的一個特殊形式。它過濾一個Observable只返回指定類型的數據。

img-OfType

示例代碼:

Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)};
    // ofType(Class clazz)
    // 過濾數據,只返回特定類型的數據
    Observable.fromArray(dataObjects)
            .ofType(Integer.class) // 過濾Integer類型的數據
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept ofType: " + t);
                }
            });

輸出:

--> accept ofType: 1
--> accept ofType: 5

Javadoc: ofType(Class clazz)

小結:

數據過濾的操做符主要是過濾被觀察者(Observable)發射的數據序列,按照指定的規則過濾數據項,忽略並丟棄其餘的數據。實際開發場景如網絡數據的過濾,數據庫數據的過濾等,是開發中重要且常見的操做之一。

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

相關文章
相關標籤/搜索