1、Envoy的工做模式bootstrap
Envoy的工做模式如圖所示,橫向是管理平面。後端
Envoy會暴露admin的API,能夠經過API查看Envoy中的路由或者集羣的配置。api
例如經過curl http://127.0.0.1:15000/routes能夠查看路由的配置,結果以下圖,請記住路由的配置層級,後面在代碼中會看到熟悉的數據結構。緩存
routes下面有virtual_hosts,裏面有帶prefix的route,裏面是route entry,裏面是weight cluster,對於不一樣的cluster不一樣的路由權重,再裏面是cluster的列表,有cluster的名稱和權重。性能優化
再如經過curl http://127.0.0.1:15000/clusters能夠獲得集羣也即cluster的配置,這裏面是真正cluster的信息。網絡
在另一面,Envoy會調用envoy API去pilot裏面獲取路由和集羣的配置,envoy API的詳情能夠查看https://www.envoyproxy.io/docs/envoy/v1.8.0/api-v2/http_routes/http_routes中的API文檔,能夠看到route的配置詳情,也是按照上面的層級組織的。數據結構
當Envoy從pilot獲取到路由和集羣信息以後,會保存在內存中,當有個客戶端要鏈接後端的時候,客戶端處於Downstream,後端處於Upstream,當數據從Downstream流向Upstream的時候,會經過Filter,根據路由和集羣的配置,選擇後端的應用創建鏈接,將請求轉發出去。架構
接下來咱們來看Envoy是如何實現這些的。併發
2、Envoy的關鍵數據結構的建立app
Envoy的啓動會在main函數中建立Envoy::MainCommon,在它的構造函數中,會調用父類MainCommonBase的構造函數,建立Server::InstanceImpl,接下來調用InstanceImpl::initialize(...)
接下來就進入關鍵的初始化階段。
加載初始化配置,裏面配置了Listener Discover Service, Router Discover Service, Cluster Discover Service等。
InstanceUtil::loadBootstrapConfig(bootstrap_, options);
建立AdminImpl,從而能夠接受請求接口
admin_.reset(new AdminImpl(initial_config.admin().accessLogPath(), initial_config.admin().profilePath(), *this));
建立ListenerManagerImpl,用於管理監聽,由於Downstream要訪問Upstream的時候,envoy會進行監聽,Downstream會鏈接監聽的端口。
listener_manager_.reset( new ListenerManagerImpl(*this, listener_component_factory_, worker_factory_, time_system_));
在ListenerManagerImpl的構造函數中,建立了不少的worker,Envoy採用libevent監聽socket的事件,當有一個新的鏈接來的時候,會將任務分配給某個worker進行處理,從而實現異步的處理。
ListenerManagerImpl::ListenerManagerImpl(...) { for (uint32_t i = 0; i < server.options().concurrency(); i++) { workers_.emplace_back(worker_factory.createWorker()); } }
createWorker會建立WorkerImpl,初始化WorkerImpl須要兩個重要的參數。
一個是allocateDispatcher建立出來的DispatcherImpl,用來封裝libevent的事件分發的。
一個是ConnectionHandlerImpl,用來管理一個鏈接的。
咱們接着看初始化過程,建立ProdClusterManagerFactory,用於建立Cluster Manager,管理上游的集羣。
cluster_manager_factory_.reset(new Upstream::ProdClusterManagerFactory(...);
在ProdClusterManagerFactory會建立ClusterManagerImpl,在ClusterManagerImpl的構造函數中,若是配置了CDS,就須要訂閱Cluster Discover Service。
if (bootstrap.dynamic_resources().has_cds_config()) { cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this); init_helper_.setCds(cds_api_.get()); }
會調用ProdClusterManagerFactory::createCds,裏面會建立CdsApiImpl。在CdsApiImpl的構造函數中,會建立CdsSubscription,訂閱CDS,當集羣的配置發生變化的時候,會調用CdsApiImpl::onConfigUpdate(...)
接下來在InstanceImpl::initialize的初始化函數中,若是配置了LDS,就須要訂閱Listener Discover Service。
if (bootstrap_.dynamic_resources().has_lds_config()) { listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config()); }
ListenerManagerImpl的createLdsApi會調用ProdListenerComponentFactory的createLdsApi函數,會建立LdsApiImpl。在LdsApiImpl的構造函數中,會建立LdsSubscription,訂閱LDS,當Listener的配置改變的時候,會調用LdsApiImpl::onConfigUpdate(...)
3、Envoy的啓動
MainCommonBase::run() 會調用InstanceImpl::run(),會調用InstanceImpl::startWorkers(),會調用ListenerManagerImpl::startWorkers。
startWorkers會將Listener添加到全部的worker中,而後啓動worker的線程。
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) { workers_started_ = true; for (const auto& worker : workers_) { for (const auto& listener : active_listeners_) { addListenerToWorker(*worker, *listener); } worker->start(guard_dog); } }
對於每個WorkerImpl,會調用ConnectionHandlerImpl的addListener函數,會建立ActiveListener對象,ActiveListener很重要,他的函數會在libevent收到事件的時候被調用。
在ActiveListener的構造函數中,會調用相同Worker的DispatcherImpl的createListener,建立Network::ListenerImpl,注意這個類的namespace,由於對於Envoy來說,LDS裏面有個Listener的概念,可是Socket也有Listener的概念,在Network這個namespace下面的,是對socket和libevent的封裝。
在Network::ListenerImpl的構造函數中,當收到事件的時候,會調用註冊的listenCallback函數。
if (bind_to_port) { listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
在listenCallback函數中,會調用onAccept函數,這個函數是ConnectionHandlerImpl::ActiveListener::onAccept函數。
listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address), listener->hand_off_restored_destination_connections_);
4、Envoy從Pilot中獲取Listener
當Listener的配置有變化的時候,會調用LdsApiImpl::onConfigUpdate(...)。
會調用ListenerManagerImpl的addOrUpdateListener(...)函數,在這裏面,會建立一個ListenerImpl,
這裏的listener是LDS定義的Listener,而非網絡的Listener了。
在ListenerImpl的構造函數中,首先會建立ListenerFilter,能夠對監聽的socket進行定製化的配置,例如爲了實現use_original_dst,就須要加入一個ListenerFilter。
爲了建立ListenerFilter,先要調用ProdListenerComponentFactory::createListenerFilterFactoryList_來建立工廠。
建立完了ListenerFilter以後,爲了對於進來的網絡包進行處理,會建立NetworkFilter,正是這些NetworkFilter實現了對網絡包的解析,並根據網絡包的內容,實現路由和負載均衡策略。
爲了建立NetworkFilter,先要調用ProdListenerComponentFactory::createNetworkFilterFactoryList_來建立工廠,在這個函數裏面,會遍歷全部的NamedNetworkFilterConfigFactory,也即起了名字的filter都過一遍,都有哪些呢?class NetworkFilterNameValues裏面有定義,這裏面最重要的是:
// HTTP connection manager filter const std::string HttpConnectionManager = "envoy.http_connection_manager";
咱們這裏重點看HTTP協議的轉發,咱們重點看這個名字對應的HttpConnectionManagerFilterConfigFactory,並調用他的createFilterFactory函數,返回一個Network::FilterFactoryCb類型的callback函數。
5、Envoy訂閱RDS
HttpConnectionManagerFilterConfigFactory的createFilterFactory函數最終調用createFilterFactoryFromProtoTyped函數中,會建立
Router::RouteConfigProviderManagerImpl。
std::shared_ptr<Router::RouteConfigProviderManager> route_config_provider_manager = context.singletonManager().getTyped <Router::RouteConfigProviderManager>( SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] { return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin()); });
建立完畢Router::RouteConfigProviderManagerImpl以後,會建立HttpConnectionManagerConfig,將Router::RouteConfigProviderManagerImpl做爲成員變量傳入。
std::shared_ptr<HttpConnectionManagerConfig> filter_config(new HttpConnectionManagerConfig(proto_config, context, *date_provider, *route_config_provider_manager));
在HttpConnectionManagerConfig的構造函數中,調用Router::RouteConfigProviderUtil::create。
route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_,route_config_provider_manager_);
Router::RouteConfigProviderUtil::create會調用RouteConfigProviderManagerImpl::createRdsRouteConfigProvider,在這個函數裏面,會建立RdsRouteConfigSubscription訂閱RDS,而後建立RdsRouteConfigProviderImpl。
當Router的配置發生變化的時候,會調用RdsRouteConfigProviderImpl::onConfigUpdate(),在這個函數裏面,會從pilot獲取Route的配置,生成ConfigImpl對象。
當咱們仔細分析ConfigImpl的時候,咱們發現,這個數據結構和第一節Envoy中的路由配置是同樣的。
在ConfigImpl裏面有RouteMatcher用於匹配路由,在RouteMatcher裏面有VirtualHostImpl,在VirtualHostImpl裏面有RouteEntryImplBase,其中一種RouteEntry是WeightedClusterEntry,在WeightedClusterEntry裏面有cluster_name_和cluster_weight_。
到此RouteConfigProviderManagerImpl如何訂閱RDS告一段落,咱們回到HttpConnectionManagerFilterConfigFactory的createFilterFactoryFromProtoTyped中,在這個函數的最後一部分,將返回Network::FilterFactoryCb,是一個callback函數,做爲createFilterFactoryFromProtoTyped的返回值。
在這個callback函數中,會建立Http::ConnectionManagerImpl,將HttpConnectionManagerConfig做爲成員變量傳入,Http::ConnectionManagerImpl是一個Network::ReadFilter。在Envoy裏面,有Network::WriteFilter和Network::ReadFilter,其中從Downstream發送到Upstream的使用Network::ReadFilter對網絡包進行處理,反方向的是使用Network::WriteFilter。
callback函數建立ConnectionManagerImpl以後調用filter_manager.addReadFilter,將ConnectionManagerImpl放到ReadFilter列表中。固然這個callback函數如今是不調用的。
createFilterFactoryFromProtoTyped的返回值callback函數會返回到ProdListenerComponentFactory::createNetworkFilterFactoryList_函數,這個函數返回一個callback函數列表,其中一個就是上面生成的這個函數。
再返回就回到了ListenerImpl的構造函數,他會調用addFilterChain,將ProdListenerComponentFactory::createNetworkFilterFactoryList_返回的callback函數列表加入到鏈裏面。
6、當一個新的鏈接創建的時候
經過前面的分析,咱們制定當一個新的鏈接創建的時候,會觸發libevent的事件,最終調用ConnectionHandlerImpl::ActiveListener::onAccept函數。
在這個函數中,會調用使用上面createListenerFilterFactoryList_建立的ListenerFilter的工廠建立ListenerFilter,而後使用這些Filter進行處理。
// Create and run the filters config_.filterChainFactory().createListenerFilterChain(*active_socket); active_socket->continueFilterChain(true);
ConnectionHandlerImpl::ActiveSocket::continueFilterChain函數調用ConnectionHandlerImpl::ActiveListener::newConnection,建立一個新的鏈接。
在ConnectionHandlerImpl::ActiveListener::newConnection函數中,先是會調用DispatcherImpl::createServerConnection,建立Network::ConnectionImpl,在Network::ConnectionImpl的構造函數裏面:
file_event_ = dispatcher_.createFileEvent( fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);
對於一個新的socket鏈接,對於操做系統來說是一個文件,也即有個文件描述符,當一個socket可讀的時候,會觸發一個事件,當一個socket可寫的時候,能夠觸發另外一個事件,發生事件後,調用onFileEvent。
在ConnectionHandlerImpl::ActiveListener::newConnection函數中,接下來就應該爲這個鏈接建立NetworkFilter了。
config_.filterChainFactory().createNetworkFilterChain( *new_connection, filter_chain->networkFilterFactories());
上面這段代碼,會調用ListenerImpl::createNetworkFilterChain。
裏面調用Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager, const std::vector<Network::FilterFactoryCb>& factories) { for (const Network::FilterFactoryCb& factory : factories) { factory(filter_manager); } return filter_manager.initializeReadFilters(); }
能夠看出這裏會調用上面生成的callback函數,將ConnectionManagerImpl放到ReadFilter列表中。
7、當有新的數據到來的時候
當Downstream發送數據到Envoy的時候,socket就處於可讀的狀態,於是ConnectionImpl::onFileEvent函數會被調用,當事件是Event::FileReadyType::Read的時候,調用ConnectionImpl::onReadReady()。
在ConnectionImpl::onReadReady()函數裏面,socket將數據讀入緩存。
IoResult result = transport_socket_->doRead(read_buffer_);
而後調用ConnectionImpl::onRead,裏面調用filter_manager_.onRead()使用NetworkFilter對於數據進行處理。
FilterManagerImpl::onRead() 會調用FilterManagerImpl::onContinueReading有下面的邏輯。
for (; entry != upstream_filters_.end(); entry++) { BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer(); if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) { FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream); if (status == FilterStatus::StopIteration) { return; } } }
對於每個Filter,都調用onData函數,我們上面解析過,其中HTTP對應的ReadFilter是ConnectionManagerImpl,於是調用ConnectionManagerImpl::onData函數。
8、對數據進行解析
ConnectionManagerImpl::onData函數中,首先建立數據解析器。
codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);
調用HttpConnectionManagerConfig::createCodec,對於HTTP1,建立Http::Http1::ServerConnectionImpl。
而後對數據進行解析。
codec_->dispatch(data);
調用ConnectionImpl::dispatch,裏面調用ConnectionImpl::dispatchSlice,在這裏面對HTTP進行解析。
<font size="3">http_parser_execute(&parser_, &settings_, slice, len);</font>
解析的參數是settings_,能夠看出這裏面解析url,而後解析header,結束後調用onHeadersCompleteBase()函數,而後解析正文。
因爲路由是根據HTTP頭裏面的信息來的,於是咱們重點看ConnectionImpl::onHeadersCompleteBase(),裏面會調用ServerConnectionImpl::onHeadersComplete函數。
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);
ServerConnectionImpl::onHeadersComplete裏面調用ConnectionManagerImpl::ActiveStream::decodeHeaders這到了路由策略的重點。
這裏面調用了兩個函數,一個是refreshCachedRoute()刷新路由配置,並查找到匹配的路由項Route Entry。
另外一個是ConnectionManagerImpl::ActiveStream::decodeHeaders的另外一個實現。
decodeHeaders(nullptr, *request_headers_, end_stream);
在這裏會經過Route Entry找到後端的集羣,並創建鏈接。
9、路由匹配
咱們先來解析refreshCachedRoute()
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_); request_info_.route_entry_ = route ? route->routeEntry() : nullptr; cached_route_ = std::move(route); }
snapped_route_config_是什麼呢?snapped_route_config_的初始化以下。
snapped_route_config_(connection_manager.config_.routeConfigProvider().config())
routeConfigProvider()返回的就是RdsRouteConfigProviderImpl,其config函數返回的就是ConfigImpl,也就是上面咱們描述過的層級的數據結構。
ConfigImpl的route函數以下
RouteConstSharedPtr route(const Http::HeaderMap& headers, uint64_t random_value) const override { return route_matcher_->route(headers, random_value); }
RouteMatcher的route函數,根據headers.Host()查找virtualhost,而後調用 VirtualHostImpl::getRouteFromEntries。
RouteConstSharedPtr RouteMatcher::route(const Http::HeaderMap& headers, uint64_t random_value) const { const VirtualHostImpl* virtual_host = findVirtualHost(headers); if (virtual_host) { return virtual_host->getRouteFromEntries(headers, random_value); } else { return nullptr; } }
VirtualHostImpl::getRouteFromEntries函數裏面有下面的循環
for (const RouteEntryImplBaseConstSharedPtr& route : routes_) { RouteConstSharedPtr route_entry = route->matches(headers, random_value); if (nullptr != route_entry) { return route_entry; } }
對於每個RouteEntry,看是否匹配。例如經常使用的有PrefixRouteEntryImpl::matches,看前綴是否一致。
RouteConstSharedPtr PrefixRouteEntryImpl::matches(const Http::HeaderMap& headers,uint64_t random_value) const { if (RouteEntryImplBase::matchRoute(headers, random_value) && StringUtil::startsWith(headers.Path()->value().c_str(), prefix_, case_sensitive_)) { return clusterEntry(headers, random_value); } return nullptr; }
獲得匹配的Route Entry後,調用RouteEntryImplBase::clusterEntry獲取一個Cluster的名字。對於WeightedCluster,會調用下面的函數。
WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,true);
在這個函數裏面,會根據權重選擇一個WeightedClusterEntry返回。
這個時候,咱們獲得了後面Cluster,也即集羣的名稱,接下來須要獲得集羣的具體的IP地址並創建鏈接。
10、查找集羣並創建鏈接
在此我向你們推薦一個架構學習交流羣。交流學習羣號:821169538 裏面會分享一些資深架構師錄製的視頻錄像:有Spring,MyBatis,Netty源碼分析,高併發、高性能、分佈式、微服務架構的原理,JVM性能優化、分佈式架構等這些成爲架構師必備的知識體系。還能領取免費的學習資源,目前受益良多。
ConnectionManagerImpl::ActiveStream::decodeHeaders的另外一個實現會調用Route.cc裏面的Filter::decodeHeaders。
在Filter::decodeHeaders函數中有如下的實現。
// A route entry matches for the request. route_entry_ = route_->routeEntry(); Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName());
經過上一節的查找,WeightedClusterEntry裏面有cluster的名稱,接下來就是從cluster manager裏面根據cluster名稱查找到cluster的信息。
cluster manager就是咱們最初建立的ClusterManagerImpl
ThreadLocalCluster* ClusterManagerImpl::get(const std::string& cluster) { ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>(); auto entry = cluster_manager.thread_local_clusters_.find(cluster); if (entry != cluster_manager.thread_local_clusters_.end()) { return entry->second.get(); } else { return nullptr; } }
返回ClusterInfoImpl,是cluster的信息。
cluster_ = cluster->info();
找到Cluster後,開始創建鏈接。
// Fetch a connection pool for the upstream cluster. Http::ConnectionPool::Instance* conn_pool = getConnPool();
route.cc的Filter::getConnPool()裏面調用如下函數。
config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(), protocol, this);
ClusterManagerImpl::httpConnPoolForCluster調用ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool函數。
在這個函數裏面,HostConstSharedPtr host = lb_->chooseHost(context);經過負載均衡,在一個cluster裏面選擇一個後端的機器創建鏈接。