時序數據庫Influx-IOx源碼學習六-2(數據寫入)

InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。node

InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。數據庫

接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。json


上一章說到數據寫入時的分區機制及分區現有的功能。詳情見: http://www.javashuo.com/article/p-vxbttdfr-vk.html緩存

這一章記錄一下數據是怎樣進行存儲的。微信


上一章沒有細節的介紹數據從Line protocol被解析成了什麼樣子,在開篇先介紹一下數據被封裝後的展現。數據結構

轉換過程的代碼能夠參見internal_types/src/entry.rs : 157行中的build_table_write_batch方法;異步

內部數據結構能夠查看: generated_types/protos/influxdata/write/v1/entry.fbsasync

數據是被層層加碼組裝出來的:性能

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntryui

sharded_entries:[{
  shard_id: None,
  entry: {
      fb: {
        operation_type: write,
        operation: {
          partition_writes:[{
            key:"2019-05-02 16:00:00",
            table_batches:[
            { 
              name:"myMeasurement",
              columns:[
              {
                name:"fieldKey",
                logical_column_type: Field,
                values_type: StringValues,
                values: { values:["123"] },
                null_mask: None 
              },
              {
                name:"tag1",
                logical_column_type: Tag,
                values_type: StringValues,
                values: { values:["value1"]) },
                null_mask: None 
              },
              {
                name:"tag2",
                logical_column_type: Tag,
                values_type: StringValues,
                values: { values:["value2"]) },
                null_mask: None
              }, 
              { 
                name:"time",
                logical_column_type: Time,
                values_type: I64Values,
                values: { values:[1556813561098000000]) },
                null_mask: None
              }]
            }]
          }]
      } 
    }
  }
}]

數據在內存中就會造成如上格式保存,但要注意,內存中使用的 flatbuffer 格式保存,上面只是爲了展現內容

繼續上節裏的內容,結構被拼湊完成以後,就會調用write_sharded_entry方法去進行實際寫入工做:

futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                 //對每一個數據進行寫入到shard
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

而後看是怎樣寫入到shard的,由於shard的寫入尚未完成,因此只能關注單機的寫入了。具體看代碼:

async fn write_sharded_entry(
        &self,
        db_name: &str,
        db: &Db,
        shards: Arc<HashMap<u32, NodeGroup>>,
        sharded_entry: ShardedEntry,
    ) -> Result<()> {
        //判斷shard的id是否爲null,若是是null就寫入本地
        //不然就寫入到具體的shard去
        match sharded_entry.shard_id {
            Some(shard_id) => {
                let node_group = shards.get(&shard_id).context(ShardNotFound { shard_id })?;
                //尚未真正的實現,能夠看下面的方法
                self.write_entry_downstream(db_name, node_group, &sharded_entry.entry)
                    .await?
            }
            None => self.write_entry_local(&db, sharded_entry.entry).await?,
        }
        Ok(())
    }


    //能夠看到尚未實現遠程的寫入
    async fn write_entry_downstream(
        &self,
        db_name: &str,
        node_group: &[WriterId],
        _entry: &Entry,
    ) -> Result<()> {
        todo!(
            "perform API call of sharded entry {} to one of the nodes {:?}",
            db_name,
            node_group
        )
    }

  //數據對本地寫入
   pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> {
        //繼續往下跟蹤
        db.store_entry(entry).map_err(|e| match e {
            db::Error::HardLimitReached {} => Error::HardLimitReached {},
            _ => Error::UnknownDatabaseError {
                source: Box::new(e),
            },
        })?;

        Ok(())
    }

//方法彷佛什麼都沒作,只是增補了clock_value和write_id
//註釋上解釋到logical clock是一個用來在數據庫內部把entry變爲有序的字段
pub fn store_entry(&self, entry: Entry) -> Result<()> {
        //生成一個新的結構SequencedEntry並增補字段
        let sequenced_entry = SequencedEntry::new_from_entry_bytes(
            ClockValue::new(self.next_sequence()),
            self.server_id.get(),
            entry.data(),
        ).context(SequencedEntryError)?;
        //關於讀緩存相關的配置和實現,先不用管
        if self.rules.read().wal_buffer_config.is_some() {
            todo!("route to the Write Buffer. TODO: carols10cents #1157")
        }
        //繼續調用其餘方法
        self.store_sequenced_entry(sequenced_entry)
    }

上面的全部方法完成以後,基本的插入數據格式就準備完成了,接下來就是寫入內存存儲:

pub fn store_sequenced_entry(&self, sequenced_entry: SequencedEntry) -> Result<()> {
        //讀取出數據庫對於寫入相關的配置信息
        //包括是否可寫、是否超過內存限制等等驗證
        let rules = self.rules.read();
        let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold;
        if rules.lifecycle_rules.immutable {
            return DatabaseNotWriteable {}.fail();
        }
        if let Some(hard_limit) = rules.lifecycle_rules.buffer_size_hard {
            if self.memory_registries.bytes() > hard_limit.get() {
                return HardLimitReached {}.fail();
            }
        }
        //rust語言中的釋放變量
        std::mem::drop(rules);

        //由於是批量寫入,因此須要循環
        //partition_writes的數據格式能夠參見上面的json數據
        if let Some(partitioned_writes) = sequenced_entry.partition_writes() {
            for write in partitioned_writes {
                let partition_key = write.key();
                //根據以前生成的partition_key來獲得或者建立一個partition描述
                let partition = self.catalog.get_or_create_partition(partition_key);
                //這裏是拿到一個寫鎖
                let mut partition = partition.write();
                //更新這個partition最後的插入時間
                //記錄這個的目的,代碼上並沒寫明白是作什麼用的
                partition.update_last_write_at();
                //找到一個打開的chunk
                //不知道爲何每次都要在全部chunk裏搜索一次
                //難道是同時可能有不少個chunk均可以寫入?
                let chunk = partition.open_chunk().unwrap_or_else(|| {
                   //不然就建立一個新的chunk出來
                   partition.create_open_chunk(self.memory_registries.mutable_buffer.as_ref())
                });
                //獲取一個寫鎖
                let mut chunk = chunk.write();
                //更新當前chunk的第一條、最後一條寫入記錄
                chunk.record_write();
                //獲得chunk的內存區域,稱爲mutable_buffer
                let chunk_id = chunk.id();
                let mb_chunk = chunk.mutable_buffer().expect("cannot mutate open chunk");
                //真正的寫入到內存中
                mb_chunk
                    .write_table_batches(
                        sequenced_entry.clock_value(),
                        sequenced_entry.writer_id(),
                        &write.table_batches(),
                    )
                    .context(WriteEntry {
                        partition_key,
                        chunk_id,
                    })?;

                //若是當前chunk寫入數據的大小超過了設置的限制,就關閉
                //關閉的意思就是把狀態製爲Closing,並更新關閉時間
                let size = mb_chunk.size();
                if let Some(threshold) = mutable_size_threshold {
                    if size > threshold.get() {
                        chunk.set_closing().expect("cannot close open chunk")
                    }
                }
            }
        }

        Ok(())
    }

再深刻的就不繼續跟蹤了,可是思路仍是比較清晰了。

1.分區相關

client --> grpc --> 進行分區shard --> 分區partition

2.寫入相關

  • 結構封裝

LP --> TableWriteBatch --> PartitionWrite --> WriteOperations --> Entry --> ShardedEntry --> SequencedEntry

  • 內存寫入空間

catalog -> partition -> table -> column

  • 達到指定大小後標記爲關閉

  • 異步 - 後臺線程進行內存整理


到這裏基本就完成了全部的寫入,並返回給客戶端成功。

關於後臺線程的內存整理再下一篇中繼續介紹。

祝玩兒的開心


歡迎關注微信公衆號:

或添加微信好友: liutaohua001

相關文章
相關標籤/搜索