時序數據庫Influx-IOx源碼學習十二(物理計劃的執行)

InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。sql

InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。數據庫

接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。緩存


上一章介紹了一個SQL是怎樣從字符串轉換到物理執行計劃的,詳情見:微信

http://www.javashuo.com/article/p-rppaqvmf-vm.html框架

這一章主要記錄一下物理計劃是怎樣執行的。async


在上一篇文章的末尾,咱們展現了物理計劃之中存儲的數據,這些數據表明了當前整個數據庫中,可以與用戶輸入的查詢表相關聯的全部數據。數據庫設計

對於通常數據庫來說,在物理計劃中更應該是指向索引相關的信息,舉例來講:select * from table1 ,在物理計劃裏,應該是要拿到table1的表描述、存儲數據的文件路徑、文件大小、等等,而不是拿到真實數據。在文章最末尾中,有一段省略的數據,爲何會出現數據呢?其實這是數據庫設計的緩存,緩存的數據原本就沒有落到磁盤上,因此直接在物理計劃中也會持有RBChunk和MBChunk的數據引用。性能

對於一個過濾而言,會在物理計劃中產生對應的信息,展現以下:ui

select * from myMeasurement where fieldKey like 'value1';

input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } }

接下來看物理計劃的執行代碼:spa

pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
    match plan.output_partitioning().partition_count() {
        0 => Ok(vec![]),
        //單一塊的時候直接取出數據
        1 => {
            let it = plan.execute(0).await?;
            common::collect(it).await
        }
        //多個數據塊的時候就須要進行合併數據
        _ => {
            let plan = MergeExec::new(plan.clone());
            assert_eq!(1, plan.output_partitioning().partition_count());
            //這裏分爲了兩步execute 和 collect
            common::collect(plan.execute(0).await?).await
        }
    }
}

接下來看plan.execute方法:

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
       。。。省略
      tokio::spawn(async move {
           //這裏的input就表明了上面展現的filter的input或者是數據的input
          let mut stream = match input.execute(part_i).await {
              Err(e) => {
                  let arrow_error = ArrowError::ExternalError(Box::new(e));
                  sender.send(Err(arrow_error)).await.ok();
                  return;
              }
              Ok(stream) => stream,
          };
          //計劃執行完成以後返回一個stream,這裏就是一直next獲取完
          while let Some(item) = stream.next().await {
              sender.send(item).await.ok();
          }
      });
      。。。省略
}

上面的input表明瞭如下這麼多東西:

上面展現的爲datafusion框架裏的Plan,也就是通用sql都須要實現的功能,下面是iox項目中實現的Plan是完成數據獲取的。

Plan之間的關係是嵌套的,想象一下上一章的大圖,好比coalesceBatchesExec裏可能還會包含filter,主要就是描述整個sql語句中都出現了什麼。全部出現的plan就會對數據進行一次全面的過濾。

姑且不看過濾的細節,只看獲取數據的部分(ExecutionPlan for IOxReadFilterNode)。

async fn execute(
        &self,
        partition: usize,
    ) -> datafusion::error::Result<SendableRecordBatchStream> {
        //由於在前面物理計劃中獲得了全部列,這裏拿出列的名字
        let fields = self.schema.fields();
        let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>();
        //多個分區的時候能夠根據分區號拿出chunk信息
        let ChunkInfo {
            chunk,
            chunk_table_schema,
        } = &self.chunk_and_infos[partition];

        //過濾出來列名字對應的arrow的filed,這裏就存在不對應的問題,假如用戶輸入了ABC,可是chunk_table_schema中並不存在,這裏就會是一個空
        let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
        let selection = Selection::Some(&selection_cols);
        //使用predicate過濾一次,可是我調試的時候一直是空的,也就是查詢出全部數據。
        let stream = chunk
            .read_filter(&self.table_name, &self.predicate, selection)
            .map_err(|e| {
                DataFusionError::Execution(format!(
                    "Error creating scan for table {} chunk {}: {}",
                    self.table_name,
                    chunk.id(),
                    e
                ))
            })?;
        //這裏使用SchemaAdapterStream的結構來填充空值列
        let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema))
            .map_err(|e| DataFusionError::Internal(e.to_string()))?;

        Ok(Box::pin(adapter))
    }

這個SchemaAdapterStream在代碼中給了一個特別形象的描述:

///
///                       ┌────────────────┐                         ┌─────────────────────────┐
///                       │ ┌─────┐┌─────┐ │                         │ ┌─────┐┌──────┐┌─────┐  │
///                       │ │  A  ││  C  │ │                         │ │  A  ││  B   ││  C  │  │
///                       │ │  -  ││  -  │ │                         │ │  -  ││  -   ││  -  │  │
/// ┌──────────────┐      │ │  1  ││ 10  │ │     ┌──────────────┐    │ │  1  ││ NULL ││ 10  │  │
/// │    Input     │      │ │  2  ││ 20  │ │     │   Adapter    │    │ │  2  ││ NULL ││ 20  │  │
/// │    Stream    ├────▶ │ │  3  ││ 30  │ │────▶│    Stream    ├───▶│ │  3  ││ NULL ││ 30  │  │
/// └──────────────┘      │ │  4  ││ 40  │ │     └──────────────┘    │ │  4  ││ NULL ││ 40  │  │
///                       │ └─────┘└─────┘ │                         │ └─────┘└──────┘└─────┘  │
///                       │                │                         │                         │
///                       │  Record Batch  │                         │      Record Batch       │
///                       └────────────────┘                         └─────────────────────────┘
///

接下來看如何實現數據查找的:

fn read_filter(
        &self,
        table_name: &str,
        predicate: &Predicate,
        selection: Selection<'_>,
    ) -> Result<SendableRecordBatchStream, Self::Error> {
         //chunk存在變體,這裏就是先判斷是什麼chunk,有三種MB,RB,ParquetFile
        match self {
            //仍是在寫入階段的buffer,暫時不支持查詢條件
            Self::MutableBuffer { chunk, .. } => {
                if !predicate.is_empty() {
                    return InternalPredicateNotSupported {
                        predicate: predicate.clone(),
                    }
                    .fail();
                }
                let batch = chunk
                    .read_filter(table_name, selection)
                    .context(MutableBufferChunk)?;

                Ok(Box::pin(MemoryStream::new(vec![batch])))
            }
            //不可寫階段的buffer,對數據進行過濾
            Self::ReadBuffer { chunk, .. } => {
                let rb_predicate =
                    to_read_buffer_predicate(&predicate).context(PredicateConversion)?;
                //讀取數據並過濾
                let read_results = chunk
                    .read_filter(table_name, rb_predicate, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //讀取schema信息並過濾
                let schema = chunk
                    .read_filter_table_schema(table_name, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //ReadFilterResultsStream是對不一樣的chunk類型實現的讀取接口
                Ok(Box::pin(ReadFilterResultsStream::new(
                    read_results,
                    schema.into(),
                )))
            }
            //Parquet同理
            Self::ParquetFile { chunk, .. } => chunk
                .read_filter(table_name, predicate, selection)
                .context(ParquetFileChunkError {
                    chunk_id: chunk.id(),
                }),
        }
    }

數據到了這裏就會按照你選擇的表名、列名,將數據所有查詢出來了。在代碼中的predicate,一直是空的,暫時不肯定是如何填充的,後面再看。

數據從這裏所有查詢出來以後,會返回給datafusion框架,繼續按照開頭寫到的過濾器進行過濾,就是遍歷一遍數據判斷大於、小於或者like等等。

好了查詢就先寫到這裏。

祝玩兒的開心!!


歡迎關注微信公衆號:

或添加微信好友: liutaohua001

相關文章
相關標籤/搜索