最完整的時間序列的邏輯數據模型以下:python
[timestamp],[d1],[d2]...[dn],[v1],[v2]...[vn]
d1 ~ dn 是維度,好比 ip, idc, country 之類的值
v1 ~ vn 是值列,好比 cpu_usage, free_memeory_bytes 之類的值mysql
一些時間序列數據庫在實現的時候爲了簡化實現,提升性能約束了一個更簡化的數據模型:sql
[timestamp],[metric],[value]
這種數據模型對於數據庫來講很是友好,能夠很好的作一些優化。可是要求開發者去選擇什麼的維度信息編碼到 metric名裏,同時對於多值列的數據須要存成多個metric,而不是一個metric多個值。本文探討的數據模型採用最完整的數據模型。mongodb
監控使用的時間序列數據具備如下的特色:數據庫
本測試採用的測試數據有838萬行,其大部分維度字段是重複的,可是有少許的髒數據使得維度組合仍是比較多的。總的維度組合數(不包括timestamp維度)爲285522。排除掉大部分髒數據的維度組合數爲5927。數據的週期是60秒,跨度爲1428777120 ~ 1428793320。平均每一個週期有 30922 行記錄。一個週期有這麼行記錄很重要的緣由是一個週期,對於同一個維度組合有多條統計數據(來源於不一樣的partition)。也就是數據是部分聚合的,並無聚合到一個週期,一個維度組合,一條記錄的程度。選擇這樣的測試數據是由於這種數據是約束最小的形式。它沒有對採集頻率,每週期記錄數,維度組合密度有任何預先假設。好比 opentsdb 假設同週期內指定維度組合只有惟一的一個值,插入了兩個不一樣值會怎麼樣?well,它報錯……json
測試數據的結構爲:數組
[timestamp],[iResult],[vCmid],[vAppid],[totalCount],[dProcesssTime]
其中最後兩列爲值列,其他的都是維度。緩存
因此按照測試數據來講,就會插入8380000個文檔到mongodb裏。服務器
{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse", "count" : 8.38534e+06, "size" : 2012533392.0000000000000000, "avgObjSize" : 240, "storageSize" : 2897301504.0000000000000000, "numExtents" : 21, "nindexes" : 1, "lastExtentSize" : 7.56662e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 2.72065e+08, "indexSizes" : { "_id_" : 2.72065e+08 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429290120, 22), "electionId" : ObjectId("54c9f324adaa0bd054140fda") } }
值得關注的地方是平均的文檔大小是240字節,也就是0.24k,很是很是的小。總的存儲空間佔用有2.9G之多。緣由3顯然是由於重複存儲維度字段的值形成的。數據結構
這種表結構大概以下:
{ "_id" : "1428710400.wxa8fdfb5a9e7f64f6.10000.-502", "iResult" : "-502", "vCmdid" : "10000", "values" : [ {"t":1, "1": x, "2": y}, {"t":1, "1": x, "2": y}, .... {"t":1449, "1": x, "2": y}, ], "date" : 1.42871e+09, "timestamp" : 1.42878e+09, "vAppid" : "appid1" }
其中date是時間戳truncate到了天。而後dProcessTime裏的每一個key都是對應在一天內的第N個分鐘。
這種按天打包的結構很是適合一次查詢就要查一天的數據的需求。可是它的壓縮效果很大程度上取決於維度組合的cardinality。若是維度裏面有一些值cardinality很高,那麼壓縮以後仍然會有很是多的文檔數。這對於須要查詢的時候再去聚合的數據就很是不利了。可是對於查詢的維度和存儲的維度一一對應的狀況,那麼拉取一天的數據就只要讀取一個文檔,那麼就會很是快了。
存儲的效果並非很好,由於文檔數量仍然不少
{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse_measurement", "count" : 285653, "size" : 4.69834e+08, "avgObjSize" : 1644, "storageSize" : 7.82356e+08, "numExtents" : 17, "nindexes" : 1, "lastExtentSize" : 2.07794e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 1.76029e+07, "indexSizes" : { "_id_" : 1.76029e+07 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429230049, 5), "electionId" : ObjectId("54c9f324adaa0bd054140fda") } }
總的文檔數是285653,平均大小隻有1.6k,尺寸有470M之多。
轉換代碼以下:
pythonfor offset, batch in read_test_data_in_batches(): updates = collections.defaultdict(list) for doc in batch: seconds_in_day = doc['timestamp'] % 86400 date = doc['timestamp'] - seconds_in_day minute_index = seconds_in_day / 60 _id = '%s.%s.%s.%s' % (date, doc['vAppid'], doc['vCmdid'], doc['iResult']) if len(_id) > 256: continue updates[_id].append({'t':doc['timestamp'],'0':doc['dProcessTime'],'1':doc['totalCount']}) bulk = measurement_coll.initialize_unordered_bulk_op() new_ids = [] for doc in batch: seconds_in_day = doc['timestamp'] % 86400 date = doc['timestamp'] - seconds_in_day _id = '%s.%s.%s.%s' % (date, doc['vAppid'], doc['vCmdid'], doc['iResult']) if len(_id) > 256: continue if _id not in known_ids: known_ids.add(_id) new_ids.append(_id) bulk.find({'_id': _id}).upsert().update({'$set': { 'vAppid': doc['vAppid'], 'vCmdid': doc['vCmdid'], 'iResult': doc['iResult'], 'date': date, 'timestamp': doc['timestamp'] }}) for _id, set_fields in updates.iteritems(): bulk.find({'_id': _id}).update({"$push": {'values': {'$each': set_fields}}}) LOGGER.info(offset) try: res = bulk.execute() except pymongo.errors.BulkWriteError as bwe: import pprint pprint.pprint(bwe.details)
在這篇mongodb的官方博客(http://blog.mongodb.org/post/65517193370/schema-design-for-time-series...)裏提到了一種更精簡的存儲表結構:
{ timestamp_minute: ISODate(「2013-10-10T23:06:00.000Z」), num_samples: 58, total_samples: 108000000, type: 「memory_used」, values: { 0: 999999, … 37: 1000000, 38: 1500000, … 59: 1800000 } }
這種格式對於無維度的時間序列是合適的。可是若是分維度存儲,那麼必然就牽涉到一個讀取的時候按不一樣維度聚合的問題。而這種格式設計基本上沒法在服務器端聚合的。而把數據拉出來在應用層做聚合就牽涉到大量IO,更加不可能快了。問題是,若是數據庫只是可以把一天的數據存進去,而後能夠原樣取出來是否是意義不大呢?要求時間序列數據庫做一些聚合是很是合理的要求吧。好比opentsdb就支持tag的功能,實際上就是分維度。
這種表結構大概以下:
{ "_id" : ObjectId("5531b34c9469047155b3423b"), "count" : 41, "max_timestamp" : 1.42879e+09, "vAppid" : "appid1", "min_timestamp" : 1.42879e+09, "sum_totalCount" : 42, "sum_dProcessTime" : 468, "_" : [ { "c" : 4, "d" : 1.42879e+09, "1" : 69, "0" : 4, "_" : [ { "_" : [ { "d" : "16", "v" : [ { "1" : 41, "0" : 1 } ] }, { "d" : "10000", "v" : [ { "1" : 1, "0" : 1 }, { "1" : 1, "0" : 1 } ] }, { "d" : "18", "v" : [ { "1" : 26, "0" : 1 } ] } ], "d" : "0" } ] }, // .. many more rows ] }
這個表結構第一個利用的是時間序列的連續性。因此一段時間的數據打包存放在了一個mongodb的文檔裏。而後在文檔上留下max_timestamp,min_timestamp兩個字段用於快速過濾掉無須讀取的文檔。
第二個利用的特性是某些維度常常用於下鑽查詢,好比vAppid。若是appid1和appid2的數據放在同一個文檔裏,那麼在查詢appid1的時候,appid2的數據也會被讀取到,從而拖慢了查詢效率。這個clustering_fields的選擇能夠是空,也就是一段時間內的全部維度的數據都打包到了一塊兒,反正查詢的時候也沒有特別突出的維度須要優化。
這兩個設計基本上是模仿 mysql 的 clustering index,讓索引值相同的數據彼此靠近的存放在同一個物理位置。由於mongodb沒有clustering index的支持,可是其同一個文檔內的數據是確定物理存放在一塊兒的。因此利用這個特性模仿了相似 clustering index的效果。同時由於按時間段打包了,文檔的數據會很是的少,使用b tree索引能夠很快的定位到所需的文檔(特別是選擇好了經常使用的下鑽維度的狀況)。若是文檔數量多,b tree索引以後仍然會對應大量的文檔id,用id去doc heap裏查找對應的doc也是很是耗時的。對於 postgresql 之類的數據庫,通常能夠用按時間partition加上暴力的全partition掃描來避免這種b tree索引反而更慢了的尷尬,可是mongodb並無partition的支持,除非咱們在應用層做一天一個collection的分表操做。
第一個優化解決的是按時間,按維度索引的效率問題。接下來要解決的問題是存儲過大的問題。咱們前面看到區區800萬行就用掉快3個G的磁盤。主要的磁盤是浪費在重複的維度字段的值的存儲上了。次要的緣由是維度名稱自己也要佔用存儲空間。咱們這裏採用的是map嵌套的方式。對於某個維度,一樣的值的記錄會存在一個map的entry下。這樣這個維度的這個值就不用反覆重複了。爲了最大話這種優化的效果,維度字段應該按照cardinality排序,也就是惟一值數量少的放在外層,惟一值數量多的嵌套在最內層。上面的 _.d
對應的 1.42879e+09 就是第一個維度(timestamp)的值。_._.d
對應的0就是第二個維度(iResult)的值。_._._.d
對應的10000對應的就是第三個維度(vCmdid)的值。嵌套的維度字段排序是 timestmap => iResult => vCmdid
。注意到除了d字段,還有c字段表明的是全部內嵌的記錄的總count,0表明的是第一個值列的sum,1表明的是第二個值列的sum。注意到維度名並無被存儲到文檔裏,維度的信息是隱含在嵌套的層次裏的。查詢的時候須要根據額外存儲的元數據知道不一樣的嵌套層次對應的是什麼維度。一樣值列的名稱也是沒有存儲的,葉子節點的v就是最後的原始值列。
前面已經看到了,文檔內還存儲了一些統計信息。好比timestamp下存儲了這個timestamp的count和全部值列的sum。這些統計值有助減小查詢時候的計算量。同時嵌套存儲還有助於在按條件過濾的狀況下砍掉不須要遞歸查詢的子文檔數量。
分vAppid存放的結果以下:
{ "sharded" : true, "systemFlags" : 1, "userFlags" : 1, "ns" : "wentao_test.sparse_precomputed", "count" : 1278, "numExtents" : 15, "size" : 3.30918e+08, "storageSize" : 4.11038e+08, "totalIndexSize" : 130816, "indexSizes" : { "_id_" : 65408, "vAppid_hashed" : 65408 }, "avgObjSize" : 258934.0594679186178837, "nindexes" : 2, "nchunks" : 6, "ok" : 1.0000000000000000 }
文檔數只有1278個了,平均的尺寸是258k。總存儲佔用是411Mb。若是不按vAppid分,壓縮效果會更好:
json{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse_precomputed_no_appid", "count" : 39, "size" : 2.68435e+08, "avgObjSize" : 6.88294e+06, "storageSize" : 2.75997e+08, "numExtents" : 3, "nindexes" : 1, "lastExtentSize" : 1.58548e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 8176, "indexSizes" : { "_id_" : 8176 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429319735, 3), "electionId" : ObjectId("54c9f324adaa0bd054140fda") } }
文檔個數39個,平均文檔大小6.9M,總存儲佔用275M。相比最初的3G磁盤佔用,壓縮效果很是明顯。
若是容許丟棄掉原始的值,對於一個維度組合一個週期只保留一個聚合記錄(這個實際上是大部分的需求)。那麼最後一層維度內就不須要內嵌v這個數組了,尺寸能夠進一步下降。固然這種壓縮是有損的,因此並非公平的比較。由於一天一個文檔的方式通常都會有一樣的限制,因此在這裏能夠用於和另一種表結構進行對比。
{ "sharded" : false, "primary" : "shard2_RS", "ns" : "wentao_test.sparse_precomputed_no_appid_no_val", "count" : 5, "size" : 5.45259e+07, "avgObjSize" : 1.09052e+07, "storageSize" : 2.01335e+08, "numExtents" : 2, "nindexes" : 1, "lastExtentSize" : 2.01327e+08, "paddingFactor" : 1.0000000000000000, "systemFlags" : 1, "userFlags" : 1, "totalIndexSize" : 8176, "indexSizes" : { "_id_" : 8176 }, "ok" : 1.0000000000000000, "$gleStats" : { "lastOpTime" : Timestamp(1429198926, 2), "electionId" : ObjectId("54c9f324adaa0bd054140fda") } }
壓縮的結果是文檔只有5個,平均大小是10M,總磁盤佔用是55M左右。
轉換的代碼以下:
pythonjumbo_docs = {} # doc with same cluster field will be packed in one document so continous on physical layout # timestamp is always the first clustering field, as the nature of time series data is clustering_fields = [] # sorted from low cardinality to high to save sapce dimension_fields = ['timestamp', 'vAppid', 'iResult', 'vCmdid'] # precompute sum/count at those dimension levels precomputed_fields = set(['timestamp']) value_fields = ['totalCount', 'dProcessTime'] store_raw_values = True for offset, batch in read_test_data_in_batches(): print(offset) for record in batch: clustering_key = tuple(record[f] for f in clustering_fields) jumbo_doc = jumbo_docs.get(clustering_key) if not jumbo_doc: jumbo_doc = { 'min_timestamp': record['timestamp'], 'max_timestamp': record['timestamp'], '_fast_lookup': {} } jumbo_doc['_all_levels'] = [jumbo_doc] for f in clustering_fields: jumbo_doc[f] = record[f] jumbo_docs[clustering_key] = jumbo_doc jumbo_doc['min_timestamp'] = min(jumbo_doc['min_timestamp'], record['timestamp']) jumbo_doc['max_timestamp'] = max(jumbo_doc['max_timestamp'], record['timestamp']) jumbo_doc['count'] = jumbo_doc.get('count', 0) + 1 for value_field in value_fields: jumbo_doc['sum_%s' % value_field] = jumbo_doc.get('sum_%s' % value_field, 0) + record[value_field] current_level = jumbo_doc for field in dimension_fields: next_levels = current_level.get('_') if next_levels is None: next_levels = [] current_level['_'] = next_levels dimension = record[field] next_level = current_level['_fast_lookup'].get(dimension) if not next_level: next_level = { 'd': dimension, '_fast_lookup': {} } if field in precomputed_fields: for i in range(len(value_fields)): next_level['%s' % i] = 0 jumbo_doc['_all_levels'].append(next_level) next_levels.append(next_level) current_level['_fast_lookup'][dimension] = next_level if field in precomputed_fields: next_level['c'] = next_level.get('c', 0) + 1 for i, value_field in enumerate(value_fields): next_level['%s' % i] += record[value_field] current_level = next_level if store_raw_values: # current_level is the last dimension now current_level['v'] = current_level.get('v', []) current_level['v'].append({str(i): record[f] for i, f in enumerate(value_fields)}) inserted_clustering_keys = [] for clustering_key, jumbo_doc in jumbo_docs.iteritems(): if len(jumbo_doc['_all_levels']) >= 10000 * 5: for level in jumbo_doc['_all_levels']: del level['_fast_lookup'] del jumbo_doc['_all_levels'] print('insert jumbo doc') sparse_precomputed_coll.insert(jumbo_doc) inserted_clustering_keys.append(clustering_key) for clustering_key in inserted_clustering_keys: del jumbo_docs[clustering_key] for jumbo_doc in jumbo_docs.values(): for level in jumbo_doc['_all_levels']: del level['_fast_lookup'] del jumbo_doc['_all_levels'] print('insert jumbo doc') sparse_precomputed_coll.insert(jumbo_doc)
db.sparse.aggregate([ {$group: {_id: '$timestamp', 'count': {$sum: 1}}} ])
得出的結果是這個格式的,每一個週期一個count值。
[ { "_id" : 1.42879e+09, "count" : 2266.0000000000000000 }, ... { "_id" : 1.42878e+09, "count" : 6935.0000000000000000 } ]
結果數據爲272行,耗時大概是9.6秒。注意這個絕對值並無意義,由於不一樣的硬件配置,不一樣的緩存設置,不一樣的sharding都會對這個結果產生影響。咱們這裏關注的是在一樣配置的狀況下,不一樣表結構對於查詢時間的相對關係。
打包存儲的數據,做一樣的查詢,須要些更復雜的聚合邏輯:
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$group: {_id: '$_.d', count: {$sum: '$_.c'}}} ])
這個查詢耗時大概是7.1秒。能夠看到打包存儲以後數據量變少了,查詢並無變得特別快。上面的查詢還使用了預先計算的字段。若是統計原始的數據,查詢更復雜
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$unwind: '$_._'}, // timestamp.vAppid {$unwind: '$_._._'}, // timestamp.vAppid.iResult {$unwind: '$_._._._'}, // timestamp.vAppid.iResult.vCmdid {$group: {_id: '$_.d', count: {$sum: {$size: '$_._._._.v'}}}} // size of the values array ])
這個查詢時間大概是9.1秒。
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$unwind: '$_._'}, // timestamp.vAppid {$unwind: '$_._._'}, // timestamp.vAppid.iResult {$unwind: '$_._._._'}, // timestamp.vAppid.iResult.vCmdid {$group: {_id: '$_.d', count: {$sum: '$_._._._.c'}}} // size of the values array ])
少作一個$size的操做要稍微快一些。大概是9秒。
結論是880萬數據聚合,用這個格式並無變得快不少,基本是一個數量級的。
db.sparse_measurement.aggregate([ {$unwind: '$values'}, {$group: {_id: '$values.t', count:{$sum:'$values.1'}}} ])
這個查詢要10.2秒,比原始格式還要慢。說明一天一個doc的存放方式並不適合聚合查詢。
db.sparse.aggregate([ {$match: {vAppid: {$ne: ''}}}, {$group: {_id: { timestamp: '$timestamp', vCmdid: '$vCmdid' }, 'totalCount': {$sum: '$totalCount'}}} ])
結果數據爲13115行,這個查詢須要21.4秒.
db.sparse_precomputed_no_appid.aggregate([ {$unwind: '$_'}, // timestamp {$unwind: '$_._'}, // timestamp.vAppid {$match: {'_._.d': {$ne: ''}}}, // vAppid != '' {$unwind: '$_._._'}, // timestamp.vAppid.iResult {$unwind: '$_._._._'}, // timestamp.vAppid.iResult.vCmdid {$unwind: '$_._._._.v'}, // timestamp.vAppid.iResult.vCmdid.values {$group: {_id: '$_.d', count: {$sum: '$_._._._.v.0'}}} // size of the values array ])
這個查詢須要18.4秒。稍微比一個數據點一行的原始表結構要快一些。
測試作到這裏,基本有一個結論了: