zipkin: self-tracing: # Set to true to enable self-tracing. enabled: ${SELF_TRACING_ENABLED:false} # percentage to self-traces to retain sample-rate: ${SELF_TRACING_SAMPLE_RATE:1.0} # Interval in seconds to flush self-tracing data to storage. flush-interval: ${SELF_TRACING_FLUSH_INTERVAL:1} collector: # percentage to traces to retain sample-rate: ${COLLECTOR_SAMPLE_RATE:1.0} kafka: # ZooKeeper host string, comma-separated host:port value. zookeeper: ${KAFKA_ZOOKEEPER:} # Name of topic to poll for spans topic: ${KAFKA_TOPIC:zipkin} # Consumer group this process is consuming on behalf of. group-id: ${KAFKA_GROUP_ID:zipkin} # Count of consumer threads consuming the topic streams: ${KAFKA_STREAMS:1} # Maximum size of a message containing spans in bytes max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576} scribe: enabled: ${SCRIBE_ENABLED:false} category: zipkin port: ${COLLECTOR_PORT:9410} query: # 7 days in millis lookback: ${QUERY_LOOKBACK:86400000} # The Cache-Control max-age (seconds) for /api/v1/services and /api/v1/spans names-max-age: 300 # CORS allowed-origins. allowed-origins: "*" storage: strict-trace-id: ${STRICT_TRACE_ID:true} type: ${STORAGE_TYPE:mem} cassandra: # Comma separated list of hosts / ip addresses part of Cassandra cluster. contact-points: ${CASSANDRA_CONTACT_POINTS:localhost} # Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin. local-dc: ${CASSANDRA_LOCAL_DC:} # Will throw an exception on startup if authentication fails. username: ${CASSANDRA_USERNAME:} password: ${CASSANDRA_PASSWORD:} keyspace: ${CASSANDRA_KEYSPACE:zipkin} # Max pooled connections per datacenter-local host. max-connections: ${CASSANDRA_MAX_CONNECTIONS:8} # Ensuring that schema exists, if enabled tries to execute script /zipkin-cassandra-core/resources/cassandra-schema-cql3.txt. ensure-schema: ${CASSANDRA_ENSURE_SCHEMA:true} # 7 days in seconds span-ttl: ${CASSANDRA_SPAN_TTL:604800} # 3 days in seconds index-ttl: ${CASSANDRA_INDEX_TTL:259200} # the maximum trace index metadata entries to cache index-cache-max: ${CASSANDRA_INDEX_CACHE_MAX:100000} # how long to cache index metadata about a trace. 1 minute in seconds index-cache-ttl: ${CASSANDRA_INDEX_CACHE_TTL:60} # how many more index rows to fetch than the user-supplied query limit index-fetch-multiplier: ${CASSANDRA_INDEX_FETCH_MULTIPLIER:3} # Using ssl for connection, rely on Keystore use-ssl: ${CASSANDRA_USE_SSL:false} cassandra3: # Comma separated list of hosts / ip addresses part of Cassandra cluster. contact-points: ${CASSANDRA3_CONTACT_POINTS:localhost} # Name of the datacenter that will be considered "local" for latency load balancing. When unset, load-balancing is round-robin. local-dc: ${CASSANDRA3_LOCAL_DC:} # Will throw an exception on startup if authentication fails. username: ${CASSANDRA3_USERNAME:} password: ${CASSANDRA3_PASSWORD:} keyspace: ${CASSANDRA3_KEYSPACE:zipkin3} # Max pooled connections per datacenter-local host. max-connections: ${CASSANDRA3_MAX_CONNECTIONS:8} # Ensuring that schema exists, if enabled tries to execute script /cassandra3-schema.cql ensure-schema: ${CASSANDRA3_ENSURE_SCHEMA:true} # how many more index rows to fetch than the user-supplied query limit index-fetch-multiplier: ${CASSANDRA3_INDEX_FETCH_MULTIPLIER:3} # Using ssl for connection, rely on Keystore use-ssl: ${CASSANDRA3_USE_SSL:false} elasticsearch: # host is left unset intentionally, to defer the decision hosts: ${ES_HOSTS:} pipeline: ${ES_PIPELINE:} max-requests: ${ES_MAX_REQUESTS:64} aws: domain: ${ES_AWS_DOMAIN:} region: ${ES_AWS_REGION:} index: ${ES_INDEX:zipkin} date-separator: ${ES_DATE_SEPARATOR:-} index-shards: ${ES_INDEX_SHARDS:5} index-replicas: ${ES_INDEX_REPLICAS:1} username: ${ES_USERNAME:} password: ${ES_PASSWORD:} mysql: host: ${MYSQL_HOST:localhost} port: ${MYSQL_TCP_PORT:3306} username: ${MYSQL_USER:} password: ${MYSQL_PASS:} db: ${MYSQL_DB:zipkin} max-active: ${MYSQL_MAX_CONNECTIONS:10} use-ssl: ${MYSQL_USE_SSL:false} ui: ## Values below here are mapped to ZipkinUiProperties, served as /config.json # Default limit for Find Traces query-limit: 10 # The value here becomes a label in the top-right corner environment: # Default duration to look back when finding traces. # Affects the "Start time" element in the UI. 1 hour in millis default-lookback: 3600000 # Which sites this Zipkin UI covers. Regex syntax. (e.g. http:\/\/example.com\/.*) # Multiple sites can be specified, e.g. # - .*example1.com # - .*example2.com # Default is "match all websites" instrumented: .* server: port: ${QUERY_PORT:9411} compression: enabled: true # compresses any response over min-response-size (default is 2KiB) # Includes dynamic json content and large static assets from zipkin-ui mime-types: application/json,application/javascript,text/css,image/svg spring: mvc: favicon: # zipkin has its own favicon enabled: false autoconfigure: exclude: # otherwise we might initialize even when not needed (ex when storage type is cassandra) - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration info: zipkin: version: "@project.version@" logging: level: # Silence Invalid method name: '__can__finagle__trace__v3__' com.facebook.swift.service.ThriftServiceProcessor: 'OFF' # # investigate /api/v1/dependencies # zipkin.internal.DependencyLinker: 'DEBUG' # # log cassandra queries (DEBUG is without values) # com.datastax.driver.core.QueryLogger: 'TRACE' # # log cassandra trace propagation # com.datastax.driver.core.Message: 'TRACE'
zipkin基於springbootjavascript
zipkin-servercss
@SpringBootApplication @EnableZipkinServer public class ZipkinServer { public static void main(String[] args) { new SpringApplicationBuilder(ZipkinServer.class) .listeners(new RegisterZipkinHealthIndicators()) .properties("spring.config.name=zipkin-server").run(args); } } 導入 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import({ZipkinServerConfiguration.class, BraveConfiguration.class, ZipkinQueryApiV1.class, ZipkinHttpCollector.class}) public @interface EnableZipkinServer { }
step1.構建存儲java
storage: strict-trace-id: ${STRICT_TRACE_ID:true} type: ${STORAGE_TYPE:mem} 配置文件默認使用爲mem內存存儲 能夠修改 -XX爲springboot對應配置 -XXstorage.type=對應存儲結構 @Configuration public class ZipkinServerConfiguration { ... //對應默認存儲配置,只有當zipkin.storage.type=mem纔會執行 @Configuration // "matchIfMissing = true" ensures this is used when there's no configured storage type @ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "mem", matchIfMissing = true) @ConditionalOnMissingBean(StorageComponent.class) static class InMemoryConfiguration { @Bean StorageComponent storage(@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId) { return InMemoryStorage.builder().strictTraceId(strictTraceId).build(); } } }
rest入口mysql
@RestController @CrossOrigin("${zipkin.query.allowed-origins:*}") public class ZipkinHttpCollector { static final ResponseEntity<?> SUCCESS = ResponseEntity.accepted().build(); static final String APPLICATION_THRIFT = "application/x-thrift"; final CollectorMetrics metrics; final Collector collector; @Autowired ZipkinHttpCollector(StorageComponent storage, CollectorSampler sampler, CollectorMetrics metrics) { this.metrics = metrics.forTransport("http"); this.collector = Collector.builder(getClass()) .storage(storage).sampler(sampler).metrics(this.metrics).build(); } //入口 @RequestMapping(value = "/api/v1/spans", method = POST) public ListenableFuture<ResponseEntity<?>> uploadSpansJson( @RequestHeader(value = "Content-Encoding", required = false) String encoding, @RequestBody byte[] body ) { return validateAndStoreSpans(encoding, Codec.JSON, body); } @RequestMapping(value = "/api/v1/spans", method = POST, consumes = APPLICATION_THRIFT) public ListenableFuture<ResponseEntity<?>> uploadSpansThrift( @RequestHeader(value = "Content-Encoding", required = false) String encoding, @RequestBody byte[] body ) { return validateAndStoreSpans(encoding, Codec.THRIFT, body); } ListenableFuture<ResponseEntity<?>> validateAndStoreSpans(String encoding, Codec codec, byte[] body) { SettableListenableFuture<ResponseEntity<?>> result = new SettableListenableFuture<>(); metrics.incrementMessages(); if (encoding != null && encoding.contains("gzip")) { try { body = gunzip(body); } catch (IOException e) { metrics.incrementMessagesDropped(); result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n")); } } //接收span collector.acceptSpans(body, codec, new Callback<Void>() { @Override public void onSuccess(@Nullable Void value) { result.set(SUCCESS); } @Override public void onError(Throwable t) { String message = t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage(); result.set(t.getMessage() == null || message.startsWith("Cannot store") ? ResponseEntity.status(500).body(message + "\n") : ResponseEntity.status(400).body(message + "\n")); } }); return result; } //略 }
collector處理器git
public final class Collector { /** Needed to scope this to the correct logging category */ public static Builder builder(Class<?> loggingClass) { return new Builder(Logger.getLogger(checkNotNull(loggingClass, "loggingClass").getName())); } public static final class Builder { final Logger logger; StorageComponent storage = null; CollectorSampler sampler = CollectorSampler.ALWAYS_SAMPLE; CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS; ... public Collector build() { return new Collector(this); } } final Logger logger; final StorageComponent storage; final CollectorSampler sampler; final CollectorMetrics metrics; Collector(Builder builder) { this.logger = checkNotNull(builder.logger, "logger"); this.storage = checkNotNull(builder.storage, "storage"); this.sampler = builder.sampler == null ? CollectorSampler.ALWAYS_SAMPLE : builder.sampler; this.metrics = builder.metrics == null ? CollectorMetrics.NOOP_METRICS : builder.metrics; } public void acceptSpans(byte[] serializedSpans, Codec codec, Callback<Void> callback) { metrics.incrementBytes(serializedSpans.length);//記錄指標 List<Span> spans; try { spans = codec.readSpans(serializedSpans);//字節數組轉換成對象 } catch (RuntimeException e) { callback.onError(errorReading(e)); return; } accept(spans, callback);//處理span } ... public void accept(List<Span> spans, Callback<Void> callback) { if (spans.isEmpty()) { callback.onSuccess(null); return; } metrics.incrementSpans(spans.size()); List<Span> sampled = sample(spans); if (sampled.isEmpty()) { callback.onSuccess(null); return; } try { storage.asyncSpanConsumer().accept(sampled, acceptSpansCallback(sampled));//處理 callback.onSuccess(null); } catch (RuntimeException e) { callback.onError(errorStoringSpans(sampled, e)); return; } } //取樣 List<Span> sample(List<Span> input) { List<Span> sampled = new ArrayList<>(input.size()); for (Span s : input) { if (sampler.isSampled(s)) sampled.add(s); } int dropped = input.size() - sampled.size(); if (dropped > 0) metrics.incrementSpansDropped(dropped); return sampled; } ... }
InMemorySpanStore最終處理github
/** Internally, spans are indexed on 64-bit trace ID */ public final class InMemorySpanStore implements SpanStore { private final Multimap<Long, Span> traceIdToSpans = new LinkedListMultimap<>();//traceId+span private final Set<Pair<Long>> traceIdTimeStamps = new TreeSet<>(VALUE_2_DESCENDING);//traceId+timestap private final Multimap<String, Pair<Long>> serviceToTraceIdTimeStamp = new SortedByValue2Descending<>(); private final Multimap<String, String> serviceToSpanNames = new LinkedHashSetMultimap<>();//serviceName+spanName private final boolean strictTraceId; volatile int acceptedSpanCount; // Historical constructor public InMemorySpanStore() { this(new InMemoryStorage.Builder()); } InMemorySpanStore(InMemoryStorage.Builder builder) { this.strictTraceId = builder.strictTraceId; } final StorageAdapters.SpanConsumer spanConsumer = new StorageAdapters.SpanConsumer() { @Override public void accept(List<Span> spans) { for (Span span : spans) { Long timestamp = guessTimestamp(span); Pair<Long> traceIdTimeStamp = Pair.create(span.traceId, timestamp == null ? Long.MIN_VALUE : timestamp); String spanName = span.name; synchronized (InMemorySpanStore.this) { traceIdTimeStamps.add(traceIdTimeStamp); traceIdToSpans.put(span.traceId, span); acceptedSpanCount++; for (String serviceName : span.serviceNames()) { serviceToTraceIdTimeStamp.put(serviceName, traceIdTimeStamp); serviceToSpanNames.put(serviceName, spanName); } } } } @Override public String toString() { return "InMemorySpanConsumer"; } }; ... }
提供api對應查詢爲配置的storeageweb
@RestController @RequestMapping("/api/v1") @CrossOrigin("${zipkin.query.allowed-origins:*}") public class ZipkinQueryApiV1 { @Autowired @Value("${zipkin.query.lookback:86400000}") int defaultLookback = 86400000; // 1 day in millis /** The Cache-Control max-age (seconds) for /api/v1/services and /api/v1/spans */ @Value("${zipkin.query.names-max-age:300}") int namesMaxAge = 300; // 5 minutes volatile int serviceCount; // used as a threshold to start returning cache-control headers private final StorageComponent storage; @Autowired public ZipkinQueryApiV1(StorageComponent storage) { this.storage = storage; // don't cache spanStore here as it can cause the app to crash! } @RequestMapping(value = "/dependencies", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) public byte[] getDependencies(@RequestParam(value = "endTs", required = true) long endTs, @RequestParam(value = "lookback", required = false) Long lookback) { return Codec.JSON.writeDependencyLinks(storage.spanStore().getDependencies(endTs, lookback != null ? lookback : defaultLookback)); } @RequestMapping(value = "/services", method = RequestMethod.GET) public ResponseEntity<List<String>> getServiceNames() { List<String> serviceNames = storage.spanStore().getServiceNames(); serviceCount = serviceNames.size(); return maybeCacheNames(serviceNames); } @RequestMapping(value = "/spans", method = RequestMethod.GET) public ResponseEntity<List<String>> getSpanNames( @RequestParam(value = "serviceName", required = true) String serviceName) { return maybeCacheNames(storage.spanStore().getSpanNames(serviceName)); } @RequestMapping(value = "/traces", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) public String getTraces( @RequestParam(value = "serviceName", required = false) String serviceName, @RequestParam(value = "spanName", defaultValue = "all") String spanName, @RequestParam(value = "annotationQuery", required = false) String annotationQuery, @RequestParam(value = "minDuration", required = false) Long minDuration, @RequestParam(value = "maxDuration", required = false) Long maxDuration, @RequestParam(value = "endTs", required = false) Long endTs, @RequestParam(value = "lookback", required = false) Long lookback, @RequestParam(value = "limit", required = false) Integer limit) { QueryRequest queryRequest = QueryRequest.builder() .serviceName(serviceName) .spanName(spanName) .parseAnnotationQuery(annotationQuery) .minDuration(minDuration) .maxDuration(maxDuration) .endTs(endTs) .lookback(lookback != null ? lookback : defaultLookback) .limit(limit).build(); return new String(Codec.JSON.writeTraces(storage.spanStore().getTraces(queryRequest)), UTF_8); } @RequestMapping(value = "/trace/{traceIdHex}", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE) public String getTrace(@PathVariable String traceIdHex, WebRequest request) { long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L; long traceIdLow = lowerHexToUnsignedLong(traceIdHex); String[] raw = request.getParameterValues("raw"); // RequestParam doesn't work for param w/o value List<Span> trace = raw != null ? storage.spanStore().getRawTrace(traceIdHigh, traceIdLow) : storage.spanStore().getTrace(traceIdHigh, traceIdLow); if (trace == null) { throw new TraceNotFoundException(traceIdHex, traceIdHigh, traceIdLow); } return new String(Codec.JSON.writeSpans(trace), UTF_8); } @ExceptionHandler(TraceNotFoundException.class) @ResponseStatus(HttpStatus.NOT_FOUND) public void notFound() { } static class TraceNotFoundException extends RuntimeException { public TraceNotFoundException(String traceIdHex, Long traceIdHigh, long traceId) { super(String.format("Cannot find trace for id=%s, parsed value=%s", traceIdHex, traceIdHigh != null ? traceIdHigh + "," + traceId : traceId)); } } /** * We cache names if there are more than 3 services. This helps people getting started: if we * cache empty results, users have more questions. We assume caching becomes a concern when zipkin * is in active use, and active use usually implies more than 3 services. */ ResponseEntity<List<String>> maybeCacheNames(List<String> names) { ResponseEntity.BodyBuilder response = ResponseEntity.ok(); if (serviceCount > 3) { response.cacheControl(CacheControl.maxAge(namesMaxAge, TimeUnit.SECONDS).mustRevalidate()); } return response.body(names); } }
zipkin-server接收插入請求-inMemory spring
zipkin-server接收查詢請求-inMemory sql
項目源碼 json