原文地址:Jaeger源碼分析——窺視分佈式系統實現html
分析Jaeger源碼主要有如下緣由:mysql
公司正在使用Jaeger,經過了解其源碼,能夠更好的把控這套系統。git
瞭解分佈式系統的設計github
提高對golang的理解golang
提高我的英語算法
分析的版本爲最新版本0.10.0,時間2017-11-23sql
agent處於jaeger-client和collector之間,屬於代理的做用,主要是把client發送過來的數據從thrift轉爲Batch,並經過RPC批量提交到collector。數據庫
github.com/jaegertracing/jaeger/cmd/agent/app/flags.go #35 var defaultProcessors = []struct { model model protocol protocol hostPort string }{ {model: "zipkin", protocol: "compact", hostPort: ":5775"}, {model: "jaeger", protocol: "compact", hostPort: ":6831"}, {model: "jaeger", protocol: "binary", hostPort: ":6832"}, }
在agent開啓時初始化了3個UDP服務編程
每一個服務對應處理不一樣的數據格式緩存
官方推薦使用6831端口接收數據
github.com/jaegertracing/jaeger/cmd/agent/app/servers/tbuffered_server.go #80 func (s *TBufferedServer) Serve() { atomic.StoreUint32(&s.serving, 1) for s.IsServing() { readBuf := s.readBufPool.Get().(*ReadBuf) n, err := s.transport.Read(readBuf.bytes) if err == nil { readBuf.n = n s.metrics.PacketSize.Update(int64(n)) select { case s.dataChan <- readBuf: s.metrics.PacketsProcessed.Inc(1) s.updateQueueSize(1) default: //這裏須要注意,若是寫比處理快,agent將會扔掉超出的部分數據 s.metrics.PacketsDropped.Inc(1) } } else { s.metrics.ReadError.Inc(1) } } }
每個UDP服務端都有本身單獨的隊列和worker,每一個隊列(長度默認1000)都會有50個(協成)worker消費隊列的數據,也能夠根據系統負載調節隊列和worker的大小。
增長隊列長度(default 1000) --processor.jaeger-compact.server-queue-size
增長worker數 (default 50) --processor.jaeger-compact.workers
go初始化一個服務很簡單,使用for{}的形式就能實現。可是啓動了就要考慮如何關閉,總不能直接強制關閉吧?請求處理了一半被中斷,致使髒數據出現,顯然不是咱們想要的結果,因此有優雅關閉方式。實現優雅關閉的方式大體是:主服務接收信號,而後通知子服務執行完當前操做就不要再執行。
下面來看看NSQ和Jaeger通知子服務中止的實現方式:
NSQ
github.com/nsqio/nsq/nsqd/topic.go #215 func (t *Topic) messagePump() { ...... for { select { case msg = <-memoryMsgChan: ...... case <-t.exitChan: goto exit } ...... } exit: t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name) }
Jaeger
func (s *TBufferedServer) Serve() { atomic.StoreUint32(&s.serving, 1) for s.IsServing() { ...... } }
在通知子服務要中止執行的實現上,NSQ和Jaeger的子服務都是留出一個入口,主服務經過這個入口通知子服務。不一樣的是在中止這步上:
NSQ使用chan+goto,exitChan接收到信號,執行goto,跳出for循環。
Jaeger使用原子操做,經過原子操做把s.serving設爲0,跳出for循環。
網上有篇博客對臨時對象池介紹得挺詳細的《GO併發編程實戰》—— 臨時對象池。臨時對象池的做用是:存放可被複用值,減小垃圾回收。
要想發揮對象池的做用,先要確保池子非空。若是從空池子獲取值,只會從新New一個值,達不到複用的效果。因此通常用法都是先Get,再Put。
readBuf := s.readBufPool.Get().(*ReadBuf)
從上面代碼中能夠看出Agent是想經過對象池複用「*ReadBuf」,可是並無看到Put這步,由於這步放在worker那邊處理。
github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go #124 func (s *TBufferedServer) DataRecd(buf *ReadBuf) { s.updateQueueSize(-1) s.readBufPool.Put(buf) }
爲何不在數據放入隊列的時候就把「ReadBuf」Put到池子呢?這是由當前的場景決定的。 首先「ReadBuf」是一個指針,第二指針會被放到 chan裏,在這種狀況下,若是chan出現了數據堆積(worker處理不完隊列數據),當Agent接到client數據時,因爲複用「*ReadBuf」,就形成了chan的全部數據和新數據同樣的錯亂問題,例子。因此要想複用值,只能在worker消費了隊列數據後再Put回池子。
看完Agent的對象池使用,再來看看NSQ的對象池使用。
github.com/nsqio/nsq/nsqd/topic.go #197 func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() //b => bp.Get().(*bytes.Buffer) err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) ...... } return nil }
這裏的使用相對容易理解點,先Get「bytes.Buffer」,再處理m數據,最後再把「bytes.Buffer」Put到池中。和Agent不一樣,writeMessageToBackend不會堆積數據,出現數據錯亂的狀況。還有一個小細節,當把「*bytes.Buffer」Put到池子再Get出來,b還會保留上次處理的數據,因此NSQ會清空數據,使用一個乾淨的值。
Agent的服務隊列是有長度限制(default 1000),若是堆積超過1000個,Agent就會絕不憐憫的把數據丟掉。固然在這裏並無不妥,Jaeger的定位就是一套日誌系統,不太看重數據的可靠性。若是要想減小數據丟失的問題,可經過配置或增長Agent節點。由於Jaeger和NSQ對於數據的定位不同,因此就不對比這部分功能。NSQ比較注重數據的可靠性。
github.com/jaegertracing/jaeger/cmd/agent/app/processors/thrift_processor.go #104 func (s *ThriftProcessor) processBuffer() { for readBuf := range s.server.DataChan() { protocol := s.protocolPool.Get().(thrift.TProtocol) protocol.Transport().Write(readBuf.GetBytes()) //這步就是把「*ReadBuf」Put到池子 s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer //將數據從thrift解析成Batch並提交 if ok, _ := s.handler.Process(protocol, protocol); !ok { // TODO log the error s.metrics.HandlerProcessError.Inc(1) } s.protocolPool.Put(protocol) } s.processing.Done() }
這裏是一個worker的實現,在啓動Agent的時候就初始化了150個worker來處理隊列數據。消耗隊列使用for + range的方式,不是使用 select + chan的方式,關於這2種方式的使用介紹能夠看Go中的Channel——range和select。這裏Agent偷懶了,沒有考慮到優雅關閉,若是隊列堆積了數據,而Agent被重啓隊列的數據就會丟失。
github.com/jaegertracing/jaeger/thrift-gen/agent/agent.go #187 func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { args := AgentEmitBatchArgs{} if err = args.Read(iprot); err != nil { iprot.ReadMessageEnd() return false, err } iprot.ReadMessageEnd() var err2 error if err2 = p.handler.EmitBatch(args.Batch); err2 != nil { return true, err2 } return true, nil }
解析thrift是一件很麻煩的事,這種格式的數據是給機器看得,須要按照指定的格式一步一步解析出來,不像Json那麼方便,可是thrift又確實能減小佔用的空間。
github.com/jaegertracing/jaeger/thrift-gen/jaeger/tchan-jaeger.go #39 func (c *tchanCollectorClient) SubmitBatches(ctx thrift.Context, batches []*Batch) ([]*BatchSubmitResponse, error) { var resp CollectorSubmitBatchesResult args := CollectorSubmitBatchesArgs{ Batches: batches, } success, err := c.client.Call(ctx, c.thriftService, "submitBatches", &args, &resp) if err == nil && !success { switch { default: err = fmt.Errorf("received no result or unknown exception for submitBatches") } } return resp.GetSuccess(), err }
Agent把數據提交到Collector是經過RPC框架TChannel,框架由Uber開發,使用TChannel,Agent能夠把數據批量提交到Collector。這個框架提供了一個頗有用的特性:上下文傳輸。爲何呢?說說咱們遇到的一個問題:RPC開發的接口,業務方按需傳入函數的參數調用便可,這樣的方式在前期業務不會產生問題。可是隨着公司發展,版本的迭代,一個接口須要按照客戶端版本進行兼容是很常見的事情,這樣就存在一個問題,做爲RPC的服務端和業務方的調用是跨進程,在上下文沒有保持一致的時候,RPC服務端不知道客戶端版本,很難對此進行兼容。是增長參數?仍是增長另外一個服務化接口?這些方法都不夠友好,最好是在不須要業務方改動的狀況下處理這個問題,這時上下文傳輸就體現它的做用了。
不憐憫數據在Jaeger隨處可見,從上面代碼能夠看出,若是提交失敗,數據也同樣丟失,沒有重試,沒有從新放入隊列等操做。
Collector收集數據,把數據保存進數據庫,雖然職責不同,但在程序設計上和Agent是同樣的,能夠從它們的實現上看出屬於不一樣開發人員分工開發完成。下面咱們也是分3步拆解Collector的實現。
Collector是使用TChannel實現的RPC服務端,在啓動時就開啓了2個基於TCP的RPC服務,一個用來接收Jaeger格式數據,一個接收Zipkin格式數據。
github.com/jaegertracing/jaeger/cmd/collector/main.go # 100 ...... ch, err := tchannel.NewChannel(serviceName, &tchannel.ChannelOptions{}) if err != nil { logger.Fatal("Unable to create new TChannel", zap.Error(err)) } server := thrift.NewServer(ch) zipkinSpansHandler, jaegerBatchesHandler := handlerBuilder.BuildHandlers() server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler)) server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler)) portStr := ":" + strconv.Itoa(builderOpts.CollectorPort) listener, err := net.Listen("tcp", portStr) if err != nil { logger.Fatal("Unable to start listening on channel", zap.Error(err)) } ch.Serve(listener) ......
github.com/jaegertracing/jaeger/cmd/collector/app/span_handler.go #69 func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) { responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches)) for _, batch := range batches { mSpans := make([]*model.Span, 0, len(batch.Spans)) for _, span := range batch.Spans { mSpan := jConv.ToDomainSpan(span, batch.Process) mSpans = append(mSpans, mSpan) } oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType) if err != nil { return nil, err } ...... } return responses, nil }
這裏就是RPC服務端接收數據的地方,通過處理後數據會被放入到隊列。
github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #76 func (q *BoundedQueue) Produce(item interface{}) bool { if atomic.LoadInt32(&q.stopped) != 0 { q.onDroppedItem(item) return false } select { case q.items <- item: atomic.AddInt32(&q.size, 1) return true default: if q.onDroppedItem != nil { q.onDroppedItem(item) } return false } }
在這裏Collector對隊列的操做進行了抽象封裝成BoundedQueue,對讀代碼帶來了便利。BoundedQueue的實現基於 select + chan和Agent的隊列有相同的功能,在生產和消費基礎上實現了優雅中止隊列和查看隊列長度。Collector隊列的數據堆積到2000條,也會絕不憐憫的把數據丟掉。固然這些也是能夠調節的:
--collector.queue-size (default 2000)
--collector.num-workers (default 50)
github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #53 func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) { var startWG sync.WaitGroup for i := 0; i < num; i++ { q.stopWG.Add(1) startWG.Add(1) go func() { startWG.Done() defer q.stopWG.Done() for { select { case item := <-q.items: atomic.AddInt32(&q.size, -1) consumer(item) case <-q.stopCh: return } } }() } startWG.Wait() }
這裏有一步不是很明白,爲何要使用」startWG「確認worker啓動完成?不用會出現什麼問題?
官方回覆: to ensure all consumer goroutines are running by the time we return from this function
優雅關閉隊列方式:close(q.stopCh)
github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/writer.go #122 func (s *SpanWriter) WriteSpan(span *model.Span) error { ds := dbmodel.FromDomain(span) mainQuery := s.session.Query( insertSpan, ds.TraceID, ds.SpanID, ds.SpanHash, ds.ParentID, ds.OperationName, ds.Flags, ds.StartTime, ds.Duration, ds.Tags, ds.Logs, ds.Refs, ds.Process, ) if err := s.writerMetrics.traces.Exec(mainQuery, s.logger); err != nil { return s.logError(ds, err, "Failed to insert span", s.logger) } if err := s.saveServiceNameAndOperationName(ds.ServiceName, ds.OperationName); err != nil { // should this be a soft failure? return s.logError(ds, err, "Failed to insert service name and operation name", s.logger) } ...... return nil }
在把ServiceName和OperationName保存到cassandra的時候作了特別的操做,使用LRU算法進行緩存。這一步緩存應該是爲了減小對cassandra查詢,減小查詢壓力。
github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/service_names.go #69 func (s *ServiceNamesStorage) Write(serviceName string) error { var err error query := s.session.Query(s.InsertStmt) if inCache := checkWriteCache(serviceName, s.serviceNames, s.writeCacheTTL); !inCache { q := query.Bind(serviceName) err2 := s.metrics.Exec(q, s.logger) if err2 != nil { err = err2 } } return err }
Collector在創建緩存的順序上先放入緩存再放入數據庫。查詢方式:key/value。
既然是緩存就會有失效時間(default 12h),而Jaeger默認保存數據2天,因此是否會存在重複保存出錯的狀況?由於serviceName是主鍵索引。
CREATE TABLE IF NOT EXISTS jaeger_v1_dc.service_names ( service_name text, PRIMARY KEY (service_name) )
這種狀況出如今mysql一定會報錯,但在cassandra就不會有這種狀況。
cassandra
cqlsh:jaeger_v1_dc> select * from service_names1; service_name -------------- test (1 rows) S lsh:jaeger_v1_dc> INSERT INTO service_names1 (service_name) VALUE ... ('test');
mysql
mysql> select * from service_names1; +--------------+ | service_name | +--------------+ | test | +--------------+ 1 row in set (0.00 sec) mysql> insert into service_names1 (service_name) values ('test'); ERROR 1062 (23000): Duplicate entry 'test' for key 'PRIMARY'
驚奇吧?雖然沒有報錯,但也會保證惟一性。有興趣的同窗可簡單瞭解一下基本用法,語法和mysql很像。關於cassandra咱們也是摸着石頭過河,不作過多描述。
NSQ | Jaeger | |
---|---|---|
目錄名 | 小寫/下劃線 | 小寫/中橫線 |
函數名 | 小駝峯 | 小駝峯 |
文件名 | 下劃線 | 下劃線 |
變量 | 小駝峯 | 小駝峯 |
常量 | 小駝峯 | 小駝峯 |
包名 | 當前目錄名 | 當前目錄名 |
請求地址 | 下劃線 | *小寫 |
請求參數 | *小寫 | 小駝峯 |
返回參數 | 下劃線 | 小駝峯 |
命令行參數 | 中橫線 | 前綴+點+中橫線 |
打」*「是因爲沒有找到足夠多的參照例子。
Jaeger向我展現了不少東西:UDP使用,優雅關閉,臨時對象池,LRU算法實現等。不僅僅是golang方面,還有程序設計、服務設計上,Agent、Collector、Query3個服務的職責都很單一,這應該是來源微服務思想的劃分。有不少東西須要自行消化,也有不少東西我沒有注意到,只看我的好奇的部分,但收穫也挺多。總結就是:Get到知識了!!