Rxjava2 Observable的錯誤處理操做詳解及實例

簡要:

需求瞭解:java

Rxjava 中當數據處理派發中發生了異常 ,觀察者會接受到一個 Error 的通知,那若是不想發射這個異常的通知,本身處理掉呢?答案固然是能夠的,在 Rxjava 中不少操做符可用於對 Observable 發射的 onError 通知作出響應或者從錯誤中恢復。react

例如:git

  1. 吞掉這個錯誤,切換到一個備用的Observable繼續發射數據
  2. 吞掉這個錯誤而後發射默認值
  3. 吞掉這個錯誤並當即嘗試重啓這個Observable
  4. 吞掉這個錯誤,在一些回退間隔後重啓這個Observable

Rxjava中常見的錯誤處理操做符有以下幾類:github

  • onErrorReturn():指示Observable在遇到錯誤時發射一個特定的數據
  • onErrorResumeNext():指示Observable在遇到錯誤時發射一個數據序列
  • onExceptionResumeNext():指示Observable遇到錯誤時繼續發射數據
  • retry():指示Observable遇到錯誤時重試
  • retryWhen():指示Observable遇到錯誤時,將錯誤傳遞給另外一個Observable來決定是否要從新給訂閱這個Observable

1. Catch

從 onError 通知中恢復發射數據。併發

img-Catch

Catch 操做符攔截原始Observable的 onError 通知,將它替換爲其它的數據項或數據序列,讓產生的Observable可以正常終止或者根本不終止。app

1.1 onErrorReturn

onErrorReturn 方法返回一個鏡像原有Observable行爲的新Observable,後者會忽略前者的 onError 調用,不會將錯誤傳遞給觀察者,做爲替代,它會發發射一個特殊的項並調用觀察者的 onCompleted 方法。ide

  • onErrorReturnItem(T item): 讓Observable遇到錯誤時發射一個指定的項(item)而且正常終止。

img-onErrorReturnItem

  • onErrorReturn(Function<Throwable, T> valueSupplier):讓Observable遇到錯誤時經過一個函數Function來進行判斷返回指定的類型數據,而且正常終止。

img-onErrorReturn

示例代碼:函數

// 建立一個能夠發射異常的Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(1 / 0);  // 產生一個異常
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });

    /** 1. onErrorReturnItem(T item)
     * 讓Observable遇到錯誤時發射一個指定的項(item)而且正常終止。
     */
    observable.onErrorReturnItem(888)   // 源Observable發生異常時發射指定的888數據
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(1): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onCompleted(1)");
                }
            });

    System.out.println("-----------------------------------------------");
    /**
     * 2. onErrorReturn(Function<Throwable, T> valueSupplier)
     * 讓Observable遇到錯誤時經過一個函數Function來接受Error參數並進行判斷返回指定的類型數據,而且正常終止。
     */
    observable.onErrorReturn(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) throws Exception {
            System.out.println("--> apply(1): e = " + throwable);
            return 888; // 源Observable發生異常時發射指定的888數據
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(2)");
        }
    });

輸出:.net

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 888
--> onCompleted(1)
-----------------------------------------------
--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> apply(1): e = java.lang.ArithmeticException: / by zero
--> onNext(2): 888
--> onCompleted(2)

Javadoc: onErrorReturnItem(T item)
Javadoc: onErrorReturn(Function<Throwable, T> valueSupplier)

1.2 onErrorResumeNext

onErrorResumeNext 方法返回一個鏡像原有Observable行爲的新Observable,後者會忽略前者的 onError 調用,不會將錯誤傳遞給觀察者,做爲替代,它會開始另外一個指定的備用Observable。

img-onErrorResumeNext-Observable

  • onErrorResumeNext(ObservableSource next): 讓Observable在遇到錯誤時開始發射第二個指定的Observable的數據序列。
  • onErrorResumeNext(Function<Throwable, ObservableSource > resumeFunction):讓Observable在遇到錯誤時經過一個函數Function來接受Error參數並進行判斷返回指定的第二個Observable的數據序列。

示例代碼:

// 建立一個能夠發射異常的Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(1 / 0);  // 產生一個異常
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });
    
    /**
     * 3. onErrorResumeNext(ObservableSource next)
     * 讓Observable在遇到錯誤時開始發射第二個指定的Observable的數據序列
     */
    observable.onErrorResumeNext(Observable.just(888))  // 當發生異常的時候繼續發射此項Observable
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(3)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(3): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(3): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onCompleted(3)");
                }
            });

    System.out.println("-----------------------------------------------");
    /**
     * 4. onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)
     * 讓Observable在遇到錯誤時經過一個函數Function來接受Error參數並進行判斷返回指定的第二個Observable的數據序列
     */
    observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
            System.out.println("--> apply(4): " + throwable);
            return Observable.just(888);    // 當發生異常的時候繼續發射此項Observable
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(4)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(4): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(4): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(4)");
        }
    });

輸出:

--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 888
--> onCompleted(3)
-----------------------------------------------
--> onSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
--> apply(4): java.lang.ArithmeticException: / by zero
--> onNext(4): 888
--> onCompleted(4)

Javadoc: onErrorResumeNext(ObservableSource next)
Javadoc: onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)

1.3 onExceptionResumeNext

與 onErrorResumeNext 相似, onExceptionResumeNext 方法返回一個鏡像原有Observable行爲的新Observable,也使用一個備用的Observable,不一樣的是,若是 onError 收到的 Throwable 不是一個 Exception ,它會將錯誤傳遞給觀察者的 onError 方法,不會使用備用的Observable。

img-onExceptionResumeNext

解析: onExceptionResumeNext 只會對Exception類型的異常進行處理,若是onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable 。

示例代碼:

// 建立一個能夠發射異常的Observable
    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
        //  emitter.onError(new Throwable("This is Throwable!"));  // Throwable類型異常,直接通知觀察者
        //  emitter.onError(new Error("This is Error!"));          // Error類型異常,直接通知觀察者
            emitter.onError(new Exception("This is Exception!"));  // Exception類型異常,進行處理,發送備用的Observable數據
        //    emitter.onNext(1 / 0);  // 會產生一個ArithmeticException異常,異常會被處理,發送備用的Observable數據
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });
    /**
     * 5. onExceptionResumeNext(ObservableSource next)
     *  若是onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable
     *  只對Exception類型的異常通知進行備用Observable處理
     */
    observable1.onExceptionResumeNext(Observable.just(888))
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(5)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(5): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(5): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onCompleted(5)");
                }
            });

輸出:

--> onSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> onNext(5): 888
--> onCompleted(5)

Javadoc: onExceptionResumeNext(ObservableSource next)

2. Retry

若是原始Observable遇到錯誤,從新訂閱它指望它能正常終止。

Retry 操做符不會將原始 Observable 的 onError 通知傳遞給觀察者,它會訂閱這個Observable,再給它機會無錯誤地完成它的數據序列。 Retry 老是傳遞 onNext 通知給觀察者,因爲從新訂閱,可能會形成數據項重複狀況。

2.1 retry()

retry():不管收到多少次 onError 通知,無參數版本的 retry 都會繼續訂閱併發射原始Observable。

img-retry
注意: 由於若是遇到異常,將會無條件的從新訂閱原始的Observable,知道沒有異常的發射所有的數據序列爲止。因此若是你的異常發生後從新訂閱也不會恢復正常的話,會一直訂閱下去,有內存泄露的風險。

2.2 retry(long times)

retry(long times):接受單個 count 參數的 retry 會最多從新訂閱指定的次數,若是次數超了,它不會嘗試再次訂閱,它會把最新的一個 onError 通知傳遞給它的觀察者。

img-retry-times

2.3 retry(long times, Predicate predicate)

retry(long times, Predicate<Throwable> predicate):遇到異常後最多從新訂閱 times 次,每次從新訂閱通過函數predicate 最終判斷是否繼續從新訂閱,若是 times 到達上限或者 predicate 返回 false 中任意一個最早知足條件,都會終止從新訂閱,retry 會將最新的一個 onError 通知傳遞給它的觀察者。

img-retry-times-predicate

2.4 retry(Predicate predicate)

retry(Predicate<Throwable> predicate):接受一個謂詞函數做爲參數,這個函數的兩個參數是:重試次數和致使發射 onError 通知的 Throwable 。這個函數返回一個布爾值,若是返回 true , retry 應該再次訂閱和鏡像原始的Observable,若是返回 false , retry 會將最新的一個 onError 通知傳遞給它的觀察者

img-retry-predicate

2.5 retry(BiPredicate predicate)

retry(BiPredicate<Integer, Throwable> predicate):遇到異常時,經過函數 predicate 判斷是否從新訂閱源Observable,而且經過參數 Integer 傳遞給 predicate 從新訂閱的次數,retry 會將最新的一個 onError 通知傳遞給它的觀察者。

img-retry-BiPrediccate

2.6 retryUntil(BooleanSupplier stop)

retryUntil(BooleanSupplier stop):重試從新訂閱,直到給定的中止函數 stop 返回 true,retry 會將最新的一個 onError 通知傳遞給它的觀察者。

img-retryUntil

2.7 retryWhen(Function handler)

retryWhen(Function<Observable<Throwable>, ObservableSource> handler):retryWhen 和 retry 相似,區別是, retryWhen 將 onError 中的 Throwable 傳遞給一個函數,這個函數產生另外一個 Observable, retryWhen 觀察它的結果再決定是否是要從新訂閱原始的Observable。若是這個Observable發射了一項數據,它就從新訂閱,若是這個Observable發射的是 onError 通知,它就將這個通知傳遞給觀察者而後終止。

img-retryWhen

實例代碼:

// flag for emitted onError times
    public static int temp = 0;

    // 建立能夠發送Error通知的Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            if (temp <= 2) {
                emitter.onError(new Exception("Test Error!"));
                temp++;
            }
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });

    /**
     * 1. retry()
     *  不管收到多少次onError通知, 都會去繼續訂閱併發射原始Observable。
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(1)");
        }
    }).retry().subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> accept(1): " + integer);
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 2. retry(long times)
     *  遇到異常後,最多從新訂閱源Observable times次
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(2)");
        }
    }).retry(1) // 遇到異常後,重複訂閱的1次
      .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext(2): " + integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onCompleted(2)");
            }
      });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 3. retry(long times, Predicate<Throwable> predicate)
     *  遇到異常後最多從新訂閱times次,每次從新訂閱通過函數predicate最終判斷是否繼續從新訂閱
     *  若是times到達上限或者predicate返回false中任意一個最早知足條件,都會終止從新訂閱
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(3)");
        }
    }).retry(2, new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable throwable) throws Exception {
            System.out.println("--> test(3)");
            if(throwable instanceof Exception) {
                return true;    // 遇到異常通知後是否繼續繼續訂閱
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(3)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(3): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(3): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(3)");
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 4. retry(Predicate<Throwable> predicate)
     *  遇到異常時,經過函數predicate判斷是否從新訂閱源Observable
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(4)");
        }
    }).retry(new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable throwable) throws Exception {
            if (throwable instanceof Exception) {
                return true;    // 遇到異常通知後是否繼續繼續訂閱
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(4)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(4): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(4): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(4)");
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 5. retry(BiPredicate<Integer, Throwable> predicate)
     *   遇到異常時,經過函數predicate判斷是否從新訂閱源Observable,而且經過參數integer傳遞給predicate從新訂閱的次數
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(5)");
        }
    }).retry(new BiPredicate<Integer, Throwable>() {
        @Override
        public boolean test(Integer integer, Throwable throwable) throws Exception {
            System.out.println("--> test(5): " + integer);
            if (throwable instanceof Exception) {
                return true;    // 遇到異常通知後是否繼續繼續訂閱
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(5)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(5): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(5): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(5)");
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 6. retryUntil(BooleanSupplier stop)
     * 重試從新訂閱,直到給定的中止函數stop返回true
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(6)");
        }
    }).retryUntil(new BooleanSupplier() {
        @Override
        public boolean getAsBoolean() throws Exception {
            System.out.println("--> getAsBoolean(6)");
            if(temp == 1){  // 知足條件,中止從新訂閱
                return true;
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(6)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(6): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(6): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(6)");
        }
    });


    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 7. retryWhen(Function<Observable<Throwable>, ObservableSource> handler)
     *  將onError中的Throwable傳遞給一個函數handler,這個函數產生另外一個Observable,
     *  retryWhen觀察它的結果再決定是否是要從新訂閱原始的Observable。
     *  若是這個Observable發射了一項數據,它就從新訂閱,
     *  若是這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者而後終止。
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(7)");
        }
    }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
            System.out.println("--> apply(7)");
            // 根據產生的Error的Observable是否正常發射數據來進行從新訂閱,若是發射Error通知,則直接傳遞給觀察者後終止
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    if (temp == 1) {
                        return Observable.error(throwable); // 知足條件後,傳遞這個Error,終止從新訂閱
                    }
                    return Observable.timer(1, TimeUnit.MILLISECONDS);  // 正常發射數據,能夠從新訂閱
                }
            });
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(7)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(7): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(7): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(7)");
        }
    });

    System.in.read();

輸出:

----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
---------------------------------------------
--> onSubscribe(2)
----> doOnSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
----> doOnSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> onError(2): java.lang.Exception: Test Error!
---------------------------------------------
--> onSubscribe(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> test(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> test(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onError(3): java.lang.Exception: Test Error!
---------------------------------------------
--> onSubscribe(4)
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
--> onNext(4): 3
--> onNext(4): 4
---------------------------------------------
--> onSubscribe(5)
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 1
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 2
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 3
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> onNext(5): 3
--> onNext(5): 4
---------------------------------------------
--> onSubscribe(6)
----> doOnSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> getAsBoolean(6)
----> doOnSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> getAsBoolean(6)
--> onError(6): java.lang.Exception: Test Error!
---------------------------------------------
--> apply(7)
--> onSubscribe(7)
----> doOnSubscribe(7)
--> onNext(7): 1
--> onNext(7): 2
----> doOnSubscribe(7)
--> onNext(7): 1
--> onNext(7): 2
--> onError(7): java.lang.Exception: Test Error!

Javadoc: retry()
Javadoc: retry(long times)
Javadoc: retry(long times, Predicate<Throwable> predicate)
Javadoc: retry(Predicate<Throwable> predicate)
Javadoc: retry(BiPredicate<Integer, Throwable> predicate)
Javadoc: retryUntil(BooleanSupplier stop)
Javadoc: retryWhen(Function<Observable<Throwable>, ObservableSource> handler)

小結

本節主要介紹了 Rxjava 中關於 Error 通知的處理,主要是在遇到異常通知時,無條件或者指定條件的去從新訂閱原始 Observable 直到沒有異常(正常發射全部數據序列)或者知足指定的條件後終止從新訂閱,發射異常通知給觀察者。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

相關文章
相關標籤/搜索