RxJava2 錯誤處理詳解

熟悉RxJava的知道,onError跟onComplete是互斥的,出現其中一個,觀察者與被觀察者的關係就被中斷(如下簡稱:管道中斷),觀察者就永遠不會收到來自被觀察者發出的事件。java

而後有些狀況下,出現了錯誤,咱們但願能夠進行一些補救措施,例如:緩存

  • 因爲網絡緣由或者其餘緣由,Http請求失敗了,這個時候咱們但願進行重試,又或者去讀取本地的緩存數據
  • 在使用RxJava的組合操做符進行Http併發請求時,咱們但願接口之間互不影響,即A接口出現錯誤不會影響B接口的正常流程,反之同樣

現實開發中,可能有更多的場景須要對錯誤進行補救,因此RxJava爲咱們提供了兩大類進行錯誤處理,分別是Catch和Retry,前者在出現錯誤時補救,後者在出現錯誤時重試,接下來,分別對它們進行講解網絡

注:Catch和Retry只能捕獲上游事件的異常併發

Catch

Catch操做符共有5個,分別是:app

onErrorReturnItem(final T item)  //內部調用第二個方法
onErrorReturn(Function function)  //遇到錯誤,用默認數據項替代
onErrorResumeNext(final ObservableSource<? extends T> next) //內部調用第四個方法
onErrorResumeNext(Function resumeFunction ) //遇到錯誤,開始發射新的Observable的數據序列
onExceptionResumeNext(final ObservableSource<? extends T> next) //內部原理與第四個相同,僅有一個參數不一樣
複製代碼

雖然有5個操做符,可是實際上就只有3個,再準確點說就只有2個,爲何這麼說呢,由於第1個操做符內部調用的就是第2個,而第3個操做符內部調用是第4個操做符,因此說只有3個,那爲何準確點說只有2個呢,由於第5個操做符,內部原理同第四個,僅僅有一個參數傳的不同,接下來咱們分別講解。ide

onErrorReturnItem

Disposable disposable = Observable
        .fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return null; //返回null,即出現錯誤
            }
        })
        .onErrorReturnItem(100) //出現錯誤時,用一個默認的數據項將其替代
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                // 打印 100 隨後會當即回調onComplete
            }
        });

複製代碼

咱們再來看看onErrorReturnItem內部實現函數

public final Observable<T> onErrorReturnItem(final T item) {
    ObjectHelper.requireNonNull(item, "item is null");
    //將item封裝成Function對象,並調用onErrorReturn方法
    return onErrorReturn(Functions.justFunction(item));
}
複製代碼

onErrorReturn內部源碼較爲簡單,這裏不作講解,接下來看看onErrorReturn如何使用ui

onErrorReturn

Disposable disposable = Observable
        .fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return null; //返回null,即出現錯誤
            }
        })
        .onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Exception {
                //出現錯誤時,用一個默認的數據項將其替代,這裏根據不一樣的錯誤返回不一樣的數據項
                return 100; 
            }
        }) 
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                // 打印 100 隨後會當即回調onComplete
            }
        });
複製代碼

看到上面代碼能夠明白,onErrorReturn的做用就是在出現錯誤的時候,用一個默認的數據項將錯誤替代,並馬上回調onComplete。spa

onErrorResumeNext(final ObservableSource next)

public final Observable<T> onErrorResumeNext(final ObservableSource<? extends T> next) {
    ObjectHelper.requireNonNull(next, "next is null");
    //能夠看到這裏將Observable對象封裝成Function對象,並調用onErrorResumeNext方法
    return onErrorResumeNext(Functions.justFunction(next));
}
複製代碼

看源碼知道onErrorResumeNext(final ObservableSource next)內部調用了onErrorResumeNext(Function resumeFunction )故這裏再也不講解.net

onErrorResumeNext(Function resumeFunction )

onErrorResumeNext的做用就是在遇到錯誤時開始發射第二個Observable的數據序列,看代碼

Disposable disposable = Observable
        .fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return null; //返回null,即出現錯誤
            }
        })
        .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
                //出現錯誤時開始發射新的Observable的數據序列
                return Observable.just(1, 2, 3);
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                // 打印 一、二、3 隨後會當即回調onComplete
            }
        });
複製代碼

onExceptionResumeNext

onExceptionResumeNextonErrorResumeNext做用相同,都是在遇到錯誤時開始發射第二個Observable的數據序列,不一樣的是,若是onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable,即它只能捕獲Exception異常,這一點,咱們能夠在ObservableOnErrorNext$OnErrorNextObserver類中源碼看到

//使用onExceptionResumeNext操做符時,allowFatal爲true
//使用onErrorResumeNext操做都是,allowFatal爲false
if (allowFatal && !(t instanceof Exception)) {
    //非Exception異常,直接交給觀察者的onError方法
    actual.onError(t);                        
    return;                                   
}                                             
複製代碼

以上就是Catch操做符的介紹,處理原理無非就兩種,第一種用一個默認的數據項替代錯誤,第二種在遇到錯誤時開始發射一個新的Observable的數據序列,Catch操做符就講解到這,如須要知道具體業務場景,能夠看這裏HttpSender 介紹篇之多請求串行與並行(五)

Retry

Retry顧名思義就是在出現錯誤的時候進行重試,共有7個操做符,以下

retry()                                //無條件,重試無數次
retry(long times)                      //無條件,重試times次
retry(Predicate predicate)             //根據條件,重試無數次
retryUntil(final BooleanSupplier stop) //根據條件,重試無數次
retry(long times, Predicate predicate) //根據條件,重試times次
retry(BiPredicate predicate)           //功能與上一個同樣,實現不一樣
retryWhen(final Function handler)      //能夠實現延遲重試n次
複製代碼

前4個操做符內部都調用第五個retry(long times, Predicate predicate)(須要注意的是retryUntil操做符,只有接口裏的方法返回false時,纔會重試),因此咱們直接從第五個開始

retry(long times, Predicate predicate)

Disposable disposable = Observable
        .fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                //這裏會執行n+1次,其中n爲重試次數(若是重試條件爲true的話)
                return null; //返回null,即出現錯誤
            }
        })
        .retry(3, new Predicate<Throwable>() {//重試3次
            @Override
            public boolean test(Throwable throwable) throws Exception {
                //true 表明須要重試,可根據throwable對象返回是否須要重試
                return true;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                //重試成功,走這裏
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                //重試n次後,依然出現錯誤,直接會走到這裏
            }
        });
複製代碼

上面註釋很詳情,這裏再也不講解。

retry(BiPredicate predicate)

Disposable disposable = Observable
        .fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                //這裏會執行n+1次,其中n爲重試次數(若是重試條件爲true的話)
                return null; //返回null,即出現錯誤
            }
        })
        .retry(new BiPredicate<Integer, Throwable>() {
            @Override
            public boolean test(Integer times, Throwable throwable) throws Exception {
                //times 爲嘗試次數,即第幾回嘗試
                return times <= 3; //只容許嘗試3次
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                //重試成功,走這裏
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                //重試n次後,依然出現錯誤,直接會走到這裏
            }
        });
複製代碼

retry(BiPredicate predicate)retry(long times, Predicate predicate)功能上是同樣的,只是實現不同而已。註釋很詳情,也再也不講解。

retryWhen(final Function handler)

先來看看官網的描述:retryWhenonError中的Throwable傳遞給一個函數,這個函數產生另外一個ObservableretryWhen觀察它的結果再決定是否是要從新訂閱原始的Observable。若是這個Observable發射了一項數據,它就從新訂閱,若是這個Observable發射的是onError通知,它就將這個通知傳遞給觀察者而後終止。

這段話的大體意思就是,若是RxJava內部傳過來的Observable(retryWhen方法傳入的接口,經過接口方法傳過來的)發射了一項數據,即發射onNext事件,就會從新訂閱原始的Observable,若是發射的是onError事件,它就將這個事件傳遞給觀察者而後終止。

那麼,retryWhen有什麼做用呢,它的主要做用出現錯誤時,從新訂閱,即重試,它跟以前的retry操做符最大的區別就是,它能夠延遲重試,例如,咱們有這樣一個需求,須要在遇到錯誤是,隔3秒重試一次,最多重試3次,先來看看代碼

Disposable disposable = Observable
        .fromCallable(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Log.d("LJX", "call");
                //這裏會執行n+1次,其中n爲重試次數
                return null; //返回null,即出現錯誤
            }
        })
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> attempts) throws Exception {
                //注:這裏須要根據RxJava傳遞過來的Observable對象發射事件,不能直接返回一個新的Observable,不然無效
                return attempts.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    private final int maxRetries = 3; //最多重試三次
                    private final int retryDelayMillis = 3; //隔3秒重試一次
                    private int retryCount; //當前重試次數

                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        Log.d("LJX", "apply retryCount=" + retryCount);
                        //每次遇到錯誤,這裏都會回調一次
                        if (++retryCount <= maxRetries) {  //最多重試三次
                            return Observable.timer(retryDelayMillis, TimeUnit.SECONDS);
                        }
                        return Observable.error(throwable); //第四次還錯,就直接發射onError事件
                    }
                });
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                //重試成功,走這裏
                Log.d("LJX", "onNext ");
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                //重試n次後,依然出現錯誤,直接會走到這裏
                Log.d("LJX", "onError");
            }
        });
複製代碼

日誌打印

2019-01-29 17:18:19.764 11179-11179/com.example.httpsender D/LJX: call
2019-01-29 17:18:19.764 11179-11179/com.example.httpsender D/LJX: apply retryCount=0
2019-01-29 17:18:22.771 11179-11229/com.example.httpsender D/LJX: call
2019-01-29 17:18:22.772 11179-11229/com.example.httpsender D/LJX: apply retryCount=1
2019-01-29 17:18:25.775 11179-11231/com.example.httpsender D/LJX: call
2019-01-29 17:18:25.775 11179-11231/com.example.httpsender D/LJX: apply retryCount=2
2019-01-29 17:18:28.779 11179-11242/com.example.httpsender D/LJX: call
2019-01-29 17:18:28.779 11179-11242/com.example.httpsender D/LJX: apply retryCount=3
2019-01-29 17:18:28.781 11179-11242/com.example.httpsender D/LJX: onError
複製代碼

到這也許有讀者會問,我能夠不使用Observable.timer操做符嗎?能夠的,這裏可使用Observable.intervalRange操做符替代,能夠根據本身的業務需求返回一個Observable對象,例如使用Observable.just操做符發送多個數據項,內部會進行過濾,只有發射的第一個數據項纔有效。

好了catchretry兩大類錯誤處理操做符已介紹完畢,若有疑問,請留言,我會第一時間做答。

題外話

若是想在Activity/Fragment的生命週期對RxJava作自動管理,防止內存泄漏,可查看個人另外一片文章。Android RxLife 一款輕量級別的RxJava生命週期管理庫,感謝支持。

相關文章
相關標籤/搜索