收集器抽象sql
因爲zipkin支持http以及kafka兩種方式上報數據,因此在配置上須要作下抽象。bootstrap
AbstractZipkinCollectorConfigurationapi
主要是針對下面兩種收集方式的一些配置上的定義,最核心的是Sender接口的定義,http與kafka是兩類徹底不一樣的實現。服務器
public abstract Sender getSender();
其次是協助性的構造函數,主要是配合構建收集器所須要的一些參數。架構
若是是http收集,那麼對應的是zipkin api域名,若是是kafka,對應的是kafka集羣的地址併發
僅在收集方式爲kafka是有效,http時傳空值便可。異步
public AbstractZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic){ this.zipkinUrl=zipkinUrl; this.serviceName=serviceName; this.topic=topic; this.tracing=this.tracing(); }
配置上報方式,這裏統一採用異常上傳,而且配置上報的超時時間。分佈式
protected AsyncReporter<Span> spanReporter() { return AsyncReporter .builder(getSender()) .closeTimeout(500, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); }
下面這兩方法,是配合應用構建span使用的。ide
注意那個sampler()方法,默認是什麼也不作的意思,咱們要想看到數據就須要配置成Sampler.ALWAYS_SAMPLE,這樣才能真正將數據上報到zipkin服務器。
protected Tracing tracing() { this.tracing= Tracing .newBuilder() .localServiceName(this.serviceName) .sampler(Sampler.ALWAYS_SAMPLE) .spanReporter(spanReporter()) .build(); return this.tracing; } protected Tracing getTracing(){ return this.tracing; }
HttpZipkinCollectorConfiguration函數
主要是實現getSender方法,能夠借用OkHttpSender這個對象來快速構建,api版本採用v2。
public class HttpZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration { public HttpZipkinCollectorConfiguration(String serviceName,String zipkinUrl) { super(serviceName,zipkinUrl,null); } @Override public Sender getSender() { return OkHttpSender.create(super.getZipkinUrl()+"/api/v2/spans"); } }
OkHttpSender這個類須要引用這個包
<dependency> <groupId>io.zipkin.reporter2</groupId> <artifactId>zipkin-sender-okhttp3</artifactId> <version>${zipkin-reporter2.version}</version> </dependency>
KafkaZipkinCollectorConfiguration
一樣也是實現getSender方法
public class KafkaZipkinCollectorConfiguration extends AbstractZipkinCollectorConfiguration { public KafkaZipkinCollectorConfiguration(String serviceName,String zipkinUrl,String topic) { super(serviceName,zipkinUrl,topic); } @Override public Sender getSender() { return KafkaSender .newBuilder() .bootstrapServers(super.getZipkinUrl()) .topic(super.getTopic()) .encoding(Encoding.JSON) .build(); } }
KafkaSender這個類須要引用這個包:
<dependency> <groupId>io.zipkin.reporter2</groupId> <artifactId>zipkin-sender-kafka11</artifactId> <version>${zipkin-reporter2.version}</version> </dependency>
收集器工廠
因爲上面建立了兩個收集器配置類,使用時只能是其中之一,因此實際運行的實例須要根據配置來動態生成。ZipkinCollectorConfigurationFactory就是負責生成收集器實例的。
private final AbstractZipkinCollectorConfiguration zipkinCollectorConfiguration; @Autowired public ZipkinCollectorConfigurationFactory(TraceConfig traceConfig){ if(Objects.equal("kafka", traceConfig.getZipkinSendType())){ zipkinCollectorConfiguration=new KafkaZipkinCollectorConfiguration( traceConfig.getApplicationName(), traceConfig.getZipkinUrl(), traceConfig.getZipkinKafkaTopic()); } else { zipkinCollectorConfiguration = new HttpZipkinCollectorConfiguration( traceConfig.getApplicationName(), traceConfig.getZipkinUrl()); } }
經過構建函數將咱們的配置類TraceConfig注入進來,而後根據發送方式來構建實例。另外提供一個輔助函數:
public Tracing getTracing(){ return this.zipkinCollectorConfiguration.getTracing(); }
過濾器
在dubbo的過濾器中實現數據上傳的功能邏輯相對簡單,通常都在invoke方法執行前記錄數據,而後方法執行完成後再次記錄數據。這個邏輯不變,有變化的是數據上報的實現,上一個版本是經過發http請求實現須要編碼,如今能夠直接借用brave所提供的span來幫助咱們完成,有兩重要的方法:
方法源碼以下,在完成的時候會填寫上完成的時間並上報數據,這通常應用於同步調用場景。
public void finish(TraceContext context, long finishTimestamp) { MutableSpan span = this.spanMap.remove(context); if(span != null && !this.noop.get()) { synchronized(span) { span.finish(Long.valueOf(finishTimestamp)); this.reporter.report(span.toSpan()); } } }
public void flush(TraceContext context) { MutableSpan span = this.spanMap.remove(context); if(span != null && !this.noop.get()) { synchronized(span) { span.finish((Long)null); this.reporter.report(span.toSpan()); } } }
消費者
作爲消費方,有一個核心功能就是將traceId以及spanId傳遞到服務提供方,這裏仍是經過dubbo提供的附加參數方式實現。
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if(!RpcTraceContext.getTraceConfig().isEnabled()){ return invoker.invoke(invocation); } ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory= SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class); Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer(); if(null==RpcTraceContext.getTraceId()){ RpcTraceContext.start(); RpcTraceContext.setTraceId(IdUtils.get()); RpcTraceContext.setParentId(null); RpcTraceContext.setSpanId(IdUtils.get()); } else { RpcTraceContext.setParentId(RpcTraceContext.getSpanId()); RpcTraceContext.setSpanId(IdUtils.get()); } TraceContext traceContext= TraceContext.newBuilder() .traceId(RpcTraceContext.getTraceId()) .parentId(RpcTraceContext.getParentId()) .spanId(RpcTraceContext.getSpanId()) .sampled(true) .build(); Span span=tracer.toSpan(traceContext).start(); invocation.getAttachments().put(RpcTraceContext.TRACE_ID_KEY, String.valueOf(span.context().traceId())); invocation.getAttachments().put(RpcTraceContext.SPAN_ID_KEY, String.valueOf(span.context().spanId())); Result result = invoker.invoke(invocation); span.finish(); return result; }
提供者
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if(!RpcTraceContext.getTraceConfig().isEnabled()){ return invoker.invoke(invocation); } Map<String, String> attaches = invocation.getAttachments(); if (!attaches.containsKey(RpcTraceContext.TRACE_ID_KEY)){ return invoker.invoke(invocation); } Long traceId = Long.valueOf(attaches.get(RpcTraceContext.TRACE_ID_KEY)); Long spanId = Long.valueOf(attaches.get(RpcTraceContext.SPAN_ID_KEY)); attaches.remove(RpcTraceContext.TRACE_ID_KEY); attaches.remove(RpcTraceContext.SPAN_ID_KEY); RpcTraceContext.start(); RpcTraceContext.setTraceId(traceId); RpcTraceContext.setParentId(spanId); RpcTraceContext.setSpanId(IdUtils.get()); ZipkinCollectorConfigurationFactory zipkinCollectorConfigurationFactory= SpringContextUtils.getApplicationContext().getBean(ZipkinCollectorConfigurationFactory.class); Tracer tracer= zipkinCollectorConfigurationFactory.getTracing().tracer(); TraceContext traceContext= TraceContext.newBuilder() .traceId(RpcTraceContext.getTraceId()) .parentId(RpcTraceContext.getParentId()) .spanId(RpcTraceContext.getSpanId()) .sampled(true) .build(); Span span = tracer.toSpan(traceContext).start(); Result result = invoker.invoke(invocation); span.finish(); return result; }
異常流程
上面不管是消費者的過濾器仍是服務提供者的過濾器,均未考慮服務在調用invoker.invoke時出錯的場景,若是出錯,後面的span.finish方法將不會按預期執行,也就記錄不了信息。因此須要針對此問題作優化:能夠在finally塊中執行finish方法。
try { result = invoker.invoke(invocation); } finally { span.finish(); }
消費者在調用服務時,異步調用問題
上面過濾器中調用span.finish都是基於同步模式,而因爲dubbo除了同步調用外還提供了兩種調用方式
只發起請求並不等待結果的異步調用,無callback一說
針對上面兩類異步再加上同步調用,咱們要想準確記錄服務真正的時間,須要在消費方的過濾器中作以下處理:
建立一個用於回調的處理類,它的主要目的是爲了在回調成功時記錄時間,這裏不管是成功仍是失敗。
private class AsyncSpanCallback implements ResponseCallback{ private Span span; public AsyncSpanCallback(Span span){ this.span=span; } @Override public void done(Object o) { span.finish(); } @Override public void caught(Throwable throwable) { span.finish(); } }
再在調用invoke方法時,若是是oneway方式,則調用flush方法結果,若是是同步則直接調用finish方法,若是是異步則在回調時調用finish方法。
Result result = null; boolean isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation); try { result = invoker.invoke(invocation); } finally { if(isOneway) { span.flush(); } else if(!isAsync) { span.finish(); } }
歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!