先來瀏覽一下架構圖(下圖的scribe採用kafka替代)java
首先咱們完成brave+kafka+zipkin這一步先,這時候採用的存儲是zipkin的內存。git
----------------------------------------------------------------------------------------------github
下載kafka後apache
tar -xzf kafka_2.11-0.9.0.0.tgzbootstrap
cd kafka_2.11-0.9.0.0windows
啓動zookeeper:api
bin/zookeeper-server-start.sh config/zookeeper.properties架構
(windows 啓動 bin\windows\zookeeper-server-start.bat config\zookeeper.properties)app
啓動kafka服務:dom
bin/kafka-server-start.sh config/server.properties
(windows 啓動 bin\windows\kafka-server-start.bat config\server.properties)
----------------------------------------------------------------------------------------------
下載zipkin的jar包,
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
運行: java -jar zipkin.jar
用解壓工具能夠看到zipkin.jar裏的BOOT-INF\classes\zipkin-server-shared.yml文件,全部的配置都在這個文件裏。
改變配置裏的值,能夠經過java啓動腳本里-D帶入配置參數。
如今咱們經過kafka作數據通道的話就採用以下參數:
java -DKAFKA_ZOOKEEPER=localhost:2181 -jar zipkin-server-1.19.2-exec.jar
打開ui查看 http://localhost:9411/
----------------------------------------------------------------------------------------------
接着附上測試插入數據到kafka的代碼。
import com.github.kristofa.brave.*; import com.twitter.zipkin.gen.Endpoint; import zipkin.Span; import zipkin.reporter.AsyncReporter; import zipkin.reporter.kafka08.KafkaSender; import java.util.ArrayList; import java.util.Collection; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BraveKafkaTest { private static Brave brave = null; private static Brave brave2 = null; private static void braveInit(){ // KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").messageMaxBytes(10000).build(); KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").build(); AsyncReporter<Span> report = AsyncReporter.builder(sender).build(); brave = new Brave.Builder("appserver").reporter(report).build(); brave2 = new Brave.Builder("datacenter").reporter(report).build(); } static class Task { String name; SpanId spanId; public Task(String name, SpanId spanId) { super(); this.name = name; this.spanId = spanId; } } public static void main(String[] args) throws Exception { braveInit(); final BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10); Thread thread = new Thread(){ public void run() { while (true) { try { Task task = queue.take(); dcHandle(task.name, task.spanId); } catch (Exception e) { e.printStackTrace(); } } } }; thread.start(); for (int i = 0; i < 10; i++) { ServerRequestInterceptor serverRequestInterceptor = brave.serverRequestInterceptor(); ServerResponseInterceptor serverResponseInterceptor = brave.serverResponseInterceptor(); ClientRequestInterceptor clientRequestInterceptor = brave.clientRequestInterceptor(); ClientResponseInterceptor clientResponseInterceptor = brave.clientResponseInterceptor(); serverRequestInterceptor.handle(new ServerRequestAdapterImpl("group_data")); ClientRequestAdapterImpl clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_radio_list"); clientRequestInterceptor.handle(clientRequestAdapterImpl); queue.offer(new Task("get_radio_list", clientRequestAdapterImpl.getSpanId())); Thread.sleep(10); clientResponseInterceptor.handle(new ClientResponseAdapterImpl()); clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_user_list"); clientRequestInterceptor.handle(clientRequestAdapterImpl); queue.offer(new Task("get_user_list", clientRequestAdapterImpl.getSpanId())); Thread.sleep(10); clientResponseInterceptor.handle(new ClientResponseAdapterImpl()); clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_program_list"); clientRequestInterceptor.handle(clientRequestAdapterImpl); queue.offer(new Task("get_program_list", clientRequestAdapterImpl.getSpanId())); Thread.sleep(10); clientResponseInterceptor.handle(new ClientResponseAdapterImpl()); serverResponseInterceptor.handle(new ServerResponseAdapterImpl()); Thread.sleep(10); } } public static void dcHandle(String spanName, SpanId spanId){ ServerRequestInterceptor serverRequestInterceptor = brave2.serverRequestInterceptor(); ServerResponseInterceptor serverResponseInterceptor = brave2.serverResponseInterceptor(); serverRequestInterceptor.handle(new ServerRequestAdapterImpl(spanName, spanId)); serverResponseInterceptor.handle(new ServerResponseAdapterImpl()); } static class ServerRequestAdapterImpl implements ServerRequestAdapter { Random randomGenerator = new Random(); SpanId spanId; String spanName; ServerRequestAdapterImpl(String spanName){ this.spanName = spanName; long startId = randomGenerator.nextLong(); SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build(); this.spanId = spanId; } ServerRequestAdapterImpl(String spanName, SpanId spanId){ this.spanName = spanName; this.spanId = spanId; } public TraceData getTraceData() { if (this.spanId != null) { return TraceData.builder().spanId(this.spanId).build(); } long startId = randomGenerator.nextLong(); SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build(); return TraceData.builder().spanId(spanId).build(); } public String getSpanName() { return spanName; } public Collection<KeyValueAnnotation> requestAnnotations() { Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>(); KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364"); collection.add(kv); return collection; } } static class ServerResponseAdapterImpl implements ServerResponseAdapter { public Collection<KeyValueAnnotation> responseAnnotations() { Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>(); KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364"); collection.add(kv); return collection; } } static class ClientRequestAdapterImpl implements ClientRequestAdapter { String spanName; SpanId spanId; ClientRequestAdapterImpl(String spanName){ this.spanName = spanName; } public SpanId getSpanId() { return spanId; } public String getSpanName() { return this.spanName; } public void addSpanIdToRequest(SpanId spanId) { //記錄傳輸到遠程服務 System.out.println(spanId); if (spanId != null) { this.spanId = spanId; System.out.println(String.format("trace_id=%s, parent_id=%s, span_id=%s", spanId.traceId, spanId.parentId, spanId.spanId)); } } public Collection<KeyValueAnnotation> requestAnnotations() { Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>(); KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364"); collection.add(kv); return collection; } public Endpoint serverAddress() { return null; } } static class ClientResponseAdapterImpl implements ClientResponseAdapter { public Collection<KeyValueAnnotation> responseAnnotations() { Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>(); KeyValueAnnotation kv = KeyValueAnnotation.create("radioname", "火星人1"); collection.add(kv); return collection; } } }
maven依賴:
<dependencies> <dependency> <groupId>io.zipkin.brave</groupId> <artifactId>brave-core</artifactId> <version>3.17.0</version> </dependency> <dependency> <groupId>io.zipkin.brave</groupId> <artifactId>brave-spancollector-http</artifactId> <version>3.17.0</version> </dependency> <dependency> <groupId>com.google.auto.value</groupId> <artifactId>auto-value</artifactId> <version>1.3</version> </dependency> <dependency> <groupId>io.zipkin.brave</groupId> <artifactId>brave-spancollector-kafka</artifactId> <version>3.17.0</version> </dependency> <dependency> <groupId>io.zipkin.reporter</groupId> <artifactId>zipkin-sender-kafka08</artifactId> <version>0.4.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.3</version> </dependency> </dependencies>
運行完成後,插入了10條跟蹤信息導kafka,打開 http://localhost:9411/ 便可看到效果。
----------------------------------------------------------------------------------------------
接下來,採用cassandra存儲從kafka採集過來的數據。
下載apache-cassandra-2.2.8-bin.tar.gz,解壓後
啓動cassandra:bin\cassandra.bat
從新運行zipkin(加入-DSTORAGE_TYPE=cassandra參數):
java -DKAFKA_ZOOKEEPER=localhost:2181 -DSTORAGE_TYPE=cassandra -jar zipkin-server-1.19.2-exec.jar