k8s與審計-- 將clickhouse增長爲 heapster sink

前言

在k8s資源審計和計費這塊,容器和虛機有很大區別。相對虛機來說,容器不容易實現。
資源指標收集能夠採用heapster,也能夠用prometheus。以前文章有介紹過,prometheus的存儲的瓶頸和查詢較大數據量,容易oom這兩個問題。因此選擇了heapster。此外,heapster不只內部實現了不少aggregator和calculator,作了不少聚合層的工做。而採用prometheus,你須要在查詢的時候作聚合。
heapster支持諸多metrics輸出,稱爲sink。目前支持的sink以下圖:git

圖片描述

而我比較傾向於clickhouse數據庫,關於clickhouse,其實前面的文章介紹過不少了。
因此本文主要講如何爲heapster增長clickhouse sink。github

代碼分析和實現

看代碼,增長一種sink仍是很簡單的。典型的工廠設計模式,實現 Name,Stop,ExportData 接口方法便可。最後再提供一個初始化函數,供factory調用便可。golang

初始化方法 NewClickhouseSink

具體代碼:sql

config, err := clickhouse_common.BuildConfig(uri)
    if err != nil {
        return nil, err
    }

    client, err := sql.Open("clickhouse", config.DSN)
    if err != nil {
        glog.Errorf("connecting to clickhouse: %v", err)
        return nil, err
    }

    sink := &clickhouseSink{
        c:       *config,
        client:  client,
        conChan: make(chan struct{}, config.Concurrency),
    }

    glog.Infof("created clickhouse sink with options: host:%s user:%s db:%s", config.Host, config.UserName, config.Database)
    return sink, nil

基本上就是獲取配置文件,初始化clickhouse 的client。docker

在factory.go 中 build方法中,加入剛剛實現的初始化函數數據庫

func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) {
    switch uri.Key {
    case "elasticsearch":
        return elasticsearch.NewElasticSearchSink(&uri.Val)
    case "gcm":
        return gcm.CreateGCMSink(&uri.Val)
    case "stackdriver":
        return stackdriver.CreateStackdriverSink(&uri.Val)
    case "statsd":
        return statsd.NewStatsdSink(&uri.Val)
    case "graphite":
        return graphite.NewGraphiteSink(&uri.Val)
    case "hawkular":
        return hawkular.NewHawkularSink(&uri.Val)
    case "influxdb":
        return influxdb.CreateInfluxdbSink(&uri.Val)
    case "kafka":
        return kafka.NewKafkaSink(&uri.Val)
    case "librato":
        return librato.CreateLibratoSink(&uri.Val)
    case "log":
        return logsink.NewLogSink(), nil
    case "metric":
        return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{
            core.MetricCpuUsageRate.MetricDescriptor.Name,
            core.MetricMemoryUsage.MetricDescriptor.Name}), nil
    case "opentsdb":
        return opentsdb.CreateOpenTSDBSink(&uri.Val)
    case "wavefront":
        return wavefront.NewWavefrontSink(&uri.Val)
    case "riemann":
        return riemann.CreateRiemannSink(&uri.Val)
    case "honeycomb":
        return honeycomb.NewHoneycombSink(&uri.Val)
    case "clickhouse":
        return clickhouse.NewClickhouseSink(&uri.Val)
    default:
        return nil, fmt.Errorf("Sink not recognized: %s", uri.Key)
    }
}

Name 和 Stop

func (sink *clickhouseSink) Name() string {
    return "clickhouse"
}

func (tsdbSink *clickhouseSink) Stop() {
    // Do nothing
}

stop 函數在heapster關閉的時候調用,執行一些非託管資源的關閉。設計模式

ExportData

這是核心的地方。併發

func (sink *clickhouseSink) ExportData(dataBatch *core.DataBatch) {
    sink.Lock()
    defer sink.Unlock()

    if err := sink.client.Ping(); err != nil {
        glog.Warningf("Failed to ping clickhouse: %v", err)
        return
    }

    dataPoints := make([]point, 0, 0)
    for _, metricSet := range dataBatch.MetricSets {
        for metricName, metricValue := range metricSet.MetricValues {
            var value float64
            if core.ValueInt64 == metricValue.ValueType {
                value = float64(metricValue.IntValue)
            } else if core.ValueFloat == metricValue.ValueType {
                value = float64(metricValue.FloatValue)
            } else {
                continue
            }

            pt := point{
                name:    metricName,
                cluster: sink.c.ClusterName,
                val:     value,
                ts:      dataBatch.Timestamp,
            }

            for key, value := range metricSet.Labels {
                if _, exists := clickhouseBlacklistLabels[key]; !exists {
                    if value != "" {
                        if key == "labels" {
                            lbs := strings.Split(value, ",")
                            for _, lb := range lbs {
                                ts := strings.Split(lb, ":")
                                if len(ts) == 2 && ts[0] != "" && ts[1] != "" {
                                    pt.tags = append(pt.tags, fmt.Sprintf("%s=%s", ts[0], ts[1]))
                                }
                            }
                        } else {
                            pt.tags = append(pt.tags, fmt.Sprintf("%s=%s", key, value))
                        }

                    }
                }
            }

            dataPoints = append(dataPoints, pt)
            if len(dataPoints) >= sink.c.BatchSize {
                sink.concurrentSendData(dataPoints)
                dataPoints = make([]point, 0, 0)
            }
        }
    }

    if len(dataPoints) >= 0 {
        sink.concurrentSendData(dataPoints)
    }

    sink.wg.Wait()
}

主要有如下幾個地方須要注意app

  • 數據的格式轉換。須要將heapster 中DataBatch 轉化爲你目的存儲的格式。其實這塊作過pipeline 多output的人,很容易理解。
  • 批量寫入。通常在大數據量的時候,批量寫入是一種有效的手段。
  • 根據設置參數,併發寫入目的存儲。用到了golang的協程。下面這段代碼實現了一個協程的發送數據。
func (sink *clickhouseSink) concurrentSendData(dataPoints []point) {
    sink.wg.Add(1)
    // use the channel to block until there's less than the maximum number of concurrent requests running
    sink.conChan <- struct{}{}
    go func(dataPoints []point) {
        sink.sendData(dataPoints)
    }(dataPoints)
}

獲取配置參數

這塊在clickhouse.go中,主要作了獲取配置參數和參數初始化一些默認值,以及對配置參數校驗的工做。less

dockerfile的更改

原來的基礎鏡像是基於scratch

FROM scratch

COPY heapster eventer /
COPY ca-certificates.crt /etc/ssl/certs/

#   nobody:nobody
USER 65534:65534
ENTRYPOINT ["/heapster"]

因爲須要改timezone的問題,改爲了基於alpine。

FROM alpine

RUN apk add -U tzdata
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai  /etc/localtime

COPY heapster eventer /
COPY ca-certificates.crt /etc/ssl/certs/
RUN chmod +x /heapster
ENTRYPOINT ["/heapster"]

實際上,基於scratch增長timezone而且更改,也能夠作到,只不過須要裝一些包指令,結果就是鏡像變大。與其如此,不如基於我比較熟悉的alpine實現。

總結

fork的項目地址。實際運行日誌截圖:

圖片描述

因爲ck的出色的寫入性能,運行很是穩定。

相關文章
相關標籤/搜索