istio源碼分析——pilot-discovery服務發現和配置中心

image

原文:istio源碼分析——poilt-discovery服務發現和配置中心node

聲明

  1. 這篇文章須要瞭解istio,k8s,golang,envoy基礎知識
  2. 分析的環境爲k8s,istio版本爲0.8.0

pilot-discovery的做用

envoy提供一套通用的數據面接口,經過接口能夠動態實現服務發現和配置。在istio中須要集成k8s,consul等服務發現系統,因此須要一箇中介整理在k8s,consul服務註冊和配置信息,並提供給envoy。

envoy v1 API 和 v2 API區別

v1版本API和v2版本API有一段歷史,詳情可看 官網博客。在envoy開源之初,使用HTTP+輪詢的方式實現動態服務發現和配置,可是這種方式存在如下缺點:
  1. 因爲接口數據使用弱類型,致使實現一些通用服務比較困難。
  2. 控制面更喜歡使用推送的方式,來減小數據在更新時傳輸的時間。
隨着和Google合做增強,官方使用GRPC + push開發了v2版本API,實現了v1版本的SDS/CDS/RDS/LDS接口,繼續支持 JSON/YAML數據格式,還增長了ADS(把SDS/CDS/RDS/LDS4個接口合在一下),HDS等接口。

創建基礎緩存數據

其實pilot-discovery已經算是一個小型的非持久性key/value數據庫了,它把istio的配置信息和服務註冊信息都進行了緩存。這樣可使配置更快的生效。

緩存了什麼數據

  • istio配置
istio.io/istio/pilot/pkg/model/config.go
var (
    ......
    // RouteRule describes route rules
    RouteRule = ProtoSchema{
      Type:        "route-rule",
      ......
    }
    // VirtualService describes v1alpha3 route rules
    VirtualService = ProtoSchema{
      Type:        "virtual-service",
      ......
    }
    // Gateway describes a gateway (how a proxy is exposed on the network)
    Gateway = ProtoSchema{
      Type:        "gateway",
      ......
    }
    // IngressRule describes ingress rules
    IngressRule = ProtoSchema{
      Type:        "ingress-rule",
      ......
    }
)
作過新手任務的同窗,應該都很熟悉上面的 Type,就是配置信息裏面的 kind,配置信息保存進k8s後,會被pilot-discovery經過api-server爬過來進行緩存。
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: reviews
  ......
  • 從k8s獲取的服務註冊信息
> istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go #102
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
    ......
  out.services = out.createInformer(&v1.Service{}, "Service", options.ResyncPeriod,
    func(opts meta_v1.ListOptions) (runtime.Object, error) {
      return client.CoreV1().Services(options.WatchedNamespace).List(opts)
    },
    func(opts meta_v1.ListOptions) (watch.Interface, error) {
      return client.CoreV1().Services(options.WatchedNamespace).Watch(opts)
    })
    ......
  out.nodes = out.createInformer(&v1.Node{}, "Node", options.ResyncPeriod,
    func(opts meta_v1.ListOptions) (runtime.Object, error) {
      return client.CoreV1().Nodes().List(opts)
    },
    func(opts meta_v1.ListOptions) (watch.Interface, error) {
      return client.CoreV1().Nodes().Watch(opts)
    })
    ......
  return out
}
還有其餘數據不一一列出,從上面能夠看出,創建緩存都是經過List和Watch方式進行(istio的配置數據也同樣),List:第一次初始化數據,Watch:經過輪詢的方式獲取數據並緩存。
  • 轉化成的請求地址
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecs?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/servicerolebindings?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/virtualservices?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecbindings?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/serviceroles?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/serviceentries?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/routerules?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/egressrules?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/authentication.istio.io/v1alpha1/policies?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecbindings?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/destinationrules?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecs?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/gateways?limit=500&resourceVersion=0
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/destinationpolicies?limit=500&resourceVersion=0

https://{k8s.ip}:443/api/v1/nodes?limit=500&resourceVersion=0
https://{k8s.ip}:443/api/v1/namespaces/istio-system/configmaps/istio-ingress-controller-leader-istio
https://{k8s.ip}:443/api/v1/services?limit=500&resourceVersion=0
https://{k8s.ip}:443/api/v1/endpoints?limit=500&resourceVersion=0
https://{k8s.ip}:443/api/v1/pods?limit=500&resourceVersion=0

key的生成

在pilot-discovery中把緩存數據分了二大類,一類istio配置信息,另外一類服務註冊信息。這二類又進行了細分,分別爲virtualservices,routerules,nodes,pods等,最後再以 k8s空間/應用名做爲下標緩存數據。
k8s.io/client-go/tools/cache/store.go #76
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
  if key, ok := obj.(ExplicitKey); ok {
    return string(key), nil
  }
  meta, err := meta.Accessor(obj)
  if err != nil {
    return "", fmt.Errorf("object has no meta: %v", err)
  }
  if len(meta.GetNamespace()) > 0 {
    return meta.GetNamespace() + "/" + meta.GetName(), nil
  }
  return meta.GetName(), nil
}

default/sleep
kube-system/grafana
istio-system/servicegraph

存儲數據

  • List 和 Watch
上面提到創建緩存都是經過List和Watch方式進行,來看看它的實現。
k8s.io/client-go/tools/cache/reflector.go #239
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  ......
  list, err := r.listerWatcher.List(options)
  ......
  resourceVersion = listMetaInterface.GetResourceVersion()
  items, err := meta.ExtractList(list)
  ......
  //緩存數據
  if err := r.syncWith(items, resourceVersion); err != nil {
    return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
  }
  ......
  for {
    ......
    w, err := r.listerWatcher.Watch(options)
    ......
    if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
      ......
      return nil
    }
  }
}
  • 如何更新緩存
List能夠看作第一次初始化數據,Watch更像是監聽數據的變化狀態:添加,修改和刪除。針對這些狀態對緩存的數據作增、刪、改。
k8s.io/client-go/tools/cache/reflector.go #358
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
......
loop:
  for {
    select {
    case <-stopCh:
      return errorStopRequested
    case err := <-errc:
      return err
    case event, ok := <-w.ResultChan():
      ......
      switch event.Type {
      case watch.Added:
        err := r.store.Add(event.Object)
        ......
      case watch.Modified:
        err := r.store.Update(event.Object)
        ......
      case watch.Deleted:
      ......
        err := r.store.Delete(event.Object)
        ......
      default:
        utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
      }
      ......
    }
  }
  ......
  return nil
}
  • 限速訪問 --令牌桶算法
剛剛看到監聽數據的變化是經過for{} 不斷請求k8s的api-server接口,若是不加限制,那就成了DDOS攻擊了,因此pilot-discovery使用了 流量控制
k8s.io/client-go/rest/config.go
const (
  DefaultQPS   float32 = 5.0
  DefaultBurst int     = 10
)
這樣理解這個配置吧,若是1秒內訪問次數大於10,那麼在接下來的訪問中一秒最多隻能訪問5次。
k8s.io/client-go/rest/request.go #616
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
  ......
  retries := 0
  for {
    ......
    if retries > 0 {
      ......
      //使用令牌桶算法
      r.tryThrottle()
    }
    resp, err := client.Do(req)
    ......
    done := func() bool {
      ......
      retries++
      ......
  }
}
  • 協程安全map
在golang中使用內存key/value緩存很是簡單,定義變量 map[string]interface{},再往裏面放入數據就能夠了。可是map結構爲非協程安全,因此像pilot-discovery這種小型數據庫,同時存在讀和寫,若是不加上鎖,很容易出現爭搶共享資源問題。因此須要加鎖: thread_safe_store.go
type ThreadSafeStore interface {
  Add(key string, obj interface{})
  Update(key string, obj interface{})
  Delete(key string)
  Get(key string) (item interface{}, exists bool)
  List() []interface{}
  ListKeys() []string
  Replace(map[string]interface{}, string)
  Index(indexName string, obj interface{}) ([]interface{}, error)
  IndexKeys(indexName, indexKey string) ([]string, error)
  ListIndexFuncValues(name string) []string
  ByIndex(indexName, indexKey string) ([]interface{}, error)
  GetIndexers() Indexers

  // AddIndexers adds more indexers to this store.  If you call this after you already have data
  // in the store, the results are undefined.
  AddIndexers(newIndexers Indexers) error
  Resync() error
}

提供接口

不論是v1 API仍是v2 API,都是基於基礎緩存的數據,按照envoy的 接口文檔,把數據拼接成envoy想要的數據。

暴露v1 API RESTFUL

  • 暴露的接口
pilot-discovery暴露了 SDS/CDS/RDS/LDS接口,envoy再使用輪詢的方式,經過這些接口獲取配置信息
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #376
func (ds *DiscoveryService) Register(container *restful.Container) {
  ws := &restful.WebService{}
  ws.Produces(restful.MIME_JSON)
  ......
  ws.Route(ws.
    GET(fmt.Sprintf("/v1/registration/{%s}", ServiceKey)).
    To(ds.ListEndpoints).
    Doc("SDS registration").
    Param(ws.PathParameter(ServiceKey, "tuple of service name and tag name").DataType("string")))
  ......
  ws.Route(ws.
    GET(fmt.Sprintf("/v1/clusters/{%s}/{%s}", ServiceCluster, ServiceNode)).
    To(ds.ListClusters).
    Doc("CDS registration").
    Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
    Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
  ......
  ws.Route(ws.
    GET(fmt.Sprintf("/v1/routes/{%s}/{%s}/{%s}", RouteConfigName, ServiceCluster, ServiceNode)).
    To(ds.ListRoutes).
    Doc("RDS registration").
    Param(ws.PathParameter(RouteConfigName, "route configuration name").DataType("string")).
    Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
    Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
  ......
  ws.Route(ws.
    GET(fmt.Sprintf("/v1/listeners/{%s}/{%s}", ServiceCluster, ServiceNode)).
    To(ds.ListListeners).
    Doc("LDS registration").
    Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")).
    Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string")))
  ......
  container.Add(ws)
}
  • 創建二級緩存
這裏的緩存能夠這樣理解:咱們日常開發中,從數據庫獲取數據,通過邏輯處理,再把最終結果進行緩存,返回給客戶端,下次進來,就從緩存獲取數據。同理v1 API的接口從基礎緩存獲取了數據後,把這些數據拼接成envoy須要的格式數據,再把這些數據緩存,返回給envoy。
  1. ListEndpoints(EDS)
其餘幾個接口方式同樣,不一一列出
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #567
> func (ds *DiscoveryService) ListEndpoints(request *restful.Request, response *restful.Response) {
  ......
  key := request.Request.URL.String()
  out, resourceCount, cached := ds.sdsCache.cachedDiscoveryResponse(key)
  //沒有緩存
  if !cached {
    /**
    邏輯處理
    **/
    ......
    resourceCount = uint32(len(endpoints))
    if resourceCount > 0 {
      //緩存數據
      ds.sdsCache.updateCachedDiscoveryResponse(key, resourceCount, out)
    }
  }
  observeResources(methodName, resourceCount)
  writeResponse(response, out)
}

暴露v2 API GRPC

我也是剛剛接觸GRPC的雙向流,我對它的理解是:一個長鏈接,客戶端和服務端能夠相互交互。在這裏的用法是,客戶端envoy打開一個GRPC鏈接,初始時pilot-discovery把數據響應給envoy,接下來,若是有數據變更,pilot-discovery經過GRPC把數據推給envoy。
  • ADS聚合接口
聚合接口就是把 SDS/CDS/RDS/LDS的配置數據都放在一個接口上。實現有點長,縮減只剩一個接口,但方式是同樣的。
istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go #237
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
  ......
  var receiveError error
  reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
  go receiveThread(con, reqChannel, &receiveError)

  for {
    // Block until either a request is received or the ticker ticks
    select {
    case discReq, ok = <-reqChannel:
      ......
      switch discReq.TypeUrl {
      case ClusterType:
      ......
      case ListenerType:
      ......
      case RouteType:
      ......
      case EndpointType:
      ......
        //推送數據
        err := s.pushEds(con)
        if err != nil {
          return err
        }
        ......
      }
    ......
    //經過監聽事件觸發推送數據
    case <-con.pushChannel:
      ......
      if len(con.Clusters) > 0 {
        err := s.pushEds(con)
        if err != nil {
          return err
        }
      }
      ......
    }
  }
}

清二級緩存和觸發推送

  • 主動觸發
清除二級緩存和觸發推送在這裏其實都是同一個觸發點:就是數據變更的時候。數據的變更應該是無序的,可是在更新配置的時候應該井井有理的進行。因此這裏使用了 任務隊列,讓事件一件一件接着作。
  1. 初始化List和Watch,註冊Add,Update,Delete事件。
istio.io/istio/pilot/pkg/config/kube/crd/controller.go #133
func (c *controller) createInformer(
  o runtime.Object,
  otype string,
  resyncPeriod time.Duration,
  lf cache.ListFunc,
  wf cache.WatchFunc) cacheHandler {
  ......
  informer.AddEventHandler(
    cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
        ......
        c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd))
      },
      ......
    })
  return cacheHandler{informer: informer, handler: handler}
}
當事件被觸發都會執行 handler.Apply,再執行註冊的方法。
istio.io/istio/pilot/pkg/serviceregistry/kube/queue.go #142
func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error {
  for _, f := range ch.funcs {
    if err := f(obj, event); err != nil {
      return err
    }
  }
  return nil
}
  1. 註冊方法
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #328
func NewDiscoveryService(ctl model.Controller, configCache model.ConfigStoreCache,
  environment model.Environment, o DiscoveryServiceOptions) (*DiscoveryService, error) {
  ......
  serviceHandler := func(*model.Service, model.Event) { out.clearCache() }
  if err := ctl.AppendServiceHandler(serviceHandler); err != nil {
    return nil, err
  }
  instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() }
  if err := ctl.AppendInstanceHandler(instanceHandler); err != nil {
    return nil, err
  }

  if configCache != nil {
    ......
    configHandler := func(model.Config, model.Event) { out.clearCache() }
    for _, descriptor := range model.IstioConfigTypes {
      configCache.RegisterEventHandler(descriptor.Type, configHandler)
    }
  }

  return out, nil
}
方法 out.clearCache(),實現了清二級緩存和推送數據
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #480
func (ds *DiscoveryService) clearCache() {
  ......
  //清二級緩存
  ds.sdsCache.clear()
  ds.cdsCache.clear()
  ds.rdsCache.clear()
  ds.ldsCache.clear()
  if V2ClearCache != nil {
    //把數據推送到envoy
    V2ClearCache()
  }
}
  • 手動觸發
在pilot-discovery開放了一個清二級緩存的接口。
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #436
func (ds *DiscoveryService) Register(container *restful.Container) {
  ws := &restful.WebService{}
  ws.Produces(restful.MIME_JSON)
  ......
  ws.Route(ws.
    POST("/cache_stats_delete").
    To(ds.ClearCacheStats).
    Doc("Clear discovery service cache stats"))
  container.Add(ws)
}

小知識

萬惡的panic

在開發使用golang的過程當中或多或少都接觸過panic。例如引入了一些喜歡用panic的第三包,斷言錯誤等觸發了panic,致使整個服務都掛掉。爲了不這些問題,咱們通常都是使用 recover來接收panic,但一直以爲本身的處理方式不是很好。因此此次源碼分析特地看了 k8s的go客戶端是如何處理panic問題,畢竟是Google出品。
k8s.io/apimachinery/pkg/util/runtime/runtime.go #47
func HandleCrash(additionalHandlers ...func(interface{})) {
  if r := recover(); r != nil {
    //默認會打印 出現panic問題的文件和行數
    for _, fn := range PanicHandlers {
      fn(r)
    }
    //留給使用方,出現了panic你還想如何處理
    for _, fn := range additionalHandlers {
      fn(r)
    }
    //若是你確認,能夠直接panic
    if ReallyCrash {
      // Actually proceed to panic.
      panic(r)
    }
  }
}
從上面看出k8s客戶端的處理方式和咱們的想法同樣,不過它的封裝更友好。在k8s的go客戶端中 HandleCrash,更喜歡和for{}一塊兒使用。
k8s.io/apimachinery/pkg/watch/streamwatcher.go #88
func (sw *StreamWatcher) receive() {
  ......
  defer utilruntime.HandleCrash()
  for {
    ......
  }
}

k8s.io/client-go/tools/record/event.go #224
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
  ......
  go func() {
    defer utilruntime.HandleCrash()
    for {
      ......
    }
  }()
  return watcher
}

結語

此次的源碼分析中,不僅僅瞭解了pilot-discovery的設計實現,還經過 k8s的go客戶端學習到了延遲隊列,流量控制,協程安全數據庫等相關的實現和應用場景,收穫很多。
相關文章
相關標籤/搜索