原文:istio源碼分析——poilt-discovery服務發現和配置中心node
envoy提供一套通用的數據面接口,經過接口能夠動態實現服務發現和配置。在istio中須要集成k8s,consul等服務發現系統,因此須要一箇中介整理在k8s,consul服務註冊和配置信息,並提供給envoy。
v1版本API和v2版本API有一段歷史,詳情可看 官網博客。在envoy開源之初,使用HTTP+輪詢的方式實現動態服務發現和配置,可是這種方式存在如下缺點:
隨着和Google合做增強,官方使用GRPC + push開發了v2版本API,實現了v1版本的SDS/CDS/RDS/LDS接口,繼續支持
JSON/YAML
數據格式,還增長了ADS(把SDS/CDS/RDS/LDS4個接口合在一下),HDS等接口。
其實pilot-discovery已經算是一個小型的非持久性key/value數據庫了,它把istio的配置信息和服務註冊信息都進行了緩存。這樣可使配置更快的生效。
istio.io/istio/pilot/pkg/model/config.go var ( ...... // RouteRule describes route rules RouteRule = ProtoSchema{ Type: "route-rule", ...... } // VirtualService describes v1alpha3 route rules VirtualService = ProtoSchema{ Type: "virtual-service", ...... } // Gateway describes a gateway (how a proxy is exposed on the network) Gateway = ProtoSchema{ Type: "gateway", ...... } // IngressRule describes ingress rules IngressRule = ProtoSchema{ Type: "ingress-rule", ...... } )
作過新手任務的同窗,應該都很熟悉上面的Type
,就是配置信息裏面的kind
,配置信息保存進k8s後,會被pilot-discovery經過api-server爬過來進行緩存。
apiVersion: networking.istio.io/v1alpha3 kind: VirtualService metadata: name: reviews ......
> istio.io/istio/pilot/pkg/serviceregistry/kube/controller.go #102 func NewController(client kubernetes.Interface, options ControllerOptions) *Controller { ...... out.services = out.createInformer(&v1.Service{}, "Service", options.ResyncPeriod, func(opts meta_v1.ListOptions) (runtime.Object, error) { return client.CoreV1().Services(options.WatchedNamespace).List(opts) }, func(opts meta_v1.ListOptions) (watch.Interface, error) { return client.CoreV1().Services(options.WatchedNamespace).Watch(opts) }) ...... out.nodes = out.createInformer(&v1.Node{}, "Node", options.ResyncPeriod, func(opts meta_v1.ListOptions) (runtime.Object, error) { return client.CoreV1().Nodes().List(opts) }, func(opts meta_v1.ListOptions) (watch.Interface, error) { return client.CoreV1().Nodes().Watch(opts) }) ...... return out }
還有其餘數據不一一列出,從上面能夠看出,創建緩存都是經過List和Watch方式進行(istio的配置數據也同樣),List:第一次初始化數據,Watch:經過輪詢的方式獲取數據並緩存。
https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecs?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/servicerolebindings?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/virtualservices?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecbindings?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/serviceroles?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/serviceentries?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/routerules?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/egressrules?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/authentication.istio.io/v1alpha1/policies?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/httpapispecbindings?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/destinationrules?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/quotaspecs?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/networking.istio.io/v1alpha3/gateways?limit=500&resourceVersion=0 https://{k8s.ip}:443/apis/config.istio.io/v1alpha2/destinationpolicies?limit=500&resourceVersion=0 https://{k8s.ip}:443/api/v1/nodes?limit=500&resourceVersion=0 https://{k8s.ip}:443/api/v1/namespaces/istio-system/configmaps/istio-ingress-controller-leader-istio https://{k8s.ip}:443/api/v1/services?limit=500&resourceVersion=0 https://{k8s.ip}:443/api/v1/endpoints?limit=500&resourceVersion=0 https://{k8s.ip}:443/api/v1/pods?limit=500&resourceVersion=0
在pilot-discovery中把緩存數據分了二大類,一類istio配置信息,另外一類服務註冊信息。這二類又進行了細分,分別爲virtualservices,routerules,nodes,pods等,最後再以
k8s空間/應用名
做爲下標緩存數據。
k8s.io/client-go/tools/cache/store.go #76 func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { return string(key), nil } meta, err := meta.Accessor(obj) if err != nil { return "", fmt.Errorf("object has no meta: %v", err) } if len(meta.GetNamespace()) > 0 { return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil } default/sleep kube-system/grafana istio-system/servicegraph
上面提到創建緩存都是經過List和Watch方式進行,來看看它的實現。
k8s.io/client-go/tools/cache/reflector.go #239 func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ...... list, err := r.listerWatcher.List(options) ...... resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list) ...... //緩存數據 if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } ...... for { ...... w, err := r.listerWatcher.Watch(options) ...... if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { ...... return nil } } }
List能夠看作第一次初始化數據,Watch更像是監聽數據的變化狀態:添加,修改和刪除。針對這些狀態對緩存的數據作增、刪、改。
k8s.io/client-go/tools/cache/reflector.go #358 func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { ...... loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): ...... switch event.Type { case watch.Added: err := r.store.Add(event.Object) ...... case watch.Modified: err := r.store.Update(event.Object) ...... case watch.Deleted: ...... err := r.store.Delete(event.Object) ...... default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } ...... } } ...... return nil }
剛剛看到監聽數據的變化是經過for{} 不斷請求k8s的api-server接口,若是不加限制,那就成了DDOS攻擊了,因此pilot-discovery使用了 流量控制。
k8s.io/client-go/rest/config.go const ( DefaultQPS float32 = 5.0 DefaultBurst int = 10 )
這樣理解這個配置吧,若是1秒內訪問次數大於10,那麼在接下來的訪問中一秒最多隻能訪問5次。
k8s.io/client-go/rest/request.go #616 func (r *Request) request(fn func(*http.Request, *http.Response)) error { ...... retries := 0 for { ...... if retries > 0 { ...... //使用令牌桶算法 r.tryThrottle() } resp, err := client.Do(req) ...... done := func() bool { ...... retries++ ...... } }
在golang中使用內存key/value緩存很是簡單,定義變量
map[string]interface{}
,再往裏面放入數據就能夠了。可是map結構爲非協程安全,因此像pilot-discovery這種小型數據庫,同時存在讀和寫,若是不加上鎖,很容易出現爭搶共享資源問題。因此須要加鎖:
thread_safe_store.go
type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error Resync() error }
不論是v1 API仍是v2 API,都是基於基礎緩存的數據,按照envoy的 接口文檔,把數據拼接成envoy想要的數據。
pilot-discovery暴露了
SDS/CDS/RDS/LDS
接口,envoy再使用輪詢的方式,經過這些接口獲取配置信息
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #376 func (ds *DiscoveryService) Register(container *restful.Container) { ws := &restful.WebService{} ws.Produces(restful.MIME_JSON) ...... ws.Route(ws. GET(fmt.Sprintf("/v1/registration/{%s}", ServiceKey)). To(ds.ListEndpoints). Doc("SDS registration"). Param(ws.PathParameter(ServiceKey, "tuple of service name and tag name").DataType("string"))) ...... ws.Route(ws. GET(fmt.Sprintf("/v1/clusters/{%s}/{%s}", ServiceCluster, ServiceNode)). To(ds.ListClusters). Doc("CDS registration"). Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")). Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string"))) ...... ws.Route(ws. GET(fmt.Sprintf("/v1/routes/{%s}/{%s}/{%s}", RouteConfigName, ServiceCluster, ServiceNode)). To(ds.ListRoutes). Doc("RDS registration"). Param(ws.PathParameter(RouteConfigName, "route configuration name").DataType("string")). Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")). Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string"))) ...... ws.Route(ws. GET(fmt.Sprintf("/v1/listeners/{%s}/{%s}", ServiceCluster, ServiceNode)). To(ds.ListListeners). Doc("LDS registration"). Param(ws.PathParameter(ServiceCluster, "client proxy service cluster").DataType("string")). Param(ws.PathParameter(ServiceNode, "client proxy service node").DataType("string"))) ...... container.Add(ws) }
這裏的緩存能夠這樣理解:咱們日常開發中,從數據庫獲取數據,通過邏輯處理,再把最終結果進行緩存,返回給客戶端,下次進來,就從緩存獲取數據。同理v1 API的接口從基礎緩存獲取了數據後,把這些數據拼接成envoy須要的格式數據,再把這些數據緩存,返回給envoy。
其餘幾個接口方式同樣,不一一列出
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #567 > func (ds *DiscoveryService) ListEndpoints(request *restful.Request, response *restful.Response) { ...... key := request.Request.URL.String() out, resourceCount, cached := ds.sdsCache.cachedDiscoveryResponse(key) //沒有緩存 if !cached { /** 邏輯處理 **/ ...... resourceCount = uint32(len(endpoints)) if resourceCount > 0 { //緩存數據 ds.sdsCache.updateCachedDiscoveryResponse(key, resourceCount, out) } } observeResources(methodName, resourceCount) writeResponse(response, out) }
我也是剛剛接觸GRPC的雙向流,我對它的理解是:一個長鏈接,客戶端和服務端能夠相互交互。在這裏的用法是,客戶端envoy打開一個GRPC鏈接,初始時pilot-discovery把數據響應給envoy,接下來,若是有數據變更,pilot-discovery經過GRPC把數據推給envoy。
聚合接口就是把
SDS/CDS/RDS/LDS
的配置數據都放在一個接口上。實現有點長,縮減只剩一個接口,但方式是同樣的。
istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go #237 func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { ...... var receiveError error reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) go receiveThread(con, reqChannel, &receiveError) for { // Block until either a request is received or the ticker ticks select { case discReq, ok = <-reqChannel: ...... switch discReq.TypeUrl { case ClusterType: ...... case ListenerType: ...... case RouteType: ...... case EndpointType: ...... //推送數據 err := s.pushEds(con) if err != nil { return err } ...... } ...... //經過監聽事件觸發推送數據 case <-con.pushChannel: ...... if len(con.Clusters) > 0 { err := s.pushEds(con) if err != nil { return err } } ...... } } }
清除二級緩存和觸發推送在這裏其實都是同一個觸發點:就是數據變更的時候。數據的變更應該是無序的,可是在更新配置的時候應該井井有理的進行。因此這裏使用了 任務隊列,讓事件一件一件接着作。
istio.io/istio/pilot/pkg/config/kube/crd/controller.go #133 func (c *controller) createInformer( o runtime.Object, otype string, resyncPeriod time.Duration, lf cache.ListFunc, wf cache.WatchFunc) cacheHandler { ...... informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ...... c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd)) }, ...... }) return cacheHandler{informer: informer, handler: handler} }
當事件被觸發都會執行
handler.Apply
,再執行註冊的方法。
istio.io/istio/pilot/pkg/serviceregistry/kube/queue.go #142 func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error { for _, f := range ch.funcs { if err := f(obj, event); err != nil { return err } } return nil }
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #328 func NewDiscoveryService(ctl model.Controller, configCache model.ConfigStoreCache, environment model.Environment, o DiscoveryServiceOptions) (*DiscoveryService, error) { ...... serviceHandler := func(*model.Service, model.Event) { out.clearCache() } if err := ctl.AppendServiceHandler(serviceHandler); err != nil { return nil, err } instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() } if err := ctl.AppendInstanceHandler(instanceHandler); err != nil { return nil, err } if configCache != nil { ...... configHandler := func(model.Config, model.Event) { out.clearCache() } for _, descriptor := range model.IstioConfigTypes { configCache.RegisterEventHandler(descriptor.Type, configHandler) } } return out, nil }
方法
out.clearCache()
,實現了清二級緩存和推送數據
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #480 func (ds *DiscoveryService) clearCache() { ...... //清二級緩存 ds.sdsCache.clear() ds.cdsCache.clear() ds.rdsCache.clear() ds.ldsCache.clear() if V2ClearCache != nil { //把數據推送到envoy V2ClearCache() } }
在pilot-discovery開放了一個清二級緩存的接口。
istio.io/istio/pilot/pkg/proxy/envoy/v1/discovery.go #436 func (ds *DiscoveryService) Register(container *restful.Container) { ws := &restful.WebService{} ws.Produces(restful.MIME_JSON) ...... ws.Route(ws. POST("/cache_stats_delete"). To(ds.ClearCacheStats). Doc("Clear discovery service cache stats")) container.Add(ws) }
在開發使用golang的過程當中或多或少都接觸過panic。例如引入了一些喜歡用panic的第三包,斷言錯誤等觸發了panic,致使整個服務都掛掉。爲了不這些問題,咱們通常都是使用
recover
來接收panic,但一直以爲本身的處理方式不是很好。因此此次源碼分析特地看了
k8s的go客戶端是如何處理panic問題,畢竟是Google出品。
k8s.io/apimachinery/pkg/util/runtime/runtime.go #47 func HandleCrash(additionalHandlers ...func(interface{})) { if r := recover(); r != nil { //默認會打印 出現panic問題的文件和行數 for _, fn := range PanicHandlers { fn(r) } //留給使用方,出現了panic你還想如何處理 for _, fn := range additionalHandlers { fn(r) } //若是你確認,能夠直接panic if ReallyCrash { // Actually proceed to panic. panic(r) } } }
從上面看出k8s客戶端的處理方式和咱們的想法同樣,不過它的封裝更友好。在k8s的go客戶端中
HandleCrash
,更喜歡和for{}一塊兒使用。
k8s.io/apimachinery/pkg/watch/streamwatcher.go #88 func (sw *StreamWatcher) receive() { ...... defer utilruntime.HandleCrash() for { ...... } } k8s.io/client-go/tools/record/event.go #224 func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { ...... go func() { defer utilruntime.HandleCrash() for { ...... } }() return watcher }
此次的源碼分析中,不僅僅瞭解了pilot-discovery的設計實現,還經過 k8s的go客戶端學習到了延遲隊列,流量控制,協程安全數據庫等相關的實現和應用場景,收穫很多。