dubbo+zipkin調用鏈監控(二)

去年的時候寫過dubbo+zipkin調用鏈監控,最近看到zipkin2配合brave實現起來會比我以前的實現要簡單不少,由於brave將不少交互的內容都封裝起來了,不須要本身去寫具體的實現,好比如何去構建span,如何去上報數據。css

收集器抽象

因爲zipkin支持http以及kafka兩種方式上報數據,因此在配置上須要作下抽象。html

AbstractZipkinCollectorConfiguration

主要是針對下面兩種收集方式的一些配置上的定義,最核心的是Sender接口的定義,http與kafka是兩類徹底不一樣的實現。java

public abstract Sender getSender();

其次是協助性的構造函數,主要是配合構建收集器所須要的一些參數。git

  • zipkinUrl

若是是http收集,那麼對應的是zipkin api域名,若是是kafka,對應的是kafka集羣的地址github

  • topic

僅在收集方式爲kafka是有效,http時傳空值便可。web

public AbstractZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic){
    this.zipkinUrl=zipkinUrl;
    this.serviceName=serviceName;
    this.topic=topic;
    this.tracing=this.tracing();
}

配置上報方式,這裏統一採用異常上傳,而且配置上報的超時時間。bootstrap

protected AsyncReporter<Span> spanReporter() {
    return AsyncReporter
            .builder(getSender())
            .closeTimeout(500, TimeUnit.MILLISECONDS)
            .build(SpanBytesEncoder.JSON_V2);
}

下面這兩方法,是配合應用構建span使用的。api

注意那個sampler()方法,默認是什麼也不作的意思,咱們要想看到數據就須要配置成Sampler.ALWAYS_SAMPLE,這樣才能真正將數據上報到zipkin服務器。服務器

protected Tracing tracing() {
    this.tracing= Tracing
            .newBuilder()
            .localServiceName(this.serviceName)
            .sampler(Sampler.ALWAYS_SAMPLE)
            .spanReporter(spanReporter())
            .build();
    return this.tracing;
}

protected Tracing getTracing(){
    return this.tracing;
}

HttpZipkinCollectorConfiguration

主要是實現getSender方法,能夠借用OkHttpSender這個對象來快速構建,api版本採用v2。異步

public class HttpZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public HttpZipkinCollectorConfiguration(String serviceName,String zipkinUrl) {
        super(serviceName,zipkinUrl,null);
    }

    @Override
    public Sender getSender() {
        return OkHttpSender.create(super.getZipkinUrl()+"/api/v2/spans");
    }
}

OkHttpSender這個類須要引用這個包

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-okhttp3</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

KafkaZipkinCollectorConfiguration

一樣也是實現getSender方法

public class KafkaZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration {
    public KafkaZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic) {
        super(serviceName,zipkinUrl,topic);
    }

    @Override
    public Sender getSender() {

        return KafkaSender
                .newBuilder()
                .bootstrapServers(super.getZipkinUrl())
                .topic(super.getTopic())
                .encoding(Encoding.JSON)
                .build();
    }
}

KafkaSender這個類須要引用這個包:

<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-sender-kafka11</artifactId>
    <version>${zipkin-reporter2.version}</version>
</dependency>

收集器工廠

因爲上面建立了兩個收集器配置類,使用時只能是其中之一,因此實際運行的實例須要根據配置來動態生成。ZipkinCollectorConfigurationFactory就是負責生成收集器實例的。

private final AbstractZipkinCollectorConfiguration zipkinCollectorConfiguration;

@Autowired
public ZipkinCollectorConfigurationFactory(TraceConfig traceConfig){
    if(Objects.equal("kafka", traceConfig.getZipkinSendType())){
        zipkinCollectorConfiguration=new KafkaZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl(),
                traceConfig.getZipkinKafkaTopic());
    }
    else {
        zipkinCollectorConfiguration = new HttpZipkinCollectorConfiguration(
                traceConfig.getApplicationName(),
                traceConfig.getZipkinUrl());
    }
}

經過構建函數將咱們的配置類TraceConfig注入進來,而後根據發送方式來構建實例。另外提供一個輔助函數:

public Tracing getTracing(){
    return this.zipkinCollectorConfiguration.getTracing();
}

過濾器

在dubbo的過濾器中實現數據上傳的功能邏輯相對簡單,通常都在invoke方法執行前記錄數據,而後方法執行完成後再次記錄數據。這個邏輯不變,有變化的是數據上報的實現,上一個版本是經過發http請求實現須要編碼,如今能夠直接借用brave所提供的span來幫助咱們完成,有兩重要的方法:

  • finish

方法源碼以下,在完成的時候會填寫上完成的時間並上報數據,這通常應用於同步調用場景。

public void finish(TraceContext context, long finishTimestamp) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish(Long.valueOf(finishTimestamp));
            this.reporter.report(span.toSpan());
        }
    }
}
  • flush 與上面finish方法的不一樣點在於,在報數據時沒有完成時間,這應該是適用於一些異步調用但不關心結果的場景,好比dubbo所提供的oneway方式調用。
public void flush(TraceContext context) {
    MutableSpan span = this.spanMap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish((Long)null);
            this.reporter.report(span.toSpan());
        }
    }
}

消費者

作爲消費方,有一個核心功能就是將traceId以及spanId傳遞到服務提供方,這裏仍是經過dubbo提供的附加參數方式實現。

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    if(!RpcTraceContext.getTraceConfig().isEnabled()){
        return invoker.invoke(invocation);
    }

    ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
            SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
    Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();

    if(null==RpcTraceContext.getTraceId()){
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(IdUtils.get());
        RpcTraceContext.setParentId(null);
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    else {
        RpcTraceContext.setParentId(RpcTraceContext.getSpanId());
        RpcTraceContext.setSpanId(IdUtils.get());
    }
    TraceContext traceContext= TraceContext.newBuilder()
            .traceId(RpcTraceContext.getTraceId())
            .parentId(RpcTraceContext.getParentId())
            .spanId(RpcTraceContext.getSpanId())
            .sampled(true)
            .build();

    Span span=tracer.toSpan(traceContext).start();

    invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId()));
    invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId()));

    Result result = invoker.invoke(invocation);

    span.finish();

    return result;
}

提供者

@Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if(!RpcTraceContext.getTraceConfig().isEnabled()){
            return invoker.invoke(invocation);
        }

        Map<String, String> attaches = invocation.getAttachments();
        if (!attaches.containsKey(RpcTraceContext.TRACE_ID_KEY)){
            return invoker.invoke(invocation);
        }

        Long traceId = Long.valueOf(attaches.get(RpcTraceContext.TRACE_ID_KEY));
        Long spanId = Long.valueOf(attaches.get(RpcTraceContext.SPAN_ID_KEY));

        attaches.remove(RpcTraceContext.TRACE_ID_KEY);
        attaches.remove(RpcTraceContext.SPAN_ID_KEY);
        RpcTraceContext.start();
        RpcTraceContext.setTraceId(traceId);
        RpcTraceContext.setParentId(spanId);
        RpcTraceContext.setSpanId(IdUtils.get());

        ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory=
                SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class);
        Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer();

        TraceContext traceContext= TraceContext.newBuilder()
                .traceId(RpcTraceContext.getTraceId())
                .parentId(RpcTraceContext.getParentId())
                .spanId(RpcTraceContext.getSpanId())
                .sampled(true)
                .build();
        Span span = tracer.toSpan(traceContext).start();

        Result result = invoker.invoke(invocation);

        span.finish();

        return result;

    }

異常流程

上面不管是消費者的過濾器仍是服務提供者的過濾器,均未考慮服務在調用invoker.invoke時出錯的場景,若是出錯,後面的span.finish方法將不會按預期執行,也就記錄不了信息。因此須要針對此問題作優化:能夠在finally塊中執行finish方法。

try {
    result = invoker.invoke(invocation);
}
finally {
    span.finish();
}

消費者在調用服務時,異步調用問題

上面過濾器中調用span.finish都是基於同步模式,而因爲dubbo除了同步調用外還提供了兩種調用方式

  • 異步調用 經過callback機制的異步

  • oneway

只發起請求並不等待結果的異步調用,無callback一說

針對上面兩類異步再加上同步調用,咱們要想準確記錄服務真正的時間,須要在消費方的過濾器中作以下處理:

建立一個用於回調的處理類,它的主要目的是爲了在回調成功時記錄時間,這裏不管是成功仍是失敗。

private class AsyncSpanCallback implements ResponseCallback{

    private Span span;

    public AsyncSpanCallback(Span span){
        this.span=span;
    }

    @Override
    public void done(Object o) {
        span.finish();
    }

    @Override
    public void caught(Throwable throwable) {
        span.finish();
    }
}

再在調用invoke方法時,若是是oneway方式,則調用flush方法結果,若是是同步則直接調用finish方法,若是是異步則在回調時調用finish方法。

Result result = null;
boolean isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
try {
    result = invoker.invoke(invocation);
}
finally {
    if(isOneway) {
        span.flush();
    }
    else if(!isAsync) {
        span.finish();
    }
}

待完善問題

過濾器中生成span的方式應該有更好的方法,尚未對brave作過多研究,後續想辦法再優化下。另外我測試的場景是consumer調用provider,provider內部再調用provider2,我測試時發現第三步調用傳遞的parentId好像有點小問題,後續須要再確認下。

代碼下載

https://github.com/jiangmin168168/jim-framework

相關文章
相關標籤/搜索