InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。數據庫
InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。緩存
接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。服務器
上一章介紹了數據從客戶端寫入到服務器端的內存中的整個過程。詳情見: http://www.javashuo.com/article/p-dxwpkohf-vk.html微信
這一章記錄一下數據庫中數據管理單元Chunk的生命週期。異步
在開篇,先介紹一下一個Chunk
擁有的生命週期:async
//這裏須要注意,這些變體裏的Chunk結構都是不相同的 //也就是有內存數據拷貝的工做 pub enum ChunkState { //內部移動數據時候用的 Invalid, //能夠寫入 Open(MBChunk), //還能繼續寫入,但很快會被關閉 Closing(MBChunk), //已經不能寫入了,準備移動到readbuffer Moving(Arc<MBChunk>), //已經被移動到了read buffer Moved(Arc<ReadBufferChunk>), //準備寫入持久化存儲 WritingToObjectStore(Arc<ReadBufferChunk>), //寫入持久化存儲完成 WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>), }
在第五章中有提到,在Create Database以後,會啓動一個後臺線程。性能
該後臺線程完成了部分對Chunk
的管理功能,經過理解這個後臺線程,可以基本理解Chunk
的全部生命週期。fetch
//後臺線程的方法入口,在建立完成數據庫後,就會調用到這個方法 pub async fn background_worker( self: &Arc<Self>, shutdown: tokio_util::sync::CancellationToken, ) { //建立一個定時器,週期性的執行 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self)); //沒有收到中止服務器時候的信號就一直執行,1秒一次 while !shutdown.is_cancelled() { //記錄執行的次數,每次加1,Ordering::Relaxed表明的單線程裏的原子操做 self.worker_iterations.fetch_add(1, Ordering::Relaxed); //進入生命週期的管理 lifecycle_manager.check_for_work(); //收到不一樣信號以後的處理方法 tokio::select! { _ = interval.tick() => {}, _ = shutdown.cancelled() => break } } info!("finished background worker"); }
前方高能,請注意:ui
fn check_for_work(&mut self, now: DateTime<Utc>) { //獲取建立數據庫的時候,對於Chunk的相關配置 let rules = self.rules(); //根據配置的排序規則,獲取出內存裏全部的chunk let chunks = self.chunks(&rules.sort_order); let mut buffer_size = 0; //判斷是否是有其餘的任務正在執行,move我理解針對於read buffer,write對於持久化 let mut move_active = self.is_move_active(); let mut write_active = self.is_write_active(); //遍歷全部塊,檢查哪些塊能夠被持久化 for chunk in &chunks { //獲取當前chunk的鎖 let chunk_guard = chunk.upgradable_read(); //獲取chunk佔用的內存大小 buffer_size += Self::chunk_size(&*chunk_guard); //沒有移動任務而且Chunk裏最後的寫入時間比較老 let would_move = !move_active && can_move(&rules, &*chunk_guard, now); //沒有寫出任務,而且開啓了持久化 let would_write = !write_active && rules.persist; //判斷chunk的生命週期 match chunk_guard.state() { //屬於open狀態,而且是須要移動的(上面的邏輯裏有展現什麼是須要移動的) //這裏我理解就是至關於實時寫入時候的一個補充方案 //試想,若是一個chunk一直不寫入數據,可能有一年了,查詢都再也不用這些數據了,內存卻被一直佔用 ChunkState::Open(_) if would_move => { let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard); //切換狀態到closing chunk_guard.set_closing().expect("cannot close open chunk"); let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); move_active = true; //移動到read_buffer,變爲不可寫入狀態(啓動了一個異步的線程,後面看) self.move_to_read_buffer(partition_key, chunk_id); } //這裏有幾種狀況,一樣會在別處觸發爲closing //例如:chunk大小超過了設置的可變內存大小的時候 ChunkState::Closing(_) if would_move => { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); move_active = true; //移動到read_buffer self.move_to_read_buffer(partition_key, chunk_id); } //已經被挪動到readbuffer中的 ChunkState::Moved(_) if would_write => { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); std::mem::drop(chunk_guard); write_active = true; //寫入到對象存儲 self.write_to_object_store(partition_key, chunk_id); } _ => {} } } //這裏是主要檢查內存限制的邏輯,當全部chunk的大小超過限制的時候就要清理Chunk if let Some(soft_limit) = rules.buffer_size_soft { let mut chunks = chunks.iter(); while buffer_size > soft_limit.get() { match chunks.next() { Some(chunk) => { //獲取讀鎖 let chunk_guard = chunk.read(); //若是配置了能夠清理未持久化數據,那麼處在read_buffer裏的數據也會被清理 //必定會清理已經被持久化到對象存儲上的數據 if (rules.drop_non_persisted && matches!(chunk_guard.state(), ChunkState::Moved(_))) || matches!(chunk_guard.state(), ChunkState::WrittenToObjectStore(_, _)) { let partition_key = chunk_guard.key().to_string(); let chunk_id = chunk_guard.id(); buffer_size = buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard)); std::mem::drop(chunk_guard); //真真正正的刪除邏輯後面看 self.drop_chunk(partition_key, chunk_id) } } //沒有什麼能夠釋放的了 None => { warn!(db_name=self.db_name(), soft_limit, buffer_size, "soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules"); break; } } } } }
這裏基本看清楚了Chunk
的週期:.net
- 在寫入時候,若是沒有
Chunk
就會open
一個,並處在open
狀態。 - 若是寫入超過了一些限制,就會被標記爲
closing
;若是數據時間超過了配置的時間,也會被標記爲closing
。標記爲closing
的會添加一個後臺進程,準備將Chunk
移動到read_buffer
中。 - 後臺任務啓動後,會標記爲
moving
狀態,此時禁止Chunk
再寫入任何數據。 - 一旦移動完成,會被標記爲
moved
。 - 程序會對
moved
狀態下的Chunk
開始進行持久化。 - 掃描任務會不斷判斷內存使用是否超過了限制,若是超過限制,會清理已經持久化的
Chunk
。若是配置了drop_non_persisted
,會把read_buffer
中未持久化的也刪除掉。
而後繼續看程序是怎樣將一個chunk
移動到read_buffer
的,由於篇幅的影響,將會在下一篇介紹數據是怎樣真正寫入到持久化存儲當中的。
pub async fn load_chunk_to_read_buffer( &self, partition_key: &str, chunk_id: u32, ) -> Result<Arc<DbChunk>> { //根據partition_key及chunk_id獲取內存中存儲的Chunk let chunk = { let partition = self .catalog .valid_partition(partition_key) .context(LoadingChunk { partition_key, chunk_id, })?; let partition = partition.read(); partition.chunk(chunk_id).context(LoadingChunk { partition_key, chunk_id, })? }; //設置當前的Chunk爲Moving狀態 let mb_chunk = { let mut chunk = chunk.write(); chunk.set_moving().context(LoadingChunk { partition_key, chunk_id, })? }; info!(%partition_key, %chunk_id, "chunk marked MOVING, loading tables into read buffer"); let mut batches = Vec::new(); //這裏是拿到Chunk中每一個Cloumn的統計信息,分別是min,max,count let table_stats = mb_chunk.table_summaries(); //重新建立一個ReadBufferChunk,後面準備把全部數據都拷貝到這裏 //還須要告訴內存管理這裏新申請了多少空間 let rb_chunk = ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer); for stats in table_stats { //把內存中的數據,所有從新拷貝一次,轉換爲arrow格式 mb_chunk .table_to_arrow(&mut batches, &stats.name, Selection::All) //這裏應該是尚未寫完,若是出現錯誤,這個Chunk該怎麼處理? .expect("Loading chunk to mutable buffer"); //循環拷貝 for batch in batches.drain(..) { rb_chunk.upsert_table(&stats.name, batch) } } let mut chunk = chunk.write(); //更新寫入緩存裏的Chunk爲Moved狀態,同時Chunk內容修改成了ReadBuffer的Chunk //對於Chunk的結構後面看 chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk { partition_key, chunk_id, })?; //工做所有都完成了,調用作快照的方法,方法裏什麼都沒作,返回新Chunk的一個Arc指針 Ok(DbChunk::snapshot(&chunk)) }
到這裏基本清楚了整個Chunk
的工做方式,由於Chunk
這個名字被代碼中重複使用到了,因此特地在文章末尾說一下都有什麼Chunk
。
//主要是存儲一個數據塊的描述信息,名字、最後寫入時間等 Server::db::catalog::chunk //數據從客戶端直接寫入的內存塊 mutable_buffer::chunk //在moving時候拷貝的新數據塊,arrow結構 read_buffer::chunk //parquet對應的chunk parquet_file::chunk //query模塊下對PartitionChunk從新命名了一下 //對於相同的partition key的數據抽象的行爲 query -> type Chunk: PartitionChunk; //實現PartitionChunk定義的方法,對不一樣位置下的chunk的操做 //如ParquetFile、MutableBuffer等 server::db::chunk
好了就到這裏,但願你也學到了不少
祝玩兒的開心
歡迎關注微信公衆號:
或添加微信好友: liutaohua001