istio源碼分析——mixer遙測報告

image

原文:istio源碼分析——mixer遙測報告html

聲明

  1. 這篇文章須要瞭解istio,k8s,golang,envoy,mixer基礎知識
  2. 分析的環境爲k8s,istio版本爲0.8.0

遙測報告是什麼

    這篇文章主要介紹mixer提供的一個GRPC接口,這個接口負責接收envoy上報的日誌,並將日誌在stdio和prometheus展示出來。 「遙測報告」這個詞是從 istio的中文翻譯文檔借過來,第一次聽到這個詞感受很陌生,很高大上。經過了解源碼,用 「日誌訂閱「 這個詞來理解這個接口的做用會容易點。用一句話來總結這個接口的功能:我有這些日誌,你想用來作什麼?stdio和prometheus只是這些日誌的另外一種展現形式。
istio.io/istio/mixer/pkg/api/grpcServer.go #187
func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {
  ......
  var errors *multierror.Error
  for i := 0; i < len(req.Attributes); i++ {
    ......
    if i > 0 {
      if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil {
        ......
        break
      }
    }
    ......
    if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {
      ......
    }
    ......
    if err := reporter.Report(reportBag); err != nil {
      ......
      continue
    }
    ......
  }
  ......
  if err := reporter.Flush(); err != nil {
    errors = multierror.Append(errors, err)
  }
  reporter.Done()
  ......
  return reportResp, nil
}

接收了什麼數據接收 —— ReportRequest

    Report接口的第二個參數是envoy上報給mixer的數據。下面的數據來源:把日誌打印到終端後再截取出來。

結構

istio.io/api/mixer/v1/report.pb.go #22
type ReportRequest struct {
  ......
  Attributes []CompressedAttributes `protobuf:"bytes,1,rep,name=attributes" json:"attributes"`
  ......
  DefaultWords []string 
  ......
  GlobalWordCount uint32 `protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3" json:"global_word_count,omitempty"`
}

接收的數據

req.Attributes
[{"strings":{"131":92,"152":-1,"154":-2,"17":-7,"18":-4,"19":90,"22":92},"int64s":{"1":33314,"151":8080,"169":292,"170":918,"23":0,"27":780,"30":200},"bools":{"177":false},"timestamps":{"24":"2018-07-05T08:12:20.125365976Z","28":"2018-07-05T08:12:20.125757852Z"},"durations":{"29":426699},"bytes":{"0":"rBQDuw==","150":"AAAAAAAAAAAAAP//rBQDqg=="},"string_maps":{"15":{"entries":{"100":92,"102":-5,"118":113,"119":-3,"31":-4,"32":90,"33":-7,"55":134,"98":-6}},"26":{"entries":{"117":134,"35":136,"55":-9,"58":110,"60":-8,"82":93}}}}]git

req.DefaultWords
["istio-pilot.istio-system.svc.cluster.local","kubernetes://istio-pilot-8696f764dd-fqxtg.istio-system","1000","rds","3a7a649f-4eeb-4d70-972c-ad2d43a680af","172.00.00.000","/v1/routes/8088/index/sidecar~172.20.3.187~index-85df88964c-tzzds.default~default.svc.cluster.local","Thu, 05 Jul 2018 08:12:19 GMT","780","/v1/routes/9411/index/sidecar~172.00.00.000~index-85df88964c-tzzds.default~default.svc.cluster.local","bc1f172f-b8e3-4ec0-a070-f2f6de38a24f","718"]github

req.GlobalWordCount
178golang

    第一次看到這些數據的時候滿腦子問號,和官網介紹的 屬性詞彙一點關聯都看不到。在這些數據裏咱們最主要關注Attributes下的類型: strings, int64s......和那些奇怪的數字。下面會揭開這些謎團。

數據轉換 —— UpdateBagFromProto

image

globalList

> istio.io/istio/mixer/pkg/attribute/list.gen.go #13
    globalList = []string{
      "source.ip",
      "source.port",
      "source.name",
      ......
    }

UpdateBagFromProto

istio.io/istio/mixer/pkg/attribute/mutableBag.go #3018
func (mb *MutableBag) UpdateBagFromProto(attrs *mixerpb.CompressedAttributes, globalWordList []string) error {
  messageWordList := attrs.Words
  ......
  lg("  setting string attributes:")
  for k, v := range attrs.Strings {
    name, e = lookup(k, e, globalWordList, messageWordList)
    value, e = lookup(v, e, globalWordList, messageWordList)
    if err := mb.insertProtoAttr(name, value, seen, lg); err != nil {
      return err
    }
  }
  lg("  setting int64 attributes:")
  ......
  lg("  setting double attributes:")
  ......
  lg("  setting bool attributes:")
  ......
  lg("  setting timestamp attributes:")
  ......
  lg("  setting duration attributes:")
  ......
  lg("  setting bytes attributes:")
  ......
  lg("  setting string map attributes:")

  ......
  return e
}
    Istio屬性是強類型,因此在數據轉換會根據類型一一轉換。從上圖能夠看出由 DefaultWordsglobalList組成一個詞典,而 Attributes 記錄了上報數據的位置,通過 UpdateBagFromProto的處理,最終轉換爲:官方的 屬性詞彙

轉換結果

connection.mtls               : false
context.protocol              : http
destination.port              : 8080
......
request.host                  : rds
request.method                : GET
......

數據加工 —— Preprocess

    這個方法在k8s環境下的結果是追加數據
istio.io/istio/mixer/template/template.gen.go #33425
outBag := newWrapperAttrBag(
  func(name string) (value interface{}, found bool) {
    field := strings.TrimPrefix(name, fullOutName)
    if len(field) != len(name) && out.WasSet(field) {
      switch field {
      case "source_pod_ip":
        return []uint8(out.SourcePodIp), true
      case "source_pod_name":
        return out.SourcePodName, true
        ......
      default:
        return nil, false
      }
    }
    return attrs.Get(name)
  }
  ......
)
return mapper(outBag)

最終追加的數據

destination.labels            : map[istio:pilot pod-template-hash:4252932088]
destination.namespace         : istio-system
......

數據分發 —— Report

     Report會把數據分發到 Variety = istio_adapter_model_v1beta1.TEMPLATE_VARIETY_REPORT Template 裏,固然還有一些過濾條件,在當前環境下會分發到 logentry Metric
istio.io/istio/mixer/pkg/runtime/dispatcher/session.go #105
func (s *session) dispatch() error {
  ......
  for _, destination := range destinations.Entries() {
    var state *dispatchState
    if s.variety == tpb.TEMPLATE_VARIETY_REPORT {
      state = s.reportStates[destination]
      if state == nil {
        state = s.impl.getDispatchState(ctx, destination)
        s.reportStates[destination] = state
      }
    }

    for _, group := range destination.InstanceGroups {
      ......
      for j, input := range group.Builders {
        ......
        var instance interface{}
        //把日誌綁定到 Template裏
        if instance, err = input.Builder(s.bag); err != nil{
          ......
          continue
        }
        ......
        if s.variety == tpb.TEMPLATE_VARIETY_REPORT {
          state.instances = append(state.instances, instance)
          continue
        }
        ......
      }
    }
  }
  ......
  return nil
}

數據展現 —— 異步Flush

    Flush是讓 logentryMetric 調用各自的 adapter 對數據進行處理,因爲各自的 adapter沒有依賴關係因此這裏使用了golang的協程進行異步處理。
istio.io/istio/mixer/pkg/runtime/dispatcher/session.go #200
func (s *session) dispatchBufferedReports() {
    // Ensure that we can run dispatches to all destinations in parallel.
    s.ensureParallelism(len(s.reportStates))

    // dispatch the buffered dispatchStates we've got
    for k, v := range s.reportStates {
        //在這裏會把 v 放入協程進行處理
      s.dispatchToHandler(v)
      delete(s.reportStates, k)
    }
    //等待全部adapter完成
    s.waitForDispatched()
}

協程池

    從上面看到 v 被放入協程進行處理,其實mixer在這裏使用了協程池。使用協程池能夠減小協程的建立和銷燬,還能夠控制服務中協程的多少,從而減小對系統的資源佔用。mixer的協程池屬於提早建立必定數量的協程,提供給業務使用,若是協程池處理不完業務的工做,須要阻塞等待。下面是mixer使用協程池的步驟。
  • 初始化協程池
    創建一個有長度的 channel,咱們能夠叫它隊列。
istio.io/istio/mixer/pkg/pool/goroutine.go 
func NewGoroutinePool(queueDepth int, singleThreaded bool) *GoroutinePool {
  gp := &GoroutinePool{
    queue:          make(chan work, queueDepth),
    singleThreaded: singleThreaded,
  }

  gp.AddWorkers(1)
  return gp
}
  • 把任務放入隊列
    把可執行的函數和參數當成一個任務放入隊列
func (gp *GoroutinePool) ScheduleWork(fn WorkFunc, param interface{}) {
    if gp.singleThreaded {
        fn(param)
    } else {
        gp.queue <- work{fn: fn, param: param}
    }
}
  • 讓工人工做
    想要用多少工人能夠按資源分配,工人不斷從隊列獲取任務執行
func (gp *GoroutinePool) AddWorkers(numWorkers int) {
  if !gp.singleThreaded {
    gp.wg.Add(numWorkers)
    for i := 0; i < numWorkers; i++ {
      go func() {
        for work := range gp.queue {
          work.fn(work.param)
        }
        gp.wg.Done()
      }()
    }
  }
}

logentry 的 adapter 將數據打印到終端(stdio)

  • adapter 交互
    每一個 Template 都有本身的 DispatchReport,它負責和 adapter交互,並對日誌進行展現。
istio.io/istio/mixer/template/template.gen.go #1311
logentry.TemplateName: {
    Name:  logentry.TemplateName,
    Impl:  "logentry",
    CtrCfg:   &logentry.InstanceParam{},
    Variety:  istio_adapter_model_v1beta1.TEMPLATE_VARIETY_REPORT,
    ......
    DispatchReport: func(ctx context.Context, handler adapter.Handler, inst []interface{}) error {
        ......
        instances := make([]*logentry.Instance, len(inst))
        for i, instance := range inst {
          instances[i] = instance.(*logentry.Instance)
        }

        // Invoke the handler.
        if err := handler.(logentry.Handler).HandleLogEntry(ctx, instances); err != nil {
            return fmt.Errorf("failed to report all values: %v", err)
        }
        return nil
    },
}
  • 日誌數據整理
istio.io/istio/mixer/adapter/stdio/stdio.go #53
func (h *handler) HandleLogEntry(_ context.Context, instances []*logentry.Instance) error {
    var errors *multierror.Error

    fields := make([]zapcore.Field, 0, 6)
    for _, instance := range instances {
      ......
      for _, varName := range h.logEntryVars[instance.Name] {
          //過濾adapter不要的數據
        if value, ok := instance.Variables[varName]; ok {
            fields = append(fields, zap.Any(varName, value))
        }
      }
      if err := h.write(entry, fields); err != nil {
          errors = multierror.Append(errors, err)
      }
      fields = fields[:0]
    }
    return errors.ErrorOrNil()
}
    每一個 adapter 都有本身想要的數據,這些數據可在啓動文件 istio-demo.yaml 下配置。
apiVersion: "config.istio.io/v1alpha2"
    kind: logentry
    metadata:
      name: accesslog
      namespace: istio-system
    spec:
      severity: '"Info"'
      timestamp: request.time
      variables:
        originIp: origin.ip | ip("0.0.0.0")
        sourceIp: source.ip | ip("0.0.0.0")
        sourceService: source.service | ""
        ......
  • 展現結果
    下面日誌從mixer終端截取
{"level":"info","time":"2018-07-15T09:27:30.739801Z","instance":"accesslog.logentry.istio-system","apiClaims":"",
"apiKey":"","apiName":"","apiVersion":"","connectionMtls":false,"destinationIp":"10.00.0.00",
"destinationNamespace":"istio-system"......}

問題

經過分析這個接口源碼咱們發現了一些問題:
  1. 接口須要處理完全部 adapter才響應返回
  2. 若是協程池出現阻塞,接口須要一直等待
    基於以上二點咱們聯想到:若是協程池出現阻塞,這個接口響應相應會變慢,是否會影響到業務的請求?從國人翻譯的一篇istio官方博客 Mixer 和 SPOF 的迷思裏知道,envoy數據上報是經過「fire-and-forget「模式異步完成。但因爲沒有C++基礎,因此我不太明白這裏面的「fire-and-forget「是如何實現。

    由於存在上面的疑問,因此咱們進行了一次模擬測試。此次測試的假設條件:接口出現了阻塞,分別延遲了50ms,100ms,150ms,200ms,250ms,300ms【模擬阻塞時間】,在相同壓力下,觀察對業務請求是否有影響。docker

  • 環境: mac Air 下的 docker for k8s
  • 壓測工具:hey
  • 壓力:-c 50 -n 200【電腦配置不高】
  • 電腦配置 i5 4G
  • 壓測命令:hey -c 50 -n 200 http://127.0.0.1:30935/sleep
  • 被壓測的服務代碼
  • mixer接口添加延遲代碼:
func (s *grpcServer) Report(legacyCtx legacyContext.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {
    time.Sleep(50 * time.Microsecond)
    ......
    return reportResp, nil
}

注意

壓測的每一個數據結果都是通過預熱後,壓測10次並從中獲取中位數獲得。

結果:

image

    從上圖咱們能夠看出隨着延遲的增長,業務處理的QPS也在降低。這說明在當前0.8.0版本下,協程池處理任務不夠快【進比出快】,出現了阻塞現象,會影響到業務的請求。固然咱們能夠經過橫向擴展mixer或增長協程池裏的工人數量來解決。 可是我以爲主要的問題出在阻塞這步上。若是沒有阻塞,就不會影響業務

Jaeger相互借鑑,避免阻塞

    這裏日誌數據處理場景和以前瞭解的 Jaeger很像。Jaeger和mixer處理的都是日誌數據,因此它們之間能夠相互借鑑。Jaeger也有它本身的 協程池,並且和mixer的協程池思想是同樣的,雖然實現細節不同。那若是遇到 進比出快的狀況Jaeger是如何處理的呢?具體的場景能夠看 這裏
github.com/jaegertracing/jaeger/pkg/queue/bounded_queue.go #76
func (q *BoundedQueue) Produce(item interface{}) bool {
    if atomic.LoadInt32(&q.stopped) != 0 {
        q.onDroppedItem(item)
        return false
    }
    select {
    case q.items <- item:
        atomic.AddInt32(&q.size, 1)
        return true
    default:
        //丟掉數據
        if q.onDroppedItem != nil {
            q.onDroppedItem(item)
        }
        return false
    }
}
    上面是Jaeger的源碼,這裏和mixer 的 ScheduleWork 相對應,其中一個區別是若是Jaeger的隊列 items滿了,還有數據進來,數據將會被丟掉,從而避免了阻塞。這個思路也能夠用在mixer的日誌處理上,犧牲一些日誌數據,保證業務請求穩定。畢竟業務的位置是最重要的。

相關博客

Mixer 的適配器模型json

相關文章
相關標籤/搜索