telegraf是infuxdb公司開源出來的一個基於插件機制的收集metrics的項目。整個架構和elastic公司的日誌收集系統極其相似,具有良好的擴展性。與如今流行的各類exporter+promethues監控方案相比:git
目前telegraf改造工做基本上是兩大部分:github
在改造改造無停機動態調度input就涉及到golang多協程精確控制的問題。golang
具體事例:算法
var wg sync.WaitGroup wg.Add(len(a.Config.Outputs)) for _, o := range a.Config.Outputs { go func(output *models.RunningOutput) { defer wg.Done() err := output.Write() if err != nil { log.Printf("E! Error writing to output [%s]: %s\n", output.Name, err.Error()) } }(o) } wg.Wait()
WaitGroup內部維護了一個counter,當counter數值爲0時,代表添加的任務都已經完成。
總共有三個方法:數據庫
func (wg *WaitGroup) Add(delta int)
添加任務,delta參數表示添加任務的數量。架構
func (wg *WaitGroup) Done()
任務執行完成,調用Done方法,通常使用姿式都是defer wg.Done(),此時counter中會減一。併發
func (wg *WaitGroup) Wait()
經過使用sync.WaitGroup,能夠阻塞主線程,直到相應數量的子線程結束。oracle
啓動協程的時候,傳遞一個shutdown chan struct{},須要關閉該協程的時候,直接close(shutdown)。struct{}在golang中是一個消耗接近0的對象。
具體事例:dom
// gatherer runs the inputs that have been configured with their own // reporting interval. func (a *Agent) gatherer( shutdown chan struct{}, kill chan struct{}, input *models.RunningInput, interval time.Duration, metricC chan telegraf.Metric, ) { defer panicRecover(input) GatherTime := selfstat.RegisterTiming("gather", "gather_time_ns", map[string]string{"input": input.Config.Name}, ) acc := NewAccumulator(input, metricC) acc.SetPrecision(a.Config.Agent.Precision.Duration, a.Config.Agent.Interval.Duration) ticker := time.NewTicker(interval) defer ticker.Stop() for { internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown) start := time.Now() gatherWithTimeout(shutdown, kill, input, acc, interval) elapsed := time.Since(start) GatherTime.Incr(elapsed.Nanoseconds()) select { case <-shutdown: return case <-kill: return case <-ticker.C: continue } } }
固然這裏必須是每一個協程是冪等,也就是全部協程作的是一樣的工做。
首先建立 一個 pool:= make(chan chan struct{}, maxWorkers),maxWorkers爲目標協程數量。
而後啓動協程:分佈式
for i := 0; i < s.workers; i++ { go func() { wQuit := make(chan struct{}) s.pool <- wQuit s.sFlowWorker(wQuit) }() }
關閉協程:
func (s *SFlow) sFlowWorker(wQuit chan struct{}) { LOOP: for { select { case <-wQuit: break LOOP case msg, ok = <-sFlowUDPCh: if !ok { break LOOP } } // 此處執行任務操做 }
動態調整:
for n = 0; n < 10; n++ { if len(s.pool) > s.workers { wQuit := <-s.pool close(wQuit) } }
在改造telegraf過程當中,要想動態調整input,每一個input都是惟一的,分屬不一樣類型插件。就必須實現精準控制指定的協程的啓停。
這個時候實現思路就是:實現一個kills map[string]chan struct{},k爲每一個任務的惟一ID。添加任務時候,傳遞一個chan struct{},這個時候關閉指定ID的chan struct{},就能控制指定的協程。
// DelInput add input func (a *Agent) DelInput(inputs []*models.RunningInput) error { a.storeMutex.Lock() defer a.storeMutex.Unlock() for _, v := range inputs { if _, ok := a.kills[v.Config.ID]; !ok { return fmt.Errorf("input: %s,未找到,沒法刪除", v.Config.ID) } } for _, input := range inputs { if kill, ok := a.kills[input.Config.ID]; ok { delete(a.kills, input.Config.ID) close(kill) } } return nil }
添加任務:
// AddInput add input func (a *Agent) AddInput(shutdown chan struct{}, inputs []*models.RunningInput) error { a.storeMutex.Lock() defer a.storeMutex.Unlock() for _, v := range inputs { if _, ok := a.kills[v.Config.ID]; ok { return fmt.Errorf("input: %s,已經存在沒法新增", v.Config.ID) } } for _, input := range inputs { interval := a.Config.Agent.Interval.Duration // overwrite global interval if this plugin has it's own. if input.Config.Interval != 0 { interval = input.Config.Interval } if input.Config.ID == "" { continue } a.wg.Add(1) kill := make(chan struct{}) a.kills[input.Config.ID] = kill go func(in *models.RunningInput, interv time.Duration) { defer a.wg.Done() a.gatherer(shutdown, kill, in, interv, a.metricC) }(input, interval) } return nil }
簡單介紹了一下telegraf項目。後續的優化和改造工做還在繼續。主要是分佈式telegraf的調度算法。畢竟集中化全部exporter之後,telegraf的負載能力受單機能力限制,並且也不符合高可用的使用目標。