時序數據庫Influx-IOx源碼學習五(建立數據庫)

InfluxDB是一個由InfluxData開發的開源時序數據庫,專一於海量時序數據的高性能讀、寫、高效存儲與實時分析等,在DB-Engines Ranking時序型數據庫排行榜上常年排名第一。git

InfluxDB能夠說是當之無愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 號在博客中發表一篇名爲:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介紹了一個新項目 InfluxDB IOx,InfluxDB 的下一代時序引擎。github

接下來,我將連載對於InfluxDB IOx的源碼解析過程,歡迎各位批評指正,聯繫方式見文章末尾。數據庫


上篇介紹到:InfluxDB-IOx的Run命令啓動過程,詳情見:http://www.javashuo.com/article/p-ahmvhsoy-vk.html緩存

這章記錄一下Database create命令的執行過程。服務器


在第三章命令行中介紹了,全部的子命令都有一個獨立的參數或配置稱爲subcommand微信

enum Command {
    Convert { // 省略 ...},
    Meta {// 省略 ...},
    Database(commands::database::Config),
    Run(Box<commands::run::Config>),
    Stats(commands::stats::Config),
    Server(commands::server::Config),
    Writer(commands::writer::Config),
    Operation(commands::operations::Config),
}

這章咱們打開看一眼commands::database下的config包含了什麼。框架

pub struct Config {
    #[structopt(subcommand)]
    command: Command,
}
//見名知意,基本猜想一下就好了,慢慢使用到再回來看
enum Command {
    Create(Create),
    List(List),
    Get(Get),
    Write(Write),
    Query(Query),
    Chunk(chunk::Config),
    Partition(partition::Config),
}

先來看一下create命令的執行。async

Command::Create(command) => {
            //建立一個grpc的client
            let mut client = management::Client::new(connection);
            //設置基本的配置項
            let rules = DatabaseRules {
                //數據庫名字
                name: command.name,
                //內存的各類配置,包含緩存大小,時間等等
                lifecycle_rules: Some(LifecycleRules {
                    //省略。。
                }),
                //設置分區的策略
                partition_template: Some(PartitionTemplate {
                    //省略。。
                }),

                 //其它都填充default
                ..Default::default()
            };
            //使用配置信息建立數據庫,這裏是生成了一個CreateDatabaseRequest去調用了遠程服務器的方法
            client.create_database(rules).await?;

            println!("Ok");
        }

在上一章中提到了grpc的啓動,這裏就涉及到了以前提到的grpc的框架tonic,在tonic中使用#[tonic::async_trait]了標記一個服務器端的實現開始。我在ide中搜索,能夠在src/influxdb_ioxd/rpc/management.rs:50行中找到ManagementService相關的實現。ide

有關tonic更多的資料請閱讀:https://github.com/hyperium/tonic性能

#[tonic::async_trait]
impl<M> management_service_server::ManagementService for ManagementService<M>
where
    M: ConnectionManager + Send + Sync + Debug + 'static,
{
    //省略其它方法。。。

 async fn create_database(
        &self,
        //這裏就是接收CreateDatabaseRequest的請求
        request: Request<CreateDatabaseRequest>,
    ) -> Result<Response<CreateDatabaseResponse>, Status> {

         //對數據進行一下校驗,而後得到在上面配置的rules規則
        let rules: DatabaseRules = request
            .into_inner()
            .rules
            .ok_or_else(|| FieldViolation::required(""))
            .and_then(TryInto::try_into)
            .map_err(|e| e.scope("rules"))?;

        //這裏就是在第三章中提到的server_id,若是沒配置就會報錯了
        let server_id = match self.server.require_id().ok() {
            Some(id) => id,
            None => return Err(NotFound::default().into()),
        };
        //這裏就是真正的去建立,在下面繼續跟蹤
        match self.server.create_database(rules, server_id).await {
            Ok(_) => Ok(Response::new(CreateDatabaseResponse {})),
            Err(Error::DatabaseAlreadyExists { db_name }) => {
                return Err(AlreadyExists {
                    resource_type: "database".to_string(),
                    resource_name: db_name,
                    ..Default::default()
                }
                .into())
            }
            Err(e) => Err(default_server_error_handler(e)),
        }
    }
}

接下來要繼續查看數據庫真正的被建立出來,我讀到這裏存在一個問題,文件格式是什麼樣子的?

pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> {
        //檢查server_id
        self.require_id()?;
        //把數據庫名字存儲到內存中,最終保存到一個btreemap中
        let db_reservation = self.config.create_db(rules)?;
        //對數據進行持久化保存
        self.persist_database_rules(db_reservation.rules().clone())
            .await?;
        //啓動數據庫後臺線程,在內存中寫入數據庫狀態
        db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec));

        Ok(())
    }

來解答上面的疑問,文件是怎樣持久化、格式是什麼樣子的。

pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> {
        //生成一個新的數據庫路徑
        let location = object_store_path_for_database_config(&self.root_path()?, &rules.name);
        //序列化DatabaseRules這個pb到byte流
        let mut data = BytesMut::new();
        rules.encode(&mut data).context(ErrorSerializing)?;
        let len = data.len();
        let stream_data = std::io::Result::Ok(data.freeze());
        //將pb的內容進行存儲
        self.store
            .put(
                &location,
                futures::stream::once(async move { stream_data }),
                Some(len),
            )
            .await
            .context(StoreError)?;
        Ok(())
    }

這裏調用了rules.encode()轉換到pb的格式,這裏是rust語言的一個方法,實現了From特性的,就獲得了一個into的方法,如:impl From<DatabaseRules> for management::DatabaseRules.

到這裏數據庫的一個描述文件rules.pb就被寫入到磁盤中了,路徑是啓動命令中指定的--data-dir參數路徑 + --writer-id + 數據庫名字。

例如,個人啓動和建立命令爲:

./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/
./influxdb_iox database create test

那麼獲得的路徑就爲:~/influxtest/1/test/rules.pb. 以後能夠運行一個pb的腳原本反查rules.pb中的數據內容,以下:

$ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules \
    < ~/influxtest/1/test/rules.pb

influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused.
name: "test"
partition_template {
  parts {
    time: "%Y-%m-%d %H:00:00"
  }
}
lifecycle_rules {
  mutable_linger_seconds: 300
  mutable_size_threshold: 10485760
  buffer_size_soft: 52428800
  buffer_size_hard: 104857600
  sort_order {
    order: ORDER_ASC
    created_at_time {
    }
  }
}

看到這裏已經知道整個生成過程及文件內容。

祝玩兒的開心。


歡迎關注微信公衆號:

或添加微信好友: liutaohua001

相關文章
相關標籤/搜索