需求瞭解:java
Rxjava 中當數據處理派發中發生了異常 ,觀察者會接受到一個 Error
的通知,那若是不想發射這個異常的通知,本身處理掉呢?答案固然是能夠的,在 Rxjava 中不少操做符可用於對 Observable 發射的 onError 通知作出響應或者從錯誤中恢復。react
例如:git
Rxjava中常見的錯誤處理操做符有以下幾類:github
從 onError 通知中恢復發射數據。併發
Catch
操做符攔截原始Observable的 onError 通知,將它替換爲其它的數據項或數據序列,讓產生的Observable可以正常終止或者根本不終止。app
onErrorReturn
方法返回一個鏡像原有Observable行爲的新Observable,後者會忽略前者的 onError 調用,不會將錯誤傳遞給觀察者,做爲替代,它會發發射一個特殊的項並調用觀察者的 onCompleted 方法。ide
示例代碼:函數
// 建立一個能夠發射異常的Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(1 / 0); // 產生一個異常 emitter.onNext(3); emitter.onNext(4); } }); /** 1. onErrorReturnItem(T item) * 讓Observable遇到錯誤時發射一個指定的項(item)而且正常終止。 */ observable.onErrorReturnItem(888) // 源Observable發生異常時發射指定的888數據 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(1)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(1): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(1): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(1)"); } }); System.out.println("-----------------------------------------------"); /** * 2. onErrorReturn(Function<Throwable, T> valueSupplier) * 讓Observable遇到錯誤時經過一個函數Function來接受Error參數並進行判斷返回指定的類型數據,而且正常終止。 */ observable.onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) throws Exception { System.out.println("--> apply(1): e = " + throwable); return 888; // 源Observable發生異常時發射指定的888數據 } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(2): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(2): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(2)"); } });
輸出:.net
--> onSubscribe(1) --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 888 --> onCompleted(1) ----------------------------------------------- --> onSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 --> apply(1): e = java.lang.ArithmeticException: / by zero --> onNext(2): 888 --> onCompleted(2)
Javadoc: onErrorReturnItem(T item)
Javadoc: onErrorReturn(Function<Throwable, T> valueSupplier)
onErrorResumeNext
方法返回一個鏡像原有Observable行爲的新Observable,後者會忽略前者的 onError 調用,不會將錯誤傳遞給觀察者,做爲替代,它會開始另外一個指定的備用Observable。
示例代碼:
// 建立一個能夠發射異常的Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(1 / 0); // 產生一個異常 emitter.onNext(3); emitter.onNext(4); } }); /** * 3. onErrorResumeNext(ObservableSource next) * 讓Observable在遇到錯誤時開始發射第二個指定的Observable的數據序列 */ observable.onErrorResumeNext(Observable.just(888)) // 當發生異常的時候繼續發射此項Observable .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(3): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(3)"); } }); System.out.println("-----------------------------------------------"); /** * 4. onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction) * 讓Observable在遇到錯誤時經過一個函數Function來接受Error參數並進行判斷返回指定的第二個Observable的數據序列 */ observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception { System.out.println("--> apply(4): " + throwable); return Observable.just(888); // 當發生異常的時候繼續發射此項Observable } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(4)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(4): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(4): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(4)"); } });
輸出:
--> onSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> onNext(3): 888 --> onCompleted(3) ----------------------------------------------- --> onSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 --> apply(4): java.lang.ArithmeticException: / by zero --> onNext(4): 888 --> onCompleted(4)
Javadoc: onErrorResumeNext(ObservableSource next)
Javadoc: onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)
與 onErrorResumeNext 相似, onExceptionResumeNext
方法返回一個鏡像原有Observable行爲的新Observable,也使用一個備用的Observable,不一樣的是,若是 onError 收到的 Throwable 不是一個 Exception ,它會將錯誤傳遞給觀察者的 onError 方法,不會使用備用的Observable。
解析: onExceptionResumeNext
只會對Exception
類型的異常進行處理,若是onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable 。
示例代碼:
// 建立一個能夠發射異常的Observable Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); // emitter.onError(new Throwable("This is Throwable!")); // Throwable類型異常,直接通知觀察者 // emitter.onError(new Error("This is Error!")); // Error類型異常,直接通知觀察者 emitter.onError(new Exception("This is Exception!")); // Exception類型異常,進行處理,發送備用的Observable數據 // emitter.onNext(1 / 0); // 會產生一個ArithmeticException異常,異常會被處理,發送備用的Observable數據 emitter.onNext(3); emitter.onNext(4); } }); /** * 5. onExceptionResumeNext(ObservableSource next) * 若是onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable * 只對Exception類型的異常通知進行備用Observable處理 */ observable1.onExceptionResumeNext(Observable.just(888)) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(5)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(5): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(5): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(5)"); } });
輸出:
--> onSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> onNext(5): 888 --> onCompleted(5)
若是原始Observable遇到錯誤,從新訂閱它指望它能正常終止。
Retry
操做符不會將原始 Observable 的 onError
通知傳遞給觀察者,它會訂閱這個Observable,再給它機會無錯誤地完成它的數據序列。 Retry 老是傳遞 onNext 通知給觀察者,因爲從新訂閱,可能會形成數據項重複狀況。
retry():不管收到多少次 onError
通知,無參數版本的 retry
都會繼續訂閱併發射原始Observable。
注意: 由於若是遇到異常,將會無條件的從新訂閱原始的Observable,知道沒有異常的發射所有的數據序列爲止。因此若是你的異常發生後從新訂閱也不會恢復正常的話,會一直訂閱下去,有內存泄露的風險。
retry(long times):接受單個 count
參數的 retry 會最多從新訂閱指定的次數,若是次數超了,它不會嘗試再次訂閱,它會把最新的一個 onError
通知傳遞給它的觀察者。
retry(long times, Predicate<Throwable> predicate):遇到異常後最多從新訂閱 times
次,每次從新訂閱通過函數predicate
最終判斷是否繼續從新訂閱,若是 times 到達上限或者 predicate 返回 false 中任意一個最早知足條件,都會終止從新訂閱,retry 會將最新的一個 onError 通知傳遞給它的觀察者。
retry(Predicate<Throwable> predicate):接受一個謂詞函數做爲參數,這個函數的兩個參數是:重試次數和致使發射 onError
通知的 Throwable 。這個函數返回一個布爾值,若是返回 true
, retry 應該再次訂閱和鏡像原始的Observable,若是返回 false
, retry 會將最新的一個 onError 通知傳遞給它的觀察者
retry(BiPredicate<Integer, Throwable> predicate):遇到異常時,經過函數 predicate
判斷是否從新訂閱源Observable,而且經過參數 Integer
傳遞給 predicate 從新訂閱的次數,retry 會將最新的一個 onError 通知傳遞給它的觀察者。
retryUntil(BooleanSupplier stop):重試從新訂閱,直到給定的中止函數 stop
返回 true
,retry 會將最新的一個 onError 通知傳遞給它的觀察者。
retryWhen(Function<Observable<Throwable>, ObservableSource> handler):retryWhen
和 retry 相似,區別是, retryWhen 將 onError 中的 Throwable 傳遞給一個函數,這個函數產生另外一個 Observable, retryWhen 觀察它的結果再決定是否是要從新訂閱原始的Observable。若是這個Observable發射了一項數據,它就從新訂閱,若是這個Observable發射的是 onError 通知,它就將這個通知傳遞給觀察者而後終止。
實例代碼:
// flag for emitted onError times public static int temp = 0; // 建立能夠發送Error通知的Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); if (temp <= 2) { emitter.onError(new Exception("Test Error!")); temp++; } emitter.onNext(3); emitter.onNext(4); } }); /** * 1. retry() * 不管收到多少次onError通知, 都會去繼續訂閱併發射原始Observable。 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(1)"); } }).retry().subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 2. retry(long times) * 遇到異常後,最多從新訂閱源Observable times次 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(2)"); } }).retry(1) // 遇到異常後,重複訂閱的1次 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(2): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(2): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(2)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 3. retry(long times, Predicate<Throwable> predicate) * 遇到異常後最多從新訂閱times次,每次從新訂閱通過函數predicate最終判斷是否繼續從新訂閱 * 若是times到達上限或者predicate返回false中任意一個最早知足條件,都會終止從新訂閱 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(3)"); } }).retry(2, new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) throws Exception { System.out.println("--> test(3)"); if(throwable instanceof Exception) { return true; // 遇到異常通知後是否繼續繼續訂閱 } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(3): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(3)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 4. retry(Predicate<Throwable> predicate) * 遇到異常時,經過函數predicate判斷是否從新訂閱源Observable */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(4)"); } }).retry(new Predicate<Throwable>() { @Override public boolean test(Throwable throwable) throws Exception { if (throwable instanceof Exception) { return true; // 遇到異常通知後是否繼續繼續訂閱 } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(4)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(4): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(4): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(4)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 5. retry(BiPredicate<Integer, Throwable> predicate) * 遇到異常時,經過函數predicate判斷是否從新訂閱源Observable,而且經過參數integer傳遞給predicate從新訂閱的次數 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(5)"); } }).retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(Integer integer, Throwable throwable) throws Exception { System.out.println("--> test(5): " + integer); if (throwable instanceof Exception) { return true; // 遇到異常通知後是否繼續繼續訂閱 } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(5)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(5): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(5): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(5)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 6. retryUntil(BooleanSupplier stop) * 重試從新訂閱,直到給定的中止函數stop返回true */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(6)"); } }).retryUntil(new BooleanSupplier() { @Override public boolean getAsBoolean() throws Exception { System.out.println("--> getAsBoolean(6)"); if(temp == 1){ // 知足條件,中止從新訂閱 return true; } return false; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(6)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(6): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(6): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(6)"); } }); System.out.println("---------------------------------------------"); temp = 0; /** * 7. retryWhen(Function<Observable<Throwable>, ObservableSource> handler) * 將onError中的Throwable傳遞給一個函數handler,這個函數產生另外一個Observable, * retryWhen觀察它的結果再決定是否是要從新訂閱原始的Observable。 * 若是這個Observable發射了一項數據,它就從新訂閱, * 若是這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者而後終止。 */ observable.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("----> doOnSubscribe(7)"); } }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception { System.out.println("--> apply(7)"); // 根據產生的Error的Observable是否正常發射數據來進行從新訂閱,若是發射Error通知,則直接傳遞給觀察者後終止 return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Throwable throwable) throws Exception { if (temp == 1) { return Observable.error(throwable); // 知足條件後,傳遞這個Error,終止從新訂閱 } return Observable.timer(1, TimeUnit.MILLISECONDS); // 正常發射數據,能夠從新訂閱 } }); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(7)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(7): " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError(7): " + e); } @Override public void onComplete() { System.out.println("--> onCompleted(7)"); } }); System.in.read();
輸出:
----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 ----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 ----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 ----> doOnSubscribe(1) --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --------------------------------------------- --> onSubscribe(2) ----> doOnSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 ----> doOnSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 --> onError(2): java.lang.Exception: Test Error! --------------------------------------------- --> onSubscribe(3) ----> doOnSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> test(3) ----> doOnSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> test(3) ----> doOnSubscribe(3) --> onNext(3): 1 --> onNext(3): 2 --> onError(3): java.lang.Exception: Test Error! --------------------------------------------- --> onSubscribe(4) ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 ----> doOnSubscribe(4) --> onNext(4): 1 --> onNext(4): 2 --> onNext(4): 3 --> onNext(4): 4 --------------------------------------------- --> onSubscribe(5) ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> test(5): 1 ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> test(5): 2 ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> test(5): 3 ----> doOnSubscribe(5) --> onNext(5): 1 --> onNext(5): 2 --> onNext(5): 3 --> onNext(5): 4 --------------------------------------------- --> onSubscribe(6) ----> doOnSubscribe(6) --> onNext(6): 1 --> onNext(6): 2 --> getAsBoolean(6) ----> doOnSubscribe(6) --> onNext(6): 1 --> onNext(6): 2 --> getAsBoolean(6) --> onError(6): java.lang.Exception: Test Error! --------------------------------------------- --> apply(7) --> onSubscribe(7) ----> doOnSubscribe(7) --> onNext(7): 1 --> onNext(7): 2 ----> doOnSubscribe(7) --> onNext(7): 1 --> onNext(7): 2 --> onError(7): java.lang.Exception: Test Error!
Javadoc: retry()
Javadoc: retry(long times)
Javadoc: retry(long times, Predicate<Throwable> predicate)
Javadoc: retry(Predicate<Throwable> predicate)
Javadoc: retry(BiPredicate<Integer, Throwable> predicate)
Javadoc: retryUntil(BooleanSupplier stop)
Javadoc: retryWhen(Function<Observable<Throwable>, ObservableSource> handler)
本節主要介紹了 Rxjava 中關於 Error
通知的處理,主要是在遇到異常通知時,無條件或者指定條件的去從新訂閱原始 Observable 直到沒有異常(正常發射全部數據序列)或者知足指定的條件後終止從新訂閱,發射異常通知給觀察者。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: