Jaeger源碼分析——窺視分佈式系統實現

image

原文地址:Jaeger源碼分析——窺視分佈式系統實現html

前言

分析Jaeger源碼主要有如下緣由:mysql

  • 公司正在使用Jaeger,經過了解其源碼,能夠更好的把控這套系統。git

  • 瞭解分佈式系統的設計github

  • 提高對golang的理解golang

  • 提高我的英語算法

分析的版本爲最新版本0.10.0,時間2017-11-23sql

Agent ——3部曲

 agent處於jaeger-client和collector之間,屬於代理的做用,主要是把client發送過來的數據從thrift轉爲Batch,並經過RPC批量提交到collector。數據庫

初始化agent

github.com/jaegertracing/jaeger/cmd/agent/app/flags.go #35
var defaultProcessors = []struct {
    model    model
    protocol protocol
    hostPort string
}{
    {model: "zipkin", protocol: "compact", hostPort: ":5775"},
    {model: "jaeger", protocol: "compact", hostPort: ":6831"},
    {model: "jaeger", protocol: "binary", hostPort: ":6832"},
}
  • 在agent開啓時初始化了3個UDP服務編程

  • 每一個服務對應處理不一樣的數據格式緩存

  • 官方推薦使用6831端口接收數據

接收Jaeger-client的數據

github.com/jaegertracing/jaeger/cmd/agent/app/servers/tbuffered_server.go #80
func (s *TBufferedServer) Serve() {
    atomic.StoreUint32(&s.serving, 1)
    for s.IsServing() {
        readBuf := s.readBufPool.Get().(*ReadBuf)
        n, err := s.transport.Read(readBuf.bytes)
        if err == nil {
            readBuf.n = n
            s.metrics.PacketSize.Update(int64(n))
            select {
            case s.dataChan <- readBuf:
                s.metrics.PacketsProcessed.Inc(1)
                s.updateQueueSize(1)
            default:
               //這裏須要注意,若是寫比處理快,agent將會扔掉超出的部分數據
               s.metrics.PacketsDropped.Inc(1)
            }
        } else {
            s.metrics.ReadError.Inc(1)
        }
    }
}

 每個UDP服務端都有本身單獨的隊列和worker,每一個隊列(長度默認1000)都會有50個(協成)worker消費隊列的數據,也能夠根據系統負載調節隊列和worker的大小。

  • 增長隊列長度(default 1000) --processor.jaeger-compact.server-queue-size

  • 增長worker數 (default 50) --processor.jaeger-compact.workers

優雅關閉

 go初始化一個服務很簡單,使用for{}的形式就能實現。可是啓動了就要考慮如何關閉,總不能直接強制關閉吧?請求處理了一半被中斷,致使髒數據出現,顯然不是咱們想要的結果,因此有優雅關閉方式。實現優雅關閉的方式大體是:主服務接收信號,而後通知子服務執行完當前操做就不要再執行。
 下面來看看NSQJaeger通知子服務中止的實現方式:

  • NSQ

github.com/nsqio/nsq/nsqd/topic.go #215
 func (t *Topic) messagePump() {
    ......
    for {
        select {
        case msg = <-memoryMsgChan:
    ......
        case <-t.exitChan:
            goto exit
        }
    ......
    }
    exit:
    t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name)
}
  • Jaeger

func (s *TBufferedServer) Serve() {
    atomic.StoreUint32(&s.serving, 1)
    for s.IsServing() {
        ......
    }
}

 在通知子服務要中止執行的實現上,NSQ和Jaeger的子服務都是留出一個入口,主服務經過這個入口通知子服務。不一樣的是在中止這步上:

  • NSQ使用chan+goto,exitChan接收到信號,執行goto,跳出for循環。

  • Jaeger使用原子操做,經過原子操做把s.serving設爲0,跳出for循環。

臨時對象池

 網上有篇博客對臨時對象池介紹得挺詳細的《GO併發編程實戰》—— 臨時對象池。臨時對象池的做用是:存放可被複用值,減小垃圾回收。
 要想發揮對象池的做用,先要確保池子非空。若是從空池子獲取值,只會從新New一個值,達不到複用的效果。因此通常用法都是先Get,再Put。

readBuf := s.readBufPool.Get().(*ReadBuf)

 從上面代碼中能夠看出Agent是想經過對象池複用「*ReadBuf」,可是並無看到Put這步,由於這步放在worker那邊處理。

github.com/uber/jaeger/cmd/agent/app/servers/tbuffered_server.go #124
func (s *TBufferedServer) DataRecd(buf *ReadBuf) {
    s.updateQueueSize(-1)
    s.readBufPool.Put(buf)
}

 爲何不在數據放入隊列的時候就把「ReadBuf」Put到池子呢?這是由當前的場景決定的。 首先「ReadBuf」是一個指針,第二指針會被放到 chan裏,在這種狀況下,若是chan出現了數據堆積(worker處理不完隊列數據),當Agent接到client數據時,因爲複用「*ReadBuf」,就形成了chan的全部數據和新數據同樣的錯亂問題,例子。因此要想複用值,只能在worker消費了隊列數據後再Put回池子。
看完Agent的對象池使用,再來看看NSQ的對象池使用。

github.com/nsqio/nsq/nsqd/topic.go #197
func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        //b => bp.Get().(*bytes.Buffer)
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        ......
    }
    return nil
}

 這裏的使用相對容易理解點,先Get「bytes.Buffer」,再處理m數據,最後再把「bytes.Buffer」Put到池中。和Agent不一樣,writeMessageToBackend不會堆積數據,出現數據錯亂的狀況。還有一個小細節,當把「*bytes.Buffer」Put到池子再Get出來,b還會保留上次處理的數據,因此NSQ會清空數據,使用一個乾淨的值。

不憐憫數據

 Agent的服務隊列是有長度限制(default 1000),若是堆積超過1000個,Agent就會絕不憐憫的把數據丟掉。固然在這裏並無不妥,Jaeger的定位就是一套日誌系統,不太看重數據的可靠性。若是要想減小數據丟失的問題,可經過配置或增長Agent節點。由於Jaeger和NSQ對於數據的定位不同,因此就不對比這部分功能。NSQ比較注重數據的可靠性。

提交數據

github.com/jaegertracing/jaeger/cmd/agent/app/processors/thrift_processor.go #104
func (s *ThriftProcessor) processBuffer() {
    for readBuf := range s.server.DataChan() {
        protocol := s.protocolPool.Get().(thrift.TProtocol)
        protocol.Transport().Write(readBuf.GetBytes())
        //這步就是把「*ReadBuf」Put到池子
        s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer
    
        //將數據從thrift解析成Batch並提交
        if ok, _ := s.handler.Process(protocol, protocol); !ok {
            // TODO log the error
            s.metrics.HandlerProcessError.Inc(1)
        }
        s.protocolPool.Put(protocol)
    }
    s.processing.Done()
}

消耗隊列數據

 這裏是一個worker的實現,在啓動Agent的時候就初始化了150個worker來處理隊列數據。消耗隊列使用for + range的方式,不是使用 select + chan的方式,關於這2種方式的使用介紹能夠看Go中的Channel——range和select。這裏Agent偷懶了,沒有考慮到優雅關閉,若是隊列堆積了數據,而Agent被重啓隊列的數據就會丟失。

數據從thrift轉爲Batch

github.com/jaegertracing/jaeger/thrift-gen/agent/agent.go #187
func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {
    args := AgentEmitBatchArgs{}
    if err = args.Read(iprot); err != nil {
        iprot.ReadMessageEnd()
        return false, err
    }

    iprot.ReadMessageEnd()
    var err2 error
    if err2 = p.handler.EmitBatch(args.Batch); err2 != nil {
        return true, err2
    }
    return true, nil
}

 解析thrift是一件很麻煩的事,這種格式的數據是給機器看得,須要按照指定的格式一步一步解析出來,不像Json那麼方便,可是thrift又確實能減小佔用的空間

提交數據

github.com/jaegertracing/jaeger/thrift-gen/jaeger/tchan-jaeger.go #39
func (c *tchanCollectorClient) SubmitBatches(ctx thrift.Context, batches []*Batch) ([]*BatchSubmitResponse, error) {
    var resp CollectorSubmitBatchesResult
    args := CollectorSubmitBatchesArgs{
        Batches: batches,
    }
    success, err := c.client.Call(ctx, c.thriftService, "submitBatches", &args, &resp)
    if err == nil && !success {
        switch {
        default:
            err = fmt.Errorf("received no result or unknown exception for submitBatches")
        }
    }

    return resp.GetSuccess(), err
}

 Agent把數據提交到Collector是經過RPC框架TChannel,框架由Uber開發,使用TChannel,Agent能夠把數據批量提交到Collector。這個框架提供了一個頗有用的特性:上下文傳輸。爲何呢?說說咱們遇到的一個問題:RPC開發的接口,業務方按需傳入函數的參數調用便可,這樣的方式在前期業務不會產生問題。可是隨着公司發展,版本的迭代,一個接口須要按照客戶端版本進行兼容是很常見的事情,這樣就存在一個問題,做爲RPC的服務端和業務方的調用是跨進程,在上下文沒有保持一致的時候,RPC服務端不知道客戶端版本,很難對此進行兼容。是增長參數?仍是增長另外一個服務化接口?這些方法都不夠友好,最好是在不須要業務方改動的狀況下處理這個問題,這時上下文傳輸就體現它的做用了。
 不憐憫數據在Jaeger隨處可見,從上面代碼能夠看出,若是提交失敗,數據也同樣丟失,沒有重試,沒有從新放入隊列等操做。

Collectore ——3部曲

 Collector收集數據,把數據保存進數據庫,雖然職責不同,但在程序設計上和Agent是同樣的,能夠從它們的實現上看出屬於不一樣開發人員分工開發完成。下面咱們也是分3步拆解Collector的實現。

初始化Collector

 Collector是使用TChannel實現的RPC服務端,在啓動時就開啓了2個基於TCP的RPC服務,一個用來接收Jaeger格式數據,一個接收Zipkin格式數據。

github.com/jaegertracing/jaeger/cmd/collector/main.go # 100
......
ch, err := tchannel.NewChannel(serviceName, &tchannel.ChannelOptions{})
if err != nil {
    logger.Fatal("Unable to create new TChannel", zap.Error(err))
}
server := thrift.NewServer(ch)
zipkinSpansHandler, jaegerBatchesHandler := handlerBuilder.BuildHandlers()
server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))

portStr := ":" + strconv.Itoa(builderOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)
if err != nil {
    logger.Fatal("Unable to start listening on channel", zap.Error(err))
}
ch.Serve(listener)
......

接收Agent的數據

github.com/jaegertracing/jaeger/cmd/collector/app/span_handler.go #69
func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
    responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
    for _, batch := range batches {
        mSpans := make([]*model.Span, 0, len(batch.Spans))
        for _, span := range batch.Spans {
            mSpan := jConv.ToDomainSpan(span, batch.Process)
            mSpans = append(mSpans, mSpan)
        }
        oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType)
        if err != nil {
            return nil, err
        }
        ......
    }
    return responses, nil
}

 這裏就是RPC服務端接收數據的地方,通過處理後數據會被放入到隊列。

github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #76
func (q *BoundedQueue) Produce(item interface{}) bool {
    if atomic.LoadInt32(&q.stopped) != 0 {
        q.onDroppedItem(item)
        return false
    }
    select {
    case q.items <- item:
        atomic.AddInt32(&q.size, 1)
        return true
    default:
        if q.onDroppedItem != nil {
            q.onDroppedItem(item)
        }
        return false
    }
}

 在這裏Collector對隊列的操做進行了抽象封裝成BoundedQueue,對讀代碼帶來了便利。BoundedQueue的實現基於 select + chan和Agent的隊列有相同的功能,在生產和消費基礎上實現了優雅中止隊列和查看隊列長度。Collector隊列的數據堆積到2000條,也會絕不憐憫的把數據丟掉。固然這些也是能夠調節的:

  • --collector.queue-size (default 2000)

  • --collector.num-workers (default 50)

保存數據

消費隊列數據

github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #53
func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{})) {
    var startWG sync.WaitGroup
    for i := 0; i < num; i++ {
        q.stopWG.Add(1)
        startWG.Add(1)
        go func() {
            startWG.Done()
            defer q.stopWG.Done()
            for {
                select {
                case item := <-q.items:
                    atomic.AddInt32(&q.size, -1)
                    consumer(item)
                case <-q.stopCh:
                    return
                }
            }
        }()
    }
    startWG.Wait()
}

 這裏有一步不是很明白,爲何要使用」startWG「確認worker啓動完成?不用會出現什麼問題?

官方回覆:
    to ensure all consumer goroutines are running by the time we return from this function

優雅關閉隊列方式:close(q.stopCh)

數據保存到數據庫——cassandra

github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/writer.go #122
func (s *SpanWriter) WriteSpan(span *model.Span) error {
    ds := dbmodel.FromDomain(span)
    mainQuery := s.session.Query(
        insertSpan,
        ds.TraceID,
        ds.SpanID,
        ds.SpanHash,
        ds.ParentID,
        ds.OperationName,
        ds.Flags,
        ds.StartTime,
        ds.Duration,
        ds.Tags,
        ds.Logs,
        ds.Refs,
        ds.Process,
    )

    if err := s.writerMetrics.traces.Exec(mainQuery, s.logger); err != nil {
        return s.logError(ds, err, "Failed to insert span", s.logger)
    }
    if err := s.saveServiceNameAndOperationName(ds.ServiceName, ds.OperationName); err != nil {
        // should this be a soft failure?
        return s.logError(ds, err, "Failed to insert service name and operation name", s.logger)
    }

    ......
    return nil
}

 在把ServiceName和OperationName保存到cassandra的時候作了特別的操做,使用LRU算法進行緩存。這一步緩存應該是爲了減小對cassandra查詢,減小查詢壓力。

github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/service_names.go #69
func (s *ServiceNamesStorage) Write(serviceName string) error {
    var err error
    query := s.session.Query(s.InsertStmt)
    if inCache := checkWriteCache(serviceName, s.serviceNames, s.writeCacheTTL); !inCache {
        q := query.Bind(serviceName)
        err2 := s.metrics.Exec(q, s.logger)
        if err2 != nil {
            err = err2
        }
    }
    return err
}

 Collector在創建緩存的順序上先放入緩存再放入數據庫。查詢方式:key/value。
 既然是緩存就會有失效時間(default 12h),而Jaeger默認保存數據2天,因此是否會存在重複保存出錯的狀況?由於serviceName是主鍵索引。

CREATE TABLE IF NOT EXISTS jaeger_v1_dc.service_names (
    service_name text,
    PRIMARY KEY (service_name)
)

這種狀況出如今mysql一定會報錯,但在cassandra就不會有這種狀況。

  • cassandra

cqlsh:jaeger_v1_dc> select * from service_names1;

 service_name
--------------
         test

(1 rows)
S lsh:jaeger_v1_dc> INSERT INTO service_names1 (service_name) VALUE
                ... ('test');
  • mysql

mysql> select * from service_names1;
+--------------+
| service_name |
+--------------+
| test         |
+--------------+
1 row in set (0.00 sec)

mysql> insert into service_names1 (service_name) values ('test');
ERROR 1062 (23000): Duplicate entry 'test' for key 'PRIMARY'

驚奇吧?雖然沒有報錯,但也會保證惟一性。有興趣的同窗可簡單瞭解一下基本用法,語法和mysql很像。關於cassandra咱們也是摸着石頭過河,不作過多描述。

golang使用規範

NSQ Jaeger
目錄名 小寫/下劃線 小寫/中橫線
函數名 小駝峯 小駝峯
文件名 下劃線 下劃線
變量 小駝峯 小駝峯
常量 小駝峯 小駝峯
包名 當前目錄名 當前目錄名
請求地址 下劃線 *小寫
請求參數 *小寫 小駝峯
返回參數 下劃線 小駝峯
命令行參數 中橫線 前綴+點+中橫線

打」*「是因爲沒有找到足夠多的參照例子。

結語

 Jaeger向我展現了不少東西:UDP使用,優雅關閉,臨時對象池,LRU算法實現等。不僅僅是golang方面,還有程序設計、服務設計上,Agent、Collector、Query3個服務的職責都很單一,這應該是來源微服務思想的劃分。有不少東西須要自行消化,也有不少東西我沒有注意到,只看我的好奇的部分,但收穫也挺多。總結就是:Get到知識了!!

相關文章
相關標籤/搜索