衆所周知,RxJava2 中當鏈式調用中拋出異常時,若是沒有對應的 Consumer 去處理異常,則這個異常會被拋出到虛擬機中去,Android 上的直接表現就是 crash,程序崩潰。java
說異常處理前我們先來看一下 RxJava2 中 Observable
訂閱方法 subscribe()
咱們經常使用的幾種訂閱方式:api
// 1
subscribe()
// 2
Disposable subscribe(Consumer<? super T> onNext) // 3 Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) // 4 Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete) // 5 Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe) // 6 void subscribe(Observer<? super T> observer) 複製代碼
無參和以 Consumer
爲參數的幾種方法內部都是以默認參數補齊的方式最終調用第 5
個方法,而方法 5
內部經過 LambdaObserver 將參數包裝成 Observer 再調用第 6
個方法sass
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
複製代碼
因此使用 Consumer
參數方式和 Observer
參數方式進行訂閱除了觀察回調來源不同其餘沒有任何差異。但就是由於這種差異,在異常狀況發生時的處理結果上也會產生差異網絡
咱們分別進行一下幾種方式模擬異常:併發
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync()) // 封裝的線程切換
.subscribe(object : Observer<List<ZooData>> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: List<ZooData>) {
throw RuntimeException("runtime exception")
}
override fun onError(e: Throwable) {
Log.d("error", e.message)
}
})
複製代碼
結果:不會觸發 onError,App 崩潰
app
Observable.create<String> {
it.onNext("ssss")
}
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.d("result::", t)
throw RuntimeException("run llllll")
}
override fun onError(e: Throwable) {
Log.e("sss", "sss", e)
}
})
複製代碼
結果:會觸發 onError,App 未崩潰
ide
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.map {
throw RuntimeException("runtime exception")
}
.compose(RxScheduler.sync())
.subscribe(object : Observer<List<ZooData>> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: List<ZooData>) {
}
override fun onError(e: Throwable) {
Log.d("error", e.message)
}
})
複製代碼
結果:會觸發 Observer 的 onError,App 未崩潰
ui
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync())
.subscribe({
throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
}, {
Log.d("Error", it.message)
})
複製代碼
結果 A:有 errorConsumer 觸發 errorConsumer,App 未崩潰
this
apiService.newJsonKeyData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync())
.subscribe {
throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
}
複製代碼
結果 B:無 errorConsumer,App 崩潰
spa
那麼爲何會出現這些不一樣狀況呢?咱們從源碼中去一探究竟。
subscribe()
傳入 consumer 類型參數最終在 Observable
中會將傳入的參數轉換爲 LambdaObserver
再調用 subscribe(lambdaObserver)
進行訂閱。展開 LambdaObserver
:(主要看 onNext 和 onError 方法中的處理)
.
.
.
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}
.
.
.
複製代碼
onNext
中調用了對應 consumer 的 apply()
方法,而且進行了 try catch。所以咱們在 consumer 中進行的工做拋出異常會被捕獲觸發 LambdaObserver 的 onError
。再看 onError
中,若是訂閱未取消且 errorConsumer 的 apply()
執行無異常則能正常走完事件流,不然會調用 RxJavaPlugins.onError(t)
。看到這裏應該就能明白了,當訂閱時未傳入 errorConsumer時 Observable
會指定 OnErrorMissingConsumer
爲默認的 errorConsumer,發生異常時拋出 OnErrorNotImplementedException
。
上面分析,發現異常最終會流向 RxJavaPlugins.onError(t)。這個方法爲 RxJava2 提供的一個全局的靜態方法。
public static void onError(@NonNull Throwable error) {
Consumer<? super Throwable> f = errorHandler;
if (error == null) {
error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
} else {
if (!isBug(error)) {
error = new UndeliverableException(error);
}
}
if (f != null) {
try {
f.accept(error);
return;
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); TODO decide
e.printStackTrace(); // NOPMD
uncaught(e);
}
}
error.printStackTrace(); // NOPMD
uncaught(error);
}
複製代碼
查看其源碼發現,當 errorHandler
不爲空時異常將由其消耗掉,爲空或者消耗過程產生新的異常則 RxJava 會將異常拋給虛擬機(可能致使程序崩潰)。 errorHandler
自己是一個 Consumer 對象,咱們能夠經過以下方式配置他:
RxJavaPlugins.setErrorHandler(object : Consumer1<Throwable> {
override fun accept(t: Throwable?) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}
})
複製代碼
以 map 操做符爲例,map 操做符實際上 RxJava 是將事件流 hook 了另外一個新的 Observable ObservableMap
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
進入 ObservableMap 類,發現內部訂閱了一個內部靜態類 MapObserver
,重點看 MapObserver
的 onNext
方法
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
複製代碼
onNext
中 try catch 了 mapper.apply(),這個 apply 執行的就是咱們在操做符中實現的 function
方法。所以在 map 之類數據變換操做符中產生異常可以自身捕獲併發送給最終的 Observer。若是此時的訂閱對象中能消耗掉異常則事件流正常走 onError()
結束,若是訂閱方式爲上以節中的 consumer,則崩潰狀況爲上一節中的分析結果。
上述的方式 1
爲一次網絡請求,裏面涉及到線程的切換。方式 2
爲直接 create 一個 Observable
對象,不涉及線程切換,其結果爲線程切換後,觀察者 Observer 的 onNext() 方法中拋出異常沒法觸發 onError(),程序崩潰。
查看 create()
方法源碼,發現內部建立了一個 ObservableCreate
對象,在調用訂閱時會觸發 subscribeActual()
方法。在 subscribeActual()
中再調用咱們 create 時傳入的 ObservableOnSubscribe
對象的 subscribe()
方法來觸發事件流。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 對咱們的觀察者使用 CreateEmitter 進行包裝,內部的觸發方法是相對應的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// source 爲 create 時建立的 ObservableOnSubscribe 匿名內部接口實現類
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
上述代碼中的訂閱過程是使用 try catch 今夕包裹的。訂閱及訂閱觸發後發送的事件流都在一個線程,因此可以捕獲整個事件流中的異常。(PS : 你們能夠嘗試下使用 observeOn() 切換事件發送線程。會發現異常不能再捕獲,程序崩潰)
Retrofit 進行網絡請求返回的 Observable 對象實質上是 RxJava2CallAdapter
中生成的 BodyObservable
,期內部的 onNext
是沒有進行異常捕獲的。其實這裏是否捕獲並非程序崩潰的根本緣由,由於進行網絡請求,必然是涉及到線程切換的。就算此處 try catch 處理了,也並不能捕獲到事件流下游的異常。
@Override public void onNext(Response<R> response) {
if (response.isSuccessful()) {
observer.onNext(response.body());
} else {
terminated = true;
Throwable t = new HttpException(response);
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
複製代碼
以咱們在最終的 Observer 的 onNext 拋出異常爲例,要捕獲此次異常那麼必須在最終的調用線程中去進行捕獲。即 .observeOn(AndroidSchedulers.mainThread())
切換過來的 Android 主線程。與其餘操做符同樣,線程切換時產生了一組新的訂閱關係,RxJava 內部會建立一個新的觀察對象 ObservableObserveOn
。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
.
.
.
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); // 執行 ObservableObserveOn 的 run 方法
}
}
.
.
.
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
複製代碼
而執行任務的 worker 即爲對應線程 Scheduler 的對應實現子類所建立的 Worker,以 AndroidSchedulers.mainThread()
爲例,Scheduler 實現類爲 HandlerScheduler
,其對應 Worker 爲 HandlerWorker
,最終任務交給 ScheduledRunnable
來執行。
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed; // Tracked solely for isDisposed().
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
handler.removeCallbacks(this);
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
複製代碼
會發現,run 中 進行了 try catch。但 catch 內消化異常使用的是全局異常處理 RxJavaPlugins.onError(t);
,而不是某一個觀察者的 onError
。因此在通過切換線程操做符後,觀察者 onNext 中拋出的異常,onError 沒法捕獲。
既然知道了問題所在,那麼處理問題的方案也就十分清晰了。 一、註冊全局的異常處理
RxJavaPlugins.setErrorHandler(object : Consumer<Throwable> {
override fun accept(t: Throwable?) {
// do something
}
})
複製代碼
二、Consumer 做爲觀察者時,不徹底肯定沒有異常必定要添加異常處理 Consumer
apiService.stringData()
.doOnSubscribe { t -> compositeDisposable.add(t) }
.compose(RxScheduler.sync())
.subscribe(Consumer<Boolean>{ }, Consumer<Throwable> { })
複製代碼
三、Observer 能夠建立一個 BaseObaerver 將 onNext 內部進行 try catch 人爲的流轉到 onError 中,項目中的觀察這都使用這個 BaseObserver 的子類。
@Override
public void onNext(T t) {
try {
onSuccess(t);
} catch (Exception e) {
onError(e);
}
data = t;
success = true;
}
複製代碼