k8s與監控--改造telegraf的buffer實現

改造telegraf的buffer實現

前言

最近在使用telegraf的場景中,要求數據在程序意外終止的時候不丟失。按照telegraf最初的原始實現,在running_output內部維護了兩個buffer,分別是metrics和failMetrics。這兩個buffer是基於go中channel實現的。因爲沒有持久化機制,在乎外退出的時候,存在丟失數據的風險。因此這篇文章主要講述以前telegraf保證數據安全的一些措施和咱們對代碼的一些優化。git

telegraf關於數據安全的處理辦法

關於兩個buffer,定義在running_output.go的struct中。github

// RunningOutput contains the output configuration
type RunningOutput struct {
    Name              string
    Output            telegraf.Output
    Config            *OutputConfig
    MetricBufferLimit int
    MetricBatchSize   int

    MetricsFiltered selfstat.Stat
    MetricsWritten  selfstat.Stat
    BufferSize      selfstat.Stat
    BufferLimit     selfstat.Stat
    WriteTime       selfstat.Stat

    metrics     *buffer.Buffer
    failMetrics *buffer.Buffer

    // Guards against concurrent calls to the Output as described in #3009
    sync.Mutex
}

這個兩個buffer的大小提供了配置參數能夠設置。redis

metrics:           buffer.NewBuffer(batchSize),
failMetrics:       buffer.NewBuffer(bufferLimit),

顧名思義。metrics存放要發送到指定output的metric,而failMetrics存放發送失敗的metric。固然失敗的metrics會在telegraf重發機制下再次發送。json

if ro.metrics.Len() == ro.MetricBatchSize {
        batch := ro.metrics.Batch(ro.MetricBatchSize)
        err := ro.write(batch)
        if err != nil {
            ro.failMetrics.Add(batch...)
        }
    }

在向metrics增長metrics的時候,作是否達到批量發送的數量,若是達到就調用發送方法。固然還有定時的解決方案,若是一直沒有達到MetricBatchSize,也會在必定時間後發送數據。具體實現代碼在agent.go中安全

ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
    semaphore := make(chan struct{}, 1)
    for {
        select {
        case <-shutdown:
            log.Println("I! Hang on, flushing any cached metrics before shutdown")
            // wait for outMetricC to get flushed before flushing outputs
            wg.Wait()
            a.flush()
            return nil
        case <-ticker.C:
            go func() {
                select {
                case semaphore <- struct{}{}:
                    internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
                    a.flush()
                    <-semaphore
                default:
                    // skipping this flush because one is already happening
                    log.Println("W! Skipping a scheduled flush because there is" +
                        " already a flush ongoing.")
                }
            }()

在程序接受到中止信號後,程序會首先flush剩下的數據到output中,而後退出進程。這樣能夠保證必定的數據安全。網絡

基於redis實現buffer的持久化

在持久化機制的選型中,優先實現redis。自己redis性能高,並且具有完善的持久化。
具體的實現架構以下:
圖片描述
將原buffer中功能抽象出buffer.go接口。
具體代碼:數據結構

package buffer

import (
    "github.com/influxdata/telegraf"
    "github.com/influxdata/telegraf/internal/buffer/memory"
    "github.com/influxdata/telegraf/internal/buffer/redis"
)

const (
    BufferTypeForMemory = "memory"
    BufferTypeForRedis  = "redis"
)

type Buffer interface {
    IsEmpty() bool
    Len() int
    Add(metrics ...telegraf.Metric)
    Batch(batchSize int) []telegraf.Metric
}

func NewBuffer(mod string, size int, key, addr string) Buffer {
    switch mod {
    case BufferTypeForRedis:
        return redis.NewBuffer(size, key, addr)
    default:
        return memory.NewBuffer(size)
    }
}

而後分別內存和redis實現了Buffer接口。
其中NewBuffer至關於一個工廠方法。
固然在後期能夠實現基於file和db等buffer實現,來知足不一樣的場景和要求。架構

redis實現buffer的要點

因爲要知足先進先出的要求,選擇了redis的list數據結構。redis中的list是一個字符串list,因此telegraf中metric數據接口要符合序列化的要求。好比屬性須要可導出,即public。因此這點須要改動telegraf對於metric struct的定義。另外能夠選擇json或是msgpack等序列化方式。咱們這邊是採用的json序列化的方式。app

結語

改造之後,能夠根據本身的需求經過配置文件來決定使用channel或是redis來實現buffer。各有優劣,內存實現的話,性能高,受到的依賴少。而redis這種分佈式存儲,決定了數據安全,可是性能會有必定的損耗,畢竟有大量的序列化和反序列化以及網絡傳輸,固然依賴也增長了,取決於redis的可靠性,建議redis集羣部署。dom

相關文章
相關標籤/搜索