InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。數據庫
InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。編程
接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。微信
上一章介紹了Chunk是怎樣被管理的,以及各個階段的操做。詳情見: https://my.oschina.net/u/3374539/blog/5029926異步
這一章記錄一下Chunk是怎樣持久化的。async
ChunkState::Moved(_) if would_write => { let partition_key = chunk_guard.key().to_string(); let table_name = chunk_guard.table_name().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); write_active = true; //處於Moved狀態下的Chunk會調用write_to_object_store方法進行持久化 self.write_to_object_store(partition_key, table_name, chunk_id); } //write_to_object_store實際調用到write_chunk_to_object_store_in_background方法來進行持久化 pub fn write_chunk_to_object_store_in_background( self: &Arc<Self>, partition_key: String, table_name: String, chunk_id: u32, ) -> TaskTracker<Job> { //獲取數據庫名稱 let name = self.rules.read().name.clone(); //新建一個後臺任務的管理器,用來記錄db中都在執行哪些任務及狀態, let (tracker, registration) = self.jobs.register(Job::WriteChunk { db_name: name.to_string(), partition_key: partition_key.clone(), table_name: table_name.clone(), chunk_id, }); let captured = Arc::clone(&self); //異步寫入 let task = async move { let result = captured //真正的寫入方法 .write_chunk_to_object_store(&partition_key, &table_name, chunk_id) .await; if let Err(e) = result { info!(?e, %name, %partition_key, %chunk_id, "background task error loading object store chunk"); return Err(e); } Ok(()) }; tokio::spawn(task.track(registration)); tracker }
後面的方法有點兒長,但願可以耐心觀看。。性能
pub async fn write_chunk_to_object_store( &self, partition_key: &str, table_name: &str, chunk_id: u32, ) -> Result<Arc<DbChunk>> { //從catalog中取回chunk let chunk = { //先找partition let partition = self.catalog .valid_partition(partition_key) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; let partition = partition.read(); //從partition里根據表名和chunk_id拿到chunk partition .chunk(table_name, chunk_id) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; let rb_chunk = { //先加寫鎖 let mut chunk = chunk.write(); //修改Chunk的狀態爲WritingToObjectStore chunk .set_writing_to_object_store() .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })? }; //獲取全部Chunk下全部表的Statistics信息 let table_stats = rb_chunk.table_summaries(); //建立一個parquet Chunk,這個在上一章裏有提到各類Chunk類型 let mut parquet_chunk = Chunk::new( partition_key.to_string(), chunk_id, //用來統計parquet佔用的內存 self.memory_registries.parquet.as_ref(), ); //建立一個Storage結構,使用的是啓動數據庫時候指定的存儲類型,這個在第3章裏有提到 let storage = Storage::new( Arc::clone(&self.store), self.server_id, self.rules.read().name.to_string(), ); //遍歷全部表的統計數據 for stats in table_stats { //構建一個空的查詢,也就是 select * from table,不加where let predicate = read_buffer::Predicate::default(); //從rb_chunk篩選數據, Selection::All表明全部列,predicate表明沒有where條件 //意思就是 `stats` 指向的單個表內的全部數據 let read_results = rb_chunk .read_filter(stats.name.as_str(), predicate, Selection::All) .context(ReadBufferChunkError { table_name, chunk_id, })?; //再拿出來schema信息,由於arrow是分開存的,因此須要拿兩次 let arrow_schema: ArrowSchemaRef = rb_chunk .read_filter_table_schema(stats.name.as_str(), Selection::All) .context(ReadBufferChunkSchemaError { table_name, chunk_id, })? .into(); //再拿出來這個表裏的最大最小的時間 //這個是從readBuffer::Column::from裏完成的最大最小時間統計 //也就是當從mutbuffer轉移到readbuffer的時候 let time_range = rb_chunk.table_time_range(stats.name.as_str()).context( ReadBufferChunkTimestampError { table_name, chunk_id, }, )?; //建立一個ReadFilterResultsStream //官方文檔裏面說的是這是一個轉變ReadFilterResults爲異步流的適配器 let stream: SendableRecordBatchStream = Box::pin( streams::ReadFilterResultsStream::new(read_results, Arc::clone(&arrow_schema)), ); // 寫到持久化存儲當中 let path = storage .write_to_object_store( partition_key.to_string(), chunk_id, stats.name.to_string(), stream, ) .await .context(WritingToObjectStore)?; // 這裏就是把寫入parquet的摘要信息存儲在內存中 let schema = Arc::clone(&arrow_schema) .try_into() .context(SchemaConversion)?; let table_time_range = time_range.map(|(start, end)| TimestampRange::new(start, end)); parquet_chunk.add_table(stats, path, schema, table_time_range); } //對`catlog::chunk`加寫鎖,而後更新這個chunk的狀態爲WrittenToObjectStore let mut chunk = chunk.write(); let parquet_chunk = Arc::clone(&Arc::new(parquet_chunk)); chunk .set_written_to_object_store(parquet_chunk) .context(LoadingChunkToParquet { partition_key, table_name, chunk_id, })?; //包裝`catlog::chunk`爲`ParquetChunk` Ok(DbChunk::snapshot(&chunk)) }
這裏面看起來有點兒繞,不容易理解的就是chunk.set_written_to_object_store
這種方法。ui
由於Rust中enum是存在變種的,因此基於這種特性,雖然都是Chunk,可是存儲的內容變化了。spa
pub enum ChunkState { ....省略 //這裏就是mutbuffer裏的chunk Moving(Arc<MBChunk>), //這裏就變成存儲的readbuffer的chunk結構 Moved(Arc<ReadBufferChunk>), //這裏又開始存儲ParquetChunk結構 WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>), }
還須要繼續查看storage.write_to_object_store
這個邏輯,這裏涉及到了從mem
的arrow
結構轉爲Parquet
結構,就不在文章中展現了,使用的是arrow
的ArrowWriter
直接轉換的。.net
//這裏直接跳躍到ObjectStore的put方法裏,來看怎麼組織的寫入 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { use ObjectStoreIntegration::*; //匹配啓動時候配置的存儲方式,轉到真正的實現去,這裏只看文件的 match (&self.0, location) { ...省略 //文件存儲 (File(file), path::Path::File(location)) => file .put(location, bytes, length) .await .context(FileObjectStoreError)?, _ => unreachable!(), } Ok(()) } //爲File實現了ObjectStoreApi trait,至關於文件存儲時候的實際實現 async fn put<S>(&self, location: &Self::Path, bytes: S, length: Option<usize>) -> Result<()> where S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static, { //讀取以前ReadFilterResultsStream裏的全部數據到content裏 let content = bytes .map_ok(|b| bytes::BytesMut::from(&b[..])) .try_concat() .await .context(UnableToStreamDataIntoMemory)?; //這裏就是一個驗證長度不然報錯DataDoesNotMatchLength。宏編程,不用關注 if let Some(length) = length { ensure!( content.len() == length, DataDoesNotMatchLength { actual: content.len(), expected: length, } ); } //獲取文件路徑,就是啓動時候配置的根路徑加上數據路徑 let path = self.path(location); //建立這個文件出來 let mut file = match fs::File::create(&path).await { Ok(f) => f, //若是是沒有找到父路徑,那就重新建立一次 Err(err) if err.kind() == std::io::ErrorKind::NotFound => { let parent = path .parent() .context(UnableToCreateFile { path: &path, err })?; fs::create_dir_all(&parent) .await .context(UnableToCreateDir { path: parent })?; match fs::File::create(&path).await { Ok(f) => f, Err(err) => return UnableToCreateFile { path, err }.fail(), } } //不然就失敗了 Err(err) => return UnableToCreateFile { path, err }.fail(), }; //這裏就是拷貝全部數據到這個文件中去 tokio::io::copy(&mut &content[..], &mut file) .await .context(UnableToCopyDataToFile)?; //大功告成 Ok(()) }
這個寫入的邏輯比較龐大了,可是基本也能捋清楚。線程
- 先寫入mutBuffer,寫到必定大小會關閉
- 異步線程來監控是否是該關掉mutBuffer
- 生命週期的轉換,而後開始寫入readBuffer
- 以後開始異步的寫入持久化存儲
- 檢查內存是否是須要清理readbuffer
大概就這些。源代碼中還有不少邏輯沒有完成,好比WAL。先總體看完流程再回來看遺漏的,留給Influx寫更多完整邏輯的時間。
祝玩兒的開心。
歡迎關注微信公衆號:
或添加微信好友: liutaohua001