本文主要研究一下rocketmq-client-go的TraceInterceptorgit
rocketmq-client-go-v2.0.0/producer/interceptor.gogithub
// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace. func WithTrace(traceCfg *primitive.TraceConfig) Option { return func(options *producerOptions) { ori := options.Interceptors options.Interceptors = make([]primitive.Interceptor, 0) options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg)) options.Interceptors = append(options.Interceptors, ori...) } }
rocketmq-client-go-v2.0.0/producer/interceptor.goapache
func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor { dispatcher := internal.NewTraceDispatcher(traceCfg) dispatcher.Start() return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error { beginT := time.Now() err := next(ctx, req, reply) producerCtx := primitive.GetProducerCtx(ctx) if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() { return next(ctx, req, reply) } // SendOneway && SendAsync has no reply. if reply == nil { return err } result := reply.(*primitive.SendResult) if result.RegionID == "" || !result.TraceOn { return err } sendSuccess := result.Status == primitive.SendOK costT := time.Since(beginT).Nanoseconds() / int64(time.Millisecond) storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2 traceBean := internal.TraceBean{ Topic: producerCtx.Message.Topic, Tags: producerCtx.Message.GetTags(), Keys: producerCtx.Message.GetKeys(), StoreHost: producerCtx.BrokerAddr, ClientHost: utils.LocalIP, BodyLength: len(producerCtx.Message.Body), MsgType: producerCtx.MsgType, MsgId: result.MsgID, OffsetMsgId: result.OffsetMsgID, StoreTime: storeT, } traceCtx := internal.TraceContext{ RequestId: primitive.CreateUniqID(), // set id TimeStamp: time.Now().UnixNano() / int64(time.Millisecond), TraceType: internal.Pub, GroupName: producerCtx.ProducerGroup, RegionId: result.RegionID, TraceBeans: []internal.TraceBean{traceBean}, CostTime: costT, IsSuccess: sendSuccess, } dispatcher.Append(traceCtx) return err } }
WithTrace方法在options.Interceptors後追加TraceInterceptor;而newTraceInterceptor方法則建立TraceInterceptorapp