RxJava2 實戰知識梳理(13) 如何使得錯誤發生時不自動中止訂閱關係

1、發生錯誤時中止訂閱的狀況

RxJava中,若是發生了錯誤,那麼 訂閱者會自動中止對上游的訂閱關係 ,咱們將致使訂閱取消的錯誤分爲兩種:java

  • 上游:上游發生錯誤,併發送onError事件給訂閱者。
  • 下游:訂閱者在onNext中處理時發生了異常。

RxJava的設計中,若是發生了錯誤,那麼訂閱關係就取消了。可是在某些時候,咱們但願在錯誤發生的時候不要取消訂閱,由於這樣訂閱者只有從新經過subscribe方法才能收到消息,相似的場景如監測數據源變化、RxBus的實現等。git

咱們先用兩個簡單的例子來演示一下上面提到的兩種狀況:github

1.1 上游傳遞消息時發生錯誤

訂閱者在初始時候訂閱到mPublishObject,當該PublishObject發送到第四個事件時,主動拋出一個異常,以模擬上游發生異常的狀況。併發

private void upError() {
        mPublishSubject.map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                if (integer == 4) {
                    throw new RuntimeException();
                }
                return integer;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(getNormalObserver());
    }

    private Observer<Integer> getNormalObserver() {
        return new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext=" + value);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError=" + e);
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
    }
複製代碼

從控制檯的輸出能夠看到,當第四次發送事件後,因爲上游發生了異常,所以訂閱者收到了onError事件,以後它就再也沒法收到消息了。 app

1.2 訂閱者處理消息時發生錯誤

下面,咱們再來看訂閱處理消息時發生錯誤的場景:ide

private void downError() {
        mPublishSubject.observeOn(AndroidSchedulers.mainThread()).subscribe(getErrorObserver());
    }

    private LambdaObserver<Integer> getErrorObserver() {
        return new LambdaObserver<>(new Consumer<Integer>() {
            @Override
            public void accept(Integer value) throws Exception {
                Log.d(TAG, "onNext=" + value);
                if (value == 4) {
                    throw new RuntimeException();
                }
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "onError=" + throwable);
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "onComplete");
            }
        }, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {

            }
        });
    }
複製代碼

咱們在訂閱者收到第四個數據的時候拋出一個異常,此時控制檯的輸出爲以下,與上面相似,以後訂閱者都沒法接收到消息,由於訂閱關係已經被解除了。 post

2、發生異常時的處理辦法

2.1 上游發生錯誤

在上游發生錯誤的時候,通常經過重訂閱的方式來解決。咱們能夠根據錯誤的類型判斷是否須要重訂閱,重訂閱的時候使用retryWhen操做符,這個咱們在 RxJava2 實戰知識梳理(6) - 基於錯誤類型的重試請求 已經介紹過了。spa

下面,咱們演示一下在上面的錯誤當中如何恢復:設計

private void upErrorIgnore() {
        mPublishSubject.map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer integer) throws Exception {
                if (integer == 4) {
                    throw new RuntimeException("retry");
                } else if (integer == 8) {
                    throw new RuntimeException("don't retry");
                }
                return integer;
            }
        }).observeOn(AndroidSchedulers.mainThread()).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                //第一步,經過flatMap對錯誤進行響應。
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Throwable throwable) throws Exception {
                        //第二步:根據錯誤的類型判斷是否須要重訂閱。
                        return "retry".equals(throwable.getMessage()) ? Observable.just(0) : Observable.empty();
                    }
                });
            }
        }).subscribe(getNormalObserver());
    }
複製代碼

在第四次/第八次點擊的是否,咱們分別在上游拋出一個異常,這樣就會觸發retryWhen的回調,在其中咱們分爲註釋中的兩部分進行處理,第四次的時候發起重訂閱,而第八次則不發起,所以,第九個事件訂閱者就收不到了,控制檯的輸出爲: code

2.2 訂閱者發生錯誤

可是retryWhen只能處理上游發生錯誤的狀況,對於上面說的第二種狀況並不能處理,所以假如是上面介紹的第二種狀況:訂閱者在onNext處理中發生錯誤的狀況,仍然會解除訂閱關係。

這裏首先要感謝 Johnny Shieh 提供的解決方法,在 RxJava 2 版本的 Rxbus 一文中,他分析了這一問題的緣由,這是由於在LambdaObserver的源碼中,若是在onNext中發生了異常,那麼首先會調用onError方法,而onError中會執行取消訂閱的操做。

解決辦法就是, 把 LambdaObserver 代碼拷貝出來,註釋掉那句,而後繼承於它去實現 Observer,代碼在 RxSample 的十三章例子中。
從控制檯能夠看出,並無解除訂閱關係,在發生錯誤以後,仍然能夠繼續收到數據。


更多文章,歡迎訪問個人 Android 知識梳理系列:

相關文章
相關標籤/搜索