Istio Pilot代碼深度解析

本文做者趙化冰,將在明天下午 1 點半在成都螞蟻 C 空間爲你們分享《服務網格技術在5G網絡管理平臺中的落地實踐》歡迎你們,查看活動詳情bootstrap

Istio Pilot 組件介紹

在Istio架構中,Pilot組件屬於最核心的組件,負責了服務網格中的流量管理以及控制面和數據面之間的配置下發。Pilot內部的代碼結構比較複雜,本文中咱們將經過對Pilot的代碼的深刻分析來了解Pilot實現原理。api

首先咱們來看一下Pilot在Istio中的功能定位,Pilot將服務信息和配置數據轉換爲xDS接口的標準數據結構,經過gRPC下發到數據面的Envoy。若是把Pilot當作一個處理數據的黑盒,則其有兩個輸入,一個輸出:網絡

Pilot的輸入與輸出

目前Pilot的輸入包括兩部分數據來源:數據結構

  • 服務數據: 來源於各個服務註冊表(Service Registry),例如Kubernetes中註冊的Service,Consul Catalog中的服務等。
  • 配置規則: 各類配置規則,包括路由規則及流量管理規則等,經過Kubernetes CRD(Custom Resources Definition)形式定義並存儲在Kubernetes中。

Pilot的輸出爲符合xDS接口的數據面配置數據,並經過gRPC Streaming接口將配置數據推送到數據面的Envoy中。架構

備註:Istio代碼庫在不停變化更新中,本文分析所基於的代碼commit爲: d539abe00c2599d80c6d64296f78d3bb8ab4b033併發

Pilot-Discovery 代碼結構

Istio Pilot的代碼分爲Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用於在數據面負責Envoy的生命週期管理,Pilot-Discovery纔是控制面進行流量管理的組件,本文將重點分析控制面部分,即Pilot-Discovery的代碼。負載均衡

下圖是Pilot-Discovery組件代碼的主要結構: Pilot-Discovery代碼結構less

Pilot-Discovery的入口函數爲:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中建立了Discovery Server,Discovery Server中主要包含三部分邏輯:函數

Config Controller

Config Controller用於管理各類配置數據,包括用戶建立的流量管理規則和策略。Istio目前支持三種類型的Config Controller:微服務

  • Kubernetes:使用Kubernetes來做爲配置數據的存儲,該方式直接依附於Kubernetes強大的CRD機制來存儲配置數據,簡單方便,是Istio最開始使用的配置存儲方案。
  • MCP (Mesh Configuration Protocol):使用Kubernetes來存儲配置數據致使了Istio和Kubernetes的耦合,限制了Istio在非Kubernetes環境下的運用。爲了解決該耦合,Istio社區提出了MCP,MCP定義了一個向Istio控制面下發配置數據的標準協議,Istio Pilot做爲MCP Client,任何實現了MCP協議的Server均可以經過MCP協議向Pilot下發配置,從而解除了Istio和Kubernetes的耦合。若是想要了解更多關於MCP的內容,請參考文後的連接。
  • Memory:一個在內存中的Config Controller實現,主要用於測試。

目前Istio的配置包括:

  • Virtual Service: 定義流量路由規則。
  • Destination Rule: 定義和一個服務或者subset相關的流量處理規則,包括負載均衡策略,鏈接池大小,斷路器設置,subset定義等等。
  • Gateway: 定義入口網關上對外暴露的服務。
  • Service Entry: 經過定義一個Service Entry能夠將一個外部服務手動添加到服務網格中。
  • Envoy Filter: 經過Pilot在Envoy的配置中添加一個自定義的Filter。

Service Controller

Service Controller用於管理各類Service Registry,提出服務發現數據,目前Istio支持的Service Registry包括:

  • Kubernetes:對接Kubernetes Registry,能夠將Kubernetes中定義的Service和Instance採集到Istio中。
  • Consul: 對接Consul Catalog,將Consul中定義的Service採集到Istio中。
  • MCP: 和MCP config controller相似,從MCP Server中獲取Service和Service Instance。
  • Memory: 一個內存中的Service Controller實現,主要用於測試。

Discovery Service

Discovery Service中主要包含下述邏輯:

  • 啓動gRPC Server並接收來自Envoy端的鏈接請求。
  • 接收Envoy端的xDS請求,從Config Controller和Service Controller中獲取配置和服務信息,生成響應消息發送給Envoy。
  • 監聽來自Config Controller的配置變化消息和來自Service Controller的服務變化消息,並將配置和服務變化內容經過xDS接口推送到Envoy。(備註:目前Pilot未實現增量變化推送,每次變化推送的是全量配置,在網格中服務較多的狀況下可能會有性能問題)。

Pilot-Discovery 業務流程

Pilot-Disocvery包括如下主要的幾個業務流程:

初始化Pilot-Discovery的各個主要組件

Pilot-Discovery命令的入口爲pilot/cmd/pilot-discovery/main.go中的main方法,在該方法中建立Pilot Server,Server代碼位於文件pilot/pkg/bootstrap/server.go中。Server主要作了下面一些初始化工做:

  • 建立並初始化Config Controller。
  • 建立並初始化Service Controller。
  • 建立並初始化Discovery Server,Pilot中建立了基於Envoy V1 API的HTTP Discovery Server和基於Envoy V2 API的GPRC Discovery Server。因爲V1已經被廢棄,本文將主要分析V2 API的gRPC Discovery Server。
  • 將Discovery Server註冊爲Config Controller和Service Controller的Event Handler,監聽配置和服務變化消息。

img

建立gRPC Server並接收Envoy的鏈接請求

Pilot Server建立了一個gRPC Server,用於監聽和接收來自Envoy的xDS請求。pilot/pkg/proxy/envoy/v2/ads.go 中的 DiscoveryServer.StreamAggregatedResources方法被註冊爲gRPC Server的服務處理方法。

當gRPC Server收到來自Envoy的鏈接時,會調用DiscoveryServer.StreamAggregatedResources方法,在該方法中建立一個XdsConnection對象,並開啓一個goroutine從該connection中接收客戶端的xDS請求並進行處理;若是控制面的配置發生變化,Pilot也會經過該connection把配置變化主動推送到Envoy端。

img

配置變化後向Envoy推送更新

這是Pilot中最複雜的一個業務流程,主要是由於代碼中採用了多個channel和queue對變化消息進行合併和轉發。該業務流程以下:

  1. Config Controller或者Service Controller在配置或服務發生變化時經過回調方法通知Discovery Server,Discovery Server將變化消息放入到Push Channel中。
  2. Discovery Server經過一個goroutine從Push Channel中接收變化消息,將一段時間內連續發生的變化消息進行合併。若是超過指定時間沒有新的變化消息,則將合併後的消息加入到一個隊列Push Queue中。
  3. 另外一個goroutine從Push Queue中取出變化消息,生成XdsEvent,發送到每一個客戶端鏈接的Push Channel中。
  4. 在DiscoveryServer.StreamAggregatedResources方法中從Push Channel中取出XdsEvent,而後根據上下文生成符合xDS接口規範的DiscoveryResponse,經過gRPC推送給Envoy端。(gRPC會爲每一個client鏈接單獨分配一個goroutine來進行處理,所以不一樣客戶端鏈接的StreamAggregatedResources處理方法是在不一樣goroutine中處理的)

img

響應Envoy主動發起的xDS請求

Pilot和Envoy之間創建的是一個雙向的Streaming gRPC服務調用,所以Pilot能夠在配置變化時向Envoy推送,Envoy也能夠主動發起xDS調用請求獲取配置。Envoy主動發起xDS請求的流程以下:

  1. Envoy經過建立好的gRPC鏈接發送一個DiscoveryRequest
  2. Discovery Server經過一個goroutine從XdsConnection中接收來自Envoy的DiscoveryRequest,並將請求發送到ReqChannel中
  3. Discovery Server的另外一個goroutine從ReqChannel中接收DiscoveryRequest,根據上下文生成符合xDS接口規範的DiscoveryResponse,而後返回給Envoy。

img

Discovery Server業務處理關鍵代碼片斷

下面是Discovery Server的關鍵代碼片斷和對應的業務邏輯註解,爲方便閱讀,代碼中只保留了邏輯主幹,去掉了一些不重要的細節。

處理xDS請求和推送的關鍵代碼

該部分關鍵代碼位於 istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go 文件的StreamAggregatedResources 方法中。StreamAggregatedResources方法被註冊爲gRPC Server的handler,對於每個客戶端鏈接,gRPC Server會啓動一個goroutine來進行處理。

代碼中主要包含如下業務邏輯:

  • 從gRPC鏈接中接收來自Envoy的xDS 請求,並放到一個channel reqChannel中。
  • 從reqChannel中接收xDS請求,根據xDS請求的類型構造響應併發送給Envoy。
  • 從connection的pushChannel中接收Service或者Config變化後的通知,構造xDS響應消息,將變化內容推送到Envoy端。
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
        
    ......

    //建立一個goroutine來接收來自Envoy的xDS請求,並將請求放到reqChannel中
    con := newXdsConnection(peerAddr, stream)
    reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
    go receiveThread(con, reqChannel, &receiveError)

     ......
    
    for {
        select{
        //從reqChannel接收Envoy端主動發起的xDS請求
        case discReq, ok := <-reqChannel:        
            //根據請求的類型構造相應的xDS Response併發送到Envoy端
            switch discReq.TypeUrl {
            case ClusterType:
                err := s.pushCds(con, s.globalPushContext(), versionInfo())
            case ListenerType:
                err := s.pushLds(con, s.globalPushContext(), versionInfo())
            case RouteType:
                err := s.pushRoute(con, s.globalPushContext(), versionInfo())
            case EndpointType:
                err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil)
            }

        //從PushChannel接收Service或者Config變化後的通知
        case pushEv := <-con.pushChannel:
            //將變化內容推送到Envoy端
            err := s.pushConnection(con, pushEv)   
        }            
    }
}複製代碼

處理服務和配置變化的關鍵代碼

該部分關鍵代碼位於 istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go 文件中,用於監聽服務和配置變化消息,並將變化消息合併後經過Channel發送給前面提到的 StreamAggregatedResources 方法進行處理。

ConfigUpdate是處理服務和配置變化的回調函數,service controller和config controller在發生變化時會調用該方法通知Discovery Server。

func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) {
  inboundConfigUpdates.Increment()

  //服務或配置變化後,將一個PushRequest發送到pushChannel中
  s.pushChannel <- req
}複製代碼

在debounce方法中將連續發生的PushRequest進行合併,若是一段時間內沒有收到新的PushRequest,再發起推送;以免因爲服務和配置頻繁變化給系統帶來較大壓力。

// The debounce helper function is implemented to enable mocking
func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) {

    ......

    pushWorker := func() {
        eventDelay := time.Since(startDebounce)
        quietTime := time.Since(lastConfigUpdateTime)

        // it has been too long or quiet enough
        //一段時間內沒有收到新的PushRequest,再發起推送
        if eventDelay >= DebounceMax || quietTime >= DebounceAfter {
            if req != nil {
                pushCounter++
                adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v",
                pushCounter, debouncedEvents,
                quietTime, eventDelay, req.Full)

                free = false
                go push(req)
                req = nil
                debouncedEvents = 0
            }
        } else {
           timeChan = time.After(DebounceAfter - quietTime)
        }
    }
    for {
        select {
        ......

        case r := <-ch:
            lastConfigUpdateTime = time.Now()
            if debouncedEvents == 0 {
                timeChan = time.After(DebounceAfter)
                startDebounce = lastConfigUpdateTime
            }
            debouncedEvents++
            //合併連續發生的多個PushRequest
            req = req.Merge(r)
        case <-timeChan:
           if free {
               pushWorker()
            }
        case <-stopCh:
            return
    }
  }
}複製代碼

完整的業務流程

img

參考閱讀

關於 ServiceMesher 社區

ServiceMesher 社區是由一羣擁有相同價值觀和理念的志願者們共同發起,於 2018 年 4 月正式成立。

社區關注領域有:容器、微服務、Service Mesh、Serverless,擁抱開源和雲原生,致力於推進 Service Mesh 在中國的蓬勃發展。

社區官網:https://www.servicemesher.com

ServiceMesher 社區

相關文章
相關標籤/搜索