操做符之功能性操做符

     一、做用

  • 輔助被觀察者(Observable) 在發送事件時實現一些功能性需求

     二、類型

     三、詳解

          3.1   subscribe() /  subscribeOn() /  observeOn()java

       做用:訂閱  /  設置被觀察者線程  /  設置觀察者線程app

 

          3.2   delay() ----- 見rxdocs.pdf第157頁ide

       做用:使得被觀察者延遲一段時間再發送事件spa

 

    public static void delay() {
        Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

        輸出:線程

08-06 19:38:09.177 15886 15886 D Operation: onSubscribe
08-06 19:38:12.178 15886 15904 D Operation: onNext: value = 1
08-06 19:38:12.179 15886 15904 D Operation: onNext: value = 2
08-06 19:38:12.179 15886 15904 D Operation: onNext: value = 3
08-06 19:38:12.179 15886 15904 D Operation: onComplete

 

          3.3   do() ----- 見rxdocs.pdf第161頁3d

       做用:在事件發送 & 接收的整個生命週期過程當中進行操做code

 

    public static void doOperation() {
        Observable.just(1, 2)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doOnNext: value = " + integer);
                    }
                })
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(TAG, "doOnLifecycle Consumer: disposable  = " + disposable.isDisposed());
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnLifecycle Action");
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doFinally");
                    }
                })
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doAfterTerminate");
                    }
                })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnTerminate");
                    }
                })
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        Log.d(TAG, "doOnSubscribe: disposable = " + disposable.isDisposed());
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "doOnError");
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnComplete");
                    }
                })
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doAfterNext: value = " + integer);
                    }
                })
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.d(TAG, "doOnEach: notification = " + integerNotification.toString());
                    }
                })
                .doOnEach(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "doOnEach: onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "doOnEach: onNext = " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "doOnEach: onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "doOnEach: onComplete");
                    }
                })
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.d(TAG, "doOnDispose");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "real onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "real onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "real onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "real onComplete");
                    }
                });
    }

          輸出:orm

08-06 20:10:46.205 18692 18692 D Operation: doOnLifecycle Consumer: disposable  = false
08-06 20:10:46.205 18692 18692 D Operation: doOnSubscribe: disposable = false
08-06 20:10:46.205 18692 18692 D Operation: real onSubscribe
08-06 20:10:46.205 18692 18692 D Operation: doOnNext: value = 1
08-06 20:10:46.206 18692 18692 D Operation: doOnEach: notification = OnNextNotification[1]
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onNext = 1
08-06 20:10:46.207 18692 18692 D Operation: real onNext: value = 1
08-06 20:10:46.207 18692 18692 D Operation: doAfterNext: value = 1
08-06 20:10:46.207 18692 18692 D Operation: doOnNext: value = 2
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: notification = OnNextNotification[2]
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onNext = 2
08-06 20:10:46.207 18692 18692 D Operation: real onNext: value = 2
08-06 20:10:46.207 18692 18692 D Operation: doAfterNext: value = 2
08-06 20:10:46.207 18692 18692 D Operation: doOnTerminate
08-06 20:10:46.207 18692 18692 D Operation: doOnComplete
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: notification = OnCompleteNotification
08-06 20:10:46.207 18692 18692 D Operation: doOnEach: onComplete
08-06 20:10:46.208 18692 18692 D Operation: real onComplete
08-06 20:10:46.208 18692 18692 D Operation: doAfterTerminate
08-06 20:10:46.208 18692 18692 D Operation: doFinally

 

          3.4   onErrorReturn() ----- 見rxdocs.pdf第151頁server

       做用:遇到錯誤時,發送1個特殊事件 & 正常終止。捕獲在它以前發生的異常blog

 

    public static void onErrorReturn() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new NullPointerException("null point exception"));
                emitter.onNext(3);
            }
        }).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Exception {
                Log.d(TAG, "onErrorReturn");
                return 666;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-07 09:05:49.377 23696 23696 D Operation: onSubscribe
08-07 09:05:49.377 23696 23696 D Operation: onNext: value = 1
08-07 09:05:49.377 23696 23696 D Operation: onNext: value = 2
08-07 09:05:49.377 23696 23696 D Operation: onErrorReturn
08-07 09:05:49.377 23696 23696 D Operation: onNext: value = 666
08-07 09:05:49.378 23696 23696 D Operation: onComplete

 

          3.5   onErrorResumeNext() ----- 見rxdocs.pdf第151頁

       做用:遇到錯誤時,發送1個新的Observable

                     注意:onErrorResumeNext()攔截的錯誤 = Throwable,即Error和Exception均可以攔截。

 

 

 

    public static void onErrorResumeNext() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new ClassFormatError("format error"));
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                return Observable.just(6, 8);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-09 15:14:01.506 20890 20890 D Operation: onSubscribe
08-09 15:14:01.506 20890 20890 D Operation: onNext: value = 1
08-09 15:14:01.506 20890 20890 D Operation: onNext: value = 2
08-09 15:14:01.507 20890 20890 D Operation: onNext: value = 6
08-09 15:14:01.507 20890 20890 D Operation: onNext: value = 8
08-09 15:14:01.507 20890 20890 D Operation: onComplete

 

          3.6   onExceptionResumeNext() ----- 見rxdocs.pdf第152頁

     做用:遇到異常時,發送1個新的Observable

                 注意:與onErrorResumeNext()不一樣的是,若是onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。

 

    public static void onExceptionResumeNext() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new NullPointerException("null exception"));
            }
        }).onExceptionResumeNext(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer<? super Integer> observer) {
                observer.onNext(3);
                observer.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

      輸出:

08-09 15:29:38.250 22073 22073 D Operation: onSubscribe
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 1
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 2
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 3
08-09 15:29:38.250 22073 22073 D Operation: onNext: value = 4

 

          3.7   retry() ----- 見rxdocs.pdf第153頁

     做用:重試,即當出現錯誤時,讓被觀察者(Observable)從新發射數據。Retry操做符不會將原始Observable的onError通知傳遞給觀察者,它會訂閱這個Observable,再給它一次機會無錯誤地完成它的數據序列。Retry老是傳遞onNext通知                                    給觀察者,因爲從新訂閱,可能會形成數據項重複。

                   類型: 

 

<-- 1. retry() -->
// 做用:出現錯誤時,讓被觀察者從新發送數據
// 注:若一直錯誤,則一直從新發送

<-- 2. retry(long time) -->
// 做用:出現錯誤時,讓被觀察者從新發送數據(具有重試次數限制
// 參數 = 重試次數
 
<-- 3. retry(Predicate predicate) -->
// 做用:出現錯誤後,判斷是否須要從新發送數據(若須要從新發送& 持續遇到錯誤,則持續重試)
// 參數 = 判斷邏輯

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 做用:出現錯誤後,判斷是否須要從新發送數據(若須要從新發送 & 持續遇到錯誤,則持續重試
// 參數 =  判斷邏輯(傳入當前重試次數 & 異常錯誤信息)

<-- 5. retry(long time,Predicate predicate) -->
// 做用:出現錯誤後,判斷是否須要從新發送數據(具有重試次數限制
// 參數 = 設置重試次數 & 判斷邏輯

 

 

 

    public static void retry() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new NullPointerException("null pointer"));
                emitter.onNext(3);
            }
        }).retry(5).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-09 15:44:01.585 22956 22956 D Operation: onSubscribe
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 1
08-09 15:44:01.586 22956 22956 D Operation: onNext: value = 2
08-09 15:44:01.586 22956 22956 D Operation: onError: java.lang.NullPointerException: null pointer

 

          3.8   retryUntil()

     做用:出現錯誤後,判斷是否須要從新發送數據。做用相似於retry(Predicate predicate)

    public static void retryUtil() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new NullPointerException("null pointer"));
                emitter.onNext(3);
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                return false;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 1
08-09 15:58:29.424 24651 24651 D Operation: onNext: value = 2
....

 

          3.9   retryWhen() ----- 見rxdocs.pdf第154頁

     做用:遇到錯誤時,將發生的錯誤傳遞給一個新的被觀察者(Observable),並決定是否須要從新訂閱原始被觀察者(Observable)& 發送事件。

                                若是這個Observable發射了一項數據,它就從新訂閱,若是這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者而後終止。

    public static void retryWhen() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onError(new NullPointerException("null pointer"));
                emitter.onNext(3);
            }
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        //不從新發送訂閱事件,發送error後終止
                        //return Observable.error(throwable);

                        //則原始的Observable從新發送事件(若持續遇到錯誤,則持續重試)
                        return Observable.just(6);
                    }
                });
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

以上return Observable.error(throwable)的結果:
08-09 16:14:06.579 25485 25485 D Operation: onSubscribe
08-09 16:14:06.580 25485 25485 D Operation: onNext: value = 1
08-09 16:14:06.580 25485 25485 D Operation: onNext: value = 2
08-09 16:14:06.580 25485 25485 D Operation: onError: java.lang.NullPointerException: null pointer


以上return Observable.just(6)的結果:
08-09 16:19:23.689 25958 25958 D Operation: onSubscribe
08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 1
08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 2
08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 1
08-09 16:19:23.689 25958 25958 D Operation: onNext: value = 2
08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 1
08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 2
08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 1
08-09 16:19:23.690 25958 25958 D Operation: onNext: value = 2
......

 

          3.10   repeat() ----- 見rxdocs.pdf第52頁

        做用:無條件地、重複發送 被觀察者事件

 

    public static void repeat() {
        Observable.just(6, 7)
                .repeat(3L)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:

08-09 16:33:48.097 27105 27105 D Operation: onSubscribe
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 6
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 7
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 6
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 7
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 6
08-09 16:33:48.097 27105 27105 D Operation: onNext: value = 7
08-09 16:33:48.097 27105 27105 D Operation: onComplete

 

          3.11   repeatWhen() ----- 見rxdocs.pdf第53頁

        做用:有條件地、重複發送 被觀察者事件。與retryWhen相似

        原理:若新被觀察者(Observable)返回1個Complete / Error事件,則不從新訂閱 & 發送原來的 Observable;

                                    若新被觀察者(Observable)返回其他事件時,則從新訂閱 & 發送原來的 Observable

 

     四、總結

相關文章
相關標籤/搜索