世上本沒有路,走的人多了,便變成了路 -- 魯迅
jaeger-client (OpenTracing API 各語言的實現,用於在應用中塞入信息採集點) jaeger-agent (負責發送的進程,對 spans 進行處理併發送給 collector,監聽 spans 的 UDP 發送。設計這層是爲了做爲基礎組件部署到主機上,從 client 中抽象出了 collector 的發現和路由。注意:1.這層應該是部署在應用本地;2.若是配置報告的 endpoint,則直接將 spans 發送到 collector,不須要 agent。) jaeger-collector (收集追蹤 spans,並經過管道對追蹤數據進行處理。當前的管道支持追蹤的驗證、索引、轉換,最後存儲數據) data store (追蹤信息的存儲) jaeger-query (從存儲中檢索追蹤信息並經過 UI 展現) jaeger-ui (UI 展現層,基於 React)
基於以上的體系結構,本文關將注點放在 jaeger-client 部分,考慮怎麼實現服務之間和服務內部的 tracing。javascript
{ serviceName: "string", disable: "boolean", sampler: { type: "string", // required param: "number", // required hostPort: "string", host: "string", port: "number", refreshIntervalMs: "number" }, reporter: { logSpans: "boolean", agentHost: "string", agentPort: "number", collectorEndpoint: "string", username: "string", password: "string", flushIntervalMs: "number" }, throttler: { host: "string", port: "number", refreshIntervalMs: "number" } }
{ objects: { _tags: "object", // tags 信息,含 jaeger-version/hostname/ip/client-uuid _metrics: "object", // Metrics 度量實例 _serviceName: "string", // 服務名稱 _reporter: "object", // 提交實例 _sampler: "object", // 採樣器實例 _logger: "object", // 日誌實例,默認 NullLogger _baggageSetter: "object", // BaggageSetter 實例 _debugThrottler: "object", // DefaultThrottler 配置實例 _injectors: "object", // 注入器列表 _extractors: "object", // 提取器列表 _process: "object" // process 信息,含 serviceName/tags }, // 文件位置 ./jaeger-client-node/blob/master/src/tracer.js methods: { _startInternalSpan: "void", // 建立基礎 span ,供 startSpan 方法調用 / params: spanContext(SpanContext) operationName(string) startTime(number) userTags(any) internalTags(any) parentContext?(SpanContext) rpcServer(boolean) references(Array<Reference>) / retuen Span _report: "void", // 發起數據提交,提交到jaeger後端 / params: span(Span) registerInjector: "void", // 向 tracer 注入 "注入 SpanContext 內容的方式" / params: format(string) injector(Injector) registerExtractor: "void", // 向 tracer 注入 "提取 SpanContext 內容的方式" / params: format(string) extractor(Extractor) startSpan: "void", // 建立一個 Span / params: operationName(string) options?:{ operationName(string) childOf(SpanContext) references(Array<Reference>) tags(object) startTime(number) } inject: "void", // 將 SpanContext 注入到序列化格式的 carrier 中 / params: SpanContext(SpanContext) format(string) carrier(any) extract: "void", // 從序列化格式的 carrier 中提取 SpanContext / params: format(string) carrier(any) / return SpanContext close: "void", // 關閉 tracer,更新 spans,或執行回調函數 / params: callback now: "void", // 返回當前時間 _isDebugAllowed: "void" // 返回是否容許 debug } }
{ objects: { _tracer: "object", // <Tracer> _operationName: "string", // span 名稱 _spanContext: "object", // span 數據,_traceId/_spanId/_parentId/... _startTime: "number", // 時間戳 _logger: "object", // 日誌實例,默認 NullLogger _references: "object", // 引用列表 _baggageSetter: "object", // BaggageSetter 實例 _logs: "object", // span 的 logs 列表 _tags: "object", // span 的 tags 列表 _duration: "number" // 耗時 }, // 文件位置 ./jaeger-client-node/blob/master/src/span.js methods: { _normalizeBaggageKey: "void", // 返回一個規範化的key / params: key(string) / 返回標準化的 key,字母小寫化、使用破折號替換下劃線 setBaggageItem: "void", // 使用關聯的 key 設置 baggage 值 / params: key(string) value(any) / 返回當前Span getBaggageItem: "void", // 使用關聯的 key 獲取 baggage 值 / params: key(string) value(any) / 返回 baggage 值 context: "void", // 獲取當前 Span 的 SpanContext tracer: "void", // 獲取當前 Span 的 Tracer _isWriteable: "void", // 返回當前 Span 是否可寫 setOperationName: "void", // 給當前 Span 設置操做名稱 / params: operationName(string) / 返回當前 Span finish: "void", // 完成當前 Span / params: finishTime?(number) addTags: "void", // 向 Span 添加多個 tag / params: keyValuePairs(object) / 返回當前 Span setTag: "void", // 向 Span 添加單個 tag / params: key(string) value(any) / 返回當前 Span log: "void", // 向 Span 添加日誌事件或者負載 / params: keyValuePairs(object) timestamp?(number) / 返回當前 Span logEvent: "void", // 攜帶負載以記錄事件 / params: keyValuePairs(object) timestamp?(number) / 返回當前 Span _setSamplingPriority: "void" // 若是標誌已成功更新,則返回true,不然返回false / params: priority(number) (0 禁用採樣;1 啓用採樣) } }
const isErrorTag = ({ key, value }: KeyValuePair) => key === "error" && (value === true || value === "true");
span.setTag("error", true); span.log({ message: err.message }); span.finish();
效果如上面UI效果圖上span樣式所示。java
const Request = require("request"); const noop = () => {}; // request const request = (url, options) => { const method = (options && options.method) || "GET"; const headers = (options && options.headers) || {}; const tracer = (options && options.tracer) || { inject: noop, setTag: noop }; const rootSpan = (options && options.rootSpan) || {}; const _config = rootSpan ? { childOf: rootSpan } : {}; const span = tracer.startSpan(`${url}`, _config); span.setTag(Tags.HTTP_URL, url); span.setTag(Tags.HTTP_METHOD, method); tracer.inject(span, FORMAT_HTTP_HEADERS, headers); const promise = new Promise((resolve, reject) => { Request( { url: url, method: method, headers: headers }, (err, res, body) => { span.finish(); if (err) { console.log("request error : ", err); reject(err); } else { resolve(body); } } ); }); return promise; }; export default request
const { initTracer } = require("jaeger-client"); const { FORMAT_HTTP_HEADERS, Tags } = require("opentracing"); // app use trace const jaegerConfig = { serviceName: "a-service", sampler: { type: "const", param: 1 }, reporter: { logSpans: true, collectorEndpoint: "http://localhost:14268/api/traces" } }; const jaegerOptions = { baggagePrefix: "x-b3-" }; const tracer = initTracer(jaegerConfig, jaegerOptions); app.use(async (ctx, next) => { const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers); const _config = parent ? { childOf: parent } : {}; const span = tracer.startSpan(`${ctx.host}`, _config); span.setTag("route", ctx.path); ctx.tracerRootSpan = span; ctx.tracer = tracer; await next(); span.finish(); }); // app router router.get("/abc", async (ctx, next) => { const result = await request("http://localhost:7072/bc", { tracer: ctx.tracer, rootSpan: ctx.tracerRootSpan }); ctx.body = "get :7071/a , hello a" + "\n" + result; }); app.use(router.routes()); app.listen(7071, () => { console.log("\x1B[32m port : 7071 \x1B[39m"); });
const { initTracer } = require("jaeger-client"); const { FORMAT_HTTP_HEADERS, Tags } = require("opentracing"); // app use trace const jaegerConfig = { serviceName: "b-service", sampler: { type: "const", param: 1 }, reporter: { logSpans: true, collectorEndpoint: "http://localhost:14268/api/traces" } }; const jaegerOptions = { baggagePrefix: "x-b3-" }; const tracer = initTracer(jaegerConfig, jaegerOptions); app.use(async (ctx, next) => { const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers); const _config = parent ? { childOf: parent } : {}; const span = tracer.startSpan(`${ctx.host}`, _config); span.setTag("route", ctx.path); ctx.tracerRootSpan = span; ctx.tracer = tracer; await next(); span.finish(); }); // app router router.get("/bc", async (ctx, next) => { const span = ctx.tracer.startSpan(`api:bc`, { childOf: ctx.tracerRootSpan }); span.setTag("request:c", ":7073/c"); try { throw Error("err"); } catch (err) { span.setTag("error", true); span.log({ level: "error", message: err.message }); } const result = await request("http://localhost:7073/c", { tracer: ctx.tracer, rootSpan: ctx.tracerRootSpan }); span.finish(); ctx.body = "get :7072/b , hello b" + "\n" + result; }); app.use(router.routes()); app.listen(7072, () => { console.log("\x1B[32m port : 7072 \x1B[39m"); });
const { initTracer } = require("jaeger-client"); const { FORMAT_HTTP_HEADERS } = require("opentracing"); // app use trace const jaegerConfig = { serviceName: "c-service", sampler: { type: "const", param: 1 }, reporter: { logSpans: true, collectorEndpoint: "http://localhost:14268/api/traces" } }; const jaegerOptions = { baggagePrefix: "x-b3-" }; const tracer = initTracer(jaegerConfig, jaegerOptions); app.use(async (ctx, next) => { const parent = tracer.extract(FORMAT_HTTP_HEADERS, ctx.headers); const _config = parent ? { childOf: parent } : {}; const span = tracer.startSpan(`${ctx.host}`, _config); span.setTag("route", ctx.path); ctx.tracerRootSpan = span; ctx.tracer = tracer; span.log({ event: "test-log_1", kk: "kk_1", vv: "vv_1" }); span.log({ event: "test-log_2", kk: "kk_2", vv: "vv_2" }); span.log({ event: "test-log_3", kk: "kk_3", vv: "vv_3" }); span.logEvent("log-event_1", { a: 1, b: 1 }); span.logEvent("log-event_2", { a: 2, b: 2 }); await next(); span.finish(); }); // app router router.get("/c", async (ctx, next) => { ctx.body = "get :7073/c , hello c"; }); app.use(router.routes()); app.listen(7073, () => { console.log("\x1B[32m port : 7073 \x1B[39m"); });
syntax = "proto3"; option go_package = "hello_package"; package hello; message HelloReq { string name = 1; } message HelloRes { string result = 1; } service HelloService { rpc SayHello(HelloReq) returns(HelloRes) {} }
protoc -I helloService/ helloService/hello.gen.proto --go_out=plugins=grpc:helloservice
package main import ( "log" "context" "strings" "net/http" "encoding/json" "google.golang.org/grpc" "google.golang.org/grpc/metadata" pb "goservice/helloService" opentracing "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" openLog "github.com/opentracing/opentracing-go/log" "github.com/uber/jaeger-client-go" jaegerCfg "github.com/uber/jaeger-client-go/config" ) // metadata 讀寫 type MDReaderWriter struct { metadata.MD } // 爲了 opentracing.TextMapReader ,參考 opentracing 代碼 func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error { for k, vs := range c.MD { for _, v := range vs { if err := handler(k, v); err != nil { return err } } } return nil } // 爲了 opentracing.TextMapWriter,參考 opentracing 代碼 func (c MDReaderWriter) Set(key, val string) { key = strings.ToLower(key) c.MD[key] = append(c.MD[key], val) } func NewJaegerTracer(serviceName string) (opentracing.Tracer, error) { // 配置項 參考代碼 https://github.com/jaegertracing/jaeger-client-go/blob/master/config/config.go cfg := jaegerCfg.Configuration{ Sampler: &jaegerCfg.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &jaegerCfg.ReporterConfig{ LogSpans: true, CollectorEndpoint: "http://localhost:14268/api/traces", }, } cfg.ServiceName = serviceName tracer, _, err := cfg.NewTracer( jaegerCfg.Logger(jaeger.StdLogger), ) if err != nil { log.Println("tracer error ", err) } return tracer, err } // 此處參考 grpc文檔 https://godoc.org/google.golang.org/grpc#UnaryClientInterceptor func interceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor{ return func (ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // 建立 rootSpan var rootCtx opentracing.SpanContext rootSpan := opentracing.SpanFromContext(ctx) if rootSpan != nil { rootCtx = rootSpan.Context() } span := tracer.StartSpan( method, opentracing.ChildOf(rootCtx), opentracing.Tag{"test","hahahahaha"}, ext.SpanKindRPCClient, ) defer span.Finish() md, succ := metadata.FromOutgoingContext(ctx) if !succ { md = metadata.New(nil) } else{ md = md.Copy() } mdWriter := MDReaderWriter{md} // 注入 spanContext err := tracer.Inject(span.Context(), opentracing.TextMap, mdWriter) if err != nil { span.LogFields(openLog.String("inject error", err.Error())) } // new ctx ,並調用後續操做 newCtx := metadata.NewOutgoingContext(ctx, md) err = invoker(newCtx, method, req, reply, cc, opts...) if err != nil { span.LogFields(openLog.String("call error", err.Error())) } return err } } func hello(w http.ResponseWriter, r *http.Request) { r.ParseForm(); // new tracer tracer, err := NewJaegerTracer("mainService") if err != nil { log.Fatal("new tracer err ", err) } // dial options dialOpts := []grpc.DialOption{grpc.WithInsecure()} if tracer != nil { dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(interceptor(tracer))) } conn, err := grpc.Dial("localhost:8082", dialOpts...) if err != nil { log.Fatal("connect err ", err) } defer conn.Close() sv := pb.NewHelloServiceClient(conn) var name = "yeshou" if (len(r.Form) > 0 && len(r.Form["name"][0]) > 0) { name = r.Form["name"][0] } res, err := sv.SayHello(context.Background(), &pb.HelloReq{Name: name}) if err != nil { log.Fatal("c.SayHello func error : ", err) } type HelloRes struct{ Result string `json:"result"` } data := HelloRes{ Result: res.Result, } jsonData, err := json.Marshal(data) if err != nil { log.Fatal("server error : ", err) } w.Write(jsonData) } func main() { http.HandleFunc("/get_h", hello) err := http.ListenAndServe(":8081", nil) if err != nil { log.Fatal("Listen server err : ", err) } }
package main import ( "log" "net" "context" "strings" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" pb "goservice/helloService" opentracing "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/uber/jaeger-client-go" jaegerCfg "github.com/uber/jaeger-client-go/config" ) // metadata 讀寫 type MDReaderWriter struct { metadata.MD } // 爲了 opentracing.TextMapReader ,參考 opentracing 代碼 func (c MDReaderWriter) ForeachKey(handler func(key, val string) error) error { for k, vs := range c.MD { for _, v := range vs { if err := handler(k, v); err != nil { return err } } } return nil } // 爲了 opentracing.TextMapWriter,參考 opentracing 代碼 func (c MDReaderWriter) Set(key, val string) { key = strings.ToLower(key) c.MD[key] = append(c.MD[key], val) } func NewJaegerTracer(serviceName string) (opentracing.Tracer, error) { cfg := jaegerCfg.Configuration{ Sampler: &jaegerCfg.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &jaegerCfg.ReporterConfig{ LogSpans: true, CollectorEndpoint: "http://localhost:14268/api/traces", }, } cfg.ServiceName = serviceName tracer, _, err := cfg.NewTracer( jaegerCfg.Logger(jaeger.StdLogger), ) if err != nil { log.Println("tracer error ", err) } return tracer, err } // 此處參考 grpc文檔 https://godoc.org/google.golang.org/grpc#WithUnaryInterceptor func interceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor{ return func (ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (res interface{}, err error) { md, succ := metadata.FromIncomingContext(ctx) if !succ { md = metadata.New(nil) } // 提取 spanContext spanContext, err := tracer.Extract(opentracing.TextMap, MDReaderWriter{md}) if err != nil && err != opentracing.ErrSpanContextNotFound { grpclog.Errorf("extract from metadata err: %v", err) } else{ span := tracer.StartSpan( info.FullMethod, ext.RPCServerOption(spanContext), opentracing.Tag{Key: string(ext.Component), Value: "grpc"}, ext.SpanKindRPCServer, ) defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) } return handler(ctx, req) } } type server struct{} func (s *server) SayHello(ctx context.Context, in *pb.HelloReq) (*pb.HelloRes, error) { return &pb.HelloRes{Result: "Hello " + in.Name}, nil } func main() { var svOpts []grpc.ServerOption tracer, err := NewJaegerTracer("serviceService") if err != nil { log.Fatal("new tracer err ", err) } if tracer != nil { svOpts = append(svOpts, grpc.UnaryInterceptor(interceptor(tracer))) } sv := grpc.NewServer(svOpts...) lis, err := net.Listen("tcp", ":8082") if err != nil { log.Fatalf("failed to listen: %v", err) } pb.RegisterHelloServiceServer(sv, &server{}) if err := sv.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
請求地址: http://localhost:8081/get_h,打開地址: http://localhost:16686/search。效果如圖:node