最近寫了一個鏈路追蹤Demo分享下,實現了鏈路追蹤過程當中數據的記錄,還有能擴展的地方,後期再繼續補充。java
原理參考上面文章 《Dubbo鏈路追蹤——生成全局ID(traceId)》git
源碼地址github
實現鏈路追蹤的目的json
- 服務調用的流程信息,定位服務調用鏈
- 記錄調用入參及返回值信息,方便問題重現
- 記錄調用時間線,代碼重構及調優處理
- 調用信息統計
分佈式跟蹤系統還有其餘比較成熟的實現,例如:Naver的Pinpoint、Apache的HTrace、阿里的鷹眼Tracing、京東的Hydra、新浪的Watchman,美團點評的CAT,skywalking等。 本次主要利用Dubbo數據傳播特性擴展Filter接口來實現鏈路追蹤的目的api
重點主要是zipkin及brave使用及特性,當前brave版本爲 5.2.0 爲 2018年8月份發佈的release版本 , zipkin版本爲2.2.1 所需JDK爲1.8服務器
下載最新的zipkin並啓動併發
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec' java -jar zipkin.jar
輸入 http://localhost:9411/zipkin/ 進入WebUI界面以下 app
代碼的初步版本:方便描述異步
import brave.Span; import brave.Tracer; import brave.Tracing; import brave.propagation.*; import brave.sampler.Sampler; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.extension.Activate; import com.alibaba.dubbo.common.json.JSON; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.remoting.exchange.ResponseCallback; import com.alibaba.dubbo.rpc.*; import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter; import com.alibaba.dubbo.rpc.support.RpcUtils; import zipkin2.codec.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.Sender; import zipkin2.reporter.okhttp3.OkHttpSender; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * Created with IntelliJ IDEA. * * @author: bakerZhu * @description: * @modifytime: */ @Activate(group = {Constants.PROVIDER, Constants.CONSUMER}) public class TracingFilter implements Filter { private static final Logger log = LoggerFactory.getLogger(TracingFilter.class); private static Tracing tracing; private static Tracer tracer; private static TraceContext.Extractor<Map<String, String>> extractor; private static TraceContext.Injector<Map<String, String>> injector; static final Propagation.Getter<Map<String, String>, String> GETTER = new Propagation.Getter<Map<String, String>, String>() { @Override public String get(Map<String, String> carrier, String key) { return carrier.get(key); } @Override public String toString() { return "Map::get"; } }; static final Propagation.Setter<Map<String, String>, String> SETTER = new Propagation.Setter<Map<String, String>, String>() { @Override public void put(Map<String, String> carrier, String key, String value) { carrier.put(key, value); } @Override public String toString() { return "Map::set"; } }; static { // 1 Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans"); // 2 AsyncReporter asyncReporter = AsyncReporter.builder(sender) .closeTimeout(500, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); // 3 tracing = Tracing.newBuilder() .localServiceName("tracer-client") .spanReporter(asyncReporter) .sampler(Sampler.ALWAYS_SAMPLE) .propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name")) .build(); tracer = tracing.tracer(); // 4 // 4.1 extractor = tracing.propagation().extractor(GETTER); // 4.2 injector = tracing.propagation().injector(SETTER); } public TracingFilter() { } @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { RpcContext rpcContext = RpcContext.getContext(); // 5 Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT; final Span span; if (kind.equals(Span.Kind.CLIENT)) { //6 span = tracer.nextSpan(); //7 injector.inject(span.context(), invocation.getAttachments()); } else { //8 TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments()); //9 span = extracted.context() != null ? tracer.joinSpan(extracted.context()) : tracer.nextSpan(extracted); } if (!span.isNoop()) { span.kind(kind).start(); //10 String service = invoker.getInterface().getSimpleName(); String method = RpcUtils.getMethodName(invocation); span.kind(kind); span.name(service + "/" + method); InetSocketAddress remoteAddress = rpcContext.getRemoteAddress(); span.remoteIpAndPort( remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName(),remoteAddress.getPort()); } boolean isOneway = false, deferFinish = false; try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)){ //11 collectArguments(invocation, span, kind); Result result = invoker.invoke(invocation); if (result.hasException()) { onError(result.getException(), span); } // 12 isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation); // 13 Future<Object> future = rpcContext.getFuture(); if (future instanceof FutureAdapter) { deferFinish = true; ((FutureAdapter) future).getFuture().setCallback(new FinishSpanCallback(span));// 14 } return result; } catch (Error | RuntimeException e) { onError(e, span); throw e; } finally { if (isOneway) { // 15 span.flush(); } else if (!deferFinish) { // 16 span.finish(); } } } static void onError(Throwable error, Span span) { span.error(error); if (error instanceof RpcException) { span.tag("dubbo.error_msg", RpcExceptionEnum.getMsgByCode(((RpcException) error).getCode())); } } static void collectArguments(Invocation invocation, Span span, Span.Kind kind) { if (kind == Span.Kind.CLIENT) { StringBuilder fqcn = new StringBuilder(); Object[] args = invocation.getArguments(); if (args != null && args.length > 0) { try { fqcn.append(JSON.json(args)); } catch (IOException e) { log.warn(e.getMessage(), e); } } span.tag("args", fqcn.toString()); } } static final class FinishSpanCallback implements ResponseCallback { final Span span; FinishSpanCallback(Span span) { this.span = span; } @Override public void done(Object response) { span.finish(); } @Override public void caught(Throwable exception) { onError(exception, span); span.finish(); } } // 17 private enum RpcExceptionEnum { UNKNOWN_EXCEPTION(0, "unknown exception"), NETWORK_EXCEPTION(1, "network exception"), TIMEOUT_EXCEPTION(2, "timeout exception"), BIZ_EXCEPTION(3, "biz exception"), FORBIDDEN_EXCEPTION(4, "forbidden exception"), SERIALIZATION_EXCEPTION(5, "serialization exception"),; private int code; private String msg; RpcExceptionEnum(int code, String msg) { this.code = code; this.msg = msg; } public static String getMsgByCode(int code) { for (RpcExceptionEnum error : RpcExceptionEnum.values()) { if (code == error.code) { return error.msg; } } return null; } } }
測試項async
- Dubbo sync async oneway 測試
- RPC異常測試
- 普通業務異常測試
- 併發測試
POM依賴添加
<dependency> <groupId>com.github.baker</groupId> <artifactId>Tracing</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
資源目錄根路徑下添加tracing.properties文件 一次調用信息
調用鏈
調用成功失敗彙總
zipkinHost 指定zipkin服務器IP:PORT 默認爲localhost:9411 serviceName 指定應用名稱 默認爲trace-default
調用鏈:
待擴展項
- 抽象數據傳輸(擴展Kafka數據傳輸)
- 調用返回值數據打印
- 更靈活的配置方式