Envoy 源碼分析--程序啓動過程

Envoy 源碼分析--程序啓動過程

申明:本文的 Envoy 源碼分析基於 Envoy1.10.0。node

前面幾章分析了 event事件底層網絡, 但對建立服務的過程並無串起來,只是分析了底層的網絡公共庫。此次咱們分析下整個服務的建立過程。git

初始化

main 入口

服務啓動的總入口 main 函數,會先建立 MainCommongithub

int main(int argc, char** argv) {
  ... ...
  std::unique_ptr<Envoy::MainCommon> main_common;

  try {
    main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
  ... ...
}

MainCommon 初始化

MainCommon 在構造函數時,會先解析程序的參數,而後再調用 MainCommonBasejson

MainCommon::MainCommon(int argc, const char* const* argv)
    : options_(argc, argv, &MainCommon::hotRestartVersion, spdlog::level::info),
      base_(options_, real_time_system_, default_test_hooks_,prod_component_factory_,
      std::make_unique<Runtime::RandomGeneratorImpl>(),platform_impl_.threadFactory(),
      platform_impl_.fileSystem()) {}

OptionsImpl 使用開源的 tclap 解析庫。OptionsImpl 支持不少參數配置,具體的參數配置參考 operation/clibootstrap

MainCommonBase 會初始化全局的參數,接着調用服務進行初始化。api

MainCommonBase::MainCommonBase(... ...)
    : options_(options), component_factory_(component_factory), thread_factory_(thread_factory),file_system_(file_system) {
  //全局的或第三方庫預先初始化
  Thread::ThreadFactorySingleton::set(&thread_factory_);
  ares_library_init(ARES_LIB_INIT_ALL);
  Event::Libevent::Global::initialize();
  RELEASE_ASSERT(Envoy::Server::validateProtoDescriptors(), "");
  Http::Http2::initializeNghttp2Logging();

  ... ... 

  //初始化服務
  server_ = std::make_unique<Server::InstanceImpl>(
    options_, time_system, local_address, test_hooks, *restarter_, *stats_store_,
    access_log_lock, component_factory, std::move(random_generator), *tls_, thread_factory_,
    file_system_);

服務 InstanceImpl 初始化

InstanceImpl 主要是初始化 admin 管理服務,各個靜態 XDS 的加載以及初始化服務。服務器

InstanceImpl 在覈心函數中先加載配置文件,經過參數 -c 配置。網絡

void InstanceImpl::initialize(const Options& options,
                              Network::Address::InstanceConstSharedPtr local_address,
                              ComponentFactory& component_factory, TestHooks& hooks) {
  ... ...
  //加載Bootstrap
  InstanceUtil::loadBootstrapConfig(bootstrap_, options, *api_);
  bootstrap_config_update_time_ = time_source_.systemTime();
  ... ...                              
}

配置文件是一個 json 格式,包括如下幾項:dom

  • node:節點標識,會在管理服務器中呈現,用於標識目的(例如,頭域中生成相應的字段)。
  • static_resources:指定靜態資源配置。
  • dynamic_resources:動態發現服務源配置。
  • cluster_manager:該服務全部的上游羣集的羣集管理配置。
  • hds_config:服務健康檢查配置。
  • flags_path:用於啓動文件標誌的路徑。
  • stats_sinks:統計彙總設置
  • stats_config:配置內部處理統計信息。
  • stats_flush_interval:刷新到統計信息服務的週期時間。出於性能方面的考慮,Envoy鎖定計數器,而且只是週期性地刷新計數器和計量器。若是未指定,則默認值爲5000毫秒(5秒)。
  • watchdog:看門狗配置。
  • tracing:配置外置的追蹤服務程序。若是沒有指定,則不會執行追蹤。
  • runtime:配置運行時配置分發服務程序。
  • admin: 配置本地管理的HTTP服務。
  • overload_manager:過載管理配置(資源限制)。

各個項的具體做用和配置參考Bootstrap

加載完配置項,接着會啓動 admin 本地的HTTP服務。

初始化 admin 服務

admin 先建立一個 AdminImpl,在構造函數裏初始化 URI。

AdminImpl::AdminImpl(const std::string& profile_path, Server::Instance& server)
    : server_(server), profile_path_(profile_path),
      ... ...
      handlers_{
          {"/", "Admin home page", MAKE_ADMIN_HANDLER(handlerAdminHome), false, false},
          {"/certs", "print certs on machine", MAKE_ADMIN_HANDLER(handlerCerts), false, false},
          // 導出 cluster  統計信息
          {"/clusters", "upstream cluster status", MAKE_ADMIN_HANDLER(handlerClusters), false,
           false},
          // 導出配置文件
          {"/config_dump", "dump current Envoy configs (experimental)",
           MAKE_ADMIN_HANDLER(handlerConfigDump), false, false},
          // 導出鏈接統計信息
          {"/contention", "dump current Envoy mutex contention stats (if enabled)",
           MAKE_ADMIN_HANDLER(handlerContention), false, false},
          ... ...
      admin_filter_chain_(std::make_shared<AdminFilterChain>()) {}

接着啓動一個服務。

admin_->startHttpListener(initial_config.admin().accessLogPath(), options.adminAddressPath(),
                              initial_config.admin().address(),
                              stats_store_.createScope("listener.admin."));

在啓動服務函數內部會建立 TcpListenSocket 和 AdminListener。

void AdminImpl::startHttpListener(const std::string& access_log_path,
                                  const std::string& address_out_path,
                                  Network::Address::InstanceConstSharedPtr address,
                                  Stats::ScopePtr&& listener_scope) {
  ... ...
  socket_ = std::make_unique<Network::TcpListenSocket>(address, nullptr, true);
  listener_ = std::make_unique<AdminListener>(*this, std::move(listener_scope));
  ... ...
}

初始化 TcpListenSocket 時會在內部建立一個 socket 後再進行 bind。

using TcpListenSocket = NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Stream>>;

template <typename T> class NetworkListenSocket : public ListenSocketImpl {
public:
  NetworkListenSocket(const Address::InstanceConstSharedPtr& address,
                      const Network::Socket::OptionsSharedPtr& options, bool bind_to_port)
      // socket 建立
      : ListenSocketImpl(address->socket(T::type), address) {
    RELEASE_ASSERT(io_handle_->fd() != -1, "");

    setPrebindSocketOptions();

    setupSocket(options, bind_to_port);
  }

void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) {
  setListenSocketOptions(options);
  //準備進行綁定
  if (bind_to_port) {
    doBind();
  }
}

void ListenSocketImpl::doBind() {
  //綁定
  const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd());
  ... ...
}

AdminListener 構造函數內只是參數的初始化。

AdminListener(AdminImpl& parent, Stats::ScopePtr&& listener_scope)
  : parent_(parent), name_("admin"), scope_(std::move(listener_scope)),
    stats_(Http::ConnectionManagerImpl::generateListenerStats("http.admin.", *scope_)) {}

作完 socket ,bind 後面就是進行 listen 處理。將 AdminListener 經過 handler 加入監聽隊列。handler 是在 InstanceImpl 的構造函數內初始化的。

InstanceImpl::InstanceImpl(... ...)
  : handler_(new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher_)),
  ... ...{
  }

void InstanceImpl::initialize(... ...)
{
  ... ...
  //將 AdminListener 加入 ConnectionHandler
  if (initial_config.admin().address()) {
    admin_->addListenerToHandler(handler_.get());
  }
  ... ...
}

void AdminImpl::addListenerToHandler(Network::ConnectionHandler* handler) {
  // 這裏的 listener_ 就是上面生成的 AdminListener
  if (listener_) {
    handler->addListener(*listener_);
  }
}

在 addListener 內會新建一個 ActiveListener 內部類,先置爲 disable 狀態。

void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) {
  ActiveListenerPtr l(new ActiveListener(*this, config));
  if (disable_listeners_) {
    l->listener_->disable();
  }
  listeners_.emplace_back(config.socket().localAddress(), std::move(l));
}

在 ActiveListener 構造函數內建立 listen,裏面dispatcher 會建立回調。等有新鏈接到來時,會回調 onAccept.

ConnectionHandlerImpl::ActiveListener::ActiveListener(ConnectionHandlerImpl& parent,Network::ListenerConfig& config)
    : ActiveListener(
          parent,
          // 建立listen
          parent.dispatcher_.createListener(config.socket(), *this, config.bindToPort(),
                                            config.handOffRestoredDestinationConnections()),
          config) {}

Network::ListenerPtr
DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) {
  ASSERT(isThreadSafe());
  return Network::ListenerPtr{new Network::ListenerImpl(*this, socket, cb, bind_to_port,
                                                        hand_off_restored_destination_connections)};
}

void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
  listener_.reset(
      //建立 evconnlistener_new ,有鏈接回調listenCallback
      evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
  ... ...
}

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,int remote_addr_len, void* arg) {
  ... ...

  //回調ActiveListener的onAccept
  listener->cb_.onAccept(
    std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address),
    listener->hand_off_restored_destination_connections_);
}

onAccept 對 Listern 過濾後,建立新鏈接。

void ConnectionHandlerImpl::ActiveListener::onAccept()
{
  ... ...
  active_socket->continueFilterChain(true);
  ... ...
}

void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) {
  ... ...
   listener_.newConnection(std::move(socket_));
  ... ...
}

void ConnectionHandlerImpl::ActiveListener::onNewConnection() {
  ... ...
  if (new_connection->state() != Network::Connection::State::Closed) {
    ActiveConnectionPtr active_connection(
        new ActiveConnection(*this, std::move(new_connection), parent_.dispatcher_.timeSource()));
    active_connection->moveIntoList(std::move(active_connection), connections_);
    parent_.num_connections_++;
  }
  ... ...
}

這樣,新鏈接就創建起來。

配置文件 XDS 初始化

初始化 Bootstrap 的 XDS 時,先初始化 static sercret,先初始化 cluster,接着初始化 listeners。

void MainImpl::initialize(... ...) {
  //初始化secrets
  const auto& secrets = bootstrap.static_resources().secrets();
  for (ssize_t i = 0; i < secrets.size(); i++) {
    ENVOY_LOG(debug, "static secret #{}: {}", i, secrets[i].name());
    server.secretManager().addStaticSecret(secrets[i]);
  }

  //初始化 cluster
  bootstrap.static_resources().clusters().size());
  cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap);

  // 初始化listeners
  const auto& listeners = bootstrap.static_resources().listeners();
  for (ssize_t i = 0; i < listeners.size(); i++) {
    ENVOY_LOG(debug, "listener #{}:", i);
    server.listenerManager().addOrUpdateListener(listeners[i], "", false);
  }

初始化 cluster 會分兩階段初始化。先初始化非 EDS 部分,再初始化 EDS 部分。分兩個階段的初始化是由於在 v2 配置中每一個 EDS 集羣單獨設置訂閱。此訂閱是 API 源時  羣集將依賴於非 EDS 羣集,所以必須首先初始化非 EDS 羣集。cluster 的類型有 5個類型:

enum DiscoveryType {
    // Refer to the :ref:`static discovery 
    STATIC = 0;

    // Refer to the :ref:`strict DNS discovery
    STRICT_DNS = 1;

    // Refer to the :ref:`logical DNS discovery
    LOGICAL_DNS = 2;

    // Refer to the :ref:`service discovery 
    EDS = 3;

    // Refer to the :ref:`original destination discovery
    ORIGINAL_DST = 4;
  }

cluster 初始化順序:
非 EDS 部分 -> ADS -> EDS -> CDS

ClusterManagerImpl::ClusterManagerImpl(... ...) {
  ... ...
  for (const auto& cluster : bootstrap.static_resources().clusters()) {
    // 第一次初始化非 EDS 部分
    if (cluster.type() != envoy::api::v2::Cluster::EDS) {
      loadCluster(cluster, "", false, active_clusters_);
    }
  }

  // 初始化 ADS
  if (bootstrap.dynamic_resources().has_ads_config()) {
    ads_mux_ = std::make_unique<Config::GrpcMuxImpl>(
        local_info,
        Config::Utility::factoryForGrpcApiConfigSource(
            *async_client_manager_, bootstrap.dynamic_resources().ads_config(), stats)
            ->create(),
        main_thread_dispatcher,
        *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
            "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"),
        random_, stats_,
        Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config()));
  } else {
    ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
  }

  for (const auto& cluster : bootstrap.static_resources().clusters()) {
    // 初始化 EDS
    if (cluster.type() == envoy::api::v2::Cluster::EDS) {
      loadCluster(cluster, "", false, active_clusters_);
    }
  }

  ... ...
  //初始化 CDS
  if (bootstrap.dynamic_resources().has_cds_config()) {
    cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), *this);
    init_helper_.setCds(cds_api_.get());
  } else {
    init_helper_.setCds(nullptr);
  }

}

上面都初始化完成後,再初始化 lds,最後再初始化 hds。

// 初始化lds
if (bootstrap_.dynamic_resources().has_lds_config()) {
  listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
}

//初始化hds
if (bootstrap_.has_hds_config()) {
  const auto& hds_config = bootstrap_.hds_config();
  async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
      *config_.clusterManager(), thread_local_, time_source_, *api_);
  ... ...
}

初始化 ListenerManager

ListenerManager 的初始化只是事先建立 worker。

ListenerManagerImpl::ListenerManagerImpl(... ...) {
  ... ...
  // 建立worker子線程
  for (uint32_t i = 0; i < server.options().concurrency(); i++) {
    workers_.emplace_back(worker_factory.createWorker(server.overloadManager()));
  }
}

WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager) {
  // 新建子線程,每一個線種一個dispatchr
  Event::DispatcherPtr dispatcher(api_.allocateDispatcher());
  return WorkerPtr{new WorkerImpl(
      tls_, hooks_, std::move(dispatcher),
      Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher)},
      overload_manager, api_)};
}

WorkerImpl::WorkerImpl(... ...)
    : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)), api_(api) {
  tls_.registerThread(*dispatcher_, false);
  overload_manager.registerForAction(
      OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
      [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });
}

啓動

main 啓動入口

main 函數調用 main_common

int main(int argc, char** argv) {
  ... ...
  return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}

main_common 進一步調用 InstanceImpl

bool MainCommonBase::run() {
  switch (options_.mode()) {
  case Server::Mode::Serve:
    server_->run();
    return true;
  ... ...
}

InstanceImpl 啓用 loop 循環。

void InstanceImpl::run() {
  auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(), access_log_manager_,
                              init_manager_, overloadManager(), [this] { startWorkers(); });

  auto watchdog = guard_dog_->createWatchDog(api_->threadFactory().currentThreadId());
  watchdog->startWatchdog(*dispatcher_);
  dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
  dispatcher_->run(Event::Dispatcher::RunType::Block);
  ENVOY_LOG(info, "main dispatch loop exited");
  guard_dog_->stopWatching(watchdog);
  watchdog.reset();

  terminate();
}

服務啓動流程

本地 HTTP 管理服務的啓動流程上面已經分析過,如今討論本地服務的啓動流程(XDS 下發的暫不討論)。

在 cluster 初始化的時候,加入 listener。

void MainImpl::initialize(... ...) {
  ... ...
  // 初始化listeners
  const auto& listeners = bootstrap.static_resources().listeners();
  for (ssize_t i = 0; i < listeners.size(); i++) {
    ENVOY_LOG(debug, "listener #{}:", i);
    server.listenerManager().addOrUpdateListener(listeners[i], "", false);
  }
}

addOrUpdateListener 建立 ListenerImpl,ListenerImpl 作 bind 操做。

// 建立ListenerImpl
  ListenerImplPtr new_listener(
      new ListenerImpl(config, version_info, *this, name, modifiable, workers_started_, hash));
  ListenerImpl& new_listener_ref = *new_listener;
  ... ...

  //bind 地址將 socket 關聯ListenerImpl
  new_listener->setSocket(draining_listener_socket
                                ? draining_listener_socket
                                : factory_.createListenSocket(new_listener->address(),
                                                              new_listener->socketType(),
                                                              new_listener->listenSocketOptions(),
                                                              new_listener->bindToPort()));

Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket(... ...) {
  ... ...
  // 調用 UdsListenSocket 作 bind() 操做。
  if (io_handle->isOpen()) {
      return std::make_shared<Network::UdsListenSocket>(std::move(io_handle), address);
  }
  return std::make_shared<Network::UdsListenSocket>(address);
}

// 最終調用系統bind()操做
void ListenSocketImpl::doBind() {
  const Api::SysCallIntResult result = local_address_->bind(io_handle_->fd());
  ... ...
}

在 InstanceImpl 啓動時,調用 RunHelper。RunHelper 則啓動 startWorkers。startWorker 將初始化獲得的 listeners 加入到 work 中。

void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
  workers_started_ = true;
  for (const auto& worker : workers_) {
    ASSERT(warming_listeners_.empty());
    for (const auto& listener : active_listeners_) {
      addListenerToWorker(*worker, *listener);
    }
    worker->start(guard_dog);
  }
}

work 將 linsteners 關聯到 connectioHandler。

void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) {
  worker.addListener(listener, [this, &listener](bool success) -> void {
  ... ...
}

void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) {
  dispatcher_->post([this, &listener, completion]() -> void {
    try {
      // 關聯到connectioHandler。
      handler_->addListener(listener);
      hooks_.onWorkerListenerAdded();
      completion(true);
    } catch (const Network::CreateListenerException& e) {
      completion(false);
    }
  });
}

connectioHandler 在 work 初始化時建立。

ListenerManagerImpl::ListenerManagerImpl(Instance& server,
                                         ListenerComponentFactory& listener_factory,
                                         WorkerFactory& worker_factory)
    : server_(server), factory_(listener_factory), stats_(generateStats(server.stats())),
      config_tracker_entry_(server.admin().getConfigTracker().add(
          "listeners", [this] { return dumpListenerConfigs(); })) {
  for (uint32_t i = 0; i < server.options().concurrency(); i++) {
    // 初始化worker
    workers_.emplace_back(worker_factory.createWorker(server.overloadManager()));
  }
}

WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager) {
  Event::DispatcherPtr dispatcher(api_.allocateDispatcher());
  return WorkerPtr{new WorkerImpl(
      tls_, hooks_, std::move(dispatcher),
      //建立connectioHandler
      Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher)},
      overload_manager, api_)};
}

將 linsteners 關聯到 connectioHandler 後,後面的 listen(),accept() 和建立鏈接過程和 admin 的 HTTP 啓動流程是同樣的。

LDS 服務啓動流程

整個服務的啓動流程基本就完成了,後面有新加服務的啓動流程和上面的服務啓動流程同樣,調用 addOrUpdateListener。在 addOrUpdateListener 內判斷服務是否已啓動,若是已啓動調用 ManagerImpl 等待初始化。

void ListenerImpl::initialize() {
  last_updated_ = timeSource().systemTime();
  if (workers_started_) {
    //ManagerImpl
    dynamic_init_manager_.initialize(*init_watcher_);
  }
}

void ManagerImpl::initialize(const Watcher& watcher) {
  ... ...
    for (const auto& target_handle : target_handles_) {
      // 等待 target_handle 初始化完成。
      if (!target_handle->initialize(watcher_)) {
        onTargetReady();
      }
    }
}

初始化完成後,調用函數指針。函數指針在初始化WatcherImpl傳入。

void ManagerImpl::onTargetReady() {
  if (--count_ == 0) {
    // 初始化完成
    ready();
  }
}

void ManagerImpl::ready() {
  state_ = State::Initialized;
  watcher_handle_->ready();
}

bool WatcherHandleImpl::ready() const {
    //調用函數指針
    (*locked_fn)();
}

ListenerImpl::ListenerImpl(... ...)
  : ... ...
   // 初始化watch
   init_watcher_(std::make_unique<Init::WatcherImpl>(
          "ListenerImpl", [this] { parent_.onListenerWarmed(*this); })){}

在 onListenerWarmed 內將 listener 加入 work。後面流程和 服務啓動流程同樣,再也不分析。

void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) {
  for (const auto& worker : workers_) {
    addListenerToWorker(*worker, listener);
  }

最後

整個服務的初始化和啓動流程就完成了。服務的啓動有3個類型 : 本地 HTTP 服務管理服務、本地配置文件的服務和xDS下發的服務。本章節只分析了服務的啓動流程,鏈接成功的後繼處理,之後分析。

相關文章
相關標籤/搜索