open_falcon中數據的存儲組件graph(index cache ,rrd)

1、什麼是open_falcon?

open_falcon 是由小米開源的企業級服務器運維監控系統。html

介紹主要節點:mysql

agent:安裝在須要監控的服務器上,負責採集該服務器上的數據指標(cpu空閒率,io負載率,mem使用率等),每格一段時間採集一條數據,天天數據只包含一個被採集的目標,好比mesg1只上報cpu空閒率,msg2只上報io負載率。因此每一個msg只包含一個key-value對和消息標籤。主動推送模式。web

transfer:負責轉發消息,不少個agent上報消息,這些消息聚集到多個transfer上,由transfer對消息進行轉發,轉發到graph和judge節點。sql

judge:負責根據消息的內容進行報警,好比cpu空閒率低於30%等狀況,並且能夠配置報警級別多種報警模式等。數據庫

graph:負責歷史數據的存儲,存儲形式爲rrd文件模式。本地存有緩存。graph還負責數據的查詢工做,當須要查詢某一段的數據時,會從rrd文件和cache中合併結果返回給請求。apache

2、graph的設計

1.rrd文件

graph 既然使用了rrd文件進行存儲,首先就要了解下rrd的特性吧,瞭解rrd文件的特殊性,請參考鏈接:http://www.360doc.com/content/07/0302/20/15540_382048.shtml。感謝該做者詳細的描述。json

rrd文件總結:rrd使用一個環形的存儲空間基於時間序列的數據庫,建立時會指定最大行數,行數事後開始循環覆蓋。每一個rrd文件能夠設定多個環,每一個環的大小單獨設定,每一個環統計的數據間隔不同,好比step爲1min,環1能夠是1*step存儲一次數據,環2能夠是5*step存儲一次數據。rrd的建立、更新能夠由rrdtool來完成,不須要關心內部實現,來一個數據須要保存到哪一個環中,rrdtool本身會判斷。讀取數據時rrdtool會根據讀取的長度判斷從哪一個環獲取數據。api

每個終端(endpoint)中每一個屬性(metric)都會造成一個rrd_file。這樣會統計出超多的rrd文件。若是查詢的話,能夠去rrd文件中去獲取數據。緩存

rrd的文件名字與endpoint、metric、tags排序、dstype,step相關。可是用戶查詢數據的時候不會給出dstype和step的參數。這時rrd文件的名字就不能肯定。怎麼來肯定rrd文件的名字,藉助於mysql保存關鍵信息。看mysql信息前,先看看用戶採集的數據是什麼樣的。安全

2.採集數據的格式:

type GraphItem struct {
	Endpoint  string            `json:"endpoint"`
	Metric    string            `json:"metric"`
	Tags      map[string]string `json:"tags"`
	Value     float64           `json:"value"`
	Timestamp int64             `json:"timestamp"`
	DsType    string            `json:"dstype"`
	Step      int               `json:"step"`
	Heartbeat int               `json:"heartbeat"`
	Min       string            `json:"min"`
	Max       string            `json:"max"`
}

 

數據採集腳本[{\"metric\": \"metric.demo\", \"endpoint\": \"qd-open-falcon-judge01.hd\", \"timestamp\": $ts,\"step\": 60,\"value\": 9,\"counterType\": \"GAUGE\",\"tags\": \"project=falcon,module=judge\"}]

這裏邊成員就不解釋了,都能看的明白。

爲了減小讀寫rrd文件的次數,會在本地緩存這個接收到的數據,並且爲了快速查找文件在本地創建索引index,方便查找。

在統一的api接口api/graph.go文件中定義:

a、接受數據HandleItems()

b、查詢數據Query()

接下來 先分析HandleItems是怎麼來存儲數據和創建index的,而後再說明數據的查詢操做。

 

三、graph的接收數據HandleItems

for i := 0; i < count; i++ {       //循環全部接收到的item,分別處理
		if items[i] == nil {
			continue
		}
		dsType := items[i].DsType    //獲取item的dsType,dstype是什麼意思去看rrd文檔。
		step := items[i].Step       //獲取item的step,就是數據採集的步長。
		checksum := items[i].Checksum()  //根據item計算checksum =md5[endpoint/metric/tag]
		//生成rrd緩存的key  return fmt.Sprintf("%s_%s_%d", md5, dsType, step)
		ckey := g.FormRrdCacheKey(checksum, dsType, step)

		//statistics
		proc.GraphRpcRecvCnt.Incr()

		// To Graph
		first := store.GraphItems.First(ckey)
		if first != nil && items[i].Timestamp <= first.Timestamp {
			continue
		}
        //添加到item的本地緩存中
		store.GraphItems.PushFront(ckey, items[i])

		// To Index 創建本地索引
		index.ReceiveItem(items[i], checksum)

		// To History 暫時不明作什麼用的
		store.AddItem(checksum, items[i])
	}

從上段代碼的註釋中能夠知曉數據的存儲過程。

a.GraphItems.PushFront(ckey,item[i]).放入到本地緩存,按期刷寫到磁盤中。

b.index.ReceiveItem(item[i],checksum)放入本地索引中,建立索引。

 

四、本地緩存item,並按期刷寫磁盤。

在上面的HandleItems中能夠看出,接受到的item都添加到GraphItem的緩存中。

在/graph/rrdtool/syncdisk.go中有syncDisk()的操做,syncDisk中會定時的對緩存GraphItem進行刷寫操做。

當刷寫到磁盤時,刪除本地緩存中的item。syncDisk->flushRrd(),flushRrd函數在rrdtool.go中。這部分代碼簡單,剩下實現的部分由rrdtool提供,涉及到rrd文件更新,這裏很少說。

 

五、創建本地索引ReceiveItem,增量添加到數據庫mysql中。

// index收到一條新上報的監控數據,嘗試用於增量更新索引
func ReceiveItem(item *cmodel.GraphItem, md5 string) {
	if item == nil {
		return
	}
	uuid := item.UUID()
	// 已上報過的數據
	if indexedItemCache.ContainsKey(md5) {
		old := indexedItemCache.Get(md5).(*IndexCacheItem)
		if uuid == old.UUID { // dsType+step沒有發生變化,只更新緩存 TODO 存在線程安全的問題
			old.Item = item
		} else { // dsType+step變化了,當成一個新的增量來處理(甚至,不用rrd文件來過濾)
			//indexedItemCache.Remove(md5)
			unIndexedItemCache.Put(md5, NewIndexCacheItem(uuid, item))
		}
		return
	}

	//省略一些代碼.....

	// 緩存未命中, 放入增量更新隊列
	unIndexedItemCache.Put(md5, NewIndexCacheItem(uuid, item))
}

當索引接受到數據後,經過數據的checksum值來肯定是否是這個來自endpoint的metric是不是第一次採集數據。

若是不是第一次採集數據,則在indexedItemCache中可以找到,而且若是uuid沒變則只更新item。若是uuid變了從新index操做(涉及dstype和step的改變)。

若是是第一次數據採集,在indexeditemCache中找不到,添加到unindexeditemCache中,等待被索引。

創建增量索引操做index_update_incr_task.go/StartIndexUpdateIncrTask 操做,他會定時的啓動updateIndexIncr()操做。

keys := unIndexedItemCache.Keys()
	for _, key := range keys {
		icitem := unIndexedItemCache.Get(key)
		unIndexedItemCache.Remove(key)
		if icitem != nil {
			// 併發更新mysql
			semaUpdateIndexIncr.Acquire()
			go func(key string, icitem *IndexCacheItem, dbConn *sql.DB) {
				defer semaUpdateIndexIncr.Release()
				err := maybeUpdateIndexFromOneItem(icitem.Item, dbConn)
				if err != nil {
					proc.IndexUpdateIncrErrorCnt.Incr()
				} else {
					indexedItemCache.Put(key, icitem)
				}
			}(key, icitem.(*IndexCacheItem), dbConn)
			ret++
		}
	}
這裏的key由(t.Endpoint, t.Metric, t.Tags)計算得出。unIndexedItemCache中根據key來存儲item。每一個key保存一個最新的item。

由這個最新的item去maybeUpdateIndexFromOneItem,由這個函數去更新DB中的表。

這裏有三個表須要更新:

a、endpoint 表。該表記錄了全部上報數據的endpoint,而且爲每個endpoint生成一個id即 endpoint_id。

b、tag_endpoint表。拆解item的每個tag。用tag和endpoint造成一個主鍵的表。記錄每一個endpoint包含的tag。每條記錄生成一個id,爲tagendpoint_id

c、endpoint_counter表。counter是metric和tags組合後的名詞。看做是一個總體。

表結構以下:

mysql> show columns from endpoint
    -> ;
+----------+------------------+------+-----+-------------------+-----------------------------+
| Field    | Type             | Null | Key | Default           | Extra                       |
+----------+------------------+------+-----+-------------------+-----------------------------+
| id       | int(10) unsigned | NO   | PRI | NULL              | auto_increment              |
| endpoint | varchar(255)     | NO   | UNI |                   |                             |
| ts       | int(11)          | YES  |     | NULL              |                             |
| t_create | datetime         | NO   |     | NULL              |                             |
| t_modify | timestamp        | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+----------+------------------+------+-----+-------------------+-----------------------------+
mysql> show columns from tag_endpoint;
+-------------+------------------+------+-----+-------------------+-----------------------------+
| Field       | Type             | Null | Key | Default           | Extra                       |
+-------------+------------------+------+-----+-------------------+-----------------------------+
| id          | int(10) unsigned | NO   | PRI | NULL              | auto_increment              |
| tag         | varchar(255)     | NO   | MUL |                   |                             |
| endpoint_id | int(10) unsigned | NO   |     | NULL              |                             |
| ts          | int(11)          | YES  |     | NULL              |                             |
| t_create    | datetime         | NO   |     | NULL              |                             |
| t_modify    | timestamp        | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------+------------------+------+-----+-------------------+-----------------------------+
6 rows in set (0.00 sec)

mysql> show columns from endpoint_counter;
+-------------+------------------+------+-----+-------------------+-----------------------------+
| Field       | Type             | Null | Key | Default           | Extra                       |
+-------------+------------------+------+-----+-------------------+-----------------------------+
| id          | int(10) unsigned | NO   | PRI | NULL              | auto_increment              |
| endpoint_id | int(10) unsigned | NO   | MUL | NULL              |                             |
| counter     | varchar(255)     | NO   |     |                   |                             |
| step        | int(11)          | NO   |     | 60                |                             |
| type        | varchar(16)      | NO   |     | NULL              |                             |
| ts          | int(11)          | YES  |     | NULL              |                             |
| t_create    | datetime         | NO   |     | NULL              |                             |
| t_modify    | timestamp        | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+-------------+------------------+------+-----+-------------------+-----------------------------+
8 rows in set (0.00 sec)

爲何要創建index'索引呢?

a、.爲了查詢的速度嗎? 其實查詢的時候查的本地緩存item中和rrd文件中的數據,這些與index中保存的數據關係不大,並且index中保存的採集相同項只有一份。因此認爲這裏是數據緩存意義不大。

b、其實這個index最後都轉化成了這三個表。這三個表的意義呢?在於何處? 答案是在與rrd文件的索引。表中並無直接保存rrd文件的名字。若是查詢的時候該怎麼知道去查詢哪個rrd文件呢?不可能全部的rrd文件的頭部都掃描一遍。。。。。這時就體現了這個數據庫表的意義了。

 

首先回顧一下rrd文件的命名:filename := 

func RrdFileName(baseDir string, md5 string, dsType string, step int) string {
	return fmt.Sprintf("%s/%s/%s_%s_%d.rrd", baseDir, md5[0:2], md5, dsType, step)
}

由md五、dstype、step決定。而md5前面也說了 與  Endpoint,Metric,Tags相關。因此 filename=f(Endpoint,Metric,Tags,dstype,step)相關,因此要準確的找到rrd文件,必須湊齊這5個元素。

 

可是問題是查詢的時候根本不給這5個條件,而是給了?

type GraphQueryParam struct {
	Start     int64  `json:"start"`
	End       int64  `json:"end"`
	ConsolFun string `json:"consolFuc"`
	Endpoint  string `json:"endpoint"`
	Counter   string `json:"counter"`
}

有效的是 時間段start和end、endpoint、counter。counter是(Metric,Tags)的組合。爲了湊齊5個條件組成rrdfilename缺乏的是dstype和step。那麼這時候能夠根據endpoint和counter在endpoint_counter表中找到dstype和step。而後組合成rrdfilename。進行讀取數據。

六、query

//--------->查詢的輸入參數GraphQueryParam,上面也介紹了
func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryResponse) error {
	// statistics
	proc.GraphQueryCnt.Incr()

	// form empty response
	resp.Values = []*cmodel.RRDData{}  //------》用於存放獲取的數據
	resp.Endpoint = param.Endpoint          // -------》數據的endpoint
	resp.Counter = param.Counter         // ----------》數據的counter信息
	dsType, step, exists := index.GetTypeAndStep(param.Endpoint, param.Counter) // complete dsType and step //----------->從緩存或者DB中獲取dstype和step。這裏DB一樣使用了一層本身的緩存,能夠本身去看下
	if !exists {
		return nil
	}
	resp.DsType = dsType
	resp.Step = step

	start_ts := param.Start - param.Start%int64(step)      //------》根據step對齊整理start時間
	end_ts := param.End - param.End%int64(step) + int64(step) //-----》根據step對齊整理end時間
	if end_ts-start_ts-int64(step) < 1 {
		return nil
	}

	md5 := cutils.Md5(param.Endpoint + "/" + param.Counter) //------->計算md5值,用於計算key值
	ckey := g.FormRrdCacheKey(md5, dsType, step)              //-----》計算key值,用於緩存索引,這個緩存是數據緩存,不是index緩存
	filename := g.RrdFileName(g.Config().RRD.Storage, md5, dsType, step)  //還原rrd文件名字
	// read data from rrd file
	datas, _ := rrdtool.Fetch(filename, param.ConsolFun, start_ts, end_ts, step) //從rrd中獲取數據,從rrd中獲取數據須要指定獲取數據的時間段。
	datas_size := len(datas)
	// read cached items
	items := store.GraphItems.FetchAll(ckey)  //根據key值,在數據緩存中獲取數據。
	items_size := len(items)

最後根據 從rrd中獲取的數據和從數據緩存中獲取的數據進行合併,輸出。完成查詢。

 

2、數據轉儲

數據一直保存在rrd文件中,能夠進行查詢,可是不適合大量數據的分析預測等過程,因此決定將採集的機器數據進行轉儲到hdfs上。在hdfs上可用於spark、mapreduce的處理分析工做。

轉儲原理:利用hdfs的http api接口進行轉儲。hdfs的http api接口查詢,請參考:http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#HTTP+Response+Codes

主要操做在graph增長一個隊列listitem用於緩存item數據,而後啓動一個轉儲線程,定時批量的轉儲item到hdfs中。

爲何要定時批量的呢? 由於hdfs的接口不能頻繁的提交數據。若是頻繁的append數據。hdfs會報錯,是hdfs的問題,尚未解決。因此採用規避問題的方法。  

相關文章
相關標籤/搜索