本文做者趙化冰,將在明天下午 1 點半在成都螞蟻 C 空間爲你們分享《服務網格技術在5G網絡管理平臺中的落地實踐》歡迎你們,查看活動詳情。bootstrap
在Istio架構中,Pilot組件屬於最核心的組件,負責了服務網格中的流量管理以及控制面和數據面之間的配置下發。Pilot內部的代碼結構比較複雜,本文中咱們將經過對Pilot的代碼的深刻分析來了解Pilot實現原理。api
首先咱們來看一下Pilot在Istio中的功能定位,Pilot將服務信息和配置數據轉換爲xDS接口的標準數據結構,經過gRPC下發到數據面的Envoy。若是把Pilot當作一個處理數據的黑盒,則其有兩個輸入,一個輸出:網絡
目前Pilot的輸入包括兩部分數據來源:數據結構
Pilot的輸出爲符合xDS接口的數據面配置數據,並經過gRPC Streaming接口將配置數據推送到數據面的Envoy中。架構
備註:Istio代碼庫在不停變化更新中,本文分析所基於的代碼commit爲: d539abe00c2599d80c6d64296f78d3bb8ab4b033併發
Istio Pilot的代碼分爲Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用於在數據面負責Envoy的生命週期管理,Pilot-Discovery纔是控制面進行流量管理的組件,本文將重點分析控制面部分,即Pilot-Discovery的代碼。負載均衡
下圖是Pilot-Discovery組件代碼的主要結構: less
Pilot-Discovery的入口函數爲:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中建立了Discovery Server,Discovery Server中主要包含三部分邏輯:svg
Config Controller用於管理各類配置數據,包括用戶建立的流量管理規則和策略。Istio目前支持三種類型的Config Controller:函數
目前Istio的配置包括:
Service Controller用於管理各類Service Registry,提出服務發現數據,目前Istio支持的Service Registry包括:
Discovery Service中主要包含下述邏輯:
Pilot-Disocvery包括如下主要的幾個業務流程:
Pilot-Discovery命令的入口爲pilot/cmd/pilot-discovery/main.go中的main方法,在該方法中建立Pilot Server,Server代碼位於文件pilot/pkg/bootstrap/server.go中。Server主要作了下面一些初始化工做:
Pilot Server建立了一個gRPC Server,用於監聽和接收來自Envoy的xDS請求。pilot/pkg/proxy/envoy/v2/ads.go 中的 DiscoveryServer.StreamAggregatedResources方法被註冊爲gRPC Server的服務處理方法。
當gRPC Server收到來自Envoy的鏈接時,會調用DiscoveryServer.StreamAggregatedResources方法,在該方法中建立一個XdsConnection對象,並開啓一個goroutine從該connection中接收客戶端的xDS請求並進行處理;若是控制面的配置發生變化,Pilot也會經過該connection把配置變化主動推送到Envoy端。
這是Pilot中最複雜的一個業務流程,主要是由於代碼中採用了多個channel和queue對變化消息進行合併和轉發。該業務流程以下:
Pilot和Envoy之間創建的是一個雙向的Streaming gRPC服務調用,所以Pilot能夠在配置變化時向Envoy推送,Envoy也能夠主動發起xDS調用請求獲取配置。Envoy主動發起xDS請求的流程以下:
下面是Discovery Server的關鍵代碼片斷和對應的業務邏輯註解,爲方便閱讀,代碼中只保留了邏輯主幹,去掉了一些不重要的細節。
該部分關鍵代碼位於 istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go
文件的StreamAggregatedResources 方法中。StreamAggregatedResources方法被註冊爲gRPC Server的handler,對於每個客戶端鏈接,gRPC Server會啓動一個goroutine來進行處理。
代碼中主要包含如下業務邏輯:
// StreamAggregatedResources implements the ADS interface. func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { ...... //建立一個goroutine來接收來自Envoy的xDS請求,並將請求放到reqChannel中 con := newXdsConnection(peerAddr, stream) reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) go receiveThread(con, reqChannel, &receiveError) ...... for { select{ //從reqChannel接收Envoy端主動發起的xDS請求 case discReq, ok := <-reqChannel: //根據請求的類型構造相應的xDS Response併發送到Envoy端 switch discReq.TypeUrl { case ClusterType: err := s.pushCds(con, s.globalPushContext(), versionInfo()) case ListenerType: err := s.pushLds(con, s.globalPushContext(), versionInfo()) case RouteType: err := s.pushRoute(con, s.globalPushContext(), versionInfo()) case EndpointType: err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil) } //從PushChannel接收Service或者Config變化後的通知 case pushEv := <-con.pushChannel: //將變化內容推送到Envoy端 err := s.pushConnection(con, pushEv) } } }
該部分關鍵代碼位於 istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go
文件中,用於監聽服務和配置變化消息,並將變化消息合併後經過Channel發送給前面提到的 StreamAggregatedResources 方法進行處理。
ConfigUpdate是處理服務和配置變化的回調函數,service controller和config controller在發生變化時會調用該方法通知Discovery Server。
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) { inboundConfigUpdates.Increment() //服務或配置變化後,將一個PushRequest發送到pushChannel中 s.pushChannel <- req }
在debounce方法中將連續發生的PushRequest進行合併,若是一段時間內沒有收到新的PushRequest,再發起推送;以免因爲服務和配置頻繁變化給系統帶來較大壓力。
// The debounce helper function is implemented to enable mocking func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) { ...... pushWorker := func() { eventDelay := time.Since(startDebounce) quietTime := time.Since(lastConfigUpdateTime) // it has been too long or quiet enough //一段時間內沒有收到新的PushRequest,再發起推送 if eventDelay >= DebounceMax || quietTime >= DebounceAfter { if req != nil { pushCounter++ adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v", pushCounter, debouncedEvents, quietTime, eventDelay, req.Full) free = false go push(req) req = nil debouncedEvents = 0 } } else { timeChan = time.After(DebounceAfter - quietTime) } } for { select { ...... case r := <-ch: lastConfigUpdateTime = time.Now() if debouncedEvents == 0 { timeChan = time.After(DebounceAfter) startDebounce = lastConfigUpdateTime } debouncedEvents++ //合併連續發生的多個PushRequest req = req.Merge(r) case <-timeChan: if free { pushWorker() } case <-stopCh: return } } }
ServiceMesher 社區是由一羣擁有相同價值觀和理念的志願者們共同發起,於 2018 年 4 月正式成立。
社區關注領域有:容器、微服務、Service Mesh、Serverless,擁抱開源和雲原生,致力於推進 Service Mesh 在中國的蓬勃發展。