目錄html
申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。node
前面幾章分析了 event事件 和 底層網絡, 但對建立服務的過程並無串起來,只是分析了底層的網絡公共庫。此次咱們分析下整個服務的建立過程。git
服務啓動的總入口 main
函數,會先建立 MainCommon
。github
int main(int argc, char** argv) { ... ... std::unique_ptr<Envoy::MainCommon> main_common; try { main_common = std::make_unique<Envoy::MainCommon>(argc, argv); ... ... }
MainCommon
在構造函數時,會先解析程序的參數,而後再調用 MainCommonBase
。json
MainCommon::MainCommon(int argc, const char* const* argv) : options_(argc, argv, &MainCommon::hotRestartVersion, spdlog::level::info), base_(options_, real_time_system_, default_test_hooks_,prod_component_factory_, std::make_unique<Runtime::RandomGeneratorImpl>(),platform_impl_.threadFactory(), platform_impl_.fileSystem()) {}
OptionsImpl
使用開源的 tclap 解析庫。OptionsImpl
支持不少參數配置,具體的參數配置參考 operation/cli。bootstrap
MainCommonBase
會初始化全局的參數,接着調用服務進行初始化。api
MainCommonBase::MainCommonBase(... ...) : options_(options), component_factory_(component_factory), thread_factory_(thread_factory),file_system_(file_system) { //全局的或第三方庫預先初始化 Thread::ThreadFactorySingleton::set(&thread_factory_); ares_library_init(ARES_LIB_INIT_ALL); Event::Libevent::Global::initialize(); RELEASE_ASSERT(Envoy::Server::validateProtoDescriptors(), ""); Http::Http2::initializeNghttp2Logging(); ... ... //初始化服務 server_ = std::make_unique<Server::InstanceImpl>( options_, time_system, local_address, test_hooks, *restarter_, *stats_store_, access_log_lock, component_factory, std::move(random_generator), *tls_, thread_factory_, file_system_);
InstanceImpl
主要是初始化 admin
管理服務,各個靜態 XDS 的加載以及初始化服務。服務器
InstanceImpl
在覈心函數中先加載配置文件,經過參數 -c
配置。網絡
void InstanceImpl::initialize(const Options& options, Network::Address::InstanceConstSharedPtr local_address, ComponentFactory& component_factory, TestHooks& hooks) { ... ... //加載Bootstrap InstanceUtil::loadBootstrapConfig(bootstrap_, options, *api_); bootstrap_config_update_time_ = time_source_.systemTime(); ... ... }
配置文件是一個 json 格式,包括如下幾項:dom
各個項的具體做用和配置參考Bootstrap
加載完配置項,接着會啓動 admin
本地的HTTP服務。
admin
先建立一個 AdminImpl,在構造函數裏初始化 URI。
AdminImpl::AdminImpl(const std::string& profile_path, Server::Instance& server) : server_(server), profile_path_(profile_path), ... ... handlers_{ {"/", "Admin home page", MAKE_ADMIN_HANDLER(handlerAdminHome), false, false}, {"/certs", "print certs on machine", MAKE_ADMIN_HANDLER(handlerCerts), false, false}, // 導出 cluster 統計信息 {"/clusters", "upstream cluster status", MAKE_ADMIN_HANDLER(handlerClusters), false, false}, // 導出配置文件 {"/config_dump", "dump current Envoy configs (experimental)", MAKE_ADMIN_HANDLER(handlerConfigDump), false, false}, // 導出鏈接統計信息 {"/contention", "dump current Envoy mutex contention stats (if enabled)", MAKE_ADMIN_HANDLER(handlerContention), false, false}, ... ... admin_filter_chain_(std::make_shared<AdminFilterChain>()) {}
接着啓動一個服務。
admin_->startHttpListener(initial_config.admin().accessLogPath(), options.adminAddressPath(), initial_config.admin().address(), stats_store_.createScope("listener.admin."));
在啓動服務函數內部會建立 TcpListenSocket 和 AdminListener。
void AdminImpl::startHttpListener(const std::string& access_log_path, const std::string& address_out_path, Network::Address::InstanceConstSharedPtr address, Stats::ScopePtr&& listener_scope) { ... ... socket_ = std::make_unique<Network::TcpListenSocket>(address, nullptr, true); listener_ = std::make_unique<AdminListener>(*this, std::move(listener_scope)); ... ... }
初始化 TcpListenSocket 時會在內部建立一個 socket 後再進行 bind。
using TcpListenSocket = NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Stream>>; template <typename T> class NetworkListenSocket : public ListenSocketImpl { public: NetworkListenSocket(const Address::InstanceConstSharedPtr& address, const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) // socket 建立 : ListenSocketImpl(address->socket(T::type), address) { RELEASE_ASSERT(io_handle_->fd() != -1, ""); setPrebindSocketOptions(); setupSocket(options, bind_to_port); } void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) { setListenSocketOptions(options); //準備進行綁定 if (bind_to_port) { doBind(); } } void ListenSocketImpl::doBind() { //綁定 const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd()); ... ... }
AdminListener 構造函數內只是參數的初始化。
AdminListener(AdminImpl& parent, Stats::ScopePtr&& listener_scope) : parent_(parent), name_("admin"), scope_(std::move(listener_scope)), stats_(Http::ConnectionManagerImpl::generateListenerStats("http.admin.", *scope_)) {}
作完 socket ,bind 後面就是進行 listen 處理。將 AdminListener 經過 handler 加入監聽隊列。handler 是在 InstanceImpl
的構造函數內初始化的。
InstanceImpl::InstanceImpl(... ...) : handler_(new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher_)), ... ...{ } void InstanceImpl::initialize(... ...) { ... ... //將 AdminListener 加入 ConnectionHandler if (initial_config.admin().address()) { admin_->addListenerToHandler(handler_.get()); } ... ... } void AdminImpl::addListenerToHandler(Network::ConnectionHandler* handler) { // 這裏的 listener_ 就是上面生成的 AdminListener if (listener_) { handler->addListener(*listener_); } }
在 addListener 內會新建一個 ActiveListener 內部類,先置爲 disable 狀態。
void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) { ActiveListenerPtr l(new ActiveListener(*this, config)); if (disable_listeners_) { l->listener_->disable(); } listeners_.emplace_back(config.socket().localAddress(), std::move(l)); }
在 ActiveListener 構造函數內建立 listen,裏面dispatcher 會建立回調。等有新鏈接到來時,會回調 onAccept.
ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,Network::ListenerConfig& config) : ActiveListener( parent, // 建立listen parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(), config.handOffRestoredDestinationConnections()), config) {} Network::ListenerPtr DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) { ASSERT(isThreadSafe()); return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port, hand_off_restored_destination_connections)}; } void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) { listener_.reset( //建立 evconnlistener_new ,有鏈接回調listenCallback evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd())); ... ... } void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,int remote_addr_len, void* arg) { ... ... //回調ActiveListener的onAccept listener->cb_.onAccept( std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address), listener->hand_off_restored_destination_connections_); }
onAccept 對 Listern 過濾後,建立新鏈接。
void ConnectionHandlerImpl::ActiveListener::onAccept() { ... ... active_socket->continueFilterChain(true); ... ... } void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) { ... ... listener_.newConnection(std::move(socket_)); ... ... } void ConnectionHandlerImpl::ActiveListener::onNewConnection() { ... ... if (new_connection->state() != Network::Connection::State::Closed) { ActiveConnectionPtr active_connection( new ActiveConnection(*this, std::move(new_connection), parent_.dispatcher_.timeSource())); active_connection->moveIntoList(std::move(active_connection), connections_); parent_.num_connections_++; } ... ... }
這樣,新鏈接就創建起來。
初始化 Bootstrap 的 XDS 時,先初始化 static sercret,先初始化 cluster,接着初始化 listeners。
void MainImpl::initialize(... ...) { //初始化secrets const auto& secrets = bootstrap.static_resources().secrets(); for (ssize_t i = 0; i < secrets.size(); i++) { ENVOY_LOG(debug, "static secret #{}: {}", i, secrets[i].name()); server.secretManager().addStaticSecret(secrets[i]); } //初始化 cluster bootstrap.static_resources().clusters().size()); cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap); // 初始化listeners const auto& listeners = bootstrap.static_resources().listeners(); for (ssize_t i = 0; i < listeners.size(); i++) { ENVOY_LOG(debug, "listener #{}:", i); server.listenerManager().addOrUpdateListener(listeners[i], "", false); }
初始化 cluster 會分兩階段初始化。先初始化非 EDS 部分,再初始化 EDS 部分。分兩個階段的初始化是由於在 v2 配置中每一個 EDS 集羣單獨設置訂閱。此訂閱是 API 源時 羣集將依賴於非 EDS 羣集,所以必須首先初始化非 EDS 羣集。cluster 的類型有 5個類型:
enum DiscoveryType { // Refer to the :ref:`static discovery STATIC = 0; // Refer to the :ref:`strict DNS discovery STRICT_DNS = 1; // Refer to the :ref:`logical DNS discovery LOGICAL_DNS = 2; // Refer to the :ref:`service discovery EDS = 3; // Refer to the :ref:`original destination discovery ORIGINAL_DST = 4; }
cluster 初始化順序:
非 EDS 部分 -> ADS -> EDS -> CDS
ClusterManagerImpl::ClusterManagerImpl(... ...) { ... ... for (const auto& cluster : bootstrap.static_resources().clusters()) { // 第一次初始化非 EDS 部分 if (cluster.type() != envoy::api::v2::Cluster::EDS) { loadCluster(cluster, "", false, active_clusters_); } } // 初始化 ADS if (bootstrap.dynamic_resources().has_ads_config()) { ads_mux_ = std::make_unique<Config::GrpcMuxImpl>( local_info, Config::Utility::factoryForGrpcApiConfigSource( *async_client_manager_, bootstrap.dynamic_resources().ads_config(), stats) ->create(), main_thread_dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), random_, stats_, Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config())); } else { ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>(); } for (const auto& cluster : bootstrap.static_resources().clusters()) { // 初始化 EDS if (cluster.type() == envoy::api::v2::Cluster::EDS) { loadCluster(cluster, "", false, active_clusters_); } } ... ... //初始化 CDS if (bootstrap.dynamic_resources().has_cds_config()) { cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), *this); init_helper_.setCds(cds_api_.get()); } else { init_helper_.setCds(nullptr); } }
上面都初始化完成後,再初始化 lds,最後再初始化 hds。
// 初始化lds if (bootstrap_.dynamic_resources().has_lds_config()) { listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config()); } //初始化hds if (bootstrap_.has_hds_config()) { const auto& hds_config = bootstrap_.hds_config(); async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>( *config_.clusterManager(), thread_local_, time_source_, *api_); ... ... }
ListenerManager 的初始化只是事先建立 worker。
ListenerManagerImpl::ListenerManagerImpl(... ...) { ... ... // 建立worker子線程 for (uint32_t i = 0; i < server.options().concurrency(); i++) { workers_.emplace_back(worker_factory.createWorker(server.overloadManager())); } } WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager) { // 新建子線程,每一個線種一個dispatchr Event::DispatcherPtr dispatcher(api_.allocateDispatcher()); return WorkerPtr{new WorkerImpl( tls_, hooks_, std::move(dispatcher), Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher)}, overload_manager, api_)}; } WorkerImpl::WorkerImpl(... ...) : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)), api_(api) { tls_.registerThread(*dispatcher_, false); overload_manager.registerForAction( OverloadActionNames::get().StopAcceptingConnections, *dispatcher_, [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); }); }
main 函數調用 main_common
int main(int argc, char** argv) { ... ... return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE; }
main_common 進一步調用 InstanceImpl
bool MainCommonBase::run() { switch (options_.mode()) { case Server::Mode::Serve: server_->run(); return true; ... ... }
InstanceImpl 啓用 loop 循環。
void InstanceImpl::run() { auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(), access_log_manager_, init_manager_, overloadManager(), [this] { startWorkers(); }); auto watchdog = guard_dog_->createWatchDog(api_->threadFactory().currentThreadId()); watchdog->startWatchdog(*dispatcher_); dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); }); dispatcher_->run(Event::Dispatcher::RunType::Block); ENVOY_LOG(info, "main dispatch loop exited"); guard_dog_->stopWatching(watchdog); watchdog.reset(); terminate(); }
本地 HTTP 管理服務的啓動流程上面已經分析過,如今討論本地服務的啓動流程(XDS 下發的暫不討論)。
在 cluster 初始化的時候,加入 listener。
void MainImpl::initialize(... ...) { ... ... // 初始化listeners const auto& listeners = bootstrap.static_resources().listeners(); for (ssize_t i = 0; i < listeners.size(); i++) { ENVOY_LOG(debug, "listener #{}:", i); server.listenerManager().addOrUpdateListener(listeners[i], "", false); } }
addOrUpdateListener 建立 ListenerImpl,ListenerImpl 作 bind 操做。
// 建立ListenerImpl ListenerImplPtr new_listener( new ListenerImpl(config, version_info, *this, name, modifiable, workers_started_, hash)); ListenerImpl& new_listener_ref = *new_listener; ... ... //bind 地址將 socket 關聯ListenerImpl new_listener->setSocket(draining_listener_socket ? draining_listener_socket : factory_.createListenSocket(new_listener->address(), new_listener->socketType(), new_listener->listenSocketOptions(), new_listener->bindToPort())); Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket(... ...) { ... ... // 調用 UdsListenSocket 作 bind() 操做。 if (io_handle->isOpen()) { return std::make_shared<Network::UdsListenSocket>(std::move(io_handle), address); } return std::make_shared<Network::UdsListenSocket>(address); } // 最終調用系統bind()操做 void ListenSocketImpl::doBind() { const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd()); ... ... }
在 InstanceImpl 啓動時,調用 RunHelper。RunHelper 則啓動 startWorkers。startWorker 將初始化獲得的 listeners 加入到 work 中。
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) { workers_started_ = true; for (const auto& worker : workers_) { ASSERT(warming_listeners_.empty()); for (const auto& listener : active_listeners_) { addListenerToWorker(*worker, *listener); } worker->start(guard_dog); } }
work 將 linsteners 關聯到 connectioHandler。
void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) { worker.addListener(listener, [this, &listener](bool success) -> void { ... ... } void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) { dispatcher_->post([this, &listener, completion]() -> void { try { // 關聯到connectioHandler。 handler_->addListener(listener); hooks_.onWorkerListenerAdded(); completion(true); } catch (const Network::CreateListenerException& e) { completion(false); } }); }
connectioHandler 在 work 初始化時建立。
ListenerManagerImpl::ListenerManagerImpl(Instance& server, ListenerComponentFactory& listener_factory, WorkerFactory& worker_factory) : server_(server), factory_(listener_factory), stats_(generateStats(server.stats())), config_tracker_entry_(server.admin().getConfigTracker().add( "listeners", [this] { return dumpListenerConfigs(); })) { for (uint32_t i = 0; i < server.options().concurrency(); i++) { // 初始化worker workers_.emplace_back(worker_factory.createWorker(server.overloadManager())); } } WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager) { Event::DispatcherPtr dispatcher(api_.allocateDispatcher()); return WorkerPtr{new WorkerImpl( tls_, hooks_, std::move(dispatcher), //建立connectioHandler Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher)}, overload_manager, api_)}; }
將 linsteners 關聯到 connectioHandler 後,後面的 listen(),accept() 和建立鏈接過程和 admin
的 HTTP 啓動流程是同樣的。
整個服務的啓動流程基本就完成了,後面有新加服務的啓動流程和上面的服務啓動流程同樣,調用 addOrUpdateListener。在 addOrUpdateListener 內判斷服務是否已啓動,若是已啓動調用 ManagerImpl 等待初始化。
void ListenerImpl::initialize() { last_updated_ = timeSource().systemTime(); if (workers_started_) { //ManagerImpl dynamic_init_manager_.initialize(*init_watcher_); } } void ManagerImpl::initialize(const Watcher& watcher) { ... ... for (const auto& target_handle : target_handles_) { // 等待 target_handle 初始化完成。 if (!target_handle->initialize(watcher_)) { onTargetReady(); } } }
初始化完成後,調用函數指針。函數指針在初始化WatcherImpl傳入。
void ManagerImpl::onTargetReady() { if (--count_ == 0) { // 初始化完成 ready(); } } void ManagerImpl::ready() { state_ = State::Initialized; watcher_handle_->ready(); } bool WatcherHandleImpl::ready() const { //調用函數指針 (*locked_fn)(); } ListenerImpl::ListenerImpl(... ...) : ... ... // 初始化watch init_watcher_(std::make_unique<Init::WatcherImpl>( "ListenerImpl", [this] { parent_.onListenerWarmed(*this); })){}
在 onListenerWarmed 內將 listener 加入 work。後面流程和 服務啓動流程同樣,再也不分析。
void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) { for (const auto& worker : workers_) { addListenerToWorker(*worker, listener); }
整個服務的初始化和啓動流程就完成了。服務的啓動有3個類型 : 本地 HTTP 服務管理服務、本地配置文件的服務和xDS下發的服務。本章節只分析了服務的啓動流程,鏈接成功的後繼處理,之後分析。