使用 Jaeger 完成服務間的鏈路追蹤

世上本沒有路,走的人多了,便變成了路 -- 魯迅
   
本次討論的話題就是須要在各個服務之間踏出條"路",讓問題有"路"可循。
至於爲何用 jaeger... 這個支持多語言方案算麼?遵循 opentracing 規範算麼?開箱即用算麼?還有更多其餘方面的支持? 至於爲何遵循 opentracing 規範的好... 這個...槓精同窗,文末地址可參考 🙃🙃🙃
老規矩,擼袖開幹...
 
瞭解 Jaeger
Jaeger: open source, end-to-end distributed tracing (Jaeger: 開源的、分佈式系統的端到端追蹤)
Monitor and troubleshoot transactions in complex distributed systems  (在複雜的分佈式系統之間作監控及問題排查的事務處理。)
jaeger 體系和流程以下圖
每層的介紹,以下 (由於編輯器裏的表格真的好難用啊...因此只能代碼塊處理啦)
jaeger-client
(OpenTracing API 各語言的實現,用於在應用中塞入信息採集點)

jaeger-agent
(負責發送的進程,對 spans 進行處理併發送給 collector,監聽 spans 的 UDP 發送。設計這層是爲了做爲基礎組件部署到主機上,從 client 中抽象出了 collector 的發現和路由。注意:1.這層應該是部署在應用本地;2.若是配置報告的 endpoint,則直接將 spans 發送到 collector,不須要 agent。)

jaeger-collector
(收集追蹤 spans,並經過管道對追蹤數據進行處理。當前的管道支持追蹤的驗證、索引、轉換,最後存儲數據)

data store
(追蹤信息的存儲)

jaeger-query
(從存儲中檢索追蹤信息並經過 UI 展現)

jaeger-ui
(UI 展現層,基於 React)
注意:jaeger 的存儲是可插拔組件,目前支持 Cassandra、ElasticSearch 和 Kafka。

基於以上的體系結構,本文關將注點放在 jaeger-client 部分,考慮怎麼實現服務之間和服務內部的 tracing。javascript

 
瞭解追蹤信息
Span:追蹤中的邏輯單元,好比一次請求的過程/一個函數的執行,包含操做名稱、開始時間、持續時間。
SpanContext:表示須要傳播到下游 Spans 和跨應用/進程的 Span 數據,能夠簡單理解爲串在各個系統裏的統一標識對象。
Baggage:字符串組成的鍵值對,和 Span/SpanContext 互相關聯,會在全部的下游 Spans 中進行傳播。(能夠作一些強大的功能,如在整個鏈路夾帶數據,使用成本高,當心使用)
Tracer:項目中的追蹤實例,追蹤項目裏數據變化/函數執行的過程,能夠認爲是一個定向非循環的 spans 的集合圖。
Tracer 和 Span 以下圖:
對於 jaeger-ui 效果以下圖:
jaeger-client 是 opentracing 的實現,因而 jaeger-client api 幾乎等同於 opentracing api。
 
Api 和配置參考
本文以 Nodejs 爲主,golang 爲輔(由於當前恰好涉及到這兩種服務的鏈路追蹤方案 😂😂😂)。這裏大體介紹一下 Configuration/Tracer/Span ,以便實現一個基礎的 tracing 。
配置項
{
  serviceName: "string",
  disable: "boolean",
  sampler: {
    type: "string", // required
    param: "number", // required
    hostPort: "string",
    host: "string",
    port: "number",
    refreshIntervalMs: "number"
  },
  reporter: {
    logSpans: "boolean",
    agentHost: "string",
    agentPort: "number",
    collectorEndpoint: "string",
    username: "string",
    password: "string",
    flushIntervalMs: "number"
  },
  throttler: {
    host: "string",
    port: "number",
    refreshIntervalMs: "number"
  }
}
Configuration
Tracer 對象
{
  objects: {
    _tags: "object", // tags 信息,含 jaeger-version/hostname/ip/client-uuid
    _metrics: "object", // Metrics 度量實例
    _serviceName: "string", // 服務名稱
    _reporter: "object", // 提交實例
    _sampler: "object", // 採樣器實例
    _logger: "object", // 日誌實例,默認 NullLogger
    _baggageSetter: "object", // BaggageSetter 實例
    _debugThrottler: "object", // DefaultThrottler 配置實例
    _injectors: "object", // 注入器列表
    _extractors: "object", // 提取器列表
    _process: "object" // process 信息,含 serviceName/tags
  },
  // 文件位置 ./jaeger-client-node/blob/master/src/tracer.js
  methods: {
    _startInternalSpan: "void", // 建立基礎 span ,供 startSpan 方法調用 / params: spanContext(SpanContext) operationName(string) startTime(number) userTags(any) internalTags(any) parentContext?(SpanContext) rpcServer(boolean) references(Array<Reference>) / retuen Span
    _report: "void", // 發起數據提交,提交到jaeger後端 / params: span(Span)
    registerInjector: "void", // 向 tracer 注入 "注入 SpanContext 內容的方式" / params: format(string) injector(Injector)
    registerExtractor: "void", // 向 tracer 注入 "提取 SpanContext 內容的方式" / params: format(string) extractor(Extractor)
    startSpan: "void", // 建立一個 Span / params: operationName(string) options?:{ operationName(string) childOf(SpanContext) references(Array<Reference>) tags(object) startTime(number) }
    inject: "void", // 將 SpanContext 注入到序列化格式的 carrier 中 / params: SpanContext(SpanContext) format(string) carrier(any)
    extract: "void", // 從序列化格式的 carrier 中提取 SpanContext / params: format(string) carrier(any) / return SpanContext
    close: "void", // 關閉 tracer,更新 spans,或執行回調函數 / params: callback
    now: "void", // 返回當前時間
    _isDebugAllowed: "void" // 返回是否容許 debug
  }
}
Tracer
Span 對象
{
  objects: {
    _tracer: "object", // <Tracer>
    _operationName: "string", // span 名稱
    _spanContext: "object", // span 數據,_traceId/_spanId/_parentId/...
    _startTime: "number", // 時間戳
    _logger: "object", // 日誌實例,默認 NullLogger
    _references: "object", // 引用列表
    _baggageSetter: "object", // BaggageSetter 實例
    _logs: "object", // span 的 logs 列表
    _tags: "object", // span 的 tags 列表
    _duration: "number" // 耗時
  },
  // 文件位置  ./jaeger-client-node/blob/master/src/span.js
  methods: {
    _normalizeBaggageKey: "void", // 返回一個規範化的key / params: key(string) / 返回標準化的 key,字母小寫化、使用破折號替換下劃線 
    setBaggageItem: "void", // 使用關聯的 key 設置 baggage 值 / params: key(string) value(any) / 返回當前Span
    getBaggageItem: "void", // 使用關聯的 key 獲取 baggage 值 / params: key(string) value(any) / 返回 baggage 值
    context: "void", // 獲取當前 Span 的 SpanContext
    tracer: "void", // 獲取當前 Span 的 Tracer
    _isWriteable: "void", // 返回當前 Span 是否可寫
    setOperationName: "void", // 給當前 Span 設置操做名稱 / params: operationName(string) / 返回當前 Span
    finish: "void", // 完成當前 Span / params: finishTime?(number)
    addTags: "void", // 向 Span 添加多個 tag / params: keyValuePairs(object) / 返回當前 Span
    setTag: "void", // 向 Span 添加單個 tag / params: key(string) value(any) / 返回當前 Span
    log: "void", // 向 Span 添加日誌事件或者負載 / params: keyValuePairs(object) timestamp?(number) / 返回當前 Span
    logEvent: "void", // 攜帶負載以記錄事件 / params: keyValuePairs(object) timestamp?(number) / 返回當前 Span
    _setSamplingPriority: "void" // 若是標誌已成功更新,則返回true,不然返回false / params: priority(number) (0 禁用採樣;1 啓用採樣)
  }
}
Span
其中包含一些僅供內部使用的方法,由於是看的人家代碼整理的(沒找到完整文檔...)
span 的話,會有 span 和 errorSpan 之分,在 jaeger-ui 代碼裏的判斷是:
const isErrorTag = ({ key, value }: KeyValuePair) =>
  key === "error" && (value === true || value === "true");
因此,設置 errorSpan 的話代碼以下:
span.setTag("error", true);

span.log({
  message: err.message
});

span.finish();

效果如上面UI效果圖上span樣式所示。java

對於數據方面 jaeger 是比較自由的,能夠拉 jaeger-ui 代碼而後根據本身設置的 KeyValuePair 作個性化設置。
 
實踐/案例
Nodejs 服務之間
好比有服務[a,b,c],發起一個請求到 a,服務 a 調用服務 b 的接口,服務 b 調用服務 c 的接口,依次作追蹤。
request.js
const Request = require("request");
const noop = () => {};

// request
const request = (url, options) => {
  const method = (options && options.method) || "GET";
  const headers = (options && options.headers) || {};
  const tracer = (options && options.tracer) || { inject: noop, setTag: noop };
  const rootSpan = (options && options.rootSpan) || {};
  const _config = rootSpan ? { childOf: rootSpan } : {};
  const span = tracer.startSpan(`${url}`, _config);
  span.setTag(Tags.HTTP_URL, url);
  span.setTag(Tags.HTTP_METHOD, method);
  tracer.inject(span, FORMAT_HTTP_HEADERS, headers);
  const promise = new Promise((resolve, reject) => {
    Request(
      {
        url: url,
        method: method,
        headers: headers
      },
      (err, res, body) => {
        span.finish();
        if (err) {
          console.log("request error : ", err);
          reject(err);
        } else {
          resolve(body);
        }
      }
    );
  });
  return promise;
};

export default request
request.js
aservice.js
const { initTracer } = require("jaeger-client");
const { FORMAT_HTTP_HEADERS, Tags } = require("opentracing");

// app use trace
const jaegerConfig = {
  serviceName: "a-service",
  sampler: { type: "const", param: 1 },
  reporter: {
    logSpans: true,
    collectorEndpoint: "http://localhost:14268/api/traces"
  }
};

const jaegerOptions = { baggagePrefix: "x-b3-" };
const tracer = initTracer(jaegerConfig, jaegerOptions);

app.use(async (ctx, next) => {
  const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers);
  const _config = parent ? { childOf: parent } : {};
  const span = tracer.startSpan(`${ctx.host}`, _config);
  span.setTag("route", ctx.path);
  ctx.tracerRootSpan = span;
  ctx.tracer = tracer;
  await next();
  span.finish();
});

// app router
router.get("/abc", async (ctx, next) => {
  const result = await request("http://localhost:7072/bc", {
    tracer: ctx.tracer,
    rootSpan: ctx.tracerRootSpan
  });
  ctx.body = "get :7071/a , hello a" + "\n" + result;
});

app.use(router.routes());
app.listen(7071, () => {
  console.log("\x1B[32m port : 7071 \x1B[39m");
});
aservice.js
bservice.js
const { initTracer } = require("jaeger-client");
const { FORMAT_HTTP_HEADERS, Tags } = require("opentracing");

// app use trace
const jaegerConfig = {
  serviceName: "b-service",
  sampler: { type: "const", param: 1 },
  reporter: {
    logSpans: true,
    collectorEndpoint: "http://localhost:14268/api/traces"
  }
};

const jaegerOptions = { baggagePrefix: "x-b3-" };
const tracer = initTracer(jaegerConfig, jaegerOptions);

app.use(async (ctx, next) => {
  const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers);
  const _config = parent ? { childOf: parent } : {};
  const span = tracer.startSpan(`${ctx.host}`, _config);
  span.setTag("route", ctx.path);
  ctx.tracerRootSpan = span;
  ctx.tracer = tracer;
  await next();
  span.finish();
});

// app router
router.get("/bc", async (ctx, next) => {
  const span = ctx.tracer.startSpan(`api:bc`, { childOf: ctx.tracerRootSpan });
  span.setTag("request:c", ":7073/c");
  try {
    throw Error("err");
  } catch (err) {
    span.setTag("error", true);
    span.log({
      level: "error",
      message: err.message
    });
  }
  const result = await request("http://localhost:7073/c", {
    tracer: ctx.tracer,
    rootSpan: ctx.tracerRootSpan
  });
  span.finish();
  ctx.body = "get :7072/b , hello b" + "\n" + result;
});

app.use(router.routes());

app.listen(7072, () => {
  console.log("\x1B[32m port : 7072 \x1B[39m");
});
bservice.js
cservice.js
const { initTracer } = require("jaeger-client");
const { FORMAT_HTTP_HEADERS } = require("opentracing");

// app use trace
const jaegerConfig = {
  serviceName: "c-service",
  sampler: { type: "const", param: 1 },
  reporter: {
    logSpans: true,
    collectorEndpoint: "http://localhost:14268/api/traces"
  }
};

const jaegerOptions = { baggagePrefix: "x-b3-" };

const tracer = initTracer(jaegerConfig, jaegerOptions);

app.use(async (ctx, next) => {
  const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers);
  const _config = parent ? { childOf: parent } : {};
  const span = tracer.startSpan(`${ctx.host}`, _config);
  span.setTag("route", ctx.path);
  ctx.tracerRootSpan = span;
  ctx.tracer = tracer;
  span.log({ event: "test-log_1", kk: "kk_1", vv: "vv_1" });
  span.log({ event: "test-log_2", kk: "kk_2", vv: "vv_2" });
  span.log({ event: "test-log_3", kk: "kk_3", vv: "vv_3" });
  span.logEvent("log-event_1", { a: 1, b: 1 });
  span.logEvent("log-event_2", { a: 2, b: 2 });
  await next();
  span.finish();
});

// app router
router.get("/c", async (ctx, next) => {
  ctx.body = "get :7073/c , hello c";
});

app.use(router.routes());

app.listen(7073, () => {
  console.log("\x1B[32m port : 7073 \x1B[39m");
});
cservice.js
請求地址: http://localhost:7071/abc ,打開 jaeger-query + jaeger-ui 服務的地址: http://localhost:16686/search。效果如圖:
 
go 服務之間(結合當前狀況,測試grpc的,實際也支持http和rpc)
這裏起了一個http服務main.go,從main這邊經過grpc請求服務service.go上的方法。第一次寫go程序,不喜勿噴...若有不正之處,感謝指出。
hello.gen.proto
syntax = "proto3";

option go_package = "hello_package";

package hello;

message HelloReq {
  string name = 1;
}

message HelloRes {
  string result = 1;
}

service HelloService {
  rpc SayHello(HelloReq) returns(HelloRes) {}
}
而後生成 grpc 所需文件
protoc -I helloService/ helloService/hello.gen.proto --go_out=plugins=grpc:helloservice
main.go
package main

import (
    "log"
    "context"
    "strings"
    "net/http"
    "encoding/json"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
    pb "goservice/helloService"
    opentracing "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    openLog "github.com/opentracing/opentracing-go/log"
    "github.com/uber/jaeger-client-go"
    jaegerCfg "github.com/uber/jaeger-client-go/config"
)

// metadata 讀寫
type MDReaderWriter struct {
    metadata.MD
}

// 爲了 opentracing.TextMapReader ,參考 opentracing 代碼
func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {
    for k, vs := range c.MD {
        for _, v := range vs {
            if err := handler(k, v); err != nil {
                return err
            }
        }
    }
    return nil
}

// 爲了 opentracing.TextMapWriter,參考 opentracing 代碼
func (c MDReaderWriter) Set(key, val string) {
    key = strings.ToLower(key)
    c.MD[key] = append(c.MD[key], val)
}

func NewJaegerTracer(serviceName string) (opentracing.Tracer, error) {
    // 配置項 參考代碼 https://github.com/jaegertracing/jaeger-client-go/blob/master/config/config.go
    cfg := jaegerCfg.Configuration{
        Sampler: &jaegerCfg.SamplerConfig{
            Type: "const",
            Param: 1,
        },
        Reporter: &jaegerCfg.ReporterConfig{
            LogSpans: true,
            CollectorEndpoint: "http://localhost:14268/api/traces",
        },
    }

    cfg.ServiceName = serviceName

    tracer, _, err := cfg.NewTracer(
        jaegerCfg.Logger(jaeger.StdLogger),
    )

    if err != nil {
        log.Println("tracer error ", err)
    }

    return tracer, err
}

// 此處參考 grpc文檔 https://godoc.org/google.golang.org/grpc#UnaryClientInterceptor
func interceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor{
    return func (ctx context.Context,
        method string,
        req,
        reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption) error {
        // 建立 rootSpan
        var rootCtx opentracing.SpanContext

        rootSpan := opentracing.SpanFromContext(ctx)
        if rootSpan != nil {
            rootCtx = rootSpan.Context()
        }

        span := tracer.StartSpan(
            method,
            opentracing.ChildOf(rootCtx),
            opentracing.Tag{"test","hahahahaha"},
            ext.SpanKindRPCClient,
        )

        defer span.Finish()

        md, succ := metadata.FromOutgoingContext(ctx)
        if !succ {
            md = metadata.New(nil)
        } else{
            md = md.Copy()
        }

        mdWriter := MDReaderWriter{md}

        // 注入 spanContext
        err := tracer.Inject(span.Context(), opentracing.TextMap, mdWriter)

        if err != nil {
            span.LogFields(openLog.String("inject error", err.Error()))
        }

        // new ctx ,並調用後續操做
        newCtx := metadata.NewOutgoingContext(ctx, md)
        err = invoker(newCtx, method, req, reply, cc, opts...)
        if err != nil {
            span.LogFields(openLog.String("call error", err.Error()))
        }
        return err
    }
}


func hello(w http.ResponseWriter, r *http.Request) {
  r.ParseForm();

    // new tracer
    tracer, err := NewJaegerTracer("mainService")
    if err != nil {
        log.Fatal("new tracer err ", err)
    }

    // dial options
    dialOpts := []grpc.DialOption{grpc.WithInsecure()}

    if tracer != nil {
        dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(interceptor(tracer)))
    }

    conn, err := grpc.Dial("localhost:8082", dialOpts...)
    if err != nil {
        log.Fatal("connect err ", err)
    }

    defer conn.Close()

    sv := pb.NewHelloServiceClient(conn)

    var name = "yeshou"
    if (len(r.Form) > 0 && len(r.Form["name"][0]) > 0) {
        name = r.Form["name"][0]
    }

    res, err := sv.SayHello(context.Background(), &pb.HelloReq{Name: name})
    if err != nil {
            log.Fatal("c.SayHello func error : ", err)
    }

    type HelloRes struct{
        Result    string  `json:"result"`
    }

    data := HelloRes{
        Result:  res.Result,
    }

    jsonData, err := json.Marshal(data)
    if err != nil {
        log.Fatal("server error : ", err)
    }

    w.Write(jsonData)
}

func main() {
    http.HandleFunc("/get_h", hello)

    err := http.ListenAndServe(":8081", nil)

    if err != nil {
        log.Fatal("Listen server err : ", err)
    }

}
main.go
service.go
package main

import (
    "log"
    "net"
    "context"
    "strings"
    "google.golang.org/grpc"
    "google.golang.org/grpc/grpclog"
    "google.golang.org/grpc/metadata"
    pb "goservice/helloService"
    opentracing "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/uber/jaeger-client-go"
    jaegerCfg "github.com/uber/jaeger-client-go/config"
)

// metadata 讀寫
type MDReaderWriter struct {
    metadata.MD
}

// 爲了 opentracing.TextMapReader ,參考 opentracing 代碼
func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error {
    for k, vs := range c.MD {
        for _, v := range vs {
            if err := handler(k, v); err != nil {
                return err
            }
        }
    }
    return nil
}

// 爲了 opentracing.TextMapWriter,參考 opentracing 代碼
func (c MDReaderWriter) Set(key, val string) {
    key = strings.ToLower(key)
    c.MD[key] = append(c.MD[key], val)
}

func NewJaegerTracer(serviceName string) (opentracing.Tracer, error) {
    cfg := jaegerCfg.Configuration{
        Sampler: &jaegerCfg.SamplerConfig{
            Type: "const",
            Param: 1,
        },
        Reporter: &jaegerCfg.ReporterConfig{
            LogSpans: true,
            CollectorEndpoint: "http://localhost:14268/api/traces",
        },
    }

    cfg.ServiceName = serviceName

    tracer, _, err := cfg.NewTracer(
        jaegerCfg.Logger(jaeger.StdLogger),
    )

    if err != nil {
        log.Println("tracer error ", err)
    }

    return tracer, err
}

// 此處參考 grpc文檔 https://godoc.org/google.golang.org/grpc#WithUnaryInterceptor
func interceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor{
    return func (ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler) (res interface{}, err error) {
            md, succ := metadata.FromIncomingContext(ctx)
            if !succ {
                md  = metadata.New(nil)
            }

            // 提取 spanContext
            spanContext, err := tracer.Extract(opentracing.TextMap, MDReaderWriter{md})
            if err != nil && err != opentracing.ErrSpanContextNotFound {
                grpclog.Errorf("extract from metadata err: %v", err)
            } else{
                span := tracer.StartSpan(
                    info.FullMethod,
                    ext.RPCServerOption(spanContext),
                    opentracing.Tag{Key: string(ext.Component), Value: "grpc"},
                    ext.SpanKindRPCServer,
                )
                defer span.Finish()
                ctx = opentracing.ContextWithSpan(ctx, span)
            }
            return handler(ctx, req)
    }
}

type server struct{}

func (s *server) SayHello(ctx context.Context, in *pb.HelloReq) (*pb.HelloRes, error) {
    return &pb.HelloRes{Result: "Hello " + in.Name}, nil
}

func main() {

    var svOpts []grpc.ServerOption
    tracer, err := NewJaegerTracer("serviceService")
    if err != nil {
        log.Fatal("new tracer err ", err)
    }

    if tracer != nil {
        svOpts = append(svOpts, grpc.UnaryInterceptor(interceptor(tracer)))
    }

    sv := grpc.NewServer(svOpts...)

    lis, err := net.Listen("tcp", ":8082")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    pb.RegisterHelloServiceServer(sv, &server{})
    if err := sv.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
service.go

請求地址: http://localhost:8081/get_h,打開地址: http://localhost:16686/search。效果如圖:node

相關連接
本文僅作 jaeger 使用的簡單參考,實際項目考慮到 Microservices 、 Service Mesh 、 Business Logic Logs 等等影響因素,tracing 會更具複雜性且有更多的坑得踩。
相關文章
相關標籤/搜索