上萬臺服務器設備監控系統--falcon二次開發模塊優化

   公司的監控系統主要採用了小米開源的falcon,上萬臺的物理機、虛擬機、以及容器,對監控系統的存儲和性能有很大的挑戰,在底層存儲層選用了opentsdb,同時內部作了開發了tsdb-proxy模塊將數據寫入kafka和opentsdb,前期上線後,發現服務器內存佔用率一直很高,增長機器後也沒有獲得很明顯的改善,這時候考慮再次優化代碼,在看具體代碼前能夠看一下相關監控。bash


 falcon tsdb-prxoy集羣機器內存使用狀況,後面明顯降低的地方是作了版本升級,發現4C8G的機器內存從50%多下降到20%多,效果很是明顯。服務器

image.png

第一部分代碼是生產環境跑的代碼convert2TsdbItem和convert2KafkaItem這兩個函數會對其監控值計算計算2次分別計算出來給TSDB和Kafka的結構體,由於計算都是在內存中實現,因此對內存使用較大。app

type Tsdb int

func (this *Tsdb) Send(items []*cmodel.MetaData, resp *cmodel.SimpleRpcResponse) error {
	go handleItems(items)
	return nil
}

// 供外部調用、處理接收到的數據 的接口
func HandleItems(items []*cmodel.MetaData) error {
	handleItems(items)
	return nil
}

func handleItems(items []*cmodel.MetaData) {
	if items == nil {
		return
	}

	count := len(items)
	if count == 0 {
		return
	}

	cfg := g.Config()

	for i := 0; i < count; i++ {
		if items[i] == nil {
			continue
		}

		endpoint := items[i].Endpoint
		if !g.IsValidString(endpoint) {
			if cfg.Debug {
				log.Printf("invalid endpoint: %s", endpoint)
			}
			pfc.Meter("invalidEnpoint", 1)
			continue
		}

		counter := cutils.Counter(items[i].Metric, items[i].Tags)
		if !g.IsValidString(counter) {
			if cfg.Debug {
				log.Printf("invalid counter: %s/%s", endpoint, counter)
			}
			pfc.Meter("invalidCounter", 1)
			continue
		}

		dsType := items[i].CounterType
		step := items[i].Step
		checksum := items[i].Checksum()
		key := g.FormRrdCacheKey(checksum, dsType, step)

		//statistics
		proc.TsdbRpcRecvCnt.Incr()

		// To tsdb
		//first := store.TsdbItems.First(key)
		first := store.GetLastItem(key)
		if first == nil && items[i].Timestamp <= first.Timestamp {
			continue
		}
		tsdbItem,ok := convert2TsdbItem(first,items[i])
		if ok {
			isSuccess := sender.TsdbQueue.PushFront(tsdbItem)
			if !isSuccess {
				proc.SendToTsdbDropCnt.Incr()
			}
		}

		kafkaItem,ok := convert2KafkaItem(first,items[i])
		if ok {
			isSuccess := sender.KafkaQueue.PushFront(kafkaItem)
			if !isSuccess {
				proc.SendToKafkaDropCnt.Incr()
			}
		}


		//store.TsdbItems.PushFront(key, items[i], checksum, cfg)

		// To Index
		index.ReceiveItem(items[i], checksum)

		// To History
		store.AddItem(key, items[i])
	}
}


//從內存索引、MySQL中刪除counter,並從磁盤上刪除對應rrd文件
func (this *Tsdb) Delete(params []*cmodel.GraphDeleteParam, resp *cmodel.GraphDeleteResp) error {
	resp = &cmodel.GraphDeleteResp{}
	for _, param := range params {
		err, tags := cutils.SplitTagsString(param.Tags)
		if err != nil {
			log.Error("invalid tags:", param.Tags, "error:", err)
			continue
		}

		var item *cmodel.MetaData = &cmodel.MetaData{
			Endpoint: param.Endpoint,
			Metric:   param.Metric,
			Tags:     tags,
			CounterType:   param.DsType,
			Step:     int64(param.Step),
		}
		index.RemoveItem(item)
	}

	return nil
}

func convert2TsdbItem(f *cmodel.MetaData,d *cmodel.MetaData) (*cmodel.TsdbItem,bool) {
	t := cmodel.TsdbItem{Tags: make(map[string]string)}

	for k, v := range d.Tags {
		t.Tags[k] = v
	}
	host,exists := store.Hosts.Get(d.Endpoint)
	if exists {
		if host.AppName != "" {
			t.Tags["app_name"] = host.AppName
		}
		if host.GrpName != "" {
			t.Tags["grp_name"] = host.GrpName
		}

		if host.Room != "" {
			t.Tags["room"] = host.Room
		}

		if host.Env != "" {
			t.Tags["env"] = host.Env
		}
	}
	t.Tags["endpoint"] = d.Endpoint
	t.Metric = d.Metric
	t.Timestamp = d.Timestamp
	if d.CounterType == g.COUNTER {
		if f == nil {
			return &t,false
		}
		if f.Endpoint == "" {
			return &t,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return &t,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return &t,false
		}

		fv := float64(value)/float64(d.Step)
		t.Value = math.Floor(fv*1e3 + 0.5)*1e-3
		return &t,true
	}
	if d.CounterType == g.DERIVE {
		if f == nil {
			return &t,false
		}
		if f.Endpoint == "" {
			return &t,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return &t,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return &t,false
		}
		fv := float64(value)/float64(d.Step)
		t.Value = math.Floor(fv*1e3 + 0.5)*1e-3
		return &t,true
	}
	t.Value = d.Value
	return &t,true
}


func convert2KafkaItem(f *cmodel.MetaData,d *cmodel.MetaData) (*cmodel.KafkaItem,bool) {
	t := cmodel.KafkaItem{Tags: make(map[string]string)}

	for k, v := range d.Tags {
		t.Tags[k] = v
	}
	host,exists := store.Hosts.Get(d.Endpoint)
	if exists {
		t.AppName = host.AppName
		t.GrpName = host.GrpName
		t.Room = host.Room
		t.Env = host.Env

	}
	t.Endpoint = d.Endpoint
	t.Metric = d.Metric
	t.Timestamp = d.Timestamp
	t.Step = d.Step
	if d.CounterType == g.COUNTER {
		if f == nil {
			return &t,false
		}
		if f.Endpoint == "" {
			return &t,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return &t,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return &t,false
		}

		fv := float64(value)/float64(d.Step)
		t.Value = math.Floor(fv*1e3 + 0.5)*1e-3
		return &t,true
	}
	if d.CounterType == g.DERIVE {
		if f == nil {
			return &t,false
		}
		if f.Endpoint == "" {
			return &t,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return &t,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return &t,false
		}
		fv := float64(value)/float64(d.Step)
		t.Value = math.Floor(fv*1e3 + 0.5)*1e-3
		return &t,true
	}
	t.Value = d.Value
	return &t,true
}

image.png

第二段代碼是我優化的,優化思路是將convert2TsdbItem和convert2KafkaItem這兩個函數中都用到的value提早計算好在須要的時候直接賦值,由於他們用的是同樣的值ide

type Tsdb int

func (this *Tsdb) Send(items []*cmodel.MetaData, resp *cmodel.SimpleRpcResponse) error {
	go handleItems(items)
	return nil
}

// 供外部調用、處理接收到的數據 的接口
func HandleItems(items []*cmodel.MetaData) error {
	handleItems(items)
	return nil
}

func handleItems(items []*cmodel.MetaData) {
	if items == nil {
		return
	}

	count := len(items)
	if count == 0 {
		return
	}

	cfg := g.Config()

	for i := 0; i < count; i++ {
		if items[i] == nil {
			continue
		}

		endpoint := items[i].Endpoint
		if !g.IsValidString(endpoint) {
			if cfg.Debug {
				log.Printf("invalid endpoint: %s", endpoint)
			}
			pfc.Meter("invalidEnpoint", 1)
			continue
		}

		counter := cutils.Counter(items[i].Metric, items[i].Tags)
		if !g.IsValidString(counter) {
			if cfg.Debug {
				log.Printf("invalid counter: %s/%s", endpoint, counter)
			}
			pfc.Meter("invalidCounter", 1)
			continue
		}

		dsType := items[i].CounterType
		step := items[i].Step
		checksum := items[i].Checksum()
		key := g.FormRrdCacheKey(checksum, dsType, step)

		//statistics
		proc.TsdbRpcRecvCnt.Incr()

		// To tsdb
		//first := store.TsdbItems.First(key)
		first := store.GetLastItem(key)
		if first == nil && items[i].Timestamp <= first.Timestamp {
			continue
		}
		value := 0.0
		if items[i].CounterType == g.GAUGE {
			value = items[i].Value
		} else {
			 ok :=true
			value,ok = computerValue(first,items[i])
			if !ok {
				return
			}
		}

		//tsdb結構體
		tsdbItem,ok := convert2TsdbItem(value,items[i])
		if ok {
			isSuccess := sender.TsdbQueue.PushFront(tsdbItem)
			if !isSuccess {
				proc.SendToTsdbDropCnt.Incr()
			}
		}

		//kafka結構體
		kafkaItem,ok := convert2KafkaItem(value,items[i])
		if ok {
			isSuccess := sender.KafkaQueue.PushFront(kafkaItem)
			if !isSuccess {
				proc.SendToKafkaDropCnt.Incr()
			}
		}


		//store.TsdbItems.PushFront(key, items[i], checksum, cfg)

		// To Index
		index.ReceiveItem(items[i], checksum)

		// To History
		store.AddItem(key, items[i])
	}
}


//從內存索引、MySQL中刪除counter,並從磁盤上刪除對應rrd文件
func (this *Tsdb) Delete(params []*cmodel.GraphDeleteParam, resp *cmodel.GraphDeleteResp) error {
	resp = &cmodel.GraphDeleteResp{}
	for _, param := range params {
		err, tags := cutils.SplitTagsString(param.Tags)
		if err != nil {
			log.Error("invalid tags:", param.Tags, "error:", err)
			continue
		}

		var item *cmodel.MetaData = &cmodel.MetaData{
			Endpoint: param.Endpoint,
			Metric:   param.Metric,
			Tags:     tags,
			CounterType:   param.DsType,
			Step:     int64(param.Step),
		}
		index.RemoveItem(item)
	}

	return nil
}

func  computerValue(f *cmodel.MetaData,d *cmodel.MetaData) (float64,bool){
	 //計算函數 計算好value結果返回後組裝tsdb和kafka將結構體
	 resultvalue := 0.0
	if d.CounterType == g.COUNTER {
		if f == nil {
			return resultvalue,false
		}
		if f.Endpoint == "" {
			return resultvalue,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return resultvalue,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return resultvalue,false
		}

		fv := float64(value)/float64(d.Step)
		resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3
		return resultvalue,true
	}
	if d.CounterType == g.DERIVE {
		if f == nil {
			return resultvalue,false
		}
		if f.Endpoint == "" {
			return resultvalue,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return resultvalue,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return resultvalue,false
		}
		fv := float64(value)/float64(d.Step)
		resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3
		return resultvalue,true
	}
	resultvalue = d.Value
	return resultvalue,true
}

func convert2TsdbItem(resultvalue float64,d *cmodel.MetaData) (*cmodel.TsdbItem,bool) {
	t := cmodel.TsdbItem{Tags: make(map[string]string)}

	for k, v := range d.Tags {
		t.Tags[k] = v
	}
	host,exists := store.Hosts.Get(d.Endpoint)
	if exists {
		if host.AppName != "" {
			t.Tags["app_name"] = host.AppName
		}
		if host.GrpName != "" {
			t.Tags["grp_name"] = host.GrpName
		}

		if host.Room != "" {
			t.Tags["room"] = host.Room
		}

		if host.Env != "" {
			t.Tags["env"] = host.Env
		}
	}
	t.Tags["endpoint"] = d.Endpoint
	t.Metric = d.Metric
	t.Timestamp = d.Timestamp
	t.Value = resultvalue

	return &t,true
}



func convert2KafkaItem(resultvalue  float64,d *cmodel.MetaData) (*cmodel.KafkaItem,bool) {
	t := cmodel.KafkaItem{Tags: make(map[string]string)}

	for k, v := range d.Tags {
		t.Tags[k] = v
	}
	host,exists := store.Hosts.Get(d.Endpoint)
	if exists {
		t.AppName = host.AppName
		t.GrpName = host.GrpName
		t.Room = host.Room
		t.Env = host.Env

	}
	t.Endpoint = d.Endpoint
	t.Metric = d.Metric
	t.Timestamp = d.Timestamp
	t.Step = d.Step
	t.Value = resultvalue
	return &t,true
}






第三段代碼如今線上
type Tsdb int

func (this *Tsdb) Send(items []*cmodel.MetaData, resp *cmodel.SimpleRpcResponse) error {
	go handleItems(items)
	return nil
}

// 供外部調用、處理接收到的數據 的接口
func HandleItems(items []*cmodel.MetaData) error {
	handleItems(items)
	return nil
}

func handleItems(items []*cmodel.MetaData) {
	if items == nil {
		return
	}

	count := len(items)
	if count == 0 {
		return
	}

	cfg := g.Config()

	for i := 0; i < count; i++ {
		if items[i] == nil {
			continue
		}

		endpoint := items[i].Endpoint
		if !g.IsValidString(endpoint) {
			if cfg.Debug {
				log.Printf("invalid endpoint: %s", endpoint)
			}
			pfc.Meter("invalidEnpoint", 1)
			continue
		}

		counter := cutils.Counter(items[i].Metric, items[i].Tags)
		if !g.IsValidString(counter) {
			if cfg.Debug {
				log.Printf("invalid counter: %s/%s", endpoint, counter)
			}
			pfc.Meter("invalidCounter", 1)
			continue
		}

		dsType := items[i].CounterType
		step := items[i].Step
		checksum := items[i].Checksum()
		key := g.FormRrdCacheKey(checksum, dsType, step)

		//statistics
		proc.TsdbRpcRecvCnt.Incr()

		// To tsdb
		//first := store.TsdbItems.First(key)
		first := store.GetLastItem(key)
		if first == nil && items[i].Timestamp <= first.Timestamp {
			continue
		}

		value,ok := computerValue(first,items[i])
		if !ok {
				continue
		}

		tsdbItem := &cmodel.TsdbItem{
			Metric:    items[i].Metric,
			Value:     value,
			Timestamp: items[i].Timestamp,
			Tags: make(map[string]string),
		}

		tsdbItem.Tags["endpoint"] = items[i].Endpoint
		for k, v := range items[i].Tags {
			tsdbItem.Tags[k] = v
		}

		kafkaItem := &cmodel.KafkaItem {
			Metric:    items[i].Metric,
			Value:     value,
			Timestamp: items[i].Timestamp,
			Step:      items[i].Step,
			Endpoint:  items[i].Endpoint,
		}
		host,exists := store.Hosts.Get(items[i].Endpoint)
		if exists {
			kafkaItem.AppName = host.AppName
			kafkaItem.GrpName = host.GrpName
			kafkaItem.Room = host.Room
			kafkaItem.Env = host.Env

			if host.AppName != "" {
				tsdbItem.Tags["app_name"] = host.AppName
			}
			if host.GrpName != "" {
				tsdbItem.Tags["grp_name"] = host.GrpName
			}

			if host.Room != "" {
				tsdbItem.Tags["room"] = host.Room
			}

			if host.Env != "" {
				tsdbItem.Tags["env"] = host.Env
			}

		}

		isSuccess := sender.TsdbQueue.PushFront(tsdbItem)
		if !isSuccess {
			proc.SendToTsdbDropCnt.Incr()
		}


		isSuccess = sender.KafkaQueue.PushFront(kafkaItem)
		if !isSuccess {
			proc.SendToKafkaDropCnt.Incr()
		}

		//store.TsdbItems.PushFront(key, items[i], checksum, cfg)

		// To Index
		index.ReceiveItem(items[i], checksum)

		// To History
		store.AddItem(key, items[i])
	}
}


//從內存索引、MySQL中刪除counter,並從磁盤上刪除對應rrd文件
func (this *Tsdb) Delete(params []*cmodel.GraphDeleteParam, resp *cmodel.GraphDeleteResp) error {
	resp = &cmodel.GraphDeleteResp{}
	for _, param := range params {
		err, tags := cutils.SplitTagsString(param.Tags)
		if err != nil {
			log.Error("invalid tags:", param.Tags, "error:", err)
			continue
		}

		var item *cmodel.MetaData = &cmodel.MetaData{
			Endpoint: param.Endpoint,
			Metric:   param.Metric,
			Tags:     tags,
			CounterType:   param.DsType,
			Step:     int64(param.Step),
		}
		index.RemoveItem(item)
	}

	return nil
}

func  computerValue(f *cmodel.MetaData,d *cmodel.MetaData) (float64,bool){
	 //計算函數 計算好value結果返回後組裝tsdb和kafka將結構體
	 resultvalue := 0.0
	if d.CounterType == g.COUNTER {
		if f == nil {
			return resultvalue,false
		}
		if f.Endpoint == "" {
			return resultvalue,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return resultvalue,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return resultvalue,false
		}

		fv := float64(value)/float64(d.Step)
		resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3
		return resultvalue,true
	}
	if d.CounterType == g.DERIVE {
		if f == nil {
			return resultvalue,false
		}
		if f.Endpoint == "" {
			return resultvalue,false
		}
		if d.Timestamp - f.Timestamp > d.Step + d.Step/2 {
			return resultvalue,false
		}
		value := d.Value - f.Value
		if value < 0 {
			return resultvalue,false
		}
		fv := float64(value)/float64(d.Step)
		resultvalue = math.Floor(fv*1e3 + 0.5)*1e-3
		return resultvalue,true
	}
	resultvalue = d.Value
	return resultvalue,true
}
相關文章
相關標籤/搜索