brave+kafka+zipkin+cassandra搭建分佈式鏈路跟蹤系統

先來瀏覽一下架構圖(下圖的scribe採用kafka替代)java

 

首先咱們完成brave+kafka+zipkin這一步先,這時候採用的存儲是zipkin的內存。git

----------------------------------------------------------------------------------------------github

下載kafka後apache

tar -xzf kafka_2.11-0.9.0.0.tgzbootstrap

cd kafka_2.11-0.9.0.0windows

啓動zookeeper:api

bin/zookeeper-server-start.sh config/zookeeper.properties架構

(windows 啓動 bin\windows\zookeeper-server-start.bat config\zookeeper.properties)app

啓動kafka服務:dom

bin/kafka-server-start.sh config/server.properties

(windows 啓動 bin\windows\kafka-server-start.bat config\server.properties)

 

----------------------------------------------------------------------------------------------

下載zipkin的jar包,

wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'

運行: java -jar zipkin.jar

 

用解壓工具能夠看到zipkin.jar裏的BOOT-INF\classes\zipkin-server-shared.yml文件,全部的配置都在這個文件裏。

改變配置裏的值,能夠經過java啓動腳本里-D帶入配置參數。

如今咱們經過kafka作數據通道的話就採用以下參數:

java -DKAFKA_ZOOKEEPER=localhost:2181 -jar zipkin-server-1.19.2-exec.jar

打開ui查看  http://localhost:9411/

----------------------------------------------------------------------------------------------

接着附上測試插入數據到kafka的代碼。

import com.github.kristofa.brave.*;
import com.twitter.zipkin.gen.Endpoint;
import zipkin.Span;
import zipkin.reporter.AsyncReporter;
import zipkin.reporter.kafka08.KafkaSender;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BraveKafkaTest {

    private static Brave brave = null;
    private static Brave brave2 = null;

    private static void braveInit(){
//        KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").messageMaxBytes(10000).build();
        KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").build();
        AsyncReporter<Span> report = AsyncReporter.builder(sender).build();

        brave = new Brave.Builder("appserver").reporter(report).build();
        brave2 = new Brave.Builder("datacenter").reporter(report).build();
    }

    static class Task {
        String name;
        SpanId spanId;
        public Task(String name, SpanId spanId) {
            super();
            this.name = name;
            this.spanId = spanId;
        }
    }

    public static void main(String[] args) throws Exception {
        braveInit();

        final BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10);
        Thread thread = new Thread(){
            public void run() {
                while (true) {
                    try {
                        Task task = queue.take();
                        dcHandle(task.name, task.spanId);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();

        for (int i = 0; i < 10; i++) {
            ServerRequestInterceptor serverRequestInterceptor = brave.serverRequestInterceptor();
            ServerResponseInterceptor serverResponseInterceptor = brave.serverResponseInterceptor();
            ClientRequestInterceptor clientRequestInterceptor = brave.clientRequestInterceptor();
            ClientResponseInterceptor clientResponseInterceptor = brave.clientResponseInterceptor();

            serverRequestInterceptor.handle(new ServerRequestAdapterImpl("group_data"));

            ClientRequestAdapterImpl clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_radio_list");
            clientRequestInterceptor.handle(clientRequestAdapterImpl);
            queue.offer(new Task("get_radio_list", clientRequestAdapterImpl.getSpanId()));
            Thread.sleep(10);
            clientResponseInterceptor.handle(new ClientResponseAdapterImpl());

            clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_user_list");
            clientRequestInterceptor.handle(clientRequestAdapterImpl);
            queue.offer(new Task("get_user_list", clientRequestAdapterImpl.getSpanId()));
            Thread.sleep(10);
            clientResponseInterceptor.handle(new ClientResponseAdapterImpl());

            clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_program_list");
            clientRequestInterceptor.handle(clientRequestAdapterImpl);
            queue.offer(new Task("get_program_list", clientRequestAdapterImpl.getSpanId()));
            Thread.sleep(10);
            clientResponseInterceptor.handle(new ClientResponseAdapterImpl());

            serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
            Thread.sleep(10);
        }
    }

    public static void dcHandle(String spanName, SpanId spanId){
        ServerRequestInterceptor serverRequestInterceptor = brave2.serverRequestInterceptor();
        ServerResponseInterceptor serverResponseInterceptor = brave2.serverResponseInterceptor();
        serverRequestInterceptor.handle(new ServerRequestAdapterImpl(spanName, spanId));
        serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
    }


    static class ServerRequestAdapterImpl implements ServerRequestAdapter {

        Random randomGenerator = new Random();
        SpanId spanId;
        String spanName;

        ServerRequestAdapterImpl(String spanName){
            this.spanName = spanName;
            long startId = randomGenerator.nextLong();
            SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
            this.spanId = spanId;
        }

        ServerRequestAdapterImpl(String spanName, SpanId spanId){
            this.spanName = spanName;
            this.spanId = spanId;
        }

        public TraceData getTraceData() {
            if (this.spanId != null) {
                return TraceData.builder().spanId(this.spanId).build();
            }
            long startId = randomGenerator.nextLong();
            SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
            return TraceData.builder().spanId(spanId).build();
        }

        public String getSpanName() {
            return spanName;
        }

        public Collection<KeyValueAnnotation> requestAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
            collection.add(kv);
            return collection;
        }

    }

    static class ServerResponseAdapterImpl implements ServerResponseAdapter {

        public Collection<KeyValueAnnotation> responseAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
            collection.add(kv);
            return collection;
        }

    }

    static class ClientRequestAdapterImpl implements ClientRequestAdapter {

        String spanName;
        SpanId spanId;

        ClientRequestAdapterImpl(String spanName){
            this.spanName = spanName;
        }

        public SpanId getSpanId() {
            return spanId;
        }

        public String getSpanName() {
            return this.spanName;
        }

        public void addSpanIdToRequest(SpanId spanId) {
            //記錄傳輸到遠程服務
            System.out.println(spanId);
            if (spanId != null) {
                this.spanId = spanId;
                System.out.println(String.format("trace_id=%s, parent_id=%s, span_id=%s", spanId.traceId, spanId.parentId, spanId.spanId));
            }
        }

        public Collection<KeyValueAnnotation> requestAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
            collection.add(kv);
            return collection;
        }

        public Endpoint serverAddress() {
            return null;
        }

    }

    static class ClientResponseAdapterImpl implements ClientResponseAdapter {

        public Collection<KeyValueAnnotation> responseAnnotations() {
            Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
            KeyValueAnnotation kv = KeyValueAnnotation.create("radioname", "火星人1");
            collection.add(kv);
            return collection;
        }

    }
}

maven依賴:

<dependencies>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-core</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-spancollector-http</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.auto.value</groupId>
            <artifactId>auto-value</artifactId>
            <version>1.3</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-spancollector-kafka</artifactId>
            <version>3.17.0</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter</groupId>
            <artifactId>zipkin-sender-kafka08</artifactId>
            <version>0.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-access</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.3</version>
        </dependency>

    </dependencies>

運行完成後,插入了10條跟蹤信息導kafka,打開 http://localhost:9411/ 便可看到效果。

 

 

----------------------------------------------------------------------------------------------

接下來,採用cassandra存儲從kafka採集過來的數據。

下載apache-cassandra-2.2.8-bin.tar.gz,解壓後

啓動cassandra:bin\cassandra.bat

從新運行zipkin(加入-DSTORAGE_TYPE=cassandra參數):

java -DKAFKA_ZOOKEEPER=localhost:2181 -DSTORAGE_TYPE=cassandra -jar zipkin-server-1.19.2-exec.jar

相關文章
相關標籤/搜索