(17)Reactor的調試——響應式Spring的道法術器

本系列文章索引《響應式Spring的道法術器》
前情提要 Reactor 3快速上手 | 響應式流規範
本文測試源碼java

2.7 調試

在響應式編程中,調試是塊難啃的骨頭,這也是從命令式編程到響應式編程的切換過程當中,學習曲線最陡峭的地方。react

在命令式編程中,方法的調用關係擺在面上,咱們一般能夠經過stack trace追蹤的問題出現的位置。可是在異步的響應式編程中,一方面有諸多的調用是在水面如下的,做爲響應式開發庫的使用者是不須要了解的;另外一方面,基於事件的異步響應機制致使stack trace並不是很容易在代碼中按圖索驥的。git

好比下邊的例子:github

@Test
    public void testBug() {
        getMonoWithException()
                .subscribe();
    }
  1. single()方法只能接收一個元素,多了的話就會致使異常。

上邊的代碼會報出以下的異常stack trace:編程

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item

Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.tryOnNext(FluxFilterFuseable.java:129)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.tryOnNext(FluxMapFuseable.java:284)
    at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.fastPath(FluxRange.java:273)
    at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.request(FluxRange.java:251)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:316)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:170)
    at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
    at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:87)
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:79)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:236)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:65)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:60)
    at reactor.core.publisher.FluxFilterFuseable.subscribe(FluxFilterFuseable.java:51)
    at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3077)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3185)
    at reactor.core.publisher.Mono.subscribe(Mono.java:2962)
    at com.getset.Test_2_7.testBug(Test_2_7.java:19)
    ...

比較明顯的信息大概就是那句「Source emitted more than one item」。下邊的內容基本都是在Reactor庫內部的調用,並且上邊的stack trace的問題是出自.subscribe()那一行的。app

若是對響應式流內部的Publisher、Subscriber和Subscription的機制比較熟悉,大概能夠根據subscribe()request()的順序大概猜想出來getMonoWithException()方法內大約通過了.map.filter.range的操做鏈,可是除此以外,確實獲取不到太多信息。異步

另外一方面,命令式編程的方式比較容易使用IDE的調試工具進行單步或斷點調試,而在異步編程方式下,一般也不太好使。ide

以上這些都是在異步的響應式編程中可能會遇到的窘境。解鈴還須繫鈴人,對於響應式編程的調試還須要響應式編程庫自己提供調試工具。異步編程

2.7.1 開啓調試模式

Reactor提供了開啓調試模式的方法。工具

Hooks.onOperatorDebug();

這個方法可以開啓調試模式,從而在拋出異常時打印出一些有用的信息。把這一行加上:

@Test
    public void testBug() {
        Hooks.onOperatorDebug();
        getMonoWithException()
                .subscribe();
    }

這時候,除了上邊的那一套stack trace以外,增長了如下內容:

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
    reactor.core.publisher.Flux.single(Flux.java:6473)
    com.getset.Test_2_7.getMonoWithException(Test_2_7.java:13)
    com.getset.Test_2_7.testBug(Test_2_7.java:19)
Error has been observed by the following operator(s):
    |_  Flux.single(Test_2_7.java:13)

這裏就能夠明確找出問題根源了。

Hooks.onOperatorDebug()的實現原理在於在組裝期包裝各個操做符的構造方法,加入一些監測功能,因此這個 hook 應該在早於聲明的時候被激活,最保險的方式就是在你程序的最開始就激活它。以map操做符爲例:

public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
        if (this instanceof Fuseable) {
            return onAssembly(new FluxMapFuseable<>(this, mapper));
        }
        return onAssembly(new FluxMap<>(this, mapper));
    }

能夠看到,每次在返回新的Flux對象的時候,都會調用onAssembly方法,這裏就是Reactor能夠在組裝期插手「搞事情」的地方。

Hooks.onOperatorDebug()是一種全局性的Hook,會影響到應用中全部的操做符,因此其帶來的性能成本也是比較大的。若是咱們大概知道可能的問題在哪,而對整個應用開啓調試模式,也容易被茫茫多的調試信息淹沒。這時候,咱們須要一種更加精準且廉價的定位方式。

2.7.2 使用 checkpoint() 來定位

若是你知道問題出在哪一個鏈上,可是因爲這個鏈的上游或下游來自其餘的調用,就能夠針對這個鏈使用checkpoint()進行問題定位。

checkpoint()操做符就像一個Hook,不過它的做用範圍僅限於這個鏈上。

@Test
    public void checkBugWithCheckPoint() {
        getMonoWithException()
                .checkpoint()
                .subscribe();
    }

經過增長checkpoint()操做符,仍然能夠打印出調試信息:

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
    reactor.core.publisher.Mono.checkpoint(Mono.java:1367)
    reactor.core.publisher.Mono.checkpoint(Mono.java:1317)
    com.getset.Test_2_7.checkBugWithCheckPoint(Test_2_7.java:25)
Error has been observed by the following operator(s):
    |_  Mono.checkpoint(Test_2_7.java:25)

checkpoint()方法還有變體checkpoint(String description),你能夠傳入一個獨特的字符串以方便在 assembly traceback 中進行識別。 這樣會省略掉stack trace,不過你能夠依賴這個字符串來定位到出問題的組裝點。checkpoint(String) 比 checkpoint 有更低的執行成本。以下:

@Test
    public void checkBugWithCheckPoint2() {
        getMonoWithException()
                .checkpoint("checkBugWithCheckPoint2")
                .subscribe();
    }

加入用於標識的字符串(方法名),輸出以下:

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly site of producer [reactor.core.publisher.MonoSingle] is identified by light checkpoint [I_HATE_BUGS]."description" : "checkBugWithCheckPoint2"

能夠看到這裏確實省略了調試的assembly traceback,可是咱們經過上邊的信息也能夠定位到是single的問題。

上邊的例子比較簡單,當有許多的調試信息打印出來的時候,這個標識字符串可以方便咱們在許多的控制檯輸出中定位到問題。

若是既但願有調試信息assembly traceback,也但願用上標識字符串,還能夠checkpoint(description, true)來實現,第二個參數true標識要打印assembly traceback。

2.7.3 使用log()操做符瞭解執行過程

最後一個方便調試的工具就是咱們前邊屢次用到的log()操做符了,它可以記錄其上游的Flux或 Mono的事件(包括onNextonErroronComplete, 以及onSubscribecancel、和request)。

log操做符能夠經過SLF4J使用相似Log4J和Logback這樣的公共的日誌工具來記錄日誌,若是SLF4J不存在的話,則直接將日誌輸出到控制檯。

控制檯使用 System.err 記錄WARNERROR級別的日誌,使用 System.out 記錄其餘級別的日誌。

相關文章
相關標籤/搜索