Author: xidianwangtao@gmail.comapi
摘要:最近咱們在寫本身的Kubernetes服務路由組件對接公司自研的負載均衡器,這其中涉及到很是核心的Endpoints相關的邏輯,所以對Endpoints Controller的深刻分析是很是有必要的,好比Pod Label發生變動、孤立Pod、Pod HostName發生變動等狀況下,Endpoints Controller的處理邏輯是否與咱們想要的一致。app
--concurrent-endpoint-syncs
int32 Default: 5 The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load.負載均衡
--leader-elect-resource-lock
endpoints Default: "endpoints" The type of resource object that is used for locking during leader election. Supported options are endpoints (default) and configmaps
.less
啓動兩類go協程:dom
// Run will not return until stopCh is closed. workers determines how many // endpoints will be handled in parallel. func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer e.queue.ShutDown() glog.Infof("Starting endpoint controller") defer glog.Infof("Shutting down endpoint controller") if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) { return } // workers = --concurrent-endpoint-syncs's value (default 5) for i := 0; i < workers; i++ { // workerLoopPeriod = 1s go wait.Until(e.worker, e.workerLoopPeriod, stopCh) } go func() { defer utilruntime.HandleCrash() e.checkLeftoverEndpoints() }() <-stopCh }
checkLeftoverEndpoints負責List全部當前集羣中的endpoints並將它們對應的services添加到queue中,由workers進行syncService同步。oop
這是爲了防止在controller-manager發生重啓時時,用戶刪除了某些Services或者某些Endpoints還沒刪除乾淨,Endpoints Controller沒有處理的狀況下,在Endpoints Controller再次啓動時能經過checkLeftoverEndpoints檢測到那些孤立的endpionts(沒有對應services),將虛構的Services從新加入到隊列進行syncService操做,從而完成這些孤立endpoint的清理工做。源碼分析
上面提到的虛構Services實際上是把Endpoints的Key(namespace/name)做爲Services的Key,所以這就是爲何要求Endpiont和Service的名字要一致的緣由之一。ui
func (e *EndpointController) checkLeftoverEndpoints() { list, err := e.endpointsLister.List(labels.Everything()) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)) return } for _, ep := range list { if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok { // when there are multiple controller-manager instances, // we observe that it will delete leader-election endpoints after 5min // and cause re-election // so skip the delete here // as leader-election only have endpoints without service continue } key, err := keyFunc(ep) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep)) continue } e.queue.Add(key) } }
另外,還須要注意一點,對於kube-controller-manager多實例HA部署時,各個contorller-manager endpoints是沒有對應service的,這種狀況下,咱們不能把虛構的Service加入到隊列觸發這些「理應孤立」的endpoints被清理,所以咱們給這些「理應孤立」的endpoints加上Annotation "control-plane.alpha.kubernetes.io/leader"以作跳過處理。this
Service的Add/Update/Delete Event Handler都是將Service Key加入到Queue中,等待worker進行syncService處理:spa
根據queue中獲得的service key(namespace/name)去indexer中獲取對應的Service Object,若是沒獲取到,則調api刪除同Key(namespace/name)的Endpoints Object進行清理工做,這對應到checkLeftoverEndpoints中描述到的那些孤立endpoints清理工做。
由於Service是經過LabelSelector進行Pod匹配,將匹配的Pods構建對應的Endpoints Subsets加入到Endpoints中,所以這裏會先過濾掉那些沒有LabelSelector的Services。
而後用Service的LabelSelector獲取同namespace下的全部Pods。
檢查service.Spec.PublishNotReadyAddresses是否爲true,或者Service Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"是否爲true(/t/T/True/TRUE/1),若是爲true,則表示tolerate Unready Endpoints,即Unready的Pods信息也會被加入該Service對應的Endpoints中。
注意,Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"在Kubernetes 1.13中將被棄用,後續只使用.Spec.PublishNotReadyAddresses Field。
接下來就是遍歷前面獲取到的Pods,用各個Pod的IP、ContainerPorts、HostName及Service的Port去構建Endpoints的Subsets,注意以下特殊處理:
跳過沒有pod.Status.PodIP爲空的pod;
當tolerate Unready Endpoints爲false時,跳過那些被標記刪除(DeletionTimestamp != nil)的Pods;
對於Headless Service,由於沒有Service Port,所以構建EndpointSubset時對應的Ports內容爲空;
4)當tolerate Unready Endpoints爲true(即便Pod not Ready)或者Pod isReady時,Pod對應的EndpointAddress也會被加入到(Ready)Addresses中。
5)tolerate Unready Endpoints爲false且Pod isNotReady狀況下:
- 當pod.Spec.RestartPolicy爲Never,Pod Status.Phase爲非結束狀態(非Failed/Successed)時,Pod對應的EndpointAddress也會被加入到NotReadyAddresses中。 - 當pod.Spec.RestartPolicy爲OnFailure, Pod Status.Phase爲非Successed時,Pod對應的EndpointAddress也會被加入到NotReadyAddresses中。 - 其餘狀況下,Pod對應的EndpointAddress也會被加入到NotReadyAddresses中。
從indexer中獲取service對應的Endpoints Object(currentEndpoints),若是從indexer中沒有返回對應的Endpoints Object,則構建一個與該Service同名、同Labels的Endpoints對象(newEndpoints)。
若是currentEndpoints的ResourceVersion不爲空,則對比currentEndpoints.Subsets、Labels與前面構建的Subsets、Service.Labels是否DeepEqual,若是是則說明不須要update,流程結束。
不然,就像currentEndpoints DeepCopy給newEndpoints,並用前面構建的Subsets和Services.Labels替換newEndpoints中對應內容。
若是currentEndpoints的ResourceVersion爲空,則調用Create API去建立上一步的newEndpoints Object。若是currentEndpoints的ResourceVersion不爲空,表示已經存在對應的Endpoints,則調用Update API用newEndpoints去更新該Endpoints。
流程結束。
// When a pod is added, figure out what services it will be a member of and // enqueue them. obj must have *v1.Pod type. func (e *EndpointController) addPod(obj interface{}) { pod := obj.(*v1.Pod) services, err := e.getPodServiceMemberships(pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return } for key := range services { e.queue.Add(key) } }
若是newPod.ResourceVersion等於oldPod.ResourceVersion,則跳過,不進行任何update。
檢查新老Pod的DeletionTimestamp、Ready Condition以及由PodIP,Hostname等建構的EndpointAddress是否發生變動,只要其中之一發生變動,podChangedFlag就爲true。
檢查新老Pod Spec的Labels、HostName、Subdomain是否發生變動,只要其中之一發生變動,labelChangedFlag就爲true。
若是podChangedFlag和labelChangedFlag都爲false,則跳過,不作任何update。
經過Services LabeleSelector與Pod Labels進行匹配的方法,將newPod能匹配上的全部Services都找出來(services記錄),若是labelChangedFlag爲true,則根據LabelSelector匹配找出oldPod對應的oldServices:
互相差值進行union集合的含義:
services.Difference(oldServices).Union(oldServices.Difference(services))
// When a pod is deleted, enqueue the services the pod used to be a member of. // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. func (e *EndpointController) deletePod(obj interface{}) { if _, ok := obj.(*v1.Pod); ok { // Enqueue all the services that the pod used to be a member // of. This happens to be exactly the same thing we do when a // pod is added. e.addPod(obj) return } // If we reached here it means the pod was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } pod, ok := tombstone.Obj.(*v1.Pod) if !ok { utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj)) return } glog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name) e.addPod(pod) }
裏面有幾個struct,挺容易混淆的,簡單用圖表示下,方便比對:
經過對Endpoints Controller的源碼分析,咱們瞭解了其中不少細節,好比對Service和Pod事件處理邏輯、對孤立Pod的處理方法、Pod Labels變動帶來的影響等等,這對咱們經過Watch Endpoints去寫本身的Ingress組件對接公司內部的路由組件時是有幫助的。