不論是在響應式編程仍是普通的程序設計中,異常處理都是一個很是重要的方面。今天將會給你們介紹Reactor中異常的處理流程。java
先舉一個例子,咱們建立一個Flux,在這個Flux中,咱們產生一個異常,看看是什麼狀況:react
Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println);
咱們會獲得一個異常ErrorCallbackNotImplemented:git
100 / 1 = 100 100 / 2 = 50 reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
那怎麼處理這個異常呢?github
有兩種方式,第一種方式就是咱們以前文章講過的,在subscribe的時候指定onError方法:編程
Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println, error -> System.err.println("Error: " + error));
仍是剛纔的代碼,可是此次咱們在subscribe的時候,添加了onError處理器,看下運行結果:app
Divided by zero :( 100 / 1 = 100 100 / 2 = 50 Error: java.lang.ArithmeticException: / by zero
能夠看到異常已經被咱們捕獲了,而且進行了合適的處理。ide
除了在subscribe中進行處理,咱們還能夠在publish的時候,就指定異常的處理模式,這就是咱們要介紹的第二種方法:oop
Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorReturn("Divided by zero :("); flux.subscribe(System.out::println);
上面的例子中,在建立Flux的時候,手動指定了其onErrorReturn方法,咱們看下輸出結果:設計
100 / 1 = 100 100 / 2 = 50 Divided by zero :(
注意,對於Flux或者Mono來講,全部的異常都是一個終止的操做,即便你使用了異常處理,原生成序列也不會繼續。可是若是你對異常進行了處理,那麼它會將oneError信號轉換成爲新的序列的開始,並將替換掉以前上游產生的序列。code
在通常的程序中,咱們的異常應該怎麼處理呢?你們很容易想到的是try catch。而Reactor中subscribe的onError方法,就是try catch的一個具體應用:
Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println, error -> System.err.println("Error: " + error));
仍是上的例子,咱們在onError方法中,對異常進行了處理。
若是轉換成爲常規代碼,應該是下面的樣子:
public void normalErrorHandle(){ try{ Arrays.asList(1,2,0).stream().map(i -> "100 / " + i + " = " + (100 / i)).forEach(System.out::println); }catch (Exception e){ System.err.println("Error: " + e); } }
除了這種最基本的異常處理方法以外,Reactor還提供了不少種不一樣的異常處理方法,下面咱們來一一介紹一下。
Static Fallback Value的意思是,在遇到異常的時候會fallback到一個靜態的默認值。好比咱們以前講到的onErrorReturn。
Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorReturn("Divided by zero :(");
固然onErrorReturn還支持一個Predicate參數,用來判斷要falback的異常是否知足條件。
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
除了fallback Value以外,還支持Fallback Method。也就是說若是你想在捕獲異常以後調用其餘的方法,就可使用Fallback Method。
這裏Fallback Method是用onErrorResume來表示的。
public void useFallbackMethod(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(e -> System.out::println); flux.subscribe(System.out::println); }
所謂的動態Fallback Value就是根據你拋出的異常進行判斷,經過定位不一樣的Error從而fallback到不一樣的值:
public void useDynamicFallback(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(error -> Mono.just( MyWrapper.fromError(error))); } public static class MyWrapper{ public static String fromError(Throwable error){ return "That is a new Error"; } }
一樣的,咱們能夠在捕獲異常以後進行rethrow:
Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(error -> Flux.error( new RuntimeException("oops, ArithmeticException!", error))); Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorMap(error -> new RuntimeException("oops, ArithmeticException!", error));
有兩種方式,第一種就是在onErrorResume中使用Flux.error構建一個新的Flux,另一種就是直接在onErrorMap中進行處理。
有時候你只是想記錄一下異常信息,並不想破壞原來的React結構,那麼能夠試着使用doOnError。
public void useDoOnError(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .doOnError(error -> System.out.println("we got the error: "+ error)); }
若是咱們在代碼中使用了某些資源,通常狀況下咱們須要在finally中對其進行關閉,或者使用JDK7中引入的 try-with-resource 。
舉個例子,下面的是使用finally的方式:
Stats stats = new Stats(); stats.startTimer(); try { doSomethingDangerous(); } finally { stats.stopTimerAndRecordTiming(); }
下面是使用try-with-resource的方式:
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) { return disposableInstance.toString(); }
那麼在Reactor中,咱們也有兩種方式和其對應。
第一種就是doFinally方法:
Stats stats = new Stats(); LongAdder statsCancel = new LongAdder(); Flux<String> flux = Flux.just("foo", "bar") .doOnSubscribe(s -> stats.startTimer()) .doFinally(type -> { stats.stopTimerAndRecordTiming(); if (type == SignalType.CANCEL) statsCancel.increment(); }) .take(1);
上面的例子中,doFinally實際上作的就是finally block作的事情。
第二種是使用using,咱們先看一個using的定義:
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
能夠看到using支持三個參數,resourceSupplier是一個生成器,用來在subscribe的時候生成要發送的resource對象。
sourceSupplier是一個生成Publisher的工廠,接收resourceSupplier傳過來的resource,而後生成Publisher對象。
resourceCleanup用來對resource進行收尾操做。
那麼咱們怎麼用呢?
舉個例子:
public void useUsing(){ AtomicBoolean isDisposed = new AtomicBoolean(); Disposable disposableInstance = new Disposable() { @Override public void dispose() { isDisposed.set(true); } @Override public String toString() { return "DISPOSABLE"; } }; Flux<String> flux = Flux.using( () -> disposableInstance, disposable -> Flux.just(disposable.toString()), Disposable::dispose); }
上面的例子中,咱們建立了一個Disposable對象,做爲resource,而後對這個resource進行加工,返回一個Flux<String>對象,最後經過調用Disposable::dispose方法,對resource進行銷燬。
有時候咱們遇到了異常,可能須要重試幾回,Reactor爲咱們提供了retry方法,先看一個例子:
public void testRetry(){ Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3){ return "tick " + input; } throw new RuntimeException("boom"); }) .retry(1) .elapsed() .subscribe(System.out::println, System.err::println); try { Thread.sleep(2100); } catch (InterruptedException e) { e.printStackTrace(); } }
看下輸出結果:
[264,tick 0] [255,tick 1] [241,tick 2] [506,tick 0] [252,tick 1] [253,tick 2] java.lang.RuntimeException: boom
retry的做用就是當遇到異常的時候,重啓一個新的序列。
elapsed是用來展現產生的value時間之間的duration。
從結果咱們能夠看到,retry以前是不會產生異常信息的。
本文的例子learn-reactive
本文做者:flydean程序那些事本文連接:http://www.flydean.com/reactor-handle-errors/
本文來源:flydean的博客
歡迎關注個人公衆號:「程序那些事」最通俗的解讀,最深入的乾貨,最簡潔的教程,衆多你不知道的小技巧等你來發現!