本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | 響應式流規範
本文測試源碼java
在響應式編程中,調試是塊難啃的骨頭,這也是從命令式編程到響應式編程的切換過程當中,學習曲線最陡峭的地方。react
在命令式編程中,方法的調用關係擺在面上,咱們一般能夠經過stack trace追蹤的問題出現的位置。可是在異步的響應式編程中,一方面有諸多的調用是在水面如下的,做爲響應式開發庫的使用者是不須要了解的;另外一方面,基於事件的異步響應機制致使stack trace並不是很容易在代碼中按圖索驥的。git
好比下邊的例子:github
@Test public void testBug() { getMonoWithException() .subscribe(); }
single()
方法只能接收一個元素,多了的話就會致使異常。上邊的代碼會報出以下的異常stack trace:編程
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.tryOnNext(FluxFilterFuseable.java:129) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.tryOnNext(FluxMapFuseable.java:284) at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.fastPath(FluxRange.java:273) at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.request(FluxRange.java:251) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:316) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:170) at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94) at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:87) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:79) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:236) at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:65) at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:60) at reactor.core.publisher.FluxFilterFuseable.subscribe(FluxFilterFuseable.java:51) at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58) at reactor.core.publisher.Mono.subscribe(Mono.java:3077) at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185) at reactor.core.publisher.Mono.subscribe(Mono.java:2962) at com.getset.Test_2_7.testBug(Test_2_7.java:19) ...
比較明顯的信息大概就是那句「Source emitted more than one item」。下邊的內容基本都是在Reactor庫內部的調用,並且上邊的stack trace的問題是出自.subscribe()
那一行的。app
若是對響應式流內部的Publisher、Subscriber和Subscription的機制比較熟悉,大概能夠根據subscribe()
或request()
的順序大概猜想出來getMonoWithException()
方法內大約通過了.map.filter.range
的操做鏈,可是除此以外,確實獲取不到太多信息。異步
另外一方面,命令式編程的方式比較容易使用IDE的調試工具進行單步或斷點調試,而在異步編程方式下,一般也不太好使。ide
以上這些都是在異步的響應式編程中可能會遇到的窘境。解鈴還須繫鈴人,對於響應式編程的調試還須要響應式編程庫自己提供調試工具。異步編程
Reactor提供了開啓調試模式的方法。工具
Hooks.onOperatorDebug();
這個方法可以開啓調試模式,從而在拋出異常時打印出一些有用的信息。把這一行加上:
@Test public void testBug() { Hooks.onOperatorDebug(); getMonoWithException() .subscribe(); }
這時候,除了上邊的那一套stack trace以外,增長了如下內容:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoSingle] : reactor.core.publisher.Flux.single(Flux.java:6473) com.getset.Test_2_7.getMonoWithException(Test_2_7.java:13) com.getset.Test_2_7.testBug(Test_2_7.java:19) Error has been observed by the following operator(s): |_ Flux.single(Test_2_7.java:13)
這裏就能夠明確找出問題根源了。
Hooks.onOperatorDebug()
的實現原理在於在組裝期包裝各個操做符的構造方法,加入一些監測功能,因此這個 hook 應該在早於聲明的時候被激活,最保險的方式就是在你程序的最開始就激活它。以map
操做符爲例:
public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) { if (this instanceof Fuseable) { return onAssembly(new FluxMapFuseable<>(this, mapper)); } return onAssembly(new FluxMap<>(this, mapper)); }
能夠看到,每次在返回新的Flux對象的時候,都會調用onAssembly
方法,這裏就是Reactor能夠在組裝期插手「搞事情」的地方。
Hooks.onOperatorDebug()
是一種全局性的Hook,會影響到應用中全部的操做符,因此其帶來的性能成本也是比較大的。若是咱們大概知道可能的問題在哪,而對整個應用開啓調試模式,也容易被茫茫多的調試信息淹沒。這時候,咱們須要一種更加精準且廉價的定位方式。
若是你知道問題出在哪一個鏈上,可是因爲這個鏈的上游或下游來自其餘的調用,就能夠針對這個鏈使用checkpoint()進行問題定位。
checkpoint()
操做符就像一個Hook,不過它的做用範圍僅限於這個鏈上。
@Test public void checkBugWithCheckPoint() { getMonoWithException() .checkpoint() .subscribe(); }
經過增長checkpoint()
操做符,仍然能夠打印出調試信息:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly trace from producer [reactor.core.publisher.MonoSingle] : reactor.core.publisher.Mono.checkpoint(Mono.java:1367) reactor.core.publisher.Mono.checkpoint(Mono.java:1317) com.getset.Test_2_7.checkBugWithCheckPoint(Test_2_7.java:25) Error has been observed by the following operator(s): |_ Mono.checkpoint(Test_2_7.java:25)
checkpoint()
方法還有變體checkpoint(String description)
,你能夠傳入一個獨特的字符串以方便在 assembly traceback 中進行識別。 這樣會省略掉stack trace,不過你能夠依賴這個字符串來定位到出問題的組裝點。checkpoint(String) 比 checkpoint 有更低的執行成本。以下:
@Test public void checkBugWithCheckPoint2() { getMonoWithException() .checkpoint("checkBugWithCheckPoint2") .subscribe(); }
加入用於標識的字符串(方法名),輸出以下:
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Assembly site of producer [reactor.core.publisher.MonoSingle] is identified by light checkpoint [I_HATE_BUGS]."description" : "checkBugWithCheckPoint2"
能夠看到這裏確實省略了調試的assembly traceback,可是咱們經過上邊的信息也能夠定位到是single
的問題。
上邊的例子比較簡單,當有許多的調試信息打印出來的時候,這個標識字符串可以方便咱們在許多的控制檯輸出中定位到問題。
若是既但願有調試信息assembly traceback,也但願用上標識字符串,還能夠checkpoint(description, true)
來實現,第二個參數true
標識要打印assembly traceback。
最後一個方便調試的工具就是咱們前邊屢次用到的log()
操做符了,它可以記錄其上游的Flux或 Mono的事件(包括onNext
、onError
、onComplete
, 以及onSubscribe
、cancel
、和request
)。
log
操做符能夠經過SLF4J使用相似Log4J和Logback這樣的公共的日誌工具來記錄日誌,若是SLF4J不存在的話,則直接將日誌輸出到控制檯。
控制檯使用 System.err 記錄WARN
和ERROR
級別的日誌,使用 System.out 記錄其餘級別的日誌。