在RxJava
中,若是發生了錯誤,那麼 訂閱者會自動中止對上游的訂閱關係 ,咱們將致使訂閱取消的錯誤分爲兩種:java
onError
事件給訂閱者。onNext
中處理時發生了異常。在RxJava
的設計中,若是發生了錯誤,那麼訂閱關係就取消了。可是在某些時候,咱們但願在錯誤發生的時候不要取消訂閱,由於這樣訂閱者只有從新經過subscribe
方法才能收到消息,相似的場景如監測數據源變化、RxBus
的實現等。git
咱們先用兩個簡單的例子來演示一下上面提到的兩種狀況:github
訂閱者在初始時候訂閱到mPublishObject
,當該PublishObject
發送到第四個事件時,主動拋出一個異常,以模擬上游發生異常的狀況。併發
private void upError() {
mPublishSubject.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
if (integer == 4) {
throw new RuntimeException();
}
return integer;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(getNormalObserver());
}
private Observer<Integer> getNormalObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext=" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError=" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
}
複製代碼
從控制檯的輸出能夠看到,當第四次發送事件後,因爲上游發生了異常,所以訂閱者收到了onError
事件,以後它就再也沒法收到消息了。 app
下面,咱們再來看訂閱處理消息時發生錯誤的場景:ide
private void downError() {
mPublishSubject.observeOn(AndroidSchedulers.mainThread()).subscribe(getErrorObserver());
}
private LambdaObserver<Integer> getErrorObserver() {
return new LambdaObserver<>(new Consumer<Integer>() {
@Override
public void accept(Integer value) throws Exception {
Log.d(TAG, "onNext=" + value);
if (value == 4) {
throw new RuntimeException();
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "onError=" + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "onComplete");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
}
});
}
複製代碼
咱們在訂閱者收到第四個數據的時候拋出一個異常,此時控制檯的輸出爲以下,與上面相似,以後訂閱者都沒法接收到消息,由於訂閱關係已經被解除了。 post
在上游發生錯誤的時候,通常經過重訂閱的方式來解決。咱們能夠根據錯誤的類型判斷是否須要重訂閱,重訂閱的時候使用retryWhen
操做符,這個咱們在 RxJava2 實戰知識梳理(6) - 基於錯誤類型的重試請求 已經介紹過了。spa
下面,咱們演示一下在上面的錯誤當中如何恢復:設計
private void upErrorIgnore() {
mPublishSubject.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
if (integer == 4) {
throw new RuntimeException("retry");
} else if (integer == 8) {
throw new RuntimeException("don't retry");
}
return integer;
}
}).observeOn(AndroidSchedulers.mainThread()).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
//第一步,經過flatMap對錯誤進行響應。
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
//第二步:根據錯誤的類型判斷是否須要重訂閱。
return "retry".equals(throwable.getMessage()) ? Observable.just(0) : Observable.empty();
}
});
}
}).subscribe(getNormalObserver());
}
複製代碼
在第四次/第八次點擊的是否,咱們分別在上游拋出一個異常,這樣就會觸發retryWhen
的回調,在其中咱們分爲註釋中的兩部分進行處理,第四次的時候發起重訂閱,而第八次則不發起,所以,第九個事件訂閱者就收不到了,控制檯的輸出爲: code
可是retryWhen
只能處理上游發生錯誤的狀況,對於上面說的第二種狀況並不能處理,所以假如是上面介紹的第二種狀況:訂閱者在onNext
處理中發生錯誤的狀況,仍然會解除訂閱關係。
這裏首先要感謝 Johnny Shieh 提供的解決方法,在 RxJava 2 版本的 Rxbus 一文中,他分析了這一問題的緣由,這是由於在LambdaObserver
的源碼中,若是在onNext
中發生了異常,那麼首先會調用onError
方法,而onError
中會執行取消訂閱的操做。