時序數據庫永遠的難關 — 時間線膨脹(高基數 Cardinality)問題的解決方案

做者 | 徐建偉 (竹影)node

前序

隨着移動端發展走向飽和,如今整個 IT 行業都期待着「萬物互聯」的物聯網時代。在物聯網場景中,每每有許多各種不一樣的終端設備,佈署在不一樣的位置,去採集各類數據,好比某一區域有 10萬個 loT 設備,每一個 loT 設備每 5 秒發送一次數據。那麼每一年會產生 6307億 個數據點。而這些數據都是順序產生的,而且 loT 設備產生數據的格式所有是一致的,而且沒有刪除和修改的需求。針對這樣按時海量寫入無更新場景,時序數據庫應運而生。git

時序數據庫在假定沒有數據插入和更新需求,數據結構穩定的前提下,極限追求快速寫入,高壓縮,快速檢索數據。時序數據的 Label(tag)會創建索引,以提升查詢性能,以便你能夠快速找到與全部指定標籤匹配的值。若是 Label(tag)值的數量過多時(高基數 Cardinality 問題),索引會出現各類各樣的問題, 本文主要討論 influxdb 在遇到寫入的數據出現高基數 Cardinality 問題時,一些可行的解決方案。

github

高基數Cardinality問題(時間線膨脹)

時序數據庫主要存儲的是 metric 數據,每一條數據稱爲一個樣本(sample),樣本由如下三部分組成:golang

  • 指標(時間線 time-series):metric name 和描述當前樣本特徵的 labelsets;
  • 時間戳(timestamp):一個精確到毫秒的時間戳;
  • 樣本值(value):表示當前樣本的值。

<-------------- time-series="" --------=""><-timestamp -----=""> <-value->數據庫

node_cpu{cpu=「cpu0」,mode=「idle」} @1627339366586 70
node_cpu{cpu=「cpu0」,mode=「sys」} @1627339366586 5
node_cpu{cpu=「cpu0」,mode=「user」} @1627339366586 25

編程

一般狀況下, time-series 中的 lablelsets 是有限的,可枚舉的,好比上面的例子 model 可選值爲 idle,sys,user。

數組

prometheus 官方文檔中對於 Label 的建議:數據結構

CAUTION: Remember that every unique combination of key-value label pairs represents a new time series, which can dramatically increase the amount of data stored. Do not use labels to store dimensions with high cardinality (many different label values), such as user IDs, email addresses, or other unbounded sets of values.

app

時序數據庫的設計時,也是假定在時間線低基數的前提下。可是隨着 metric 的普遍使用,在不少場景下沒法避免出現時間線膨脹。

less

好比,在雲原生場景下 tag 出現 pod/container ID之類,也有些 tag 出現 userId,甚至有些 tag 是 url,而這些 tag 組合時,時間線膨脹得很是厲害。

這個矛盾出現是必然的,怎麼解決呢?是寫入數據方調整寫入數據時,控制寫入的 time-series的數量,仍是時序數據庫去更改設計來適用這種場景?這個問題沒有完美的解決方案,咱們須要作出平衡。

從實際狀況出發,若是時間線膨脹後,時序數據庫不會出現不可用,性能也不會出現指數級別降低。也就是說時間線不膨脹時,性能優秀。時間線膨脹後,性能能達到良好或者及格就好。

那怎麼讓時序數據庫在時間線膨脹的狀況下性能還能良好呢?接下來咱們經過influxdb的源碼來討論這個問題。

時間線的處理邏輯


influxdb 的 tsm 結構,主要的邏輯處理過程相似 lsm。數據上報後,會添加到 cache 和日誌文件(wal)。爲了加快檢索速度或者壓縮比例,會對上報的數據進行 compaction(數據文件合併,從新構建索引)。

索引涉及到三個方面:

  • TSI(Time Series Index)檢索Measurement,tag,tagval,time

  • TSM(Time-Structured Merge Tree)用來檢索time-series -> value

  • Series Segment Index 用來檢索 time-series key <–> time-series Id

具體influxdb的索引實現能夠參照官方文章。
(https://github.com/influxdata/influxdb/blob/master/tsdb/index/tsi1/doc.go)

在這裏插入圖片描述

當時間線膨脹後,TSI 和 TSM 的檢索性能降低並不嚴重,問題主要是出如今 Series Segment Index 裏。

這節咱們會討論influxdb的時間線文件的正排索引(time-series key ->id, id->time-series key):

  • SeriesFile 是 Database(bucket)級別的。
  • SeriesIndex 主要處理 key->Id, key->id 的索引映射。
  • SeriesSegment 主要存放的是 Series 的 Id 和 key。
  • SeriesIndex 裏面是存放 Series 的 Id 和 key 等索引。(能夠理解是兩個 hashmap)
  • keyIDMap 經過 key 來查找對應的 Id。
  • idOffsetMap 經過 Id 查到到 offset,經過這個 offset(對應 SeriesSegment 的位置)來查找 SeriesSegment 文件獲取 key。

![在這裏插入圖片描述](https://img-blog.csdnimg.cn/723084a949334e57954bdd241de3c603.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zOTg2MDkxNQ==,size_16,color_FFFFFF,t_70#pic_center)

具體的代碼(influxdb 2.0.7)以下:

tsdb/series_partition.go:30 // SeriesPartition represents a subset of series file data. type SeriesPartition struct { ... segments []*SeriesSegment index *SeriesIndex seq uint64 // series id sequence .... } tsdb/series_index.go:36 // SeriesIndex represents an index of key-to-id & id-to-offset mappings. type SeriesIndex struct { path string ... data []byte // mmap data keyIDData []byte // key/id mmap data idOffsetData []byte // id/offset mmap data // In-memory data since rebuild. keyIDMap *rhh.HashMap idOffsetMap map[uint64]int64 tombstones map[uint64]struct{} }

對 series key 進行檢索時,會先在內存 map 中查找,而後在磁盤的 map 上查找,具體的實現代碼以下:

tsdb/series_index.go:185 func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 { // 內存map查找 if v := idx.keyIDMap.Get(key); v != nil { if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) { return id } } if len(idx.data) == 0 { return 0 } hash := rhh.HashKey(key) for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { // 磁盤map查找offset elem := idx.keyIDData[(pos * SeriesIndexElemSize):] elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) if elemOffset == 0 { return 0 } // 經過offset獲取對於的id elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) elemHash := rhh.HashKey(elemKey) if d > rhh.Dist(elemHash, pos, idx.capacity) { return 0 } else if elemHash == hash && bytes.Equal(elemKey, key) { id := binary.BigEndian.Uint64(elem[8:]) if idx.IsDeleted(id) { return 0 } return id } } }

這裏補充一個知識點,將內存 hashmap 轉成磁盤 hashmap 的實現。咱們都知道 hashmap 的存儲是數組,influfxdb 中的實現是經過 mmap 方式映射磁盤空間(見 SeriesIndex 的 keyIDData),而後經過 hash 訪問數組地址,採用的 Robin Hood Hashing,符合內存局部性原理(查找邏輯的代碼如上 series_index.go 中)。將 Robin Hood Hashtable 純手動移植磁盤 hashtable, 開發人員仍是花了很多心思。


那內存 map 和磁盤 map 是如何生成的,爲何須要兩個 map?

influxdb 的作法是將新增的 series key 先放到內存 hashmap 裏面,當內存 hashmap 增加大於閾值時,將內存 hashmap 和磁盤 hashmap 進行 merge(遍歷全部 SeriesSegment,過濾已經刪除的 series key)生成一個新的磁盤 hashmap,這個過程叫作 compaction。compation 結束後內存 hashmap 被清空,而後繼續存放新增的 series key。


在這裏插入圖片描述

tsdb/series_partition.go:200 // Check if we've crossed the compaction threshold. if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) && p.compactionLimiter.TryTake() { p.compacting = true log, logEnd := logger.NewOperation(context.TODO(), p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path)) p.wg.Add(1) go func() { defer p.wg.Done() defer p.compactionLimiter.Release() compactor := NewSeriesPartitionCompactor() compactor.cancel = p.closing if err := compactor.Compact(p); err != nil { log.Error("series partition compaction failed", zap.Error(err)) } logEnd() // Clear compaction flag. p.mu.Lock() p.compacting = false p.mu.Unlock() }() }


tsdb/series_partition.go:569 func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { hdr := NewSeriesIndexHeader() hdr.Count = seriesN hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) // Allocate space for maps. keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) // Reindex all partitions. var entryN int for _, segment := range segments { errDone := errors.New("done") if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { ... // Save max series identifier processed. hdr.MaxSeriesID, hdr.MaxOffset = id, offset // Ignore entry if tombstoned. if index.IsDeleted(id) { return nil } // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) }); err == errDone { break } else if err != nil { return err } }


這樣設計有兩個缺陷:

  1. 作 compaction 時,當 io 訪問 SeriesSegments 文件, 內存加載全部的 series key,會構建一個新的 hashtable,而後將這個 hashtable mmap 存儲到磁盤,當 series key 超過幾千萬或者更多時,會出現內存不夠,oom 問題。

  2. 作 compaction 時, 對於已經刪除的 series key(tombstone 標記)作了過濾,不生成 series index,可是 SeriesSegment 中已經刪除 series key 只有作了 tombstone 標記,不會作物理刪除,這樣會致使 SeriesSegment 一直膨脹,在實際生產環境一個 partition 下的全部 segmeng 文件超過幾十 G,作 compaction 時,會產生大量 io 訪問。

可行的解決方案


一、增長partition或者database


influxdb 的正排索引是 database 級別的,有兩個方式能夠減小 compaction 時的內存,一個是增長 partition 數量或者將多個 Measurement 劃到不一樣的 database 裏面。

但這樣作的問題是,已經存在數據的 influxdb 很差調整兩個數據。

二、修改時間線存儲策略


咱們知道 hash 索引是 O1 的查詢,效率很是高,可是對於增加性的數據,存在擴容問題。那咱們作個折中的選擇。當 partition 大於某個閾值時,將 hash 索引變成 b+tree 索引。b+tree 對於數據膨脹性能降低有限,更適合高基數問題,並且再也不須要全局的 compaction。

三、將series key的正排索引下沉到shard級別


influxdb 裏面每一個 shard 都是有時間區間的,某個時間區間內的時間線數據並不大。好比 database 裏面保存的是 180天 的 series key,而 shard 通常只有一天甚至 1 個小時的跨度,二者存放的 series key 存在 1~ 2 個數量級的差距。另外將 series key 正排索引下沉到 shard 級別對刪除操做更友好,當 shard 過時刪除時,會將當前 shard 的全部 series key 和其餘 shard 作 diff,當 series key 不存在時再去刪除 series key。

四、根據Measurement修改時間線存儲策略


在實際生產環境中,時間線膨脹和 Measurement 有很大關係,通常是少數的 Measurement 存在時間線膨脹問題,可是絕大部分的 Measurement 不存在時間線爆炸的問題。

咱們能夠對作 series key 的正排索引的 compaction 時,能夠添加 Measurement 時間線統計,若是某個 Measurement 的時間線膨脹時,能夠將這個 Measurement 的全部 series key 切換到 B+ tree。而不膨脹的 series key 繼續保留走 hash 索引。這樣方案性能比第二個方案更好,開發成本會更高一些。

目前高基數問題主要體如今 series key 正排索引。我的以爲短時間先作第二個方案過分到第四個方案的方式。這樣能夠比較好的解決時間線增加的問題,性能降低很少,成本不高。第三個方案改動比較大,設計更合理,能夠做爲一個長期修復方案。


總結


本文主要經過 influxdb 來說解時序數據庫的高基數 Cardinality 問題,以及可行的方案。metric 的維度爆炸致使數據線膨脹問題,不少同窗都認爲這是對時序數據庫的誤用或者是濫用。可是信息數據爆炸的今天,讓數據維度收斂,不發散成本很是高,甚至遠高於數據存儲成本。

我的以爲須要對這個問題進行分而治之的方式,提高時序數據庫對維度爆炸的容忍度。換句話說,出現時間線膨脹後,時序數據庫不會出現崩潰狀況,對時間線未膨脹的 metric 繼續高效運行,而出現時間線膨脹的 metic 能夠出現性能降低,單不會線性降低。提高對時間線膨脹的容忍度,控制時間線膨脹的爆炸半徑,將會成爲時序數據庫的核心能力。

新學的 golang,用 influxdb 的源碼來練手,特別感謝博樹,李子,仁劼幫忙講解 influxdb 以及討論這個問題。

參考
https://github.com/influxdata/influxdb/blob/master/tsdb/index/tsi1/doc.go
https://blog.csdn.net/wjandy0211/article/details/102955268
http://hbasefly.com/2018/02/09/timeseries-database-5/

更多探討與分享,歡迎加羣。
釘釘搜索羣號:31704055



第二屆雲原生編程挑戰賽開始啦!


本屆大賽將繼續深度探索 RocketMQ、Dubbo三、Serverless 三大熱門技術領域,爲熱愛技術的年輕人提供一個挑戰世界級技術問題的舞臺,但願你們用技術爲全社會創造更大價值。

瓜分 603000 獎金池,你準備好了嗎?

戳👉https://tianchi.aliyun.com/specials/promotion/cloudnative2021,馬上報名參賽~

相關文章
相關標籤/搜索