公司的監控系統主要採用了小米開源的falcon,上萬臺的物理機、虛擬機、以及容器,對監控系統的存儲和性能有很大的挑戰,在底層存儲層選用了opentsdb,同時內部作了開發了tsdb-proxy模塊將數據寫入kafka和opentsdb,前期上線後,發現服務器內存佔用率一直很高,增長機器後也沒有獲得很明顯的改善,這時候考慮再次優化代碼,在看具體代碼前能夠看一下相關監控。bash
falcon tsdb-prxoy集羣機器內存使用狀況,後面明顯降低的地方是作了版本升級,發現4C8G的機器內存從50%多下降到20%多,效果很是明顯。服務器
第一部分代碼是生產環境跑的代碼convert2TsdbItem和convert2KafkaItem這兩個函數會對其監控值計算計算2次分別計算出來給TSDB和Kafka的結構體,由於計算都是在內存中實現,因此對內存使用較大。app
type Tsdb int func (this *Tsdb) Send(items []*cmodel.MetaData, resp *cmodel.SimpleRpcResponse) error { go handleItems(items) return nil } // 供外部調用、處理接收到的數據 的接口 func HandleItems(items []*cmodel.MetaData) error { handleItems(items) return nil } func handleItems(items []*cmodel.MetaData) { if items == nil { return } count := len(items) if count == 0 { return } cfg := g.Config() for i := 0; i < count; i++ { if items[i] == nil { continue } endpoint := items[i].Endpoint if !g.IsValidString(endpoint) { if cfg.Debug { log.Printf("invalid endpoint: %s", endpoint) } pfc.Meter("invalidEnpoint", 1) continue } counter := cutils.Counter(items[i].Metric, items[i].Tags) if !g.IsValidString(counter) { if cfg.Debug { log.Printf("invalid counter: %s/%s", endpoint, counter) } pfc.Meter("invalidCounter", 1) continue } dsType := items[i].CounterType step := items[i].Step checksum := items[i].Checksum() key := g.FormRrdCacheKey(checksum, dsType, step) //statistics proc.TsdbRpcRecvCnt.Incr() // To tsdb //first := store.TsdbItems.First(key) first := store.GetLastItem(key) if first == nil && items[i].Timestamp <= first.Timestamp { continue } tsdbItem,ok := convert2TsdbItem(first,items[i]) if ok { isSuccess := sender.TsdbQueue.PushFront(tsdbItem) if !isSuccess { proc.SendToTsdbDropCnt.Incr() } } kafkaItem,ok := convert2KafkaItem(first,items[i]) if ok { isSuccess := sender.KafkaQueue.PushFront(kafkaItem) if !isSuccess { proc.SendToKafkaDropCnt.Incr() } } //store.TsdbItems.PushFront(key, items[i], checksum, cfg) // To Index index.ReceiveItem(items[i], checksum) // To History store.AddItem(key, items[i]) } } //從內存索引、MySQL中刪除counter,並從磁盤上刪除對應rrd文件 func (this *Tsdb) Delete(params []*cmodel.GraphDeleteParam, resp *cmodel.GraphDeleteResp) error { resp = &cmodel.GraphDeleteResp{} for _, param := range params { err, tags := cutils.SplitTagsString(param.Tags) if err != nil { log.Error("invalid tags:", param.Tags, "error:", err) continue } var item *cmodel.MetaData = &cmodel.MetaData{ Endpoint: param.Endpoint, Metric: param.Metric, Tags: tags, CounterType: param.DsType, Step: int64(param.Step), } index.RemoveItem(item) } return nil } func convert2TsdbItem(f *cmodel.MetaData,d *cmodel.MetaData) (*cmodel.TsdbItem,bool) { t := cmodel.TsdbItem{Tags: make(map[string]string)} for k, v := range d.Tags { t.Tags[k] = v } host,exists := store.Hosts.Get(d.Endpoint) if exists { if host.AppName != "" { t.Tags["app_name"] = host.AppName } if host.GrpName != "" { t.Tags["grp_name"] = host.GrpName } if host.Room != "" { t.Tags["room"] = host.Room } if host.Env != "" { t.Tags["env"] = host.Env } } t.Tags["endpoint"] = d.Endpoint t.Metric = d.Metric t.Timestamp = d.Timestamp if d.CounterType == g.COUNTER { if f == nil { return &t,false } if f.Endpoint == "" { return &t,false } value := d.Value - f.Value if value < 0 { return &t,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return &t,false } fv := float64(value)/float64(d.Step) t.Value = math.Floor(fv*1e3 + 0.5)*1e-3 return &t,true } if d.CounterType == g.DERIVE { if f == nil { return &t,false } if f.Endpoint == "" { return &t,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return &t,false } value := d.Value - f.Value if value < 0 { return &t,false } fv := float64(value)/float64(d.Step) t.Value = math.Floor(fv*1e3 + 0.5)*1e-3 return &t,true } t.Value = d.Value return &t,true } func convert2KafkaItem(f *cmodel.MetaData,d *cmodel.MetaData) (*cmodel.KafkaItem,bool) { t := cmodel.KafkaItem{Tags: make(map[string]string)} for k, v := range d.Tags { t.Tags[k] = v } host,exists := store.Hosts.Get(d.Endpoint) if exists { t.AppName = host.AppName t.GrpName = host.GrpName t.Room = host.Room t.Env = host.Env } t.Endpoint = d.Endpoint t.Metric = d.Metric t.Timestamp = d.Timestamp t.Step = d.Step if d.CounterType == g.COUNTER { if f == nil { return &t,false } if f.Endpoint == "" { return &t,false } value := d.Value - f.Value if value < 0 { return &t,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return &t,false } fv := float64(value)/float64(d.Step) t.Value = math.Floor(fv*1e3 + 0.5)*1e-3 return &t,true } if d.CounterType == g.DERIVE { if f == nil { return &t,false } if f.Endpoint == "" { return &t,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return &t,false } value := d.Value - f.Value if value < 0 { return &t,false } fv := float64(value)/float64(d.Step) t.Value = math.Floor(fv*1e3 + 0.5)*1e-3 return &t,true } t.Value = d.Value return &t,true }
第二段代碼是我優化的,優化思路是將convert2TsdbItem和convert2KafkaItem這兩個函數中都用到的value提早計算好在須要的時候直接賦值,由於他們用的是同樣的值ide
type Tsdb int func (this *Tsdb) Send(items []*cmodel.MetaData, resp *cmodel.SimpleRpcResponse) error { go handleItems(items) return nil } // 供外部調用、處理接收到的數據 的接口 func HandleItems(items []*cmodel.MetaData) error { handleItems(items) return nil } func handleItems(items []*cmodel.MetaData) { if items == nil { return } count := len(items) if count == 0 { return } cfg := g.Config() for i := 0; i < count; i++ { if items[i] == nil { continue } endpoint := items[i].Endpoint if !g.IsValidString(endpoint) { if cfg.Debug { log.Printf("invalid endpoint: %s", endpoint) } pfc.Meter("invalidEnpoint", 1) continue } counter := cutils.Counter(items[i].Metric, items[i].Tags) if !g.IsValidString(counter) { if cfg.Debug { log.Printf("invalid counter: %s/%s", endpoint, counter) } pfc.Meter("invalidCounter", 1) continue } dsType := items[i].CounterType step := items[i].Step checksum := items[i].Checksum() key := g.FormRrdCacheKey(checksum, dsType, step) //statistics proc.TsdbRpcRecvCnt.Incr() // To tsdb //first := store.TsdbItems.First(key) first := store.GetLastItem(key) if first == nil && items[i].Timestamp <= first.Timestamp { continue } value := 0.0 if items[i].CounterType == g.GAUGE { value = items[i].Value } else { ok :=true value,ok = computerValue(first,items[i]) if !ok { return } } //tsdb結構體 tsdbItem,ok := convert2TsdbItem(value,items[i]) if ok { isSuccess := sender.TsdbQueue.PushFront(tsdbItem) if !isSuccess { proc.SendToTsdbDropCnt.Incr() } } //kafka結構體 kafkaItem,ok := convert2KafkaItem(value,items[i]) if ok { isSuccess := sender.KafkaQueue.PushFront(kafkaItem) if !isSuccess { proc.SendToKafkaDropCnt.Incr() } } //store.TsdbItems.PushFront(key, items[i], checksum, cfg) // To Index index.ReceiveItem(items[i], checksum) // To History store.AddItem(key, items[i]) } } //從內存索引、MySQL中刪除counter,並從磁盤上刪除對應rrd文件 func (this *Tsdb) Delete(params []*cmodel.GraphDeleteParam, resp *cmodel.GraphDeleteResp) error { resp = &cmodel.GraphDeleteResp{} for _, param := range params { err, tags := cutils.SplitTagsString(param.Tags) if err != nil { log.Error("invalid tags:", param.Tags, "error:", err) continue } var item *cmodel.MetaData = &cmodel.MetaData{ Endpoint: param.Endpoint, Metric: param.Metric, Tags: tags, CounterType: param.DsType, Step: int64(param.Step), } index.RemoveItem(item) } return nil } func computerValue(f *cmodel.MetaData,d *cmodel.MetaData) (float64,bool){ //計算函數 計算好value結果返回後組裝tsdb和kafka將結構體 resultvalue := 0.0 if d.CounterType == g.COUNTER { if f == nil { return resultvalue,false } if f.Endpoint == "" { return resultvalue,false } value := d.Value - f.Value if value < 0 { return resultvalue,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return resultvalue,false } fv := float64(value)/float64(d.Step) resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3 return resultvalue,true } if d.CounterType == g.DERIVE { if f == nil { return resultvalue,false } if f.Endpoint == "" { return resultvalue,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return resultvalue,false } value := d.Value - f.Value if value < 0 { return resultvalue,false } fv := float64(value)/float64(d.Step) resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3 return resultvalue,true } resultvalue = d.Value return resultvalue,true } func convert2TsdbItem(resultvalue float64,d *cmodel.MetaData) (*cmodel.TsdbItem,bool) { t := cmodel.TsdbItem{Tags: make(map[string]string)} for k, v := range d.Tags { t.Tags[k] = v } host,exists := store.Hosts.Get(d.Endpoint) if exists { if host.AppName != "" { t.Tags["app_name"] = host.AppName } if host.GrpName != "" { t.Tags["grp_name"] = host.GrpName } if host.Room != "" { t.Tags["room"] = host.Room } if host.Env != "" { t.Tags["env"] = host.Env } } t.Tags["endpoint"] = d.Endpoint t.Metric = d.Metric t.Timestamp = d.Timestamp t.Value = resultvalue return &t,true } func convert2KafkaItem(resultvalue float64,d *cmodel.MetaData) (*cmodel.KafkaItem,bool) { t := cmodel.KafkaItem{Tags: make(map[string]string)} for k, v := range d.Tags { t.Tags[k] = v } host,exists := store.Hosts.Get(d.Endpoint) if exists { t.AppName = host.AppName t.GrpName = host.GrpName t.Room = host.Room t.Env = host.Env } t.Endpoint = d.Endpoint t.Metric = d.Metric t.Timestamp = d.Timestamp t.Step = d.Step t.Value = resultvalue return &t,true } 第三段代碼如今線上 type Tsdb int func (this *Tsdb) Send(items []*cmodel.MetaData, resp *cmodel.SimpleRpcResponse) error { go handleItems(items) return nil } // 供外部調用、處理接收到的數據 的接口 func HandleItems(items []*cmodel.MetaData) error { handleItems(items) return nil } func handleItems(items []*cmodel.MetaData) { if items == nil { return } count := len(items) if count == 0 { return } cfg := g.Config() for i := 0; i < count; i++ { if items[i] == nil { continue } endpoint := items[i].Endpoint if !g.IsValidString(endpoint) { if cfg.Debug { log.Printf("invalid endpoint: %s", endpoint) } pfc.Meter("invalidEnpoint", 1) continue } counter := cutils.Counter(items[i].Metric, items[i].Tags) if !g.IsValidString(counter) { if cfg.Debug { log.Printf("invalid counter: %s/%s", endpoint, counter) } pfc.Meter("invalidCounter", 1) continue } dsType := items[i].CounterType step := items[i].Step checksum := items[i].Checksum() key := g.FormRrdCacheKey(checksum, dsType, step) //statistics proc.TsdbRpcRecvCnt.Incr() // To tsdb //first := store.TsdbItems.First(key) first := store.GetLastItem(key) if first == nil && items[i].Timestamp <= first.Timestamp { continue } value,ok := computerValue(first,items[i]) if !ok { continue } tsdbItem := &cmodel.TsdbItem{ Metric: items[i].Metric, Value: value, Timestamp: items[i].Timestamp, Tags: make(map[string]string), } tsdbItem.Tags["endpoint"] = items[i].Endpoint for k, v := range items[i].Tags { tsdbItem.Tags[k] = v } kafkaItem := &cmodel.KafkaItem { Metric: items[i].Metric, Value: value, Timestamp: items[i].Timestamp, Step: items[i].Step, Endpoint: items[i].Endpoint, } host,exists := store.Hosts.Get(items[i].Endpoint) if exists { kafkaItem.AppName = host.AppName kafkaItem.GrpName = host.GrpName kafkaItem.Room = host.Room kafkaItem.Env = host.Env if host.AppName != "" { tsdbItem.Tags["app_name"] = host.AppName } if host.GrpName != "" { tsdbItem.Tags["grp_name"] = host.GrpName } if host.Room != "" { tsdbItem.Tags["room"] = host.Room } if host.Env != "" { tsdbItem.Tags["env"] = host.Env } } isSuccess := sender.TsdbQueue.PushFront(tsdbItem) if !isSuccess { proc.SendToTsdbDropCnt.Incr() } isSuccess = sender.KafkaQueue.PushFront(kafkaItem) if !isSuccess { proc.SendToKafkaDropCnt.Incr() } //store.TsdbItems.PushFront(key, items[i], checksum, cfg) // To Index index.ReceiveItem(items[i], checksum) // To History store.AddItem(key, items[i]) } } //從內存索引、MySQL中刪除counter,並從磁盤上刪除對應rrd文件 func (this *Tsdb) Delete(params []*cmodel.GraphDeleteParam, resp *cmodel.GraphDeleteResp) error { resp = &cmodel.GraphDeleteResp{} for _, param := range params { err, tags := cutils.SplitTagsString(param.Tags) if err != nil { log.Error("invalid tags:", param.Tags, "error:", err) continue } var item *cmodel.MetaData = &cmodel.MetaData{ Endpoint: param.Endpoint, Metric: param.Metric, Tags: tags, CounterType: param.DsType, Step: int64(param.Step), } index.RemoveItem(item) } return nil } func computerValue(f *cmodel.MetaData,d *cmodel.MetaData) (float64,bool){ //計算函數 計算好value結果返回後組裝tsdb和kafka將結構體 resultvalue := 0.0 if d.CounterType == g.COUNTER { if f == nil { return resultvalue,false } if f.Endpoint == "" { return resultvalue,false } value := d.Value - f.Value if value < 0 { return resultvalue,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return resultvalue,false } fv := float64(value)/float64(d.Step) resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3 return resultvalue,true } if d.CounterType == g.DERIVE { if f == nil { return resultvalue,false } if f.Endpoint == "" { return resultvalue,false } if d.Timestamp - f.Timestamp > d.Step + d.Step/2 { return resultvalue,false } value := d.Value - f.Value if value < 0 { return resultvalue,false } fv := float64(value)/float64(d.Step) resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3 return resultvalue,true } resultvalue = d.Value return resultvalue,true }