Observable
) 在發送事件時實現一些功能性需求3.1 subscribe() / subscribeOn() / observeOn()java
做用:訂閱 / 設置被觀察者線程 / 設置觀察者線程app
3.2 delay() ----- 見rxdocs.pdf第157頁ide
做用:使得被觀察者延遲一段時間再發送事件spa
public static void delay() { Observable.just(1, 2, 3) .delay(3, TimeUnit.SECONDS) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:線程
08-06 19:38:09.177 15886 15886 D Operation: onSubscribe
08-06 19:38:12.178 15886 15904 D Operation: onNext: value = 1
08-06 19:38:12.179 15886 15904 D Operation: onNext: value = 2
08-06 19:38:12.179 15886 15904 D Operation: onNext: value = 3
08-06 19:38:12.179 15886 15904 D Operation: onComplete
3.3 do() ----- 見rxdocs.pdf第161頁3d
做用:在事件發送 & 接收的整個生命週期過程當中進行操做code
public static void doOperation() { Observable.just(1, 2) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "doOnNext: value = " + integer); } }) .doOnLifecycle(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.d(TAG, "doOnLifecycle Consumer: disposable = " + disposable.isDisposed()); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "doOnLifecycle Action"); } }) .doFinally(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doFinally"); } }) .doAfterTerminate(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doAfterTerminate"); } }) .doOnTerminate(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doOnTerminate"); } }) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.d(TAG, "doOnSubscribe: disposable = " + disposable.isDisposed()); } }) .doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "doOnError"); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doOnComplete"); } }) .doAfterNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "doAfterNext: value = " + integer); } }) .doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.d(TAG, "doOnEach: notification = " + integerNotification.toString()); } }) .doOnEach(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "doOnEach: onSubscribe"); } @Override public void onNext(Integer integer) { Log.d(TAG, "doOnEach: onNext = " + integer); } @Override public void onError(Throwable e) { Log.d(TAG, "doOnEach: onError"); } @Override public void onComplete() { Log.d(TAG, "doOnEach: onComplete"); } }) .doOnDispose(new Action() { @Override public void run() throws Exception { Log.d(TAG, "doOnDispose"); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "real onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "real onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "real onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "real onComplete"); } }); }
輸出:orm
08-06 20:10:46.205 18692 18692 D Operation: doOnLifecycle Consumer: disposable = false 08-06 20:10:46.205 18692 18692 D Operation: doOnSubscribe: disposable = false 08-06 20:10:46.205 18692 18692 D Operation: real onSubscribe 08-06 20:10:46.205 18692 18692 D Operation: doOnNext: value = 1 08-06 20:10:46.206 18692 18692 D Operation: doOnEach: notification = OnNextNotification[1] 08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onNext = 1 08-06 20:10:46.207 18692 18692 D Operation: real onNext: value = 1 08-06 20:10:46.207 18692 18692 D Operation: doAfterNext: value = 1 08-06 20:10:46.207 18692 18692 D Operation: doOnNext: value = 2 08-06 20:10:46.207 18692 18692 D Operation: doOnEach: notification = OnNextNotification[2] 08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onNext = 2 08-06 20:10:46.207 18692 18692 D Operation: real onNext: value = 2 08-06 20:10:46.207 18692 18692 D Operation: doAfterNext: value = 2 08-06 20:10:46.207 18692 18692 D Operation: doOnTerminate 08-06 20:10:46.207 18692 18692 D Operation: doOnComplete 08-06 20:10:46.207 18692 18692 D Operation: doOnEach: notification = OnCompleteNotification 08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onComplete 08-06 20:10:46.208 18692 18692 D Operation: real onComplete 08-06 20:10:46.208 18692 18692 D Operation: doAfterTerminate 08-06 20:10:46.208 18692 18692 D Operation: doFinally
3.4 onErrorReturn() ----- 見rxdocs.pdf第151頁server
做用:遇到錯誤時,發送1個特殊事件 & 正常終止。捕獲在它以前發生的異常blog
public static void onErrorReturn() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new NullPointerException("null point exception")); emitter.onNext(3); } }).onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) throws Exception { Log.d(TAG, "onErrorReturn"); return 666; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-07 09:05:49.377 23696 23696 D Operation: onSubscribe 08-07 09:05:49.377 23696 23696 D Operation: onNext: value = 1 08-07 09:05:49.377 23696 23696 D Operation: onNext: value = 2 08-07 09:05:49.377 23696 23696 D Operation: onErrorReturn 08-07 09:05:49.377 23696 23696 D Operation: onNext: value = 666 08-07 09:05:49.378 23696 23696 D Operation: onComplete
3.5 onErrorResumeNext() ----- 見rxdocs.pdf第151頁
做用:遇到錯誤時,發送1個新的Observable
注意:onErrorResumeNext()
攔截的錯誤 = Throwable,即Error和Exception均可以攔截。
public static void onErrorResumeNext() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new ClassFormatError("format error")); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception { return Observable.just(6, 8); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 15:14:01.506 20890 20890 D Operation: onSubscribe
08-09 15:14:01.506 20890 20890 D Operation: onNext: value = 1
08-09 15:14:01.506 20890 20890 D Operation: onNext: value = 2
08-09 15:14:01.507 20890 20890 D Operation: onNext: value = 6
08-09 15:14:01.507 20890 20890 D Operation: onNext: value = 8
08-09 15:14:01.507 20890 20890 D Operation: onComplete
3.6 onExceptionResumeNext() ----- 見rxdocs.pdf第152頁
做用:遇到異常時,發送1個新的Observable
注意:與onErrorResumeNext()不一樣的是,若是
onError
收到的Throwable
不是一個Exception
,它會將錯誤傳遞給觀察者的onError
方法,不會使用備用的Observable。
public static void onExceptionResumeNext() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new NullPointerException("null exception")); } }).onExceptionResumeNext(new Observable<Integer>() { @Override protected void subscribeActual(Observer<? super Integer> observer) { observer.onNext(3); observer.onNext(4); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 15:29:38.250 22073 22073 D Operation: onSubscribe
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 1
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 2
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 3
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 4
3.7 retry() ----- 見rxdocs.pdf第153頁
做用:重試,即當出現錯誤時,讓被觀察者(Observable
)從新發射數據。Retry
操做符不會將原始Observable的onError
通知傳遞給觀察者,它會訂閱這個Observable,再給它一次機會無錯誤地完成它的數據序列。Retry
老是傳遞onNext
通知 給觀察者,因爲從新訂閱,可能會形成數據項重複。
類型:
<-- 1. retry() --> // 做用:出現錯誤時,讓被觀察者從新發送數據 // 注:若一直錯誤,則一直從新發送 <-- 2. retry(long time) --> // 做用:出現錯誤時,讓被觀察者從新發送數據(具有重試次數限制 // 參數 = 重試次數 <-- 3. retry(Predicate predicate) --> // 做用:出現錯誤後,判斷是否須要從新發送數據(若須要從新發送& 持續遇到錯誤,則持續重試) // 參數 = 判斷邏輯 <-- 4. retry(new BiPredicate<Integer, Throwable>) --> // 做用:出現錯誤後,判斷是否須要從新發送數據(若須要從新發送 & 持續遇到錯誤,則持續重試 // 參數 = 判斷邏輯(傳入當前重試次數 & 異常錯誤信息) <-- 5. retry(long time,Predicate predicate) --> // 做用:出現錯誤後,判斷是否須要從新發送數據(具有重試次數限制 // 參數 = 設置重試次數 & 判斷邏輯
public static void retry() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new NullPointerException("null pointer")); emitter.onNext(3); } }).retry(5).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 15:44:01.585 22956 22956 D Operation: onSubscribe 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1 08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2 08-09 15:44:01.586 22956 22956 D Operation: onError: java.lang.NullPointerException: null pointer
3.8 retryUntil()
做用:出現錯誤後,判斷是否須要從新發送數據。做用相似於retry(Predicate predicate)
public static void retryUtil() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new NullPointerException("null pointer")); emitter.onNext(3); } }).retryUntil(new BooleanSupplier() { @Override public boolean getAsBoolean() throws Exception { return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
....
3.9 retryWhen() ----- 見rxdocs.pdf第154頁
做用:遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable
),並決定是否須要從新訂閱原始被觀察者(Observable
)& 發送事件。
若是這個Observable發射了一項數據,它就從新訂閱,若是這個Observable發射的是onError
通知,它就將這個通知傳遞給觀察者而後終止。
public static void retryWhen() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new NullPointerException("null pointer")); emitter.onNext(3); } }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception { return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Throwable throwable) throws Exception { //不從新發送訂閱事件,發送error後終止 //return Observable.error(throwable); //則原始的Observable從新發送事件(若持續遇到錯誤,則持續重試) return Observable.just(6); } }); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
以上return Observable.error(throwable)的結果: 08-09 16:14:06.579 25485 25485 D Operation: onSubscribe 08-09 16:14:06.580 25485 25485 D Operation: onNext: value = 1 08-09 16:14:06.580 25485 25485 D Operation: onNext: value = 2 08-09 16:14:06.580 25485 25485 D Operation: onError: java.lang.NullPointerException: null pointer 以上return Observable.just(6)的結果: 08-09 16:19:23.689 25958 25958 D Operation: onSubscribe 08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 1 08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 2 08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 1 08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 2 08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 1 08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 2 08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 1 08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 2 ......
3.10 repeat() ----- 見rxdocs.pdf第52頁
做用:無條件地、重複發送 被觀察者事件
public static void repeat() { Observable.just(6, 7) .repeat(3L) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e.toString()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }
輸出:
08-09 16:33:48.097 27105 27105 D Operation: onSubscribe
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 6
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 7
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 6
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 7
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 6
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 7
08-09 16:33:48.097 27105 27105 D Operation: onComplete
3.11 repeatWhen() ----- 見rxdocs.pdf第53頁
做用:有條件地、重複發送 被觀察者事件。與retryWhen相似
原理:若新被觀察者(Observable
)返回1個Complete
/ Error
事件,則不從新訂閱 & 發送原來的 Observable;
若新被觀察者(Observable
)返回其他事件時,則從新訂閱 & 發送原來的 Observable