時序數據庫Influx-IOx源碼學習六-1(數據寫入之分區)

InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。正則表達式

InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。算法

接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。數據庫


上一章說到如何建立一個數據庫,而且數據庫的描述信息是如何保存的。詳情見:https://my.oschina.net/u/3374539/blog/5025128服務器

這一章記錄一下,數據是如何寫入並保存的,具體會分爲兩篇來寫:微信

  • 一篇介紹分區是如何完成的
  • 一篇介紹具體的寫入

說到數據寫入,必然是須要可以鏈接到服務器。IOx項目爲提供了多種方式能夠於服務器進行交互,分別是GrpcHttp基於這兩種通訊方式,又擴展支持了influxdb2_client以及influxdb_iox_clientasync

基於influxdb_iox_client我寫了一個數據寫入及查詢的示例來觀測接口是如何組織的,代碼以下:性能

#[tokio::main]
async fn main() {
    {
        let connection = Builder::default()
            .build("http://127.0.0.1:8081")
            .await
            .unwrap();
        write::Client::new(connection)
            .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#)
            .await
            .expect("failed to write data");
    }

    let connection = Builder::default()
        .build("http://127.0.0.1:8081")
        .await
        .unwrap();

    let mut query = flight::Client::new(connection)
        .perform_query("a", "select * from myMeasurement")
        .await
        .expect("query request should work");

    let mut batches = vec![];

    while let Some(data) = query.next().await.expect("valid batches") {
        batches.push(data);
    }

    let format1 = format::QueryOutputFormat::Pretty;
    println!("{}", format1.format(&batches).unwrap());
}


+------------+--------+--------+-------------------------+
| fieldKey   | tag1   | tag2   | time                    |
+------------+--------+--------+-------------------------+
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
+------------+--------+--------+-------------------------+

由於我多運行了幾回,因此能看到數據被重複插入了。ui

這裏還須要說一下的是寫入的語句格式能夠參見:url

[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format.net


write::Client中的write方法生成了一個WriteRequest結構,並使用RPC調用遠程的write方法。打開src/influxdb_ioxd/rpc/write.rs : 22行能夠看到方法的具體實現。

async fn write(
        &self,
        request: tonic::Request<WriteRequest>,
    ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
        let request = request.into_inner();
        //獲得上面在客戶端中寫入的數據庫名字,在上面的例子中傳入的"a"
        let db_name = request.db_name;
        //這裏獲得了寫入的LineProtocol
        let lp_data = request.lp_data;
        let lp_chars = lp_data.len();
        //解析LineProtocol的內容
        //示例中的lp會被解析爲:
        //measurement: "myMeasurement"
        //tag_set: [("tag1", "value1"), ("tag2", "value2")]
        //field_set: [("fieldKey", "123")]
        //timestamp: 1556813561098000000
        let lines = parse_lines(&lp_data)
            .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
            .map_err(|e| FieldViolation {
                field: "lp_data".into(),
                description: format!("Invalid Line Protocol: {}", e),
            })?;

        let lp_line_count = lines.len();
        debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
        //對數據進行保存
        self.server
            .write_lines(&db_name, &lines)
            .await
            .map_err(default_server_error_handler)?;
        //返回成功
        let lines_written = lp_line_count as u64;
        Ok(Response::new(WriteResponse { lines_written }))
    }

繼續看self.server.write_lines的執行:

pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
        self.require_id()?;
        //驗證一下名字,而後拿到以前建立數據庫時候在內存中存儲的相關信息
        let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
        let db = self
            .config
            .db(&db_name)
            .context(DatabaseNotFound { db_name: &*db_name })?;
        //這裏就開始執行分片相關的策略
        let (sharded_entries, shards) = {
            //讀取建立數據庫時候配置的分片策略
            let rules = db.rules.read();
            let shard_config = &rules.shard_config;
            //根據數據和shard策略,把逐個數據對應的分區找到
            //寫入到一個List<分區標識,List<數據>>這樣的結構中
            //具體的結構信息後面看
            let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules)
                .context(LineConversion)?;
            //再把全部分區的配置返回給調用者
            let shards = shard_config
                .as_ref()
                .map(|cfg| Arc::clone(&cfg.shards))
                .unwrap_or_default();

            (sharded_entries, shards)
        };

        //根據上面返回的集合進行map方法遍歷,寫到每一個分區中
        futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

        Ok(())
    }

這裏描述了寫入一條數據的主邏輯:數據寫入的時候,先把數據劃分到具體的分區裏(使用List結構存儲下全部的分區對應的數據),而後並行的進行數據寫入

接下來看,數據是如何進行分區的:

pub fn lines_to_sharded_entries(
    lines: &[ParsedLine<'_>],
    sharder: Option<&impl Sharder>,
    partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> {
    let default_time = Utc::now();
    let mut sharded_lines = BTreeMap::new();

    //對全部要插入的數據進行遍歷
    for line in lines {
        //先找到符合哪一個shard
        let shard_id = match &sharder {
            Some(s) => Some(s.shard(line).context(GeneratingShardId)?),
            None => None,
        };
        //再判斷屬於哪一個分區
        let partition_key = partitioner
            .partition_key(line, &default_time)
            .context(GeneratingPartitionKey)?;
        let table = line.series.measurement.as_str();
        //最後存儲到一個map中
        //shard-> partition -> table -> List<data> 的映射關係
        sharded_lines
            .entry(shard_id)
            .or_insert_with(BTreeMap::new)
            .entry(partition_key)
            .or_insert_with(BTreeMap::new)
            .entry(table)
            .or_insert_with(Vec::new)
            .push(line);
    }

    let default_time = Utc::now();
    //最後遍歷這個map 轉換到以前提到的List結構中
    let sharded_entries = sharded_lines
        .into_iter()
        .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time))
        .collect::<Result<Vec<_>>>()?;
    Ok(sharded_entries)
}

這裏理解shard的概念就是一個或者一組機器,稱爲一個shard,他們負責真正的存儲數據。

partition理解爲一個個文件夾,在shard上具體的存儲路徑。

這裏看一下是怎樣完成shard的劃分的:

impl Sharder for ShardConfig {
    fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
        if let Some(specific_targets) = &self.specific_targets {
            //若是對數據進行匹配,若是符合規則就返回,能夠採用當前的shard
            //官方的代碼中只實現了根據表名進行shard的策略
            //這個配置彷佛只能經過grpc來進行設置,這樣好處多是未來有個什麼管理界面能動態修改
            if specific_targets.matcher.match_line(line) {
                return Ok(specific_targets.shard);
            }
        }
        //若是沒有配置就使用hash的方式
        //對整條數據進行hash,而後比較機器的hash,找到合適的節點
        //若是沒找到,就放在hashring的第一個節點
        //hash算法見後面
        if let Some(hash_ring) = &self.hash_ring {
            return hash_ring
                .shards
                .find(LineHasher { line, hash_ring })
                .context(NoShardsDefined);
        }

        NoShardingRuleMatches {
            line: line.to_string(),
        }
        .fail()
    }
}
//具體的Hash算法,若是全配置的話分的就會特別散,幾乎不一樣測點都放到了不一樣的地方
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        //若是配置了使用table名字就在hash中加入tablename
        if self.hash_ring.table_name {
            self.line.series.measurement.hash(state);
        }
        //而後按照配置的列的值進行hash
        for column in &self.hash_ring.columns {
            if let Some(tag_value) = self.line.tag_value(column) {
                tag_value.hash(state);
            } else if let Some(field_value) = self.line.field_value(column) {
                field_value.to_string().hash(state);t
            }
            state.write_u8(0); // column separator
        }
    }
}

接下來看默認的partition分區方式:

impl Partitioner for PartitionTemplate {
    fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
        let parts: Vec<_> = self
            .parts
            .iter()
             //匹配分區策略,或者是單一的,或者是複合的
             //目前支持基於表、值、時間
             //其他還會支持正則表達式和strftime模式
            .map(|p| match p {
                TemplatePart::Table => line.series.measurement.to_string(),
                TemplatePart::Column(column) => match line.tag_value(&column) {
                    Some(v) => format!("{}_{}", column, v),
                    None => match line.field_value(&column) {
                        Some(v) => format!("{}_{}", column, v),
                        None => "".to_string(),
                    },
                },
                TemplatePart::TimeFormat(format) => match line.timestamp {
                    Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(),
                    None => default_time.format(&format).to_string(),
                },
                _ => unimplemented!(),
            })
            .collect();
        //最後返回一個組合文件名,或者是 a-b-c 或者是一個單一的值
        Ok(parts.join("-"))
    }
}

到這裏分區的工做就完成了,下一篇繼續分析是怎樣寫入的。

祝玩兒的開心


歡迎關注微信公衆號:

或添加微信好友: liutaohua001

相關文章
相關標籤/搜索