k8s與監控--從telegraf改造談golang多協程精確控制

從telegraf改造談golang多協程精確控制


前言

telegraf是infuxdb公司開源出來的一個基於插件機制的收集metrics的項目。整個架構和elastic公司的日誌收集系統極其相似,具有良好的擴展性。與如今流行的各類exporter+promethues監控方案相比:git

  1. 大體具有良好的可擴展性。很容易增長本身的處理邏輯,在input,output,process,filter等環境定製本身專屬的插件。
  2. 統一了各類exporter,減小了部署各類exporter的工做量和維護成本。

目前telegraf改造工做基本上是兩大部分:github

  1. 增長了一些telegraf不支持的插件,好比虛擬化(kvm,vmware等),數據庫(oracle),k8s和openstack等input插件。
  2. telegraf是基於配置文件的,因此會有兩個問題,很難作分佈式和無停機動態調度input任務。因此咱們的工做就是將獲取配置接口化,全部的配置文件來源於統一配置中心。而後就是改造無停機動態調度input。

在改造改造無停機動態調度input就涉及到golang多協程精確控制的問題。golang

一些golang經常使用併發手段

sync包下WaitGroup

具體事例:算法

var wg sync.WaitGroup

    wg.Add(len(a.Config.Outputs))
    for _, o := range a.Config.Outputs {
        go func(output *models.RunningOutput) {
            defer wg.Done()
            err := output.Write()
            if err != nil {
                log.Printf("E! Error writing to output [%s]: %s\n",
                    output.Name, err.Error())
            }
        }(o)
    }

    wg.Wait()

WaitGroup內部維護了一個counter,當counter數值爲0時,代表添加的任務都已經完成。
總共有三個方法:數據庫

func (wg *WaitGroup) Add(delta int)

添加任務,delta參數表示添加任務的數量。架構

func (wg *WaitGroup) Done()

任務執行完成,調用Done方法,通常使用姿式都是defer wg.Done(),此時counter中會減一。併發

func (wg *WaitGroup) Wait()

經過使用sync.WaitGroup,能夠阻塞主線程,直到相應數量的子線程結束。oracle

chan struct{},控制協程退出

啓動協程的時候,傳遞一個shutdown chan struct{},須要關閉該協程的時候,直接close(shutdown)。struct{}在golang中是一個消耗接近0的對象。
具體事例:dom

// gatherer runs the inputs that have been configured with their own
// reporting interval.
func (a *Agent) gatherer(
    shutdown chan struct{},
    kill chan struct{},
    input *models.RunningInput,
    interval time.Duration,
    metricC chan telegraf.Metric,
) {
    defer panicRecover(input)

    GatherTime := selfstat.RegisterTiming("gather",
        "gather_time_ns",
        map[string]string{"input": input.Config.Name},
    )

    acc := NewAccumulator(input, metricC)
    acc.SetPrecision(a.Config.Agent.Precision.Duration,
        a.Config.Agent.Interval.Duration)

    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)

        start := time.Now()
        gatherWithTimeout(shutdown, kill, input, acc, interval)
        elapsed := time.Since(start)

        GatherTime.Incr(elapsed.Nanoseconds())

        select {
        case <-shutdown:
            return
        case <-kill:
            return
        case <-ticker.C:
            continue
        }
    }
}

藉助chan 實現指定數量的協程或動態調整協程數量

固然這裏必須是每一個協程是冪等,也就是全部協程作的是一樣的工做。
首先建立 一個 pool:= make(chan chan struct{}, maxWorkers),maxWorkers爲目標協程數量。
而後啓動協程:分佈式

for i := 0; i < s.workers; i++ {
        go func() {
            wQuit := make(chan struct{})
            s.pool <- wQuit
            s.sFlowWorker(wQuit)
        }()
    }

關閉協程:

func (s *SFlow) sFlowWorker(wQuit chan struct{}) {
LOOP:
    for {

        select {
        case <-wQuit:
            break LOOP
        case msg, ok = <-sFlowUDPCh:
            if !ok {
                break LOOP
            }
        }

        // 此處執行任務操做
        
}

動態調整:

for n = 0; n < 10; n++ {
                if len(s.pool) > s.workers {
                    wQuit := <-s.pool
                    close(wQuit)
                }
            }

多協程精確控制

在改造telegraf過程當中,要想動態調整input,每一個input都是惟一的,分屬不一樣類型插件。就必須實現精準控制指定的協程的啓停。
這個時候實現思路就是:實現一個kills map[string]chan struct{},k爲每一個任務的惟一ID。添加任務時候,傳遞一個chan struct{},這個時候關閉指定ID的chan struct{},就能控制指定的協程。

// DelInput add input
func (a *Agent) DelInput(inputs []*models.RunningInput) error {
    a.storeMutex.Lock()
    defer a.storeMutex.Unlock()

    for _, v := range inputs {
        if _, ok := a.kills[v.Config.ID]; !ok {
            return fmt.Errorf("input: %s,未找到,沒法刪除", v.Config.ID)
        }
    }

    for _, input := range inputs {
        if kill, ok := a.kills[input.Config.ID]; ok {
            delete(a.kills, input.Config.ID)
            close(kill)
        }
    }
    return nil
}

添加任務:

// AddInput add input
func (a *Agent) AddInput(shutdown chan struct{}, inputs []*models.RunningInput) error {
    a.storeMutex.Lock()
    defer a.storeMutex.Unlock()
    for _, v := range inputs {
        if _, ok := a.kills[v.Config.ID]; ok {
            return fmt.Errorf("input: %s,已經存在沒法新增", v.Config.ID)
        }
    }

    for _, input := range inputs {
        interval := a.Config.Agent.Interval.Duration
        // overwrite global interval if this plugin has it's own.
        if input.Config.Interval != 0 {
            interval = input.Config.Interval
        }
        if input.Config.ID == "" {
            continue
        }
        
        a.wg.Add(1)

        kill := make(chan struct{})
        a.kills[input.Config.ID] = kill

        go func(in *models.RunningInput, interv time.Duration) {
            defer a.wg.Done()
            a.gatherer(shutdown, kill, in, interv, a.metricC)
        }(input, interval)
    }

    return nil
}

總結

簡單介紹了一下telegraf項目。後續的優化和改造工做還在繼續。主要是分佈式telegraf的調度算法。畢竟集中化全部exporter之後,telegraf的負載能力受單機能力限制,並且也不符合高可用的使用目標。

相關文章
相關標籤/搜索