本文做者趙化冰,將在明天下午 1 點半在成都螞蟻 C 空間爲你們分享《服務網格技術在5G網絡管理平臺中的落地實踐》歡迎你們,查看活動詳情。bootstrap
在Istio架構中,Pilot組件屬於最核心的組件,負責了服務網格中的流量管理以及控制面和數據面之間的配置下發。Pilot內部的代碼結構比較複雜,本文中咱們將經過對Pilot的代碼的深刻分析來了解Pilot實現原理。api
首先咱們來看一下Pilot在Istio中的功能定位,Pilot將服務信息和配置數據轉換爲xDS接口的標準數據結構,經過gRPC下發到數據面的Envoy。若是把Pilot當作一個處理數據的黑盒,則其有兩個輸入,一個輸出:網絡
目前Pilot的輸入包括兩部分數據來源:數據結構
Pilot的輸出爲符合xDS接口的數據面配置數據,並經過gRPC Streaming接口將配置數據推送到數據面的Envoy中。架構
備註:Istio代碼庫在不停變化更新中,本文分析所基於的代碼commit爲: d539abe00c2599d80c6d64296f78d3bb8ab4b033併發
Istio Pilot的代碼分爲Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用於在數據面負責Envoy的生命週期管理,Pilot-Discovery纔是控制面進行流量管理的組件,本文將重點分析控制面部分,即Pilot-Discovery的代碼。負載均衡
下圖是Pilot-Discovery組件代碼的主要結構: less
Pilot-Discovery的入口函數爲:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中建立了Discovery Server,Discovery Server中主要包含三部分邏輯:函數
Config Controller用於管理各類配置數據,包括用戶建立的流量管理規則和策略。Istio目前支持三種類型的Config Controller:微服務
目前Istio的配置包括:
Service Controller用於管理各類Service Registry,提出服務發現數據,目前Istio支持的Service Registry包括:
Discovery Service中主要包含下述邏輯:
Pilot-Disocvery包括如下主要的幾個業務流程:
Pilot-Discovery命令的入口爲pilot/cmd/pilot-discovery/main.go中的main方法,在該方法中建立Pilot Server,Server代碼位於文件pilot/pkg/bootstrap/server.go中。Server主要作了下面一些初始化工做:
Pilot Server建立了一個gRPC Server,用於監聽和接收來自Envoy的xDS請求。pilot/pkg/proxy/envoy/v2/ads.go 中的 DiscoveryServer.StreamAggregatedResources方法被註冊爲gRPC Server的服務處理方法。
當gRPC Server收到來自Envoy的鏈接時,會調用DiscoveryServer.StreamAggregatedResources方法,在該方法中建立一個XdsConnection對象,並開啓一個goroutine從該connection中接收客戶端的xDS請求並進行處理;若是控制面的配置發生變化,Pilot也會經過該connection把配置變化主動推送到Envoy端。
這是Pilot中最複雜的一個業務流程,主要是由於代碼中採用了多個channel和queue對變化消息進行合併和轉發。該業務流程以下:
Pilot和Envoy之間創建的是一個雙向的Streaming gRPC服務調用,所以Pilot能夠在配置變化時向Envoy推送,Envoy也能夠主動發起xDS調用請求獲取配置。Envoy主動發起xDS請求的流程以下:
下面是Discovery Server的關鍵代碼片斷和對應的業務邏輯註解,爲方便閱讀,代碼中只保留了邏輯主幹,去掉了一些不重要的細節。
該部分關鍵代碼位於 istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go
文件的StreamAggregatedResources 方法中。StreamAggregatedResources方法被註冊爲gRPC Server的handler,對於每個客戶端鏈接,gRPC Server會啓動一個goroutine來進行處理。
代碼中主要包含如下業務邏輯:
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
......
//建立一個goroutine來接收來自Envoy的xDS請求,並將請求放到reqChannel中
con := newXdsConnection(peerAddr, stream)
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
go receiveThread(con, reqChannel, &receiveError)
......
for {
select{
//從reqChannel接收Envoy端主動發起的xDS請求
case discReq, ok := <-reqChannel:
//根據請求的類型構造相應的xDS Response併發送到Envoy端
switch discReq.TypeUrl {
case ClusterType:
err := s.pushCds(con, s.globalPushContext(), versionInfo())
case ListenerType:
err := s.pushLds(con, s.globalPushContext(), versionInfo())
case RouteType:
err := s.pushRoute(con, s.globalPushContext(), versionInfo())
case EndpointType:
err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
}
//從PushChannel接收Service或者Config變化後的通知
case pushEv := <-con.pushChannel:
//將變化內容推送到Envoy端
err := s.pushConnection(con, pushEv)
}
}
}複製代碼
該部分關鍵代碼位於 istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go
文件中,用於監聽服務和配置變化消息,並將變化消息合併後經過Channel發送給前面提到的 StreamAggregatedResources 方法進行處理。
ConfigUpdate是處理服務和配置變化的回調函數,service controller和config controller在發生變化時會調用該方法通知Discovery Server。
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
inboundConfigUpdates.Increment()
//服務或配置變化後,將一個PushRequest發送到pushChannel中
s.pushChannel <- req
}複製代碼
在debounce方法中將連續發生的PushRequest進行合併,若是一段時間內沒有收到新的PushRequest,再發起推送;以免因爲服務和配置頻繁變化給系統帶來較大壓力。
// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {
......
pushWorker := func() {
eventDelay := time.Since(startDebounce)
quietTime := time.Since(lastConfigUpdateTime)
// it has been too long or quiet enough
//一段時間內沒有收到新的PushRequest,再發起推送
if eventDelay >= DebounceMax || quietTime >= DebounceAfter {
if req != nil {
pushCounter++
adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
pushCounter, debouncedEvents,
quietTime, eventDelay, req.Full)
free = false
go push(req)
req = nil
debouncedEvents = 0
}
} else {
timeChan = time.After(DebounceAfter - quietTime)
}
}
for {
select {
......
case r := <-ch:
lastConfigUpdateTime = time.Now()
if debouncedEvents == 0 {
timeChan = time.After(DebounceAfter)
startDebounce = lastConfigUpdateTime
}
debouncedEvents++
//合併連續發生的多個PushRequest
req = req.Merge(r)
case <-timeChan:
if free {
pushWorker()
}
case <-stopCh:
return
}
}
}複製代碼
ServiceMesher 社區是由一羣擁有相同價值觀和理念的志願者們共同發起,於 2018 年 4 月正式成立。
社區關注領域有:容器、微服務、Service Mesh、Serverless,擁抱開源和雲原生,致力於推進 Service Mesh 在中國的蓬勃發展。
社區官網:https://www.servicemesher.com