InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。正則表達式
InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。算法
接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。數據庫
上一章說到如何建立一個數據庫,而且數據庫的描述信息是如何保存的。詳情見:https://my.oschina.net/u/3374539/blog/5025128服務器
這一章記錄一下,數據是如何寫入並保存的,具體會分爲兩篇來寫:微信
- 一篇介紹分區是如何完成的
- 一篇介紹具體的寫入
說到數據寫入,必然是須要可以鏈接到服務器。IOx
項目爲提供了多種方式能夠於服務器進行交互,分別是Grpc
和Http
基於這兩種通訊方式,又擴展支持了influxdb2_client
以及influxdb_iox_client
。async
基於influxdb_iox_client
我寫了一個數據寫入及查詢的示例來觀測接口是如何組織的,代碼以下:性能
#[tokio::main] async fn main() { { let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); write::Client::new(connection) .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#) .await .expect("failed to write data"); } let connection = Builder::default() .build("http://127.0.0.1:8081") .await .unwrap(); let mut query = flight::Client::new(connection) .perform_query("a", "select * from myMeasurement") .await .expect("query request should work"); let mut batches = vec![]; while let Some(data) = query.next().await.expect("valid batches") { batches.push(data); } let format1 = format::QueryOutputFormat::Pretty; println!("{}", format1.format(&batches).unwrap()); } +------------+--------+--------+-------------------------+ | fieldKey | tag1 | tag2 | time | +------------+--------+--------+-------------------------+ | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 | | 123 | value1 | value2 | 2019-05-02 16:12:41.098 | +------------+--------+--------+-------------------------+
由於我多運行了幾回,因此能看到數據被重複插入了。ui
這裏還須要說一下的是寫入的語句格式能夠參見:url
[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format.net
write::Client
中的write
方法生成了一個WriteRequest
結構,並使用RPC
調用遠程的write
方法。打開src/influxdb_ioxd/rpc/write.rs : 22行
能夠看到方法的具體實現。
async fn write( &self, request: tonic::Request<WriteRequest>, ) -> Result<tonic::Response<WriteResponse>, tonic::Status> { let request = request.into_inner(); //獲得上面在客戶端中寫入的數據庫名字,在上面的例子中傳入的"a" let db_name = request.db_name; //這裏獲得了寫入的LineProtocol let lp_data = request.lp_data; let lp_chars = lp_data.len(); //解析LineProtocol的內容 //示例中的lp會被解析爲: //measurement: "myMeasurement" //tag_set: [("tag1", "value1"), ("tag2", "value2")] //field_set: [("fieldKey", "123")] //timestamp: 1556813561098000000 let lines = parse_lines(&lp_data) .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>() .map_err(|e| FieldViolation { field: "lp_data".into(), description: format!("Invalid Line Protocol: {}", e), })?; let lp_line_count = lines.len(); debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database"); //對數據進行保存 self.server .write_lines(&db_name, &lines) .await .map_err(default_server_error_handler)?; //返回成功 let lines_written = lp_line_count as u64; Ok(Response::new(WriteResponse { lines_written })) }
繼續看self.server.write_lines
的執行:
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> { self.require_id()?; //驗證一下名字,而後拿到以前建立數據庫時候在內存中存儲的相關信息 let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self .config .db(&db_name) .context(DatabaseNotFound { db_name: &*db_name })?; //這裏就開始執行分片相關的策略 let (sharded_entries, shards) = { //讀取建立數據庫時候配置的分片策略 let rules = db.rules.read(); let shard_config = &rules.shard_config; //根據數據和shard策略,把逐個數據對應的分區找到 //寫入到一個List<分區標識,List<數據>>這樣的結構中 //具體的結構信息後面看 let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules) .context(LineConversion)?; //再把全部分區的配置返回給調用者 let shards = shard_config .as_ref() .map(|cfg| Arc::clone(&cfg.shards)) .unwrap_or_default(); (sharded_entries, shards) }; //根據上面返回的集合進行map方法遍歷,寫到每一個分區中 futures_util::future::try_join_all( sharded_entries .into_iter() .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)), ) .await?; Ok(()) }
這裏描述了寫入一條數據的主邏輯:數據寫入的時候,先把數據劃分到具體的分區裏(使用List結構存儲下全部的分區對應的數據),而後並行的進行數據寫入
接下來看,數據是如何進行分區的:
pub fn lines_to_sharded_entries( lines: &[ParsedLine<'_>], sharder: Option<&impl Sharder>, partitioner: &impl Partitioner, ) -> Result<Vec<ShardedEntry>> { let default_time = Utc::now(); let mut sharded_lines = BTreeMap::new(); //對全部要插入的數據進行遍歷 for line in lines { //先找到符合哪一個shard let shard_id = match &sharder { Some(s) => Some(s.shard(line).context(GeneratingShardId)?), None => None, }; //再判斷屬於哪一個分區 let partition_key = partitioner .partition_key(line, &default_time) .context(GeneratingPartitionKey)?; let table = line.series.measurement.as_str(); //最後存儲到一個map中 //shard-> partition -> table -> List<data> 的映射關係 sharded_lines .entry(shard_id) .or_insert_with(BTreeMap::new) .entry(partition_key) .or_insert_with(BTreeMap::new) .entry(table) .or_insert_with(Vec::new) .push(line); } let default_time = Utc::now(); //最後遍歷這個map 轉換到以前提到的List結構中 let sharded_entries = sharded_lines .into_iter() .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time)) .collect::<Result<Vec<_>>>()?; Ok(sharded_entries) }
這裏理解shard
的概念就是一個或者一組機器,稱爲一個shard
,他們負責真正的存儲數據。
partition
理解爲一個個文件夾,在shard
上具體的存儲路徑。
這裏看一下是怎樣完成shard
的劃分的:
impl Sharder for ShardConfig { fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> { if let Some(specific_targets) = &self.specific_targets { //若是對數據進行匹配,若是符合規則就返回,能夠採用當前的shard //官方的代碼中只實現了根據表名進行shard的策略 //這個配置彷佛只能經過grpc來進行設置,這樣好處多是未來有個什麼管理界面能動態修改 if specific_targets.matcher.match_line(line) { return Ok(specific_targets.shard); } } //若是沒有配置就使用hash的方式 //對整條數據進行hash,而後比較機器的hash,找到合適的節點 //若是沒找到,就放在hashring的第一個節點 //hash算法見後面 if let Some(hash_ring) = &self.hash_ring { return hash_ring .shards .find(LineHasher { line, hash_ring }) .context(NoShardsDefined); } NoShardingRuleMatches { line: line.to_string(), } .fail() } } //具體的Hash算法,若是全配置的話分的就會特別散,幾乎不一樣測點都放到了不一樣的地方 impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> { fn hash<H: Hasher>(&self, state: &mut H) { //若是配置了使用table名字就在hash中加入tablename if self.hash_ring.table_name { self.line.series.measurement.hash(state); } //而後按照配置的列的值進行hash for column in &self.hash_ring.columns { if let Some(tag_value) = self.line.tag_value(column) { tag_value.hash(state); } else if let Some(field_value) = self.line.field_value(column) { field_value.to_string().hash(state);t } state.write_u8(0); // column separator } } }
接下來看默認的partition
分區方式:
impl Partitioner for PartitionTemplate { fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> { let parts: Vec<_> = self .parts .iter() //匹配分區策略,或者是單一的,或者是複合的 //目前支持基於表、值、時間 //其他還會支持正則表達式和strftime模式 .map(|p| match p { TemplatePart::Table => line.series.measurement.to_string(), TemplatePart::Column(column) => match line.tag_value(&column) { Some(v) => format!("{}_{}", column, v), None => match line.field_value(&column) { Some(v) => format!("{}_{}", column, v), None => "".to_string(), }, }, TemplatePart::TimeFormat(format) => match line.timestamp { Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(), None => default_time.format(&format).to_string(), }, _ => unimplemented!(), }) .collect(); //最後返回一個組合文件名,或者是 a-b-c 或者是一個單一的值 Ok(parts.join("-")) } }
到這裏分區的工做就完成了,下一篇繼續分析是怎樣寫入的。
祝玩兒的開心
歡迎關注微信公衆號:
或添加微信好友: liutaohua001