歡迎關注公衆號: 數據庫
上篇介紹到:InfluxDB-IOx的命令行及配置,詳情見:https://my.oschina.net/u/3374539/blog/5017858api
這章記錄一下Run命令的執行過程。緩存
//根據用戶在命令行配置的num_threads參數 //來選擇建立一個多線程的模型,仍是current_thread的模型 //後面有時間深刻研究tokio的時候再來分析有什麼異同 let tokio_runtime = get_runtime(config.num_threads)?; //block_on會讓線程一直等待方法裏的future執行完成 //這是讓閉包中的方法佔有了io driver 和 timer context tokio_runtime.block_on(async move { let host = config.host; match config.command { // 省略其它command ... Command::Run(config) => { //具體去子類型裏執行,而後await一個結果 if let Err(e) = commands::run::command(logging_level, *config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) } } } });
在influxdb_ioxd::main
方法中,忽略一些不太須要重點關注的,分別是初始化log
的管理、PanicsTracing
、CancellationToken
等。多線程
//初始化對象存儲 let object_store = ObjectStore::try_from(&config)?; //能夠看到,目前已經支持了 //1.內存(在container環境運行時候使用) //2.Google //3.S3 //4.Azure //5.File 本地文件,方便開發者調試運行在雲上時候的文件變化 fn try_from(config: &Config) -> Result<Self, Self::Error> { match config.object_store { Some(ObjStoreOpt::Memory) | None => { //建立一個btreemap用來緩存或者搜索 Ok(Self::new_in_memory(object_store::memory::InMemory::new())) } Some(ObjStoreOpt::Google) => { // 省略 } Some(ObjStoreOpt::S3) => { // 省略 } Some(ObjStoreOpt::Azure) => { // 省略 } Some(ObjStoreOpt::File) => match config.database_directory.as_ref() { Some(db_dir) => { //去遞歸建立這個配置路徑中的文件夾 //context也是使用的snafu來處理錯誤的 fs::create_dir_all(db_dir) .context(CreatingDatabaseDirectory { path: db_dir })?; //都建立完成,而且沒出錯誤,把路徑保存起來 Ok(Self::new_file(object_store::disk::File::new(&db_dir))) } // 若是database_directory這個參數沒有配置的時候 //使用snafu這個crate來返回一個錯誤 None => MissingObjectStoreConfig { object_store: ObjStoreOpt::File, missing: "data-dir", } .fail(), }, } }
關於錯誤處理的代碼:閉包
#[snafu(display("Unable to create database directory {:?}: {}", path, source))] CreatingDatabaseDirectory { path: PathBuf, source: std::io::Error, }, #[snafu(display( "Specified {} for the object store, required configuration missing for {}", object_store, missing ))] MissingObjectStoreConfig { object_store: ObjStoreOpt, missing: String, },
咱們來測試一下錯誤的場景,來看看是否符合代碼的預期。app
// 不傳入路徑 cargo run run --object-store file Finished dev [unoptimized + debuginfo] target(s) in 0.42s Running `./influxdb_iox run --object-store file` Apr 15 13:38:34.352 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Specified File for the object store, required configuration missing for data-dir //傳入一個建立不了的路徑 cargo run run --object-store file --data-dir /root/1/1 Finished dev [unoptimized + debuginfo] target(s) in 0.47s Running `./influxdb_iox run --object-store file --data-dir /root/1/1` Apr 15 13:45:26.664 INFO influxdb_iox::influxdb_ioxd: Using File for object storage Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30)
能夠看到是符合預期的,bingofrontend
//建立一個空的結構體 let connection_manager = ConnectionManager {}; //建立AppServer結構體用來保存基本的信息 //server_config裏就是保存的對象存儲的信息及線程配置 //若是num_worker_threads沒有填寫,默認就使用cpu數量 let app_server = Arc::new(AppServer::new(connection_manager, server_config)); //不設置這個writer_id能啓動,可是不能作任何操做 if let Some(id) = config.writer_id { //compare and set 一個非0的數值,錯誤就打印一個指定的panic app_server.set_id(id).expect("writer id already set"); //校驗全部的配置 if let Err(e) = app_server.load_database_configs().await { error!( "unable to load database configurations from object storage: {}", e ) } } else { warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data."); }
接下來進入load_database_configs
方法看看,curl
let list_result = self .store //把write_id和配置的文件路徑組合一下,做爲一個目錄 //遍歷文件夾中的全部東西,用一個BTreeSet存全部子文件夾 //用Vec存下全部的文件信息,包括路徑、修改時間、大小等 .list_with_delimiter(&self.root_path()?) .await .context(StoreError)?; //拿到配置的server的write_id let server_id = self.require_id()?; let handles: Vec<_> = list_result //配置的文件夾下的全部文件夾 .common_prefixes .into_iter() //所有進行map轉換 .map(|mut path| { let store = Arc::clone(&self.store); let config = Arc::clone(&self.config); let exec = Arc::clone(&self.exec); //先找database的相關信息文件,名字叫rules.pb path.set_file_name(DB_RULES_FILE_NAME); //感受是須要io來讀取文件內容,因此開一個異步 tokio::task::spawn(async move { let mut res = get_store_bytes(&path, &store).await; //省略錯誤處理。。 let res = res.unwrap().freeze(); //解析文件內容,根據文件名能夠看出是個pb文件。 match DatabaseRules::decode(res) { Err(e) => { //省略錯誤。。 } //根據解析出來的文件內容,在內存中恢復回來db的相關信息 Ok(rules) => match config.create_db(rules) { Err(e) => error!("error adding database to config: {}", e), //提交一個後臺任務,用來不斷的檢測chunks的狀態 //好比達到了某個大小,而後寫入到存儲等 Ok(handle) => handle.commit(server_id, store, exec), }, } }) }) .collect(); //等待全部任務完成 futures::future::join_all(handles).await;
這裏就啓動完成了一個基本的服務,建立了存儲路徑、初始化數據庫的基本配置、啓動了一個用來刷盤、整理chunk的後臺任務。異步
接下來就是啓動鏈接相關的了。socket
//從啓動命令行中讀取grpc的地址 let grpc_bind_addr = config.grpc_bind_address; //綁定這個地址 let socket = tokio::net::TcpListener::bind(grpc_bind_addr) .await .context(StartListeningGrpc { grpc_bind_addr })?; //真正的協議啓動 let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //一樣的啓動http相關的服務,使用的hyper庫 let bind_addr = config.http_bind_address; let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?; let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse(); //省略後面的中止流程。。。
而後看grpc的啓動的服務
//啓動起來健康檢查的服務 let stream = TcpListenerStream::new(socket); let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); //標識相對應的服務已是能夠提供服務的狀態了 let services = [ generated_types::STORAGE_SERVICE, generated_types::IOX_TESTING_SERVICE, generated_types::ARROW_SERVICE, ]; for service in &services { health_reporter .set_service_status(service, tonic_health::ServingStatus::Serving) .await; } //增長一堆使用grpc的服務,並啓動起來 tonic::transport::Server::builder() .add_service(health_service) .add_service(testing::make_server()) .add_service(storage::make_server(Arc::clone(&server))) .add_service(flight::make_server(Arc::clone(&server))) .add_service(write::make_server(Arc::clone(&server))) .add_service(management::make_server(Arc::clone(&server))) .add_service(operations::make_server(server)) .serve_with_incoming_shutdown(stream, shutdown.cancelled()) .await
而後是http相關的啓動
pub async fn serve<M>( addr: AddrIncoming, server: Arc<AppServer<M>>, shutdown: CancellationToken, ) -> Result<(), hyper::Error> where M: ConnectionManager + Send + Sync + Debug + 'static, { //初始化路由相關的信息 let router = router(server); let service = RouterService::new(router).unwrap(); //啓動服務 hyper::Server::builder(addr) .serve(service) .with_graceful_shutdown(shutdown.cancelled()) .await }
順便看一下都提供了哪些地址能夠被訪問的:
Router::builder() .data(server) //寫了一個攔截,打印請求參數和返回結果 .middleware(Middleware::pre(|req| async move { debug!(request = ?req, "Processing request"); Ok(req) })) .middleware(Middleware::post(|res| async move { debug!(response = ?res, "Successfully processed request"); Ok(res) })) // this endpoint is for API backward compatibility with InfluxDB 2.x .post("/api/v2/write", write::<M>) .get("/health", health) .get("/metrics", handle_metrics) .get("/iox/api/v1/databases/:name/query", query::<M>) .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>) .get("/api/v1/partitions", list_partitions::<M>) .post("/api/v1/snapshot", snapshot_partition::<M>) //錯誤的時候調用的處理攔截 .err_handler_with_info(error_handler) .build() .unwrap()
作一個/health
的測試:
curl localhost:8080/health OK%
能夠看到成功返回了值。
到這裏基本啓動就完成了,後面再用到的時候會繼續對啓動裏的細節作研究,好比Panics
,Log
等等吧,歡迎持續關注。
祝玩兒的開心