時序數據庫Influx-IOx源碼學習七(Chunk的生命週期)

InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。數據庫

InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。緩存

接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。服務器


上一章介紹了數據從客戶端寫入到服務器端的內存中的整個過程。詳情見: http://www.javashuo.com/article/p-dxwpkohf-vk.html微信

這一章記錄一下數據庫中數據管理單元Chunk的生命週期。異步


在開篇,先介紹一下一個Chunk擁有的生命週期:async

//這裏須要注意,這些變體裏的Chunk結構都是不相同的
//也就是有內存數據拷貝的工做
pub enum ChunkState {
    //內部移動數據時候用的
    Invalid,
    //能夠寫入
    Open(MBChunk),
    //還能繼續寫入,但很快會被關閉
    Closing(MBChunk),
    //已經不能寫入了,準備移動到readbuffer
    Moving(Arc<MBChunk>),
    //已經被移動到了read buffer
    Moved(Arc<ReadBufferChunk>),
    //準備寫入持久化存儲
    WritingToObjectStore(Arc<ReadBufferChunk>),
    //寫入持久化存儲完成
    WrittenToObjectStore(Arc<ReadBufferChunk>, Arc<ParquetChunk>),
}

在第五章中有提到,在Create Database以後,會啓動一個後臺線程。性能

該後臺線程完成了部分對Chunk的管理功能,經過理解這個後臺線程,可以基本理解Chunk的全部生命週期。fetch

//後臺線程的方法入口,在建立完成數據庫後,就會調用到這個方法
    pub async fn background_worker(
        self: &Arc<Self>,
        shutdown: tokio_util::sync::CancellationToken,
    ) {
        //建立一個定時器,週期性的執行
        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
        let mut lifecycle_manager = LifecycleManager::new(Arc::clone(&self));
        //沒有收到中止服務器時候的信號就一直執行,1秒一次
        while !shutdown.is_cancelled() {
            //記錄執行的次數,每次加1,Ordering::Relaxed表明的單線程裏的原子操做
            self.worker_iterations.fetch_add(1, Ordering::Relaxed);
            //進入生命週期的管理
            lifecycle_manager.check_for_work();
            //收到不一樣信號以後的處理方法
            tokio::select! {
                _ = interval.tick() => {},
                _ = shutdown.cancelled() => break
            }
        }

        info!("finished background worker");
    }

前方高能,請注意:ui

fn check_for_work(&mut self, now: DateTime<Utc>) {
        //獲取建立數據庫的時候,對於Chunk的相關配置
        let rules = self.rules();
        //根據配置的排序規則,獲取出內存裏全部的chunk
        let chunks = self.chunks(&rules.sort_order);

        let mut buffer_size = 0;

        //判斷是否是有其餘的任務正在執行,move我理解針對於read buffer,write對於持久化
        let mut move_active = self.is_move_active();
        let mut write_active = self.is_write_active();

         //遍歷全部塊,檢查哪些塊能夠被持久化
        for chunk in &chunks {
            //獲取當前chunk的鎖
            let chunk_guard = chunk.upgradable_read();
            //獲取chunk佔用的內存大小
            buffer_size += Self::chunk_size(&*chunk_guard);
            //沒有移動任務而且Chunk裏最後的寫入時間比較老
            let would_move = !move_active && can_move(&rules, &*chunk_guard, now);
            //沒有寫出任務,而且開啓了持久化
            let would_write = !write_active && rules.persist;

            //判斷chunk的生命週期
            match chunk_guard.state() {
                 //屬於open狀態,而且是須要移動的(上面的邏輯裏有展現什麼是須要移動的)
                 //這裏我理解就是至關於實時寫入時候的一個補充方案
                 //試想,若是一個chunk一直不寫入數據,可能有一年了,查詢都再也不用這些數據了,內存卻被一直佔用
                ChunkState::Open(_) if would_move => {
                    let mut chunk_guard = RwLockUpgradableReadGuard::upgrade(chunk_guard);
                    //切換狀態到closing
                    chunk_guard.set_closing().expect("cannot close open chunk");
                    let partition_key = chunk_guard.key().to_string();
                    let chunk_id = chunk_guard.id();
                    std::mem::drop(chunk_guard);
                    move_active = true;
                    //移動到read_buffer,變爲不可寫入狀態(啓動了一個異步的線程,後面看)
                    self.move_to_read_buffer(partition_key, chunk_id);
                }
                //這裏有幾種狀況,一樣會在別處觸發爲closing
                //例如:chunk大小超過了設置的可變內存大小的時候
                ChunkState::Closing(_) if would_move => {
                    let partition_key = chunk_guard.key().to_string();
                    let chunk_id = chunk_guard.id();
                    std::mem::drop(chunk_guard);
                    move_active = true;
                    //移動到read_buffer
                    self.move_to_read_buffer(partition_key, chunk_id);
                }
                //已經被挪動到readbuffer中的
                ChunkState::Moved(_) if would_write => {
                    let partition_key = chunk_guard.key().to_string();
                    let chunk_id = chunk_guard.id();
                    std::mem::drop(chunk_guard);
                    write_active = true;
                    //寫入到對象存儲
                    self.write_to_object_store(partition_key, chunk_id);
                }
                _ => {}
            }
        }
        //這裏是主要檢查內存限制的邏輯,當全部chunk的大小超過限制的時候就要清理Chunk
        if let Some(soft_limit) = rules.buffer_size_soft {
            let mut chunks = chunks.iter();

            while buffer_size > soft_limit.get() {
                match chunks.next() {
                    Some(chunk) => {
                        //獲取讀鎖
                        let chunk_guard = chunk.read();
                        //若是配置了能夠清理未持久化數據,那麼處在read_buffer裏的數據也會被清理
                        //必定會清理已經被持久化到對象存儲上的數據
                        if (rules.drop_non_persisted
                            && matches!(chunk_guard.state(), ChunkState::Moved(_)))
                            || matches!(chunk_guard.state(), ChunkState::WrittenToObjectStore(_, _))
                        {
                            let partition_key = chunk_guard.key().to_string();
                            let chunk_id = chunk_guard.id();
                            buffer_size =
                                buffer_size.saturating_sub(Self::chunk_size(&*chunk_guard));
                            std::mem::drop(chunk_guard);
                            //真真正正的刪除邏輯後面看
                            self.drop_chunk(partition_key, chunk_id)
                        }
                    }
                    //沒有什麼能夠釋放的了
                    None => {
                        warn!(db_name=self.db_name(), soft_limit, buffer_size,
                              "soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules");
                        break;
                    }
                }
            }
        }
    }

這裏基本看清楚了Chunk的週期:.net

  1. 在寫入時候,若是沒有Chunk就會open一個,並處在open狀態。
  2. 若是寫入超過了一些限制,就會被標記爲closing;若是數據時間超過了配置的時間,也會被標記爲closing。標記爲closing的會添加一個後臺進程,準備將Chunk移動到read_buffer中。
  3. 後臺任務啓動後,會標記爲moving狀態,此時禁止Chunk再寫入任何數據。
  4. 一旦移動完成,會被標記爲moved
  5. 程序會對moved狀態下的Chunk開始進行持久化。
  6. 掃描任務會不斷判斷內存使用是否超過了限制,若是超過限制,會清理已經持久化的Chunk。若是配置了drop_non_persisted,會把read_buffer中未持久化的也刪除掉。

而後繼續看程序是怎樣將一個chunk移動到read_buffer的,由於篇幅的影響,將會在下一篇介紹數據是怎樣真正寫入到持久化存儲當中的。

pub async fn load_chunk_to_read_buffer(
        &self,
        partition_key: &str,
        chunk_id: u32,
    ) -> Result<Arc<DbChunk>> {
        //根據partition_key及chunk_id獲取內存中存儲的Chunk
        let chunk = {
            let partition = self
                .catalog
                .valid_partition(partition_key)
                .context(LoadingChunk {
                    partition_key,
                    chunk_id,
                })?;
            let partition = partition.read();

            partition.chunk(chunk_id).context(LoadingChunk {
                partition_key,
                chunk_id,
            })?
        };

        //設置當前的Chunk爲Moving狀態
        let mb_chunk = {
            let mut chunk = chunk.write();
            chunk.set_moving().context(LoadingChunk {
                partition_key,
                chunk_id,
            })?
        };
        info!(%partition_key, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
        let mut batches = Vec::new();
        //這裏是拿到Chunk中每一個Cloumn的統計信息,分別是min,max,count
        let table_stats = mb_chunk.table_summaries();

        //重新建立一個ReadBufferChunk,後面準備把全部數據都拷貝到這裏
        //還須要告訴內存管理這裏新申請了多少空間
        let rb_chunk =
            ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer);

        for stats in table_stats {
            //把內存中的數據,所有從新拷貝一次,轉換爲arrow格式
            mb_chunk
                .table_to_arrow(&mut batches, &stats.name, Selection::All)
                //這裏應該是尚未寫完,若是出現錯誤,這個Chunk該怎麼處理?
                .expect("Loading chunk to mutable buffer");
            //循環拷貝
            for batch in batches.drain(..) {
                rb_chunk.upsert_table(&stats.name, batch)
            }
        }

        let mut chunk = chunk.write();
        //更新寫入緩存裏的Chunk爲Moved狀態,同時Chunk內容修改成了ReadBuffer的Chunk
        //對於Chunk的結構後面看
        chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk {
            partition_key,
            chunk_id,
        })?;

        //工做所有都完成了,調用作快照的方法,方法裏什麼都沒作,返回新Chunk的一個Arc指針
        Ok(DbChunk::snapshot(&chunk))
    }

到這裏基本清楚了整個Chunk的工做方式,由於Chunk這個名字被代碼中重複使用到了,因此特地在文章末尾說一下都有什麼Chunk

//主要是存儲一個數據塊的描述信息,名字、最後寫入時間等
Server::db::catalog::chunk
//數據從客戶端直接寫入的內存塊
mutable_buffer::chunk
//在moving時候拷貝的新數據塊,arrow結構
read_buffer::chunk
//parquet對應的chunk
parquet_file::chunk
//query模塊下對PartitionChunk從新命名了一下
//對於相同的partition key的數據抽象的行爲
query -> type Chunk: PartitionChunk;
//實現PartitionChunk定義的方法,對不一樣位置下的chunk的操做
//如ParquetFile、MutableBuffer等
server::db::chunk

好了就到這裏,但願你也學到了不少

祝玩兒的開心


歡迎關注微信公衆號:

或添加微信好友: liutaohua001

相關文章
相關標籤/搜索