brave是zipkin的java客戶端,負責數據收集以及上傳。 首先看下怎麼構造一個brave對象:java
public Brave brave(Reporter<Span> reporter){ //建立Brave builder,並設置server name Brave.Builder builder = new Brave.Builder("server name"); //建立reporter,本例使用http方式上傳到collector,也能夠使用kafka等方式上傳數據 Reporter<Span> reporter = AsyncReporter.builder(URLConnectionSender.create("collector url")).build(); builder.reporter(reporter); //設置採樣率,最大爲1,最小0.01 builder.traceSampler(Sampler.ALWAYS_SAMPLE); //下面看builder.build()建立brave -》 Brave brave = builder.build(); return brave; } private Brave(Builder builder) { //做爲服務端接受外部系統請求的tracer serverTracer = ServerTracer.builder() .randomGenerator(builder.random) .reporter(builder.reporter) .state(builder.state) .traceSampler(builder.sampler) .clock(builder.clock) .traceId128Bit(builder.traceId128Bit) .build(); //做爲客戶端請求外部系統的tracer clientTracer = ClientTracer.builder() .randomGenerator(builder.random) .reporter(builder.reporter) .state(builder.state) .traceSampler(builder.sampler) .clock(builder.clock) .traceId128Bit(builder.traceId128Bit) .build(); //本地tracer,在進程內操做,好比一個file io操做或者一個代碼塊的執行 localTracer = LocalTracer.builder() .randomGenerator(builder.random) .reporter(builder.reporter) .allowNestedLocalSpans(builder.allowNestedLocalSpans) .spanAndEndpoint(SpanAndEndpoint.LocalSpanAndEndpoint.create(builder.state)) .traceSampler(builder.sampler) .clock(builder.clock) .traceId128Bit(builder.traceId128Bit) .build(); //建立ss攔截器 serverRequestInterceptor = new ServerRequestInterceptor(serverTracer); //建立sr攔截器 serverResponseInterceptor = new ServerResponseInterceptor(serverTracer); //建立cs攔截器 clientRequestInterceptor = new ClientRequestInterceptor(clientTracer); //建立cr攔截器 clientResponseInterceptor = new ClientResponseInterceptor(clientTracer); //專門用來提交應用定義的annotation,好比一些自定義的指標 serverSpanAnnotationSubmitter = AnnotationSubmitter.create(SpanAndEndpoint.ServerSpanAndEndpoint.create(builder.state)); //下面的三種binder支持將span綁定到新的線程上 serverSpanThreadBinder = new ServerSpanThreadBinder(builder.state); clientSpanThreadBinder = new ClientSpanThreadBinder(builder.state); localSpanThreadBinder = new LocalSpanThreadBinder(builder.state); }
下面分析brave怎麼集成springmvc的:spring
//BraveApiConfig將Brave的部分屬性轉換成bean暴露出去 @Configuration public class BraveApiConfig { @Autowired Brave brave; @Bean @Scope(value = "singleton") public ClientTracer clientTracer() { return brave.clientTracer(); } @Bean @Scope(value = "singleton") public ServerTracer serverTracer() { return brave.serverTracer(); } @Bean @Scope(value = "singleton") public ClientRequestInterceptor clientRequestInterceptor() { return brave.clientRequestInterceptor(); } @Bean @Scope(value = "singleton") public ClientResponseInterceptor clientResponseInterceptor() { return brave.clientResponseInterceptor(); } @Bean @Scope(value = "singleton") public ServerRequestInterceptor serverRequestInterceptor() { return brave.serverRequestInterceptor(); } @Bean @Scope(value = "singleton") public ServerResponseInterceptor serverResponseInterceptor() { return brave.serverResponseInterceptor(); } @Bean(name = "serverSpanAnnotationSubmitter") @Scope(value = "singleton") public AnnotationSubmitter serverSpanAnnotationSubmitter() { return brave.serverSpanAnnotationSubmitter(); } @Bean @Scope(value = "singleton") public ServerSpanThreadBinder serverSpanThreadBinder() { return brave.serverSpanThreadBinder(); } } @Configuration @Import(BraveApiConfig.class) @EnableWebMvc public class BraveConfig extends WebMvcConfigurerAdapter { //ServerRequestInterceptor等都是Brave內部屬性 @Autowired private ServerRequestInterceptor requestInterceptor; @Autowired private ServerResponseInterceptor responseInterceptor; @Autowired private ServerSpanThreadBinder serverThreadBinder; //添加ServletHandlerInterceptor攔截器,下面看怎麼構建ServletHandlerInterceptor的 -》 @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new ServletHandlerInterceptor(requestInterceptor, responseInterceptor, new DefaultSpanNameProvider(), serverThreadBinder)); } } public class ServletHandlerInterceptor extends HandlerInterceptorAdapter { static final String HTTP_SERVER_SPAN_ATTRIBUTE = ServletHandlerInterceptor.class.getName() + ".server-span"; private final ServerRequestInterceptor requestInterceptor; private final SpanNameProvider spanNameProvider; private final ServerResponseInterceptor responseInterceptor; private final ServerSpanThreadBinder serverThreadBinder; @Autowired public ServletHandlerInterceptor(ServerRequestInterceptor requestInterceptor, ServerResponseInterceptor responseInterceptor, SpanNameProvider spanNameProvider, final ServerSpanThreadBinder serverThreadBinder) { this.requestInterceptor = requestInterceptor; this.spanNameProvider = spanNameProvider; this.responseInterceptor = responseInterceptor; this.serverThreadBinder = serverThreadBinder; } //前置處理 @Override public boolean preHandle(final HttpServletRequest request, final HttpServletResponse response, final Object handler) { //ServerRequestInterceptor前置處理 -》 requestInterceptor.handle(new HttpServerRequestAdapter(new HttpServerRequest() { @Override public String getHttpHeaderValue(String headerName) { return request.getHeader(headerName); } @Override public URI getUri() { try { return new URI(request.getRequestURI()); } catch (URISyntaxException e) { throw new RuntimeException(e); } } @Override public String getHttpMethod() { return request.getMethod(); } }, spanNameProvider)); return true; } //若是是異步請求則在開始前調用 @Override public void afterConcurrentHandlingStarted(final HttpServletRequest request, final HttpServletResponse response, final Object handler) { //在reqeust中保存span信息,由於異步請求的話會在新的線程中處理業務邏輯,致使在新的線程中獲取以前線程中的span request.setAttribute(HTTP_SERVER_SPAN_ATTRIBUTE, serverThreadBinder.getCurrentServerSpan()); //清空當前線程中span信息 serverThreadBinder.setCurrentSpan(null); } //後置處理 @Override public void afterCompletion(final HttpServletRequest request, final HttpServletResponse response, final Object handler, final Exception ex) { //若是是異步請求的話,則須要到request中獲取span final ServerSpan span = (ServerSpan) request.getAttribute(HTTP_SERVER_SPAN_ATTRIBUTE); if (span != null) { serverThreadBinder.setCurrentSpan(span); } //ServerResponseInterceptor後置處理 responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() { @Override public int getHttpStatusCode() { return response.getStatus(); } })); }
ServerRequestInterceptor處理邏輯:mvc
public void handle(ServerRequestAdapter adapter) { //清空當前線程中的span serverTracer.clearCurrentSpan(); //從request中獲取trace相應信息,若是請求植入了zipkin //trace信息,則沿用request中的span信息,不然會返回一個空的traceData final TraceData traceData = adapter.getTraceData(); Boolean sample = traceData.getSample(); if (sample != null && Boolean.FALSE.equals(sample)) { //不須要採樣,建立一個不採樣的span serverTracer.setStateNoTracing(); LOGGER.fine("Received indication that we should NOT trace."); } else { boolean clientOriginatedTrace = traceData.getSpanId() != null; if (clientOriginatedTrace) { //該traceData是從請求中傳遞過來生成的 LOGGER.fine("Received span information as part of request."); //根據traceData建立span,並保存到當前線程中 serverTracer.setStateCurrentTrace(traceData.getSpanId(), adapter.getSpanName()); } else { //請求中沒有完整的或者沒有trace信息,則會根據採樣率判斷是否生成一個徹底新的traceId、span,並把span存入當前線程中 LOGGER.fine("Received no span state."); serverTracer.setStateUnknown(adapter.getSpanName()); } //在span中添加sr類型的annotation serverTracer.setServerReceived(); //若是是從請求中生成的span,則須要清空timestamp以及startTick //由於原始span是在client生成的 if (clientOriginatedTrace) { Span span = serverTracer.spanAndEndpoint().span(); synchronized (span) { span.setTimestamp(null); span.startTick = null; } } //根據url生成BinaryAnnotation,並存入span for(KeyValueAnnotation annotation : adapter.requestAnnotations()) { serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue()); } } }
ServerResponseInterceptor處理邏輯:app
public void handle(ServerResponseAdapter adapter) { // We can submit this in any case. When server state is not set or // we should not trace this request nothing will happen. LOGGER.fine("Sending server send."); try { //設置響應碼到span的BinaryAnnotation for(KeyValueAnnotation annotation : adapter.responseAnnotations()) { serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue()); } //設置ss annotation,並上報數據 具體邏輯看AnnotationSubmitter的submitEndAnnotation方法-》 serverTracer.setServerSend(); } finally { serverTracer.clearCurrentSpan(); } }
下面分析AnnotationSubmitter的submitEndAnnotation方法:dom
boolean submitEndAnnotation(String annotationName, Reporter<zipkin.Span> reporter) { Span span = spanAndEndpoint().span(); if (span == null) { return false; } Long startTimestamp; Long startTick; synchronized (span) { startTimestamp = span.getTimestamp(); startTick = span.startTick; } long endTimestamp = currentTimeMicroseconds(startTimestamp, startTick); Annotation annotation = Annotation.create( endTimestamp, annotationName, spanAndEndpoint().endpoint() ); synchronized (span) { span.addToAnnotations(annotation); if (startTimestamp != null) { //計算rr->rs耗費的時間 span.setDuration(Math.max(1L, endTimestamp - startTimestamp)); } } //上報消息 reporter.report(span.toZipkin()); return true; }
springmvc相關代碼分析結束。異步