Observable
)發送的事件 & 觀察者 (Observer
)接收的事件
3.1 filter() ----- 見rxdocs.pdf第103頁java
做用:過濾 特定條件的事件ide
public static void filter() { Observable.just(1, 2, 3, 7, 6, 9) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 5; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:spa
08-09 17:01:19.982 28378 28378 D Operation: onSubscribe
08-09 17:01:19.982 28378 28378 D Operation: onNext: value = 7
08-09 17:01:19.982 28378 28378 D Operation: onNext: value = 6
08-09 17:01:19.982 28378 28378 D Operation: onNext: value = 9
08-09 17:01:19.982 28378 28378 D Operation: onComplete
3.2 ofType() ----- 見rxdocs.pdf第105頁3d
做用:過濾 特定數據類型的數據code
public static void ofType() { Observable.just(1, "A", 1.0f) .ofType(String.class) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(String value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:server
08-09 17:13:24.019 28953 28953 D Operation: onSubscribe 08-09 17:13:24.019 28953 28953 D Operation: onNext: value = A 08-09 17:13:24.019 28953 28953 D Operation: onComplete
3.3 skip() / skipLast() ----- 見rxdocs.pdf第120頁blog
做用:跳過前面或後面某些事件,某幾個或某段時間的事件索引
public static void skip() { Observable.just(1, 2, 3, 4, 5, 6, 7) .skip(2) .skipLast(3) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:事件
08-09 17:20:42.436 29210 29210 D Operation: onSubscribe
08-09 17:20:42.437 29210 29210 D Operation: onNext: value = 3
08-09 17:20:42.437 29210 29210 D Operation: onNext: value = 4
08-09 17:20:42.437 29210 29210 D Operation: onComplete
3.4 distinct() / distinctUntilChanged()----- 見rxdocs.pdf第97頁ip
做用:過濾事件序列中重複的事件 / 連續重複的事件
public static void distinct() { Observable.just(1, 2, 2, 3, 5, 6, 8, 8) .distinct() .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 17:29:09.744 29578 29578 D Operation: onSubscribe
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 1
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 2
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 3
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 5
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 6
08-09 17:29:09.744 29578 29578 D Operation: onNext: value = 8
08-09 17:29:09.744 29578 29578 D Operation: onComplete
3.5 take()/ takeLast()----- 見rxdocs.pdf第124頁
做用:指定觀察者最多能接收到最前面或最後面的事件數量
public static void take() { Observable.just(1, 2, 3, 4) .take(2) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 17:36:23.703 29843 29843 D Operation: onSubscribe
08-09 17:36:23.703 29843 29843 D Operation: onNext: value = 1
08-09 17:36:23.703 29843 29843 D Operation: onNext: value = 2
08-09 17:36:23.703 29843 29843 D Operation: onComplete
3.6 throttleFirst()/ throttleLast()----- 見rxdocs.pdf第119頁
做用:在某段時間內,只發送該段時間內第1次事件 / 最後1次事件
public static void throttleFirst() { Observable.intervalRange(0, 6, 0, 1, TimeUnit.SECONDS) .throttleFirst(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Long value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 17:47:52.213 30470 30470 D Operation: onSubscribe
08-09 17:47:52.215 30470 30494 D Operation: onNext: value = 0
08-09 17:47:55.216 30470 30494 D Operation: onNext: value = 3
08-09 17:47:57.215 30470 30494 D Operation: onComplete
3.7 sample()----- 見rxdocs.pdf第118頁
做用:在某段時間內,只發送該段時間內最新(最後)1次事件。與 throttleLast()
操做符相似
3.8 throttleWithTimeout () / debounce()----- 見rxdocs.pdf第95頁
做用:發送數據事件時,若2次發送事件的間隔<指定時間,就會丟棄前一次的數據,直到指定時間內都沒有新數據發射時纔會發送後一次的數據
public static void debounce() { Observable.intervalRange(0, 6, 0, 1, TimeUnit.SECONDS) .debounce(2, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Long value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 18:00:40.068 31014 31014 D Operation: onSubscribe
08-09 18:00:45.073 31014 31038 D Operation: onNext: value = 5
08-09 18:00:45.073 31014 31038 D Operation: onComplete
3.9 firstElement() / lastElement()
做用:僅選取第1個元素 / 最後一個元素
public static void firstElement() { Observable.just(1, 2, 3) .firstElement() .subscribe(new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Integer value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 19:25:41.413 2486 2486 D Operation: onSubscribe
08-09 19:25:41.413 2486 2486 D Operation: onSuccess: value = 1
3.10 elementAt()----- 見rxdocs.pdf第101頁
做用:指定接收某個元素(經過 索引值 肯定)
public static void elementAt() { Observable.just(1, 2, 3, 4, 5) .elementAt(2) .subscribe(new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Integer value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 19:29:54.401 2972 2972 D Operation: onSubscribe
08-09 19:29:54.402 2972 2972 D Operation: onSuccess: value = 3
3.11 elementAtOrError()
做用:在elementAt()
的基礎上,當出現越界狀況(即獲取的位置索引 > 發送事件序列長度)時,即拋出異常
public static void elementAtOrError() { Observable.just(1, 2, 3, 4, 5) .elementAtOrError(10) .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(Integer value) { Log.d(TAG, "onSuccess: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } }); }
輸出:
08-09 19:35:06.800 3849 3849 D Operation: onSubscribe
08-09 19:35:06.800 3849 3849 D Operation: onError: java.util.NoSuchElementException