InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。sql
InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。數據庫
接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。json
上一篇粗略的總結了寫入的基本流程,詳情見:服務器
https://my.oschina.net/u/3374539/blog/5033469微信
這一篇記錄一下查詢的主要流程。數據結構
在第六章中,寫了一個查詢示例,以下:app
let mut query = flight::Client::new(connection) .perform_query("databaseName", "select * from myMeasurement") .await .expect("query request should work");
其中connection,表明的創建了一個Grpc的鏈接。perform_query表明執行查詢,其中第一個參數是數據庫名字,第二個參數是要執行查詢的sql語句。這個perform_query是封裝了一下調用協議,而後調用了服務器端的do_get方法,do_get方法在服務器的src/influxdb_ioxd/rpc/flight.rs:139行
能夠找到,以下:異步
async fn do_get( &self, //這個Ticket裏就是保存的perform_query方法中封裝的json數據 request: Request<Ticket>, ) -> Result<Response<Self::DoGetStream>, tonic::Status> { //這裏就是把json還原回來 let ticket = request.into_inner(); let json_str = String::from_utf8(ticket.ticket.to_vec()).context(InvalidTicket { ticket: ticket.ticket, })?; //反序列化成了ReadInfo結構 let read_info: ReadInfo = serde_json::from_str(&json_str).context(InvalidQuery { query: &json_str })?; //拿到客戶端設置的數據庫名字 let database = DatabaseName::new(&read_info.database_name).context(InvalidDatabaseName)?; //從內存中查找是否存在這個database名字,若是不存在就會報DatabaseNotFound錯誤回去 //這裏就是建立數據庫的時候寫入到內存裏的 //同時還應該記得iox的數據庫必須一個節點建立一次。。hhhhha let db = self.server.db(&database).context(DatabaseNotFound { database_name: &read_info.database_name, })?; //這個是拿到以前建立數據庫時候設置的線程池,能夠回去參考第五章 let executor = db.executor(); //這裏是建立出sql語句對應的physical_plan,後面再看 let physical_plan = Planner::new(Arc::clone(&executor)) .sql(db, &read_info.sql_query) .await .context(Planning)?; //使用線程異步的執行查詢 let results = executor //複製一下執行時候須要用到的信息 .new_context() //真正的去執行 .collect(Arc::clone(&physical_plan)) .await .map_err(|e| Box::new(e) as _) .context(Query { database_name: &read_info.database_name, })?; //在寫入的章節裏應該知道了在RBChunk裏面存儲的是Arrow格式的。 //在這個方法中就是調用arrow_flight工具包的方法,先把schema序列化到flight_buffer中 let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema = physical_plan.schema(); let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema(schema.as_ref(), &options); let mut flights: Vec<Result<FlightData, tonic::Status>> = vec![Ok(schema_flight_data)]; //上面獲得的結果集,這裏進行遍歷,封裝爲要返回的數據結構 let mut batches: Vec<Result<FlightData, tonic::Status>> = results .iter() //這個是爲了給下面flight_data_from_arrow_batch這個方法打補丁用的 //由於這個方法即使對於切片類型的batch也是盲目的序列化全部數據 .map(optimize_record_batch) .collect::<Result<Vec<_>, Error>>()? .iter() //這裏就是一條一條的把數據序列化到緩衝區裏 .flat_map(|batch| { let (flight_dictionaries, flight_batch) = arrow_flight::utils::flight_data_from_arrow_batch(&batch, &options); //把數據包裝在Result中 flight_dictionaries .into_iter() .chain(std::iter::once(flight_batch)) .map(Ok) }) .collect(); //前面是schema,後面是數據 flights.append(&mut batches); //返回一個數據的異步stream,有可能調用一次next就會釋放一次cpu? let output = futures::stream::iter(flights); //數據以flight形式發送到了客戶端,客戶端先讀取schema再讀取數據。 Ok(Response::new(Box::pin(output) as Self::DoGetStream)) }
這裏基本上是整個查詢的主邏輯:async
- 異步的將sql轉換爲plan。
- 異步的去執行plan並返回結果和結果所對應的schema信息。
- 將返回的arrow數據封裝到flights格式中。
- 經過Grpc返回
這一篇就到這裏吧,下幾章準備記錄一下:工具
- sql是怎麼被執行的
- 查詢中都經歷了什麼
- 等等。。。
祝玩兒的開心。
歡迎關注微信公衆號:
或添加微信好友: liutaohua001