InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。sql
InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。數據庫
接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。緩存
上一章介紹了一個SQL是怎樣從字符串轉換到物理執行計劃的,詳情見:微信
http://www.javashuo.com/article/p-rppaqvmf-vm.html框架
這一章主要記錄一下物理計劃是怎樣執行的。async
在上一篇文章的末尾,咱們展現了物理計劃之中存儲的數據,這些數據表明了當前整個數據庫中,可以與用戶輸入的查詢表相關聯的全部數據。數據庫設計
對於通常數據庫來說,在物理計劃中更應該是指向索引相關的信息,舉例來講:select * from table1 ,在物理計劃裏,應該是要拿到table1的表描述、存儲數據的文件路徑、文件大小、等等,而不是拿到真實數據。在文章最末尾中,有一段省略的數據,爲何會出現數據呢?其實這是數據庫設計的緩存,緩存的數據原本就沒有落到磁盤上,因此直接在物理計劃中也會持有RBChunk和MBChunk的數據引用。性能
對於一個過濾而言,會在物理計劃中產生對應的信息,展現以下:ui
select * from myMeasurement where fieldKey like 'value1'; input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } }
接下來看物理計劃的執行代碼:spa
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> { match plan.output_partitioning().partition_count() { 0 => Ok(vec![]), //單一塊的時候直接取出數據 1 => { let it = plan.execute(0).await?; common::collect(it).await } //多個數據塊的時候就須要進行合併數據 _ => { let plan = MergeExec::new(plan.clone()); assert_eq!(1, plan.output_partitioning().partition_count()); //這裏分爲了兩步execute 和 collect common::collect(plan.execute(0).await?).await } } }
接下來看plan.execute方法:
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { 。。。省略 tokio::spawn(async move { //這裏的input就表明了上面展現的filter的input或者是數據的input let mut stream = match input.execute(part_i).await { Err(e) => { let arrow_error = ArrowError::ExternalError(Box::new(e)); sender.send(Err(arrow_error)).await.ok(); return; } Ok(stream) => stream, }; //計劃執行完成以後返回一個stream,這裏就是一直next獲取完 while let Some(item) = stream.next().await { sender.send(item).await.ok(); } }); 。。。省略 }
上面的input表明瞭如下這麼多東西:
上面展現的爲datafusion框架裏的Plan,也就是通用sql都須要實現的功能,下面是iox項目中實現的Plan是完成數據獲取的。
Plan之間的關係是嵌套的,想象一下上一章的大圖,好比coalesceBatchesExec裏可能還會包含filter,主要就是描述整個sql語句中都出現了什麼。全部出現的plan就會對數據進行一次全面的過濾。
姑且不看過濾的細節,只看獲取數據的部分(ExecutionPlan for IOxReadFilterNode)。
async fn execute( &self, partition: usize, ) -> datafusion::error::Result<SendableRecordBatchStream> { //由於在前面物理計劃中獲得了全部列,這裏拿出列的名字 let fields = self.schema.fields(); let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>(); //多個分區的時候能夠根據分區號拿出chunk信息 let ChunkInfo { chunk, chunk_table_schema, } = &self.chunk_and_infos[partition]; //過濾出來列名字對應的arrow的filed,這裏就存在不對應的問題,假如用戶輸入了ABC,可是chunk_table_schema中並不存在,這裏就會是一個空 let selection_cols = restrict_selection(selection_cols, &chunk_table_schema); let selection = Selection::Some(&selection_cols); //使用predicate過濾一次,可是我調試的時候一直是空的,也就是查詢出全部數據。 let stream = chunk .read_filter(&self.table_name, &self.predicate, selection) .map_err(|e| { DataFusionError::Execution(format!( "Error creating scan for table {} chunk {}: {}", self.table_name, chunk.id(), e )) })?; //這裏使用SchemaAdapterStream的結構來填充空值列 let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema)) .map_err(|e| DataFusionError::Internal(e.to_string()))?; Ok(Box::pin(adapter)) }
這個SchemaAdapterStream在代碼中給了一個特別形象的描述:
/// /// ┌────────────────┐ ┌─────────────────────────┐ /// │ ┌─────┐┌─────┐ │ │ ┌─────┐┌──────┐┌─────┐ │ /// │ │ A ││ C │ │ │ │ A ││ B ││ C │ │ /// │ │ - ││ - │ │ │ │ - ││ - ││ - │ │ /// ┌──────────────┐ │ │ 1 ││ 10 │ │ ┌──────────────┐ │ │ 1 ││ NULL ││ 10 │ │ /// │ Input │ │ │ 2 ││ 20 │ │ │ Adapter │ │ │ 2 ││ NULL ││ 20 │ │ /// │ Stream ├────▶ │ │ 3 ││ 30 │ │────▶│ Stream ├───▶│ │ 3 ││ NULL ││ 30 │ │ /// └──────────────┘ │ │ 4 ││ 40 │ │ └──────────────┘ │ │ 4 ││ NULL ││ 40 │ │ /// │ └─────┘└─────┘ │ │ └─────┘└──────┘└─────┘ │ /// │ │ │ │ /// │ Record Batch │ │ Record Batch │ /// └────────────────┘ └─────────────────────────┘ ///
接下來看如何實現數據查找的:
fn read_filter( &self, table_name: &str, predicate: &Predicate, selection: Selection<'_>, ) -> Result<SendableRecordBatchStream, Self::Error> { //chunk存在變體,這裏就是先判斷是什麼chunk,有三種MB,RB,ParquetFile match self { //仍是在寫入階段的buffer,暫時不支持查詢條件 Self::MutableBuffer { chunk, .. } => { if !predicate.is_empty() { return InternalPredicateNotSupported { predicate: predicate.clone(), } .fail(); } let batch = chunk .read_filter(table_name, selection) .context(MutableBufferChunk)?; Ok(Box::pin(MemoryStream::new(vec![batch]))) } //不可寫階段的buffer,對數據進行過濾 Self::ReadBuffer { chunk, .. } => { let rb_predicate = to_read_buffer_predicate(&predicate).context(PredicateConversion)?; //讀取數據並過濾 let read_results = chunk .read_filter(table_name, rb_predicate, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //讀取schema信息並過濾 let schema = chunk .read_filter_table_schema(table_name, selection) .context(ReadBufferChunkError { chunk_id: chunk.id(), })?; //ReadFilterResultsStream是對不一樣的chunk類型實現的讀取接口 Ok(Box::pin(ReadFilterResultsStream::new( read_results, schema.into(), ))) } //Parquet同理 Self::ParquetFile { chunk, .. } => chunk .read_filter(table_name, predicate, selection) .context(ParquetFileChunkError { chunk_id: chunk.id(), }), } }
數據到了這裏就會按照你選擇的表名、列名,將數據所有查詢出來了。在代碼中的predicate,一直是空的,暫時不肯定是如何填充的,後面再看。
數據從這裏所有查詢出來以後,會返回給datafusion框架,繼續按照開頭寫到的過濾器進行過濾,就是遍歷一遍數據判斷大於、小於或者like等等。
好了查詢就先寫到這裏。
祝玩兒的開心!!
歡迎關注微信公衆號:
或添加微信好友: liutaohua001