InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。node
InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。數據庫
接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。json
上一章說到數據寫入時的分區機制及分區現有的功能。詳情見: http://www.javashuo.com/article/p-vxbttdfr-vk.html緩存
這一章記錄一下數據是怎樣進行存儲的。微信
上一章沒有細節的介紹數據從Line protocol
被解析成了什麼樣子,在開篇先介紹一下數據被封裝後的展現。數據結構
轉換過程的代碼能夠參見
internal_types/src/entry.rs : 157行
中的build_table_write_batch
方法;異步內部數據結構能夠查看:
generated_types/protos/influxdata/write/v1/entry.fbs
。async
數據是被層層加碼組裝出來的:性能
LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry
ui
sharded_entries:[{ shard_id: None, entry: { fb: { operation_type: write, operation: { partition_writes:[{ key:"2019-05-02 16:00:00", table_batches:[ { name:"myMeasurement", columns:[ { name:"fieldKey", logical_column_type: Field, values_type: StringValues, values: { values:["123"] }, null_mask: None }, { name:"tag1", logical_column_type: Tag, values_type: StringValues, values: { values:["value1"]) }, null_mask: None }, { name:"tag2", logical_column_type: Tag, values_type: StringValues, values: { values:["value2"]) }, null_mask: None }, { name:"time", logical_column_type: Time, values_type: I64Values, values: { values:[1556813561098000000]) }, null_mask: None }] }] }] } } } }]
數據在內存中就會造成如上格式保存,但要注意,內存中使用的 flatbuffer 格式保存,上面只是爲了展現內容。
繼續上節裏的內容,結構被拼湊完成以後,就會調用write_sharded_entry
方法去進行實際寫入工做:
futures_util::future::try_join_all( sharded_entries .into_iter() //對每一個數據進行寫入到shard .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?;
而後看是怎樣寫入到shard的,由於shard的寫入尚未完成,因此只能關注單機的寫入了。具體看代碼:
async fn write_sharded_entry( &self, db_name: &str, db: &Db, shards: Arc<HashMap<u32, NodeGroup>>, sharded_entry: ShardedEntry, ) -> Result<()> { //判斷shard的id是否爲null,若是是null就寫入本地 //不然就寫入到具體的shard去 match sharded_entry.shard_id { Some(shard_id) => { let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?; //尚未真正的實現,能夠看下面的方法 self.write_entry_downstream(db_name, node_group, &sharded_entry.entry) .await? } None => self.write_entry_local(&db, sharded_entry.entry).await?, } Ok(()) } //能夠看到尚未實現遠程的寫入 async fn write_entry_downstream( &self, db_name: &str, node_group: &[WriterId], _entry: &Entry, ) -> Result<()> { todo!( "perform API call of sharded entry {} to one of the nodes {:?}", db_name, node_group ) } //數據對本地寫入 pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> { //繼續往下跟蹤 db.store_entry(entry).map_err(|e| match e { db::Error::HardLimitReached {} => Error::HardLimitReached {}, _ => Error::UnknownDatabaseError { source: Box::new(e), }, })?; Ok(()) } //方法彷佛什麼都沒作,只是增補了clock_value和write_id //註釋上解釋到logical clock是一個用來在數據庫內部把entry變爲有序的字段 pub fn store_entry(&self, entry: Entry) -> Result<()> { //生成一個新的結構SequencedEntry並增補字段 let sequenced_entry = SequencedEntry::new_from_entry_bytes( ClockValue::new(self.next_sequence()), self.server_id.get(), entry.data(), ).context(SequencedEntryError)?; //關於讀緩存相關的配置和實現,先不用管 if self.rules.read().wal_buffer_config.is_some() { todo!("route to the Write Buffer. TODO: carols10cents #1157") } //繼續調用其餘方法 self.store_sequenced_entry(sequenced_entry) }
上面的全部方法完成以後,基本的插入數據格式就準備完成了,接下來就是寫入內存存儲:
pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> { //讀取出數據庫對於寫入相關的配置信息 //包括是否可寫、是否超過內存限制等等驗證 let rules = self.rules.read(); let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; if rules.lifecycle_rules.immutable { return DatabaseNotWriteable {}.fail(); } if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard { if self.memory_registries.bytes() > hard_limit.get() { return HardLimitReached {}.fail(); } } //rust語言中的釋放變量 std::mem::drop(rules); //由於是批量寫入,因此須要循環 //partition_writes的數據格式能夠參見上面的json數據 if let Some(partitioned_writes) = sequenced_entry.partition_writes() { for write in partitioned_writes { let partition_key = write.key(); //根據以前生成的partition_key來獲得或者建立一個partition描述 let partition = self.catalog.get_or_create_partition(partition_key); //這裏是拿到一個寫鎖 let mut partition = partition.write(); //更新這個partition最後的插入時間 //記錄這個的目的,代碼上並沒寫明白是作什麼用的 partition.update_last_write_at(); //找到一個打開的chunk //不知道爲何每次都要在全部chunk裏搜索一次 //難道是同時可能有不少個chunk均可以寫入? let chunk = partition.open_chunk().unwrap_or_else(|| { //不然就建立一個新的chunk出來 partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref()) }); //獲取一個寫鎖 let mut chunk = chunk.write(); //更新當前chunk的第一條、最後一條寫入記錄 chunk.record_write(); //獲得chunk的內存區域,稱爲mutable_buffer let chunk_id = chunk.id(); let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk"); //真正的寫入到內存中 mb_chunk .write_table_batches( sequenced_entry.clock_value(), sequenced_entry.writer_id(), &write.table_batches(), ) .context(WriteEntry { partition_key, chunk_id, })?; //若是當前chunk寫入數據的大小超過了設置的限制,就關閉 //關閉的意思就是把狀態製爲Closing,並更新關閉時間 let size = mb_chunk.size(); if let Some(threshold) = mutable_size_threshold { if size > threshold.get() { chunk.set_closing().expect("cannot close open chunk") } } } } Ok(()) }
再深刻的就不繼續跟蹤了,可是思路仍是比較清晰了。
1.分區相關
client --> grpc --> 進行分區shard --> 分區partition
2.寫入相關
- 結構封裝
LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry
- 內存寫入空間
catalog -> partition -> table -> column
-
達到指定大小後標記爲關閉
-
異步 - 後臺線程進行內存整理
到這裏基本就完成了全部的寫入,並返回給客戶端成功。
關於後臺線程的內存整理再下一篇中繼續介紹。
祝玩兒的開心
歡迎關注微信公衆號:
或添加微信好友: liutaohua001