時序數據庫Influx-IOx源碼學習十(查詢主流程)

InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。sql

InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。數據庫

接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。json


上一篇粗略的總結了寫入的基本流程,詳情見:服務器

https://my.oschina.net/u/3374539/blog/5033469微信

這一篇記錄一下查詢的主要流程。數據結構


在第六章中,寫了一個查詢示例,以下:app

let mut query = flight::Client::new(connection)
        .perform_query("databaseName", "select * from myMeasurement")
        .await
        .expect("query request should work");

其中connection,表明的創建了一個Grpc的鏈接。perform_query表明執行查詢,其中第一個參數是數據庫名字,第二個參數是要執行查詢的sql語句。這個perform_query是封裝了一下調用協議,而後調用了服務器端的do_get方法,do_get方法在服務器的src/influxdb_ioxd/rpc/flight.rs:139行能夠找到,以下:異步

async fn do_get(
        &self,
        //這個Ticket裏就是保存的perform_query方法中封裝的json數據
        request: Request<Ticket>,
    ) -> Result<Response<Self::DoGetStream>, tonic::Status> {
        //這裏就是把json還原回來
        let ticket = request.into_inner();
        let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket {
            ticket: ticket.ticket,
        })?;
        //反序列化成了ReadInfo結構
        let read_info: ReadInfo =
            serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?;
        //拿到客戶端設置的數據庫名字
        let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?;
        //從內存中查找是否存在這個database名字,若是不存在就會報DatabaseNotFound錯誤回去
        //這裏就是建立數據庫的時候寫入到內存裏的
        //同時還應該記得iox的數據庫必須一個節點建立一次。。hhhhha
        let db = self.server.db(&database).context(DatabaseNotFound {
            database_name: &read_info.database_name,
        })?;
        //這個是拿到以前建立數據庫時候設置的線程池,能夠回去參考第五章
        let executor = db.executor();
        //這裏是建立出sql語句對應的physical_plan,後面再看
        let physical_plan = Planner::new(Arc::clone(&executor))
            .sql(db, &read_info.sql_query)
            .await
            .context(Planning)?;

        //使用線程異步的執行查詢
        let results = executor
             //複製一下執行時候須要用到的信息
            .new_context()
             //真正的去執行
            .collect(Arc::clone(&physical_plan))
            .await
            .map_err(|e| Box::new(e) as _)
            .context(Query {
                database_name: &read_info.database_name,
            })?;

        //在寫入的章節裏應該知道了在RBChunk裏面存儲的是Arrow格式的。
        //在這個方法中就是調用arrow_flight工具包的方法,先把schema序列化到flight_buffer中
        let options = arrow::ipc::writer::IpcWriteOptions::default();
        let schema = physical_plan.schema();
        let schema_flight_data =
            arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options);

        let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)];
        //上面獲得的結果集,這裏進行遍歷,封裝爲要返回的數據結構
        let mut batches: Vec<Result<FlightData, tonic::Status>> = results
            .iter()
            //這個是爲了給下面flight_data_from_arrow_batch這個方法打補丁用的
            //由於這個方法即使對於切片類型的batch也是盲目的序列化全部數據
            .map(optimize_record_batch)
            .collect::<Result<Vec<_>, Error>>()?
            .iter()
            //這裏就是一條一條的把數據序列化到緩衝區裏
            .flat_map(|batch| {
                let (flight_dictionaries, flight_batch) =
                    arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options);
                //把數據包裝在Result中
                flight_dictionaries
                    .into_iter()
                    .chain(std::iter::once(flight_batch))
                    .map(Ok)
            })
            .collect();

        //前面是schema,後面是數據
        flights.append(&mut batches);
        //返回一個數據的異步stream,有可能調用一次next就會釋放一次cpu?
        let output = futures::stream::iter(flights);
        //數據以flight形式發送到了客戶端,客戶端先讀取schema再讀取數據。
        Ok(Response::new(Box::pin(output) as Self::DoGetStream))
    }

這裏基本上是整個查詢的主邏輯:async

  • 異步的將sql轉換爲plan。
  • 異步的去執行plan並返回結果和結果所對應的schema信息。
  • 將返回的arrow數據封裝到flights格式中。
  • 經過Grpc返回

這一篇就到這裏吧,下幾章準備記錄一下:工具

  1. sql是怎麼被執行的
  2. 查詢中都經歷了什麼
  3. 等等。。。

祝玩兒的開心。


歡迎關注微信公衆號:

或添加微信好友: liutaohua001

相關文章
相關標籤/搜索