最近在使用telegraf的場景中,要求數據在程序意外終止的時候不丟失。按照telegraf最初的原始實現,在running_output內部維護了兩個buffer,分別是metrics和failMetrics。這兩個buffer是基於go中channel實現的。因爲沒有持久化機制,在乎外退出的時候,存在丟失數據的風險。因此這篇文章主要講述以前telegraf保證數據安全的一些措施和咱們對代碼的一些優化。git
關於兩個buffer,定義在running_output.go的struct中。github
// RunningOutput contains the output configuration type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig MetricBufferLimit int MetricBatchSize int MetricsFiltered selfstat.Stat MetricsWritten selfstat.Stat BufferSize selfstat.Stat BufferLimit selfstat.Stat WriteTime selfstat.Stat metrics *buffer.Buffer failMetrics *buffer.Buffer // Guards against concurrent calls to the Output as described in #3009 sync.Mutex }
這個兩個buffer的大小提供了配置參數能夠設置。redis
metrics: buffer.NewBuffer(batchSize), failMetrics: buffer.NewBuffer(bufferLimit),
顧名思義。metrics存放要發送到指定output的metric,而failMetrics存放發送失敗的metric。固然失敗的metrics會在telegraf重發機制下再次發送。json
if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) err := ro.write(batch) if err != nil { ro.failMetrics.Add(batch...) } }
在向metrics增長metrics的時候,作是否達到批量發送的數量,若是達到就調用發送方法。固然還有定時的解決方案,若是一直沒有達到MetricBatchSize,也會在必定時間後發送數據。具體實現代碼在agent.go中安全
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) semaphore := make(chan struct{}, 1) for { select { case <-shutdown: log.Println("I! Hang on, flushing any cached metrics before shutdown") // wait for outMetricC to get flushed before flushing outputs wg.Wait() a.flush() return nil case <-ticker.C: go func() { select { case semaphore <- struct{}{}: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() <-semaphore default: // skipping this flush because one is already happening log.Println("W! Skipping a scheduled flush because there is" + " already a flush ongoing.") } }()
在程序接受到中止信號後,程序會首先flush剩下的數據到output中,而後退出進程。這樣能夠保證必定的數據安全。網絡
在持久化機制的選型中,優先實現redis。自己redis性能高,並且具有完善的持久化。
具體的實現架構以下:
將原buffer中功能抽象出buffer.go接口。
具體代碼:數據結構
package buffer import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer/memory" "github.com/influxdata/telegraf/internal/buffer/redis" ) const ( BufferTypeForMemory = "memory" BufferTypeForRedis = "redis" ) type Buffer interface { IsEmpty() bool Len() int Add(metrics ...telegraf.Metric) Batch(batchSize int) []telegraf.Metric } func NewBuffer(mod string, size int, key, addr string) Buffer { switch mod { case BufferTypeForRedis: return redis.NewBuffer(size, key, addr) default: return memory.NewBuffer(size) } }
而後分別內存和redis實現了Buffer接口。
其中NewBuffer至關於一個工廠方法。
固然在後期能夠實現基於file和db等buffer實現,來知足不一樣的場景和要求。架構
因爲要知足先進先出的要求,選擇了redis的list數據結構。redis中的list是一個字符串list,因此telegraf中metric數據接口要符合序列化的要求。好比屬性須要可導出,即public。因此這點須要改動telegraf對於metric struct的定義。另外能夠選擇json或是msgpack等序列化方式。咱們這邊是採用的json序列化的方式。app
改造之後,能夠根據本身的需求經過配置文件來決定使用channel或是redis來實現buffer。各有優劣,內存實現的話,性能高,受到的依賴少。而redis這種分佈式存儲,決定了數據安全,可是性能會有必定的損耗,畢竟有大量的序列化和反序列化以及網絡傳輸,固然依賴也增長了,取決於redis的可靠性,建議redis集羣部署。dom