Zipkin客戶端鏈路追蹤源碼解析

咱們知道,Zipkin這個工具能夠幫助咱們收集分佈式系統中各個系統之間的調用連關係,並且除了Servlet以外還能收集:MQ、線程池、WebSocket、Feign、Hystrix、RxJava、WebFlux等等組件之間的調用關係。本篇文章就來分析一下Zipkin是如何完成這些功能的web

咱們先以最經常使用的Servlet接受請求爲例來分析spring

在spring-cloud-sleuth的spring.factories文件中注入的不少類中包含了一個類:TraceWebServletAutoConfiguration,一看就知道,這是爲Servlet環境量身定製的一個自動裝配類app

在這個類中,建立了一個Filter,這個Filter就是攔截web請求,完成Servlet請求鏈路的收集的利器分佈式

@Bean
	@ConditionalOnMissingBean
	public TracingFilter tracingFilter(HttpTracing tracing) {
		return (TracingFilter) TracingFilter.create(tracing);
	}

咱們直接來看這個攔截器都是作了一些什麼東西吧ide

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest)request;
        HttpServletResponse httpResponse = this.servlet.httpResponse(response);
        TraceContext context = (TraceContext)request.getAttribute(TraceContext.class.getName());
        if (context != null) {
            Scope scope = this.currentTraceContext.maybeScope(context);

            try {
                chain.doFilter(request, response);
            } finally {
                scope.close();
            }

        } else {
            Span span = this.handler.handleReceive(this.extractor, httpRequest);
            request.setAttribute(SpanCustomizer.class.getName(), span.customizer());
            request.setAttribute(TraceContext.class.getName(), span.context());
            Throwable error = null;
            Scope scope = this.currentTraceContext.newScope(span.context());

            try {
                chain.doFilter(httpRequest, httpResponse);
            } catch (ServletException | RuntimeException | Error | IOException var19) {
                error = var19;
                throw var19;
            } finally {
                scope.close();
                if (this.servlet.isAsync(httpRequest)) {
                    this.servlet.handleAsync(this.handler, httpRequest, span);
                } else {
                    this.handler.handleSend(ADAPTER.adaptResponse(httpRequest, httpResponse), error, span);
                }

            }

        }
    }
Span的建立

第一步,嘗試從request中獲取TraceContext,TraceContext包含了本次請求的鏈路信息,假如這個請求是從上游系統過來的話,那麼這裏就會存在這個信息。工具

咱們先重點看不存在上游系統時的分支,這個時候,第一步就應該去建立一個span。關於span和trace的概念上篇文章已經提到過了,這裏就再也不展開了。oop

public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
    Span span = nextSpan(extractor.extract(carrier), request);
    span.kind(Span.Kind.SERVER);
    return handleStart(request, span);
  }
  Span nextSpan(TraceContextOrSamplingFlags extracted, Req request) {
    if (extracted.sampled() == null) { // Otherwise, try to make a new decision
      extracted = extracted.sampled(sampler.trySample(adapter, request));
    }
    return extracted.context() != null
        ? tracer.joinSpan(extracted.context())
        : tracer.nextSpan(extracted);
  }

這個三目表達式的意思是看當前環境中是否存在span,若是存在則加入當前環境的span,不然繼續進入建立span的邏輯ui

public Span nextSpan(TraceContextOrSamplingFlags extracted) {
    TraceContext parent = extracted.context();
    if (extracted.samplingFlags() != null) {
      TraceContext implicitParent = currentTraceContext.get();
      if (implicitParent == null) {
        return toSpan(newContextBuilder(null, extracted.samplingFlags())
            .extra(extracted.extra()).build());
      }
      // fall through, with an implicit parent, not an extracted one
      parent = appendExtra(implicitParent, extracted.extra());
    }
    if (parent != null) {
      TraceContext.Builder builder;
      if (extracted.samplingFlags() != null) {
        builder = newContextBuilder(parent, extracted.samplingFlags());
      } else {
        builder = newContextBuilder(parent, sampler);
      }
      return toSpan(builder.build());
    }
    TraceIdContext traceIdContext = extracted.traceIdContext();
    if (extracted.traceIdContext() != null) {
      Boolean sampled = traceIdContext.sampled();
      if (sampled == null) sampled = sampler.isSampled(traceIdContext.traceId());
      return toSpan(TraceContext.newBuilder()
          .sampled(sampled)
          .debug(traceIdContext.debug())
          .traceIdHigh(traceIdContext.traceIdHigh()).traceId(traceIdContext.traceId())
          .spanId(nextId())
          .extra(extracted.extra()).build());
    }
    // TraceContextOrSamplingFlags is a union of 3 types, we've checked all three
    throw new AssertionError("should not reach here");
  }

首先會嘗試獲取trace,由於是第一次請求,因此這個時候trace也不存在因此會進入到toSpan方法this

public Span toSpan(TraceContext context) {
    if (context == null) throw new NullPointerException("context == null");
    TraceContext decorated = propagationFactory.decorate(context);
    if (!noop.get() && Boolean.TRUE.equals(decorated.sampled())) {
      return RealSpan.create(decorated, recorder, errorParser);
    }
    return NoopSpan.create(decorated);
  }

這裏若是咱們沒有特殊指定的話會使用RealSpan來建立span,這個span的最終實現類是AutoValue_RealSpanspa

接着返回最開始的handleReceive方法

public <C> Span handleReceive(TraceContext.Extractor<C> extractor, C carrier, Req request) {
    Span span = nextSpan(extractor.extract(carrier), request);
    span.kind(Span.Kind.SERVER);
    return handleStart(request, span);
  }

span建立完畢後就會設置kind,這個kand表明了服務類型,這裏就是設置了服務類型爲服務端。

接下來就是去開啓記錄鏈路信息

Span handleStart(Req request, Span span) {
    if (span.isNoop()) return span;
    Scope ws = currentTraceContext.maybeScope(span.context());
    try {
      parser.request(adapter, request, span.customizer());

      Endpoint.Builder remoteEndpoint = Endpoint.newBuilder();
      if (parseRemoteEndpoint(request, remoteEndpoint)) {
        span.remoteEndpoint(remoteEndpoint.build());
      }
    } finally {
      ws.close();
    }

    return span.start();
  }

開啓過程當中記錄了幾個信息

public <Req> void request(HttpAdapter<Req, ?> adapter, Req req, SpanCustomizer customizer) {
    customizer.name(spanName(adapter, req));
    String method = adapter.method(req);
    if (method != null) customizer.tag("http.method", method);
    String path = adapter.path(req);
    if (path != null) customizer.tag("http.path", path);
  }
  
  public Span start() {
      return start(clock.currentTimeMicroseconds());
  }
  synchronized MutableSpan start(long timestamp) {
    span.timestamp(this.timestamp = timestamp);
    return this;
  }

接着在回到文章最開始提到的Filter方法中

在span和trace建立完成後,會把它們添加到request中

Scope的建立

而後是一個scope的建立,這個scope和日誌組件說息息相關的。簡單來講,它會把traceId、parentId、spanId打印到當前系統打印的每一行日誌中

public Scope newScope(@Nullable TraceContext currentSpan) {
		final String previousTraceId = MDC.get("traceId");
		final String previousParentId = MDC.get("parentId");
		final String previousSpanId = MDC.get("spanId");
		final String spanExportable = MDC.get("spanExportable");
		final String legacyPreviousTraceId = MDC.get(LEGACY_TRACE_ID_NAME);
		final String legacyPreviousParentId = MDC.get(LEGACY_PARENT_ID_NAME);
		final String legacyPreviousSpanId = MDC.get(LEGACY_SPAN_ID_NAME);
		final String legacySpanExportable = MDC.get(LEGACY_EXPORTABLE_NAME);

		if (currentSpan != null) {
			String traceIdString = currentSpan.traceIdString();
			MDC.put("traceId", traceIdString);
			MDC.put(LEGACY_TRACE_ID_NAME, traceIdString);
			String parentId = currentSpan.parentId() != null ?
					HexCodec.toLowerHex(currentSpan.parentId()) :
					null;
			replace("parentId", parentId);
			replace(LEGACY_PARENT_ID_NAME, parentId);
			String spanId = HexCodec.toLowerHex(currentSpan.spanId());
			MDC.put("spanId", spanId);
			MDC.put(LEGACY_SPAN_ID_NAME, spanId);
			String sampled = String.valueOf(currentSpan.sampled());
			MDC.put("spanExportable", sampled);
			MDC.put(LEGACY_EXPORTABLE_NAME, sampled);
			log("Starting scope for span: {}", currentSpan);
			if (currentSpan.parentId() != null) {
				if (log.isTraceEnabled()) {
					log.trace("With parent: {}", currentSpan.parentId());
				}
			}
		}
		else {
			MDC.remove("traceId");
			MDC.remove("parentId");
			MDC.remove("spanId");
			MDC.remove("spanExportable");
			MDC.remove(LEGACY_TRACE_ID_NAME);
			MDC.remove(LEGACY_PARENT_ID_NAME);
			MDC.remove(LEGACY_SPAN_ID_NAME);
			MDC.remove(LEGACY_EXPORTABLE_NAME);
		}

		Scope scope = this.delegate.newScope(currentSpan);

		class ThreadContextCurrentTraceContextScope implements Scope {
			@Override public void close() {
				log("Closing scope for span: {}", currentSpan);
				scope.close();
				replace("traceId", previousTraceId);
				replace("parentId", previousParentId);
				replace("spanId", previousSpanId);
				replace("spanExportable", spanExportable);
				replace(LEGACY_TRACE_ID_NAME, legacyPreviousTraceId);
				replace(LEGACY_PARENT_ID_NAME, legacyPreviousParentId);
				replace(LEGACY_SPAN_ID_NAME, legacyPreviousSpanId);
				replace(LEGACY_EXPORTABLE_NAME, legacySpanExportable);
			}
		}
		return new ThreadContextCurrentTraceContextScope();
	}
Span的上送

接下來當剩下的執行鏈執行完畢後,本次請求也就該結束了。在請求結束時,span就會被上送到Zipkin服務端中

public void handleSend(@Nullable Resp response, @Nullable Throwable error, Span span) {
    handleFinish(response, error, span);
  }
  
    void handleFinish(@Nullable Resp response, @Nullable Throwable error, Span span) {
    if (span.isNoop()) return;
    try {
      Scope ws = currentTraceContext.maybeScope(span.context());
      try {
        parser.response(adapter, response, error, span.customizer());
      } finally {
        ws.close(); // close the scope before finishing the span
      }
    } finally {
      finishInNullScope(span);
    }
  }

首先在span中記錄本次調用的相應信息

public <Resp> void response(HttpAdapter<?, Resp> adapter, @Nullable Resp res,
      @Nullable Throwable error, SpanCustomizer customizer) {
    int statusCode = 0;
    if (res != null) {
      statusCode = adapter.statusCodeAsInt(res);
      String nameFromRoute = spanNameFromRoute(adapter, res, statusCode);
      if (nameFromRoute != null) customizer.name(nameFromRoute);
      String maybeStatus = maybeStatusAsString(statusCode, 299);
      if (maybeStatus != null) customizer.tag("http.status_code", maybeStatus);
    }
    error(statusCode, error, customizer);
  }

接着清空Scope

void finishInNullScope(Span span) {
    Scope ws = currentTraceContext.maybeScope(null);
    try {
      span.finish();
    } finally {
      ws.close();
    }
  }

以後說span的上傳

public void finish(TraceContext context) {
    MutableSpan span = spanMap.remove(context);
    if (span == null || noop.get()) return;
    synchronized (span) {
      span.finish(span.clock.currentTimeMicroseconds());
      reporter.report(span.toSpan());
    }
  }

具體上傳的實現是由Sender接口的實現類實現的,它的實現類默認狀況下是這三個 屏幕快照 2019-11-18 下午10.31.01file

而一個span內容則是這樣的 屏幕快照 2019-11-13 下午9.45.27file

RabbitMQ鏈路追蹤

當看完SpringMVC鏈路追蹤的實現方式以後,再去看其餘的方式,我想確定是很是簡單的。這裏咱們以RabbitMQ爲例:

首先查找spring-cloud-sleuth的spring.factories文件,看到關於消息中間件的追蹤配置類是這個TraceMessagingAutoConfiguration

看這個類關於RabbitMQ的東西

@Configuration
	@ConditionalOnProperty(value = "spring.sleuth.messaging.rabbit.enabled", matchIfMissing = true)
	@ConditionalOnClass(RabbitTemplate.class)
	protected static class SleuthRabbitConfiguration {
		@Bean
		@ConditionalOnMissingBean
		SpringRabbitTracing springRabbitTracing(Tracing tracing,
				SleuthMessagingProperties properties) {
			return SpringRabbitTracing.newBuilder(tracing)
					.remoteServiceName(properties.getMessaging().getRabbit().getRemoteServiceName())
					.build();
		}

		@Bean
		@ConditionalOnMissingBean
		static SleuthRabbitBeanPostProcessor sleuthRabbitBeanPostProcessor(BeanFactory beanFactory) {
			return new SleuthRabbitBeanPostProcessor(beanFactory);
		}
	}

這裏其實大體就能夠猜想出來了,確定是使用了SleuthRabbitBeanPostProcessor在構造RabbitTemplate的使用作了一些改造,好比說加個攔截器啥的,而後當使用RabbitTemplate發送消息時自動添加Header等東西就完成了整個流程了

1

相關文章
相關標籤/搜索