heapster version: release-1.2node
Heapster是Kubernetes下的一個監控項目,用於進行容器集羣的監控和性能分析。
基本的功能及概念介紹能夠回顧我以前的一篇文章:《Kubernetes監控之Heapster介紹》。
隨着的Heapster的版本迭代,支持的功能越愈來愈多,好比新版本支持更多的後端數據存儲方式:OpenTSDB、Monasca、Kafka、Elasticsearch等等。看太低版本(如v0.18)的源碼,會發現v1.2版本的源碼架構徹底變了樣,架構擴展性愈來愈強,源碼學無止境!
上面不少介紹這篇文章並不會涉及,咱們仍是會用到最流行的模式:Heapster + InfluxDB。git
監控系統架構圖:
github
該圖很好的描述了監控系統的關鍵組件,及數據流向。
在源碼分析以前咱們先介紹Heapster的實現流程,由上圖能夠看出Heapster會從各個Node上kubelet獲取相關的監控信息,而後進行彙總發送給後臺數據庫InfluxDB。
這裏會涉及到幾個關鍵點:golang
k8s集羣會增刪Nodes,Heapster須要獲取這些sources並作相應的操做數據庫
Heapster後端數據庫怎麼存儲?是否支持多後端?後端
Heapster獲取到數據後推送給後端數據庫,那麼其提供了API的數據該從何處獲取?本地cache?api
Heapster從kubelet獲取到的數據是否須要處理?仍是能直接存儲到後端restful
等等..數據結構
一塊兒分析完heapster源碼實現,就能進行解惑了。架構
先列出我解析源碼時所用的命令,及參數使用,便於後面的理解。
# heapster --source=kubernetes:http://<master-ip>:8080?inClusterConfig=false\&useServiceAccount=false --sink=influxdb:http://<influxdb-ip>:8086
從Heapster的啓動流程開始分析其實現,前面作了簡單的分析,能夠帶着問題去看源碼會有更好的收穫。
路徑: heapster/metrics/heapster.go
func main() { ... // 根據--source參數的輸入來建立數據源 // 咱們這裏會使用kubernetes,下面會根據k8s來解析 sourceFactory := sources.NewSourceFactory() // 建立該sourceProvider時,會建立Node的ListWatch,用於監控k8s節點的增刪狀況,由於這些纔是數據的真實來源. // 該sourceProvider會包含nodeLister,還有kubeletClient,用於跟各個節點的kubelet通訊,獲取cadvisor數據 sourceProvider, err := sourceFactory.BuildAll(argSources) if err != nil { glog.Fatalf("Failed to create source provide: %v", err) } // 建立sourceManager,其實就是sourceProvider + ScrapeTimeout,用於超時獲取數據 sourceManager, err := sources.NewSourceManager(sourceProvider, sources.DefaultMetricsScrapeTimeout) if err != nil { glog.Fatalf("Failed to create source manager: %v", err) } // 根據--sink建立數據存儲後端 // 咱們這裏會使用influxDB,來做爲數據的存儲後端 sinksFactory := sinks.NewSinkFactory() // 建立sinks時會返回各種對象: // metricSink: 能夠理解爲本地的metrics數據池,Heapster API獲取到的數據都是從該對象中獲取的,默認必定會建立 // sinkList: Heapster在新版本中支持多後端數據存儲,好比你能夠指定多個不一樣的influxDB,也能夠同時指定influxDB和Elasticsearch。 // historicalSource: 須要配置,咱們暫時沒有用到 metricSink, sinkList, historicalSource := sinksFactory.BuildAll(argSinks, *argHistoricalSource) if metricSink == nil { glog.Fatal("Failed to create metric sink") } if historicalSource == nil && len(*argHistoricalSource) > 0 { glog.Fatal("Failed to use a sink as a historical metrics source") } for _, sink := range sinkList { glog.Infof("Starting with %s", sink.Name()) } // 建立sinkManager,會根據以前的sinkList,建立對應數量的協程,用於從sink的數據管道中獲取數據,而後推送到對應的後端 sinkManager, err := sinks.NewDataSinkManager(sinkList, sinks.DefaultSinkExportDataTimeout, sinks.DefaultSinkStopTimeout) if err != nil { glog.Fatalf("Failed to created sink manager: %v", err) } // 建立對象,用於處理各個kubelet獲取到的metrics數據 // 最終都會加入到dataProcessors,在最終的處理函數中會進行遍歷並調用其process() metricsToAggregate := []string{ core.MetricCpuUsageRate.Name, core.MetricMemoryUsage.Name, core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } metricsToAggregateForNode := []string{ core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } // 速率計算對象 dataProcessors := []core.DataProcessor{ // Convert cumulaties to rate processors.NewRateCalculator(core.RateMetricsMapping), } kubernetesUrl, err := getKubernetesAddress(argSources) if err != nil { glog.Fatalf("Failed to get kubernetes address: %v", err) } kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl) if err != nil { glog.Fatalf("Failed to get client config: %v", err) } kubeClient := kube_client.NewOrDie(kubeConfig) // 會建立podLister、nodeLister、namespaceLister,用於從k8s watch各個資源的增刪狀況 // 防止獲取數據失敗 podLister, err := getPodLister(kubeClient) if err != nil { glog.Fatalf("Failed to create podLister: %v", err) } nodeLister, err := getNodeLister(kubeClient) if err != nil { glog.Fatalf("Failed to create nodeLister: %v", err) } podBasedEnricher, err := processors.NewPodBasedEnricher(podLister) if err != nil { glog.Fatalf("Failed to create PodBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, podBasedEnricher) namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, namespaceBasedEnricher) // 這裏的對象append順序會有必定的要求 // 好比Pod的有些數據須要進行containers數據的累加獲得 dataProcessors = append(dataProcessors, processors.NewPodAggregator(), &processors.NamespaceAggregator{ MetricsToAggregate: metricsToAggregate, }, &processors.NodeAggregator{ MetricsToAggregate: metricsToAggregateForNode, }, &processors.ClusterAggregator{ MetricsToAggregate: metricsToAggregate, }) nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err) } dataProcessors = append(dataProcessors, nodeAutoscalingEnricher) // 這是整個Heapster功能的關鍵處 // 根據sourceManger、sinkManager、dataProcessors來建立manager對象 manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution, manager.DefaultScrapeOffset, manager.DefaultMaxParallelism) if err != nil { glog.Fatalf("Failed to create main manager: %v", err) } // 開始建立協程,從各個sources獲取metrics數據,並通過dataProcessors的處理,而後export到各個用於後端數據存儲的sinks manager.Start() // 如下的就是建立Heapster server,用於提供各種API // 經過http.mux及go-restful進行實現 // 新版的heapster還支持TLS handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource) addr := fmt.Sprintf("%s:%d", *argIp, *argPort) glog.Infof("Starting heapster on port %d", *argPort) mux := http.NewServeMux() promHandler := prometheus.Handler() if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 { if len(*argTLSClientCAFile) > 0 { authPprofHandler, err := newAuthHandler(handler) if err != nil { glog.Fatalf("Failed to create authorized pprof handler: %v", err) } handler = authPprofHandler authPromHandler, err := newAuthHandler(promHandler) if err != nil { glog.Fatalf("Failed to create authorized prometheus handler: %v", err) } promHandler = authPromHandler } mux.Handle("/", handler) mux.Handle("/metrics", promHandler) healthz.InstallHandler(mux, healthzChecker(metricSink)) // If allowed users is set, then we need to enable Client Authentication if len(*argAllowedUsers) > 0 { server := &http.Server{ Addr: addr, Handler: mux, TLSConfig: &tls.Config{ClientAuth: tls.RequestClientCert}, } glog.Fatal(server.ListenAndServeTLS(*argTLSCertFile, *argTLSKeyFile)) } else { glog.Fatal(http.ListenAndServeTLS(addr, *argTLSCertFile, *argTLSKeyFile, mux)) } } else { mux.Handle("/", handler) mux.Handle("/metrics", promHandler) healthz.InstallHandler(mux, healthzChecker(metricSink)) glog.Fatal(http.ListenAndServe(addr, mux)) } }
介紹了Heapster的啓動流程後,大體能明白了該啓動過程分爲幾個關鍵點:
建立數據源對象
建立後端存儲對象list
建立處理metrics數據的processors
建立manager,並開啓數據的獲取及export的協程
開啓Heapster server,並支持各種API
下面進行一一介紹。
先介紹下相關的結構體,由於這纔是做者的核心思想。
建立的sourceProvider是實現了MetricsSourceProvider接口的對象。
先看下MetricsSourceProvider:
type MetricsSourceProvider interface { GetMetricsSources() []MetricsSource }
每一個最終返回的對象,都須要提供GetMetricsSources(),看字面意識就能夠知道就是提供全部的獲取Metrics源頭的接口。
咱們的參數--source=kubernetes,因此其實咱們真實返回的結構是kubeletProvider.
路徑: heapster/metrics/sources/kubelet/kubelet.go
type kubeletProvider struct { // 用於從k8s獲取最新的nodes信息,而後根據kubeletClient,合成各個metricSources nodeLister *cache.StoreToNodeLister // 反射 reflector *cache.Reflector // kubeletClient相關的配置,好比端口:10255 kubeletClient *KubeletClient }
結構介紹完了,看下具體的建立過程,跟kubernetes相關的關鍵接口是NewKubeletProvider():
func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) { // 建立kubernetes master及kubelet client相關的配置 kubeConfig, kubeletConfig, err := GetKubeConfigs(uri) if err != nil { return nil, err } // 建立kubeClient及kubeletClient kubeClient := kube_client.NewOrDie(kubeConfig) kubeletClient, err := NewKubeletClient(kubeletConfig) if err != nil { return nil, err } // 獲取下全部的Nodes,測試下建立的client是否能正常通信 if _, err := kubeClient.Nodes().List(kube_api.ListOptions{ LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}); err != nil { glog.Errorf("Failed to load nodes: %v", err) } // 監控k8s的nodes變動 // 這裏會建立協程進行watch,便於後面調用nodeLister.List()列出全部的nodes。 // 該Watch的實現,須要看下apiServer中的實現,後面會進行講解 lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything()) nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour) reflector.Run() // 結構在前面介紹過 return &kubeletProvider{ nodeLister: nodeLister, reflector: reflector, kubeletClient: kubeletClient, }, nil }
該過程會涉及到較多的技術點,好比apiServer中的watch實現,reflector的使用。這裏不會進行細講,該文章主要是針對heapster的源碼實現,apiServer相關的實現後面會進行單獨輸出。
這裏須要注意的是建立了ListWath,須要關注後面哪裏用到了nodeLister.List()進行nodes的獲取。
前面已經提到後端數據存儲會有兩處,一個是metricSink,另外一個是influxdbSink。因此這裏會涉及到兩個結構:
type MetricSink struct { // 鎖 lock sync.Mutex // 長時間存儲metrics數據,默認時間是15min longStoreMetrics []string longStoreDuration time.Duration // 短期存儲metrics數據,默認時間是140s shortStoreDuration time.Duration // 短時存儲空間 shortStore []*core.DataBatch // 長時存儲空間 longStore []*multimetricStore }
該結構就是用於heapster API調用時獲取的數據源,這裏會分爲兩種數據存儲方式:長時存儲和短時存儲。因此集羣越大時,heapster佔用內存越多,須要考慮該問題如何處理或者優化。
type influxdbSink struct { // 鏈接後端influxDB數據庫的client client influxdb_common.InfluxdbClient // 鎖 sync.RWMutex c influxdb_common.InfluxdbConfig dbExists bool }
這個就是咱們配置的InfluxDB的結構,是咱們真正的數據存儲後端。
開始介紹建立後端服務流程,從sinksFactory.BuildAll()接口直接入手。
路徑: heapster/metrics/sinks/factory.go
func (this *SinkFactory) BuildAll(uris flags.Uris, historicalUri string) (*metricsink.MetricSink, []core.DataSink, core.HistoricalSource) { result := make([]core.DataSink, 0, len(uris)) var metric *metricsink.MetricSink var historical core.HistoricalSource // 根據傳入的"--sink"參數信息,進行build // 支持多後端數據存儲,會進行遍歷並建立 for _, uri := range uris { // 關鍵接口 sink, err := this.Build(uri) if err != nil { glog.Errorf("Failed to create sink: %v", err) continue } if uri.Key == "metric" { metric = sink.(*metricsink.MetricSink) } if uri.String() == historicalUri { if asHistSource, ok := sink.(core.AsHistoricalSource); ok { historical = asHistSource.Historical() } else { glog.Errorf("Sink type %q does not support being used for historical access", uri.Key) } } result = append(result, sink) } // 默認metricSink必定會建立 if metric == nil { uri := flags.Uri{} uri.Set("metric") sink, err := this.Build(uri) if err == nil { result = append(result, sink) metric = sink.(*metricsink.MetricSink) } else { glog.Errorf("Error while creating metric sink: %v", err) } } if len(historicalUri) > 0 && historical == nil { glog.Errorf("Error while initializing historical access: unable to use sink %q as a historical source", historicalUri) } return metric, result, historical }
該接口流程比較簡單,就是對傳入參數進行判斷,而後調用this.Build()進行建立,這裏只須要注意即便沒有配置metric,也會進行metricSink的建立。
func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) { switch uri.Key { 。。。 case "influxdb": return influxdb.CreateInfluxdbSink(&uri.Val) 。。。 case "metric": return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{ core.MetricCpuUsageRate.MetricDescriptor.Name, core.MetricMemoryUsage.MetricDescriptor.Name}), nil 。。。 default: return nil, fmt.Errorf("Sink not recognized: %s", uri.Key) } }
influxdb的建立其實就是根據傳入的參數而後建立一個config結構,用於後面建立鏈接influxDB的client;
metric的建立其實就是初始化了一個MetricSink結構,須要注意的是傳入的第三個參數,由於這是用於指定哪些metrics須要進行長時間存儲,默認就是cpu/usage和memory/usage,由於這兩個參數用戶最爲關心。
具體的建立接口就不在深刻了,較爲簡單。
到這裏BuildAll()就結束了,至於返回值前面已經作過介紹,就不在累贅了。
其實沒那麼簡單,還有一步:sinkManager的建立。
進入sinks.NewDataSinkManager()接口看下:
func NewDataSinkManager(sinks []core.DataSink, exportDataTimeout, stopTimeout time.Duration) (core.DataSink, error) { sinkHolders := []sinkHolder{} // 遍歷前面建立的sinkList for _, sink := range sinks { // 爲每一個sink添加一個dataChannel和stopChannel // 用於獲取數據和stop信號 sh := sinkHolder{ sink: sink, dataBatchChannel: make(chan *core.DataBatch), stopChannel: make(chan bool), } sinkHolders = append(sinkHolders, sh) // 每一個sink都會建立一個協程 // 從dataChannel獲取數據,並調用sink.export()導出到後端數據庫 go func(sh sinkHolder) { for { select { case data := <-sh.dataBatchChannel: export(sh.sink, data) case isStop := <-sh.stopChannel: glog.V(2).Infof("Stop received: %s", sh.sink.Name()) if isStop { sh.sink.Stop() return } } } }(sh) } return &sinkManager{ sinkHolders: sinkHolders, exportDataTimeout: exportDataTimeout, stopTimeout: stopTimeout, }, nil }
這裏會爲每一個sink建立協程,等待數據的到來並最終將數據導入到對應的後端數據庫。
這裏須要帶個問號,既然channel有一端在收,總得有地方會發送,這會在後面纔會揭曉。
go協程 + channel的方式,是golang最多見的方式,確實便用。
由於cAdvisor返回的原始數據就包含了nodes和containers的相關數據,因此heapster須要建立各類processor,用於處理成不一樣類型的數據,好比pod, namespace, cluster,node。
還有些數據須要計算出速率,有些數據須要進行累加,不一樣類型擁有的metrics還不同等等狀況。
看下源碼:
func main() { ... // 計算namespace和cluster的metrics值時,下列數據須要進行累加求值 metricsToAggregate := []string{ core.MetricCpuUsageRate.Name, core.MetricMemoryUsage.Name, core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } // 計算node的metrics值時,下列數據須要進行累加求值 metricsToAggregateForNode := []string{ core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } // RateMetricsMapping中的數據須要計算速率,好比cpu/usage_rate,network/rx_rate dataProcessors := []core.DataProcessor{ // Convert cumulaties to rate processors.NewRateCalculator(core.RateMetricsMapping), } kubernetesUrl, err := getKubernetesAddress(argSources) if err != nil { glog.Fatalf("Failed to get kubernetes address: %v", err) } kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl) if err != nil { glog.Fatalf("Failed to get client config: %v", err) } kubeClient := kube_client.NewOrDie(kubeConfig) // 建立pod的ListWatch,用於從k8s server監聽pod變動 podLister, err := getPodLister(kubeClient) if err != nil { glog.Fatalf("Failed to create podLister: %v", err) } // 建立node的ListWatch,用於從k8s server監聽node變動 nodeLister, err := getNodeLister(kubeClient) if err != nil { glog.Fatalf("Failed to create nodeLister: %v", err) } // 該podBasedEnricher用於解析從sources獲取到的pod和container的metrics數據, // 而後對pod和container進行數據完善,好比添加labels.但這裏還不會處理metricsValue podBasedEnricher, err := processors.NewPodBasedEnricher(podLister) if err != nil { glog.Fatalf("Failed to create PodBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, podBasedEnricher) // 跟上面的podBasedEnricher同理,須要注意的是在append時有前後順序 namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, namespaceBasedEnricher) // 這裏的對象會對metricsValue進行處理,對應的數據進行累加求值 dataProcessors = append(dataProcessors, processors.NewPodAggregator(), &processors.NamespaceAggregator{ MetricsToAggregate: metricsToAggregate, }, &processors.NodeAggregator{ MetricsToAggregate: metricsToAggregateForNode, }, &processors.ClusterAggregator{ MetricsToAggregate: metricsToAggregate, }) dataProcessors = append(dataProcessors, processors.NewRcAggregator()) nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err) } dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)
Processors的功能基本就是這樣了,相對有點複雜,數據處理的樣式和類別較多。
各個對象的Process()方法就不進行一一介紹了,就是按照順序一個一個的填充core.DataBatch數據。有興趣的能夠逐個看下,能夠借鑑下實現的方式。
前面的都是鋪墊,開始介紹heapster的關鍵實現,進行源數據的獲取,並導出到後端存儲。
先介紹相關結構:
type Manager interface { Start() Stop() }
Manager是須要實現Start和stop方法的接口。而真實建立的對象實際上是realManager:
type realManager struct { // 數據源 source core.MetricsSource // 數據處理對象 processors []core.DataProcessor // 後端存儲對象 sink core.DataSink // 每次scrape數據的時間間隔 resolution time.Duration // 建立多個scrape協程時,須要sleep這點時間,防止異常 scrapeOffset time.Duration // scrape 中止的管道 stopChan chan struct{} // housekeepSemaphoreChan chan struct{} // 超時 housekeepTimeout time.Duration }
關鍵的代碼以下:
manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution, manager.DefaultScrapeOffset, manager.DefaultMaxParallelism) if err != nil { glog.Fatalf("Failed to create main manager: %v", err) } manager.Start()
首先會根據前面建立的sourceManager, dataProcessors, sinkManager對象,再建立manager。
路徑: heapster/metrics/manager/manager.go
func NewManager(source core.MetricsSource, processors []core.DataProcessor, sink core.DataSink, resolution time.Duration, scrapeOffset time.Duration, maxParallelism int) (Manager, error) { manager := realManager{ source: source, processors: processors, sink: sink, resolution: resolution, scrapeOffset: scrapeOffset, stopChan: make(chan struct{}), housekeepSemaphoreChan: make(chan struct{}, maxParallelism), housekeepTimeout: resolution / 2, } for i := 0; i < maxParallelism; i++ { manager.housekeepSemaphoreChan <- struct{}{} } return &manager, nil }
前面介紹了該關鍵結構readlManager,繼續進入manager.Start():
func (rm *realManager) Start() { go rm.Housekeep() } func (rm *realManager) Housekeep() { for { // Always try to get the newest metrics now := time.Now() // 獲取數據的時間段,默認是1min start := now.Truncate(rm.resolution) end := start.Add(rm.resolution) // 真正同步一次的時間間隔,默認是1min + 5s timeToNextSync := end.Add(rm.scrapeOffset).Sub(now) select { case <-time.After(timeToNextSync): rm.housekeep(start, end) case <-rm.stopChan: rm.sink.Stop() return } } }
繼續看rm.housekeep(start, end), 該接口就傳入了時間區間,其實cAdvisor就是支持時間區間來獲取metrics值。
func (rm *realManager) housekeep(start, end time.Time) { if !start.Before(end) { glog.Warningf("Wrong time provided to housekeep start:%s end: %s", start, end) return } select { case <-rm.housekeepSemaphoreChan: // ok, good to go case <-time.After(rm.housekeepTimeout): glog.Warningf("Spent too long waiting for housekeeping to start") return } go func(rm *realManager) { defer func() { rm.housekeepSemaphoreChan <- struct{}{} }() // 從sources獲取數據 data := rm.source.ScrapeMetrics(start, end) // 遍歷processors,而後進行數據處理 for _, p := range rm.processors { newData, err := process(p, data) if err == nil { data = newData } else { glog.Errorf("Error in processor: %v", err) return } } // 最終將數據導出到後端存儲 rm.sink.ExportData(data) }(rm) }
邏輯比較簡單,會有三個關鍵:
源數據獲取
數據處理
導出到後端
先看下rm.source.ScrapeMetrics()接口實現.
路徑: heapster/metrics/sources/manager.go
func (this *sourceManager) ScrapeMetrics(start, end time.Time) *DataBatch { // 調用了nodeLister.List()獲取最新的k8s nodes列表,再根據以前配置的kubelet端口等信息,返回sources // 在建立sourceProvider時,會建立node的ListWatch,因此這裏nodeLister可以使用list() sources := this.metricsSourceProvider.GetMetricsSources() responseChannel := make(chan *DataBatch) 。。。 // 遍歷各個source,而後建立協程獲取數據 for _, source := range sources { go func(source MetricsSource, channel chan *DataBatch, start, end, timeoutTime time.Time, delayInMs int) { // scrape()接口其實就是調用了kubeletMetricsSource.ScrapeMetrics() // 每一個node都會組成對應的kubeletMetricsSource // ScrapeMetrics()就是從cAdvisor中獲取監控信息,並進行了decode metrics := scrape(source, start, end) ... select { // 將獲取到的數據丟入responseChannel // 下面會用到 case channel <- metrics: // passed the response correctly. return case <-time.After(timeForResponse): glog.Warningf("Failed to send the response back %s", source) return } }(source, responseChannel, start, end, timeoutTime, delayMs) } response := DataBatch{ Timestamp: end, MetricSets: map[string]*MetricSet{}, } latencies := make([]int, 11) responseloop: for i := range sources { ... select { // 獲取前面建立的協程獲得的數據 case dataBatch := <-responseChannel: if dataBatch != nil { for key, value := range dataBatch.MetricSets { response.MetricSets[key] = value } } 。。。 case <-time.After(timeoutTime.Sub(now)): glog.Warningf("Failed to get all responses in time (got %d/%d)", i, len(sources)) break responseloop } } ... return &response }
該接口的邏輯就是先經過nodeLister獲取k8s全部的nodes,這樣便能知道全部的kubelet信息,而後建立對應數量的協程從各個kubelet中獲取對應的cAdvisor監控信息,進行處理後再返回。
獲取到數據後,就須要調用各個processors的Process()接口進行數據處理,接口太多就不一一介紹了,挑個node_aggregator.go進行介紹:
func (this *NodeAggregator) Process(batch *core.DataBatch) (*core.DataBatch, error) { for key, metricSet := range batch.MetricSets { // 判斷下該metric是不是pod的 // metricSet.Labels都是前面就進行了填充,因此前面說須要注意每一個processor的append順序 if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePod { // Aggregating pods nodeName, found := metricSet.Labels[core.LabelNodename.Key] if nodeName == "" { glog.V(8).Infof("Skipping pod %s: no node info", key) continue } if found { // 獲取nodeKey,好比: node:172.25.5.111 nodeKey := core.NodeKey(nodeName) // 前面都是判斷該pod在哪一個node上,而後該node的數據是須要經過這些pod進行累加獲得 node, found := batch.MetricSets[nodeKey] if !found { glog.V(1).Info("No metric for node %s, cannot perform node level aggregation.") } else if err := aggregate(metricSet, node, this.MetricsToAggregate); err != nil { return nil, err } } else { glog.Errorf("No node info in pod %s: %v", key, metricSet.Labels) } } } return batch, nil }
基本流程就是這樣了,有須要的能夠各個深刻查看。
最後就是數據的後端存儲。
這裏會涉及到兩部分:metricSink和influxdbSink。
從rm.sink.ExportData(data)接口入手:
路徑: heapster/metrics/sinks/manager.go
func (this *sinkManager) ExportData(data *core.DataBatch) { var wg sync.WaitGroup // 遍歷全部的sink,這裏其實就兩個 for _, sh := range this.sinkHolders { wg.Add(1) // 建立協程,而後將以前獲取的data丟入dataBatchChannel go func(sh sinkHolder, wg *sync.WaitGroup) { defer wg.Done() glog.V(2).Infof("Pushing data to: %s", sh.sink.Name()) select { case sh.dataBatchChannel <- data: glog.V(2).Infof("Data push completed: %s", sh.sink.Name()) // everything ok case <-time.After(this.exportDataTimeout): glog.Warningf("Failed to push data to sink: %s", sh.sink.Name()) } }(sh, &wg) } // Wait for all pushes to complete or timeout. wg.Wait() }
千辛萬苦,你把數據丟入sh.dataBatchChannel完事了?
dataBatchChannel有點眼熟,由於以前建立sinkManager的時候,也建立了協程並監聽了該管道,因此真正export數據是在以前就完成了,這裏只須要把數據丟入管道便可。
因此golang中協程與協程之間的通訊,channel纔是王道啊!
ExportData有兩個,一個一個講吧。
先來關鍵的influxDB.
路徑: heapster/metrics/sinks/influxdb/influxdb.go
func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) { ... dataPoints := make([]influxdb.Point, 0, 0) for _, metricSet := range dataBatch.MetricSets { // 遍歷MetricValues for metricName, metricValue := range metricSet.MetricValues { var value interface{} if core.ValueInt64 == metricValue.ValueType { value = metricValue.IntValue } else if core.ValueFloat == metricValue.ValueType { value = float64(metricValue.FloatValue) } else { continue } // Prepare measurement without fields fieldName := "value" measurementName := metricName if sink.c.WithFields { // Prepare measurement and field names serieName := strings.SplitN(metricName, "/", 2) measurementName = serieName[0] if len(serieName) > 1 { fieldName = serieName[1] } } // influxdb單條數據結構 point := influxdb.Point{ // 度量值名稱,好比cpu/usage Measurement: measurementName, // 該tags就是在processors中進行添加,主要是pod_name,node_name,namespace_name等 Tags: metricSet.Labels, // 該字段就是具體的值了 Fields: map[string]interface{}{ fieldName: value, }, // 時間戳 Time: dataBatch.Timestamp.UTC(), } // append到dataPoints,超過maxSendBatchSize數量後直接sendData到influxdb dataPoints = append(dataPoints, point) if len(dataPoints) >= maxSendBatchSize { sink.sendData(dataPoints) dataPoints = make([]influxdb.Point, 0, 0) } } // 遍歷LabeledMetrics,主要就是filesystem的數據 // 不太明白爲什麼要將filesystem的數據進行區分,要放到Labeled中?什麼意圖?望高手指點,謝謝 // 接下來的操做就跟上面MetricValues的操做差很少了 for _, labeledMetric := range metricSet.LabeledMetrics { 。。。 point := influxdb.Point{ Measurement: measurementName, Tags: make(map[string]string), Fields: map[string]interface{}{ fieldName: value, }, Time: dataBatch.Timestamp.UTC(), } for key, value := range metricSet.Labels { point.Tags[key] = value } for key, value := range labeledMetric.Labels { point.Tags[key] = value } dataPoints = append(dataPoints, point) if len(dataPoints) >= maxSendBatchSize { sink.sendData(dataPoints) dataPoints = make([]influxdb.Point, 0, 0) } } } if len(dataPoints) >= 0 { sink.sendData(dataPoints) } }
該接口中有一處不太明白,metricSet中的LabeledMetrics和MetricsValue有何差異,爲什麼要將filesystem的數據進行區分對待,放入LabeldMetrics?
看代碼的過程當中沒有獲得答案,望大神指點迷津,多謝多謝!
有問題,但也不影響繼續往下學習,接着看下MetricSink:
func (this *MetricSink) ExportData(batch *core.DataBatch) { this.lock.Lock() defer this.lock.Unlock() now := time.Now() // 將數據丟入longStore和shortStore // 須要根據保存的時間將老數據丟棄 this.longStore = append(popOldStore(this.longStore, now.Add(-this.longStoreDuration)), buildMultimetricStore(this.longStoreMetrics, batch)) this.shortStore = append(popOld(this.shortStore, now.Add(-this.shortStoreDuration)), batch) }
該邏輯比較簡單,就是將數據丟入兩個Store中,而後把過時數據丟棄。
這裏提醒一點,heapster API調用時先會從longStore中匹配數據,沒匹配上的話再從shortStore獲取,而longStore中存儲的數據類型前面已經作過介紹。
終於結束了。。
前面的主流業務都介紹完了,Heapster自己也提供了API用於開發者進行使用與測試。
繼續分析代碼吧:
// 關鍵接口,後面分析 handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource) 。。。 // 建立http的mux多分器,用於http.Server的路由 mux := http.NewServeMux() // prometheus:最新出現的人氣很高的監控系統,值得了解學習下,後續安排! promHandler := prometheus.Handler() // 支持TLS,咱們用了http if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 { 。。。 } else { // 多分器分了"/"和"/metrics" // 進入"/",還會進行細分,裏面使用到了go-restful mux.Handle("/", handler) mux.Handle("/metrics", promHandler) // 註冊健康檢測接口 healthz.InstallHandler(mux, healthzChecker(metricSink)) // 啓動Server glog.Fatal(http.ListenAndServe(addr, mux)) }
這裏的關鍵是setupHandlers()接口,須要學習下里面如何使用go-restful進行請求路由的。
k8s apiServer中也大量使用了go-restful,在學習該源碼時有進行過度析
路徑: heapster/metrics/handlers.go
func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister, historicalSource core.HistoricalSource) http.Handler { runningInKubernetes := true // 建立container,指定route類型爲CurlyRouter // 這些都跟go-restful基礎有關,有興趣的能夠看下原理 wsContainer := restful.NewContainer() wsContainer.EnableContentEncoding(true) wsContainer.Router(restful.CurlyRouter{}) // 註冊v1版本相關的api,包括官方介紹的"/api/v1/model" a := v1.NewApi(runningInKubernetes, metricSink, historicalSource) a.Register(wsContainer) // 這個metricsApi註冊了"/apis/metrics/v1alpha1"的各種命令 // 暫不關心 m := metricsApi.NewApi(metricSink, podLister, nodeLister) m.Register(wsContainer) handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) { name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath) switch name { case "profile": pprof.Profile(resp, req.Request) case "symbol": pprof.Symbol(resp, req.Request) case "cmdline": pprof.Cmdline(resp, req.Request) default: pprof.Index(resp, req.Request) } } // Setup pporf handlers. ws = new(restful.WebService).Path(pprofBasePath) ws.Route(ws.GET("/{subpath:*}").To(metrics.InstrumentRouteFunc("pprof", handlePprofEndpoint))).Doc("pprof endpoint") wsContainer.Add(ws) return wsContainer }
關鍵在於v1版本的API註冊,繼續深刻a.Register(wsContainer):
func (a *Api) Register(container *restful.Container) { // 註冊"/api/v1/metric-export" API // 用於從shortStore中獲取全部的metrics信息 ws := new(restful.WebService) ws.Path("/api/v1/metric-export"). Doc("Exports the latest point for all Heapster metrics"). Produces(restful.MIME_JSON) ws.Route(ws.GET(""). To(a.exportMetrics). Doc("export the latest data point for all metrics"). Operation("exportMetrics"). Writes([]*types.Timeseries{})) // ws必需要add到container中才能生效 container.Add(ws) // 註冊"/api/v1/metric-export-schema" API // 用於導出全部的metrics name,好比network-rx // 還會導出還有的labels,好比pod-name ws = new(restful.WebService) ws.Path("/api/v1/metric-export-schema"). Doc("Schema for metrics exported by heapster"). Produces(restful.MIME_JSON) ws.Route(ws.GET(""). To(a.exportMetricsSchema). Doc("export the schema for all metrics"). Operation("exportmetricsSchema"). Writes(types.TimeseriesSchema{})) container.Add(ws) // 註冊metircSink相關的API,即"/api/v1/model/" if a.metricSink != nil { glog.Infof("Starting to Register Model.") a.RegisterModel(container) } if a.historicalSource != nil { a.RegisterHistorical(container) } }
官方資料中介紹heapster metric model,咱們使用到這些API也會比較多。
進入a.RegisterModel(container)看下:
func (a *Api) RegisterModel(container *restful.Container) { ws := new(restful.WebService) // 指定全部命令的prefix: "/api/v1/model" ws.Path("/api/v1/model"). Doc("Root endpoint of the stats model"). Consumes("*/*"). Produces(restful.MIME_JSON) // 在這裏增長各種命令,好比"/metrics/,/nodes/"等等 addClusterMetricsRoutes(a, ws) // 列出全部的keys ws.Route(ws.GET("/debug/allkeys"). To(metrics.InstrumentRouteFunc("debugAllKeys", a.allKeys)). Doc("Get keys of all metric sets available"). Operation("debugAllKeys")) container.Add(ws) }
繼續看addClusterMetricsRoutes():
func addClusterMetricsRoutes(a clusterMetricsFetcher, ws *restful.WebService) { 。。。 if a.isRunningInKubernetes() { // 列出全部namespaces的API ws.Route(ws.GET("/namespaces/"). To(metrics.InstrumentRouteFunc("namespaceList", a.namespaceList)). Doc("Get a list of all namespaces that have some current metrics"). Operation("namespaceList")) // 獲取指定namespaces的metrics ws.Route(ws.GET("/namespaces/{namespace-name}/metrics"). To(metrics.InstrumentRouteFunc("availableNamespaceMetrics", a.availableNamespaceMetrics)). Doc("Get a list of all available metrics for a Namespace entity"). Operation("availableNamespaceMetrics"). Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string"))) // 獲取namespace指定的metrics值 ws.Route(ws.GET("/namespaces/{namespace-name}/metrics/{metric-name:*}"). To(metrics.InstrumentRouteFunc("namespaceMetrics", a.namespaceMetrics)). Doc("Export an aggregated namespace-level metric"). Operation("namespaceMetrics"). Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")). Param(ws.PathParameter("metric-name", "The name of the requested metric").DataType("string")). Param(ws.QueryParameter("start", "Start time for requested metrics").DataType("string")). Param(ws.QueryParameter("end", "End time for requested metric").DataType("string")). Param(ws.QueryParameter("labels", "A comma-separated list of key:values pairs to use to search for a labeled metric").DataType("string")). Writes(types.MetricResult{})) 。。。 } 。。。 }
Heapster API的註冊基本就這樣了,在花點時間看下API的實現吧。
咱們挑一個例子作下分析,獲取某個pod的指定的metrics值.
對應的接口:heapster/metrics/api/v1/model_handler.go
func (a *Api) podMetrics(request *restful.Request, response *restful.Response) { a.processMetricRequest( // 根據URI傳入的ns和pod名字,拼裝成key,如:"namespace:default/pod:123" core.PodKey(request.PathParameter("namespace-name"), request.PathParameter("pod-name")), request, response) }
根據URI的輸入參數並調用processMetricRequest()接口,獲取對應的metric value:
func (a *Api) processMetricRequest(key string, request *restful.Request, response *restful.Response) { // 時間區間 start, end, err := getStartEndTime(request) if err != nil { response.WriteError(http.StatusBadRequest, err) return } // 獲取metric Name,好比"/cpu/usage" metricName := request.PathParameter("metric-name") // 根據metricName進行轉換,好比將cpu-usage轉換成cpu/usage_rate // 因此這裏須要注意cpu-usage不等於/cpu/usage,一個表示cpu使用率,一個表示cpu使用量 convertedMetricName := convertMetricName(metricName) // 獲取請求中的labels,根據是否有指定labels來調用不一樣的接口 labels, err := getLabels(request) if err != nil { response.WriteError(http.StatusBadRequest, err) return } var metrics map[string][]core.TimestampedMetricValue if labels != nil { // 該接口從metricSet.LabeledMetrics中獲取對應的value metrics = a.metricSink.GetLabeledMetric(convertedMetricName, labels, []string{key}, start, end) } else { // 該接口先從longStoreMetrics中進行匹配,匹配不到的話再從shortStore中獲取對應的metricValue metrics = a.metricSink.GetMetric(convertedMetricName, []string{key}, start, end) } // 將獲取到的metricValue轉換成MetricPoint格式的值,會有多組"時間戳+value" converted := exportTimestampedMetricValue(metrics[key]) // 將結果進行response response.WriteEntity(converted) }
OK,大功告成!API的實現也講完了,不少API都是相通的,最終都會調用相同的接口,因此不一一介紹了。
這裏須要注意heapster的API的URI還有多種寫法,好比/api/v1/model/cpu-usage,等價於/api/v1/model/cpu/usage_rate/,別誤理解成/cpu/usage了,這兩個概念不同,一個是cpu使用率,一個是cpu使用量。
上面的提醒告訴咱們,沒事多看源碼,不少誤解天然而然就解除了!
筆者能力有限,看源碼也在於學習提高能力,固然也會有較多不理解或者理解不當的地方,但願各位能予以矯正,多謝多謝!
上面的介紹完了Heapster的實現,咱們能夠思考下是否能夠動手修改源碼,好比增長一些對象的metrics信息。
筆者考慮是否能夠直接支持RC/RS/Deployment的metrics信息,讓業務層能夠直接拿到服務的總體信息。
Heapster官方資料:https://github.com/kubernetes...
InfluxDB github: https://github.com/influxdata...