InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。git
InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。github
接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。數據庫
上篇介紹到:InfluxDB-IOx的Run命令啓動過程,詳情見:http://www.javashuo.com/article/p-ahmvhsoy-vk.html緩存
這章記錄一下Database create
命令的執行過程。服務器
在第三章命令行
中介紹了,全部的子命令都有一個獨立的參數或配置稱爲subcommand
。微信
enum Command { Convert { // 省略 ...}, Meta {// 省略 ...}, Database(commands::database::Config), Run(Box<commands::run::Config>), Stats(commands::stats::Config), Server(commands::server::Config), Writer(commands::writer::Config), Operation(commands::operations::Config), }
這章咱們打開看一眼commands::database
下的config
包含了什麼。框架
pub struct Config { #[structopt(subcommand)] command: Command, } //見名知意,基本猜想一下就好了,慢慢使用到再回來看 enum Command { Create(Create), List(List), Get(Get), Write(Write), Query(Query), Chunk(chunk::Config), Partition(partition::Config), }
先來看一下create
命令的執行。async
Command::Create(command) => { //建立一個grpc的client let mut client = management::Client::new(connection); //設置基本的配置項 let rules = DatabaseRules { //數據庫名字 name: command.name, //內存的各類配置,包含緩存大小,時間等等 lifecycle_rules: Some(LifecycleRules { //省略。。 }), //設置分區的策略 partition_template: Some(PartitionTemplate { //省略。。 }), //其它都填充default ..Default::default() }; //使用配置信息建立數據庫,這裏是生成了一個CreateDatabaseRequest去調用了遠程服務器的方法 client.create_database(rules).await?; println!("Ok"); }
在上一章中提到了grpc
的啓動,這裏就涉及到了以前提到的grpc
的框架tonic
,在tonic
中使用#[tonic::async_trait]
了標記一個服務器端的實現開始。我在ide中搜索,能夠在src/influxdb_ioxd/rpc/management.rs:50行
中找到ManagementService
相關的實現。ide
有關tonic更多的資料請閱讀:https://github.com/hyperium/tonic性能
#[tonic::async_trait] impl<M> management_service_server::ManagementService for ManagementService<M> where M: ConnectionManager + Send + Sync + Debug + 'static, { //省略其它方法。。。 async fn create_database( &self, //這裏就是接收CreateDatabaseRequest的請求 request: Request<CreateDatabaseRequest>, ) -> Result<Response<CreateDatabaseResponse>, Status> { //對數據進行一下校驗,而後得到在上面配置的rules規則 let rules: DatabaseRules = request .into_inner() .rules .ok_or_else(|| FieldViolation::required("")) .and_then(TryInto::try_into) .map_err(|e| e.scope("rules"))?; //這裏就是在第三章中提到的server_id,若是沒配置就會報錯了 let server_id = match self.server.require_id().ok() { Some(id) => id, None => return Err(NotFound::default().into()), }; //這裏就是真正的去建立,在下面繼續跟蹤 match self.server.create_database(rules, server_id).await { Ok(_) => Ok(Response::new(CreateDatabaseResponse {})), Err(Error::DatabaseAlreadyExists { db_name }) => { return Err(AlreadyExists { resource_type: "database".to_string(), resource_name: db_name, ..Default::default() } .into()) } Err(e) => Err(default_server_error_handler(e)), } } }
接下來要繼續查看數據庫真正的被建立出來,我讀到這裏存在一個問題,文件格式是什麼樣子的?
pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> { //檢查server_id self.require_id()?; //把數據庫名字存儲到內存中,最終保存到一個btreemap中 let db_reservation = self.config.create_db(rules)?; //對數據進行持久化保存 self.persist_database_rules(db_reservation.rules().clone()) .await?; //啓動數據庫後臺線程,在內存中寫入數據庫狀態 db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec)); Ok(()) }
來解答上面的疑問,文件是怎樣持久化、格式是什麼樣子的。
pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> { //生成一個新的數據庫路徑 let location = object_store_path_for_database_config(&self.root_path()?, &rules.name); //序列化DatabaseRules這個pb到byte流 let mut data = BytesMut::new(); rules.encode(&mut data).context(ErrorSerializing)?; let len = data.len(); let stream_data = std::io::Result::Ok(data.freeze()); //將pb的內容進行存儲 self.store .put( &location, futures::stream::once(async move { stream_data }), Some(len), ) .await .context(StoreError)?; Ok(()) }
這裏調用了rules.encode()
轉換到pb
的格式,這裏是rust
語言的一個方法,實現了From
特性的,就獲得了一個into
的方法,如:impl From<DatabaseRules> for management::DatabaseRules
.
到這裏數據庫的一個描述文件rules.pb
就被寫入到磁盤中了,路徑是啓動命令中指定的--data-dir
參數路徑 + --writer-id
+ 數據庫名字。
例如,個人啓動和建立命令爲:
./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/ ./influxdb_iox database create test
那麼獲得的路徑就爲:~/influxtest/1/test/rules.pb
. 以後能夠運行一個pb
的腳原本反查rules.pb
中的數據內容,以下:
$ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules \ < ~/influxtest/1/test/rules.pb influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused. name: "test" partition_template { parts { time: "%Y-%m-%d %H:00:00" } } lifecycle_rules { mutable_linger_seconds: 300 mutable_size_threshold: 10485760 buffer_size_soft: 52428800 buffer_size_hard: 104857600 sort_order { order: ORDER_ASC created_at_time { } } }
看到這裏已經知道整個生成過程及文件內容。
祝玩兒的開心。
歡迎關注微信公衆號:
或添加微信好友: liutaohua001