常常有人問:「瀏覽器輸入url後發生了什麼」,這個問題看似簡單,可是卻能全面的考察一我的對系統的瞭解程度。若是把這個問題引伸到k8s領域,就能夠問出「K8sClient提交Yaml後發生了什麼」這樣相似的問題。一樣的,要回答這個問題,須要咱們對k8s的設計有一個比較完整的瞭解。本文就試圖回答這個問題,帶着你們體驗一下一份Yaml的K8S之旅。html
k8s能夠說是鬆耦合設計的一個典型,以下圖所示,各個組件都和ApiServer進行通訊,只有ApiServer能夠寫Etcd,這樣作的的好處有許多:各個組件解耦了,能夠獨立發展;各組件也能夠分佈在不一樣的機器上,避免單機繁忙,甚至對某些關鍵組件能夠多實例部署,加強性能和可用性;因爲數據庫Etcd維護了集羣的核心元數據和狀態,由ApiServer統一驗證鑑權更合理;等等。node
瞭解了k8s的總體設計後,下面咱們以Service這個應用最關心的資源的Yaml文件提交後的效果進行分析,其它資源也是大同小異。git
系統的總體處理流程圖大體以下,首先在k8s啓動後,各個組件包括CoreDNS、各個Controller都會鏈接到ApiServer(list/watch),在client如kubectl提交yaml後,API server會把相關資源存儲到Ectd中並通知各個組件,各個組件而後各自進行本身的相關操做,最後產生了一個能夠對外提供服務的service。github
瞭解了總體流程後,咱們來對流程中涉及的各個組件進行細緻的分析。數據庫
ApiServer 會啓動一個httpsserver,並把相關端點註冊到具體的storage,其中以「api」開頭的屬於legacy,其註冊的部分常見端點有:後端
restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "services": serviceRest, "endpoints": endpointsStorage, "nodes": nodeStorage.Node, ...... }
首先咱們來看看通用的storageapi
type Store struct { // NewFunc returns a new instance of the type this registry returns for a // GET of a single object, e.g.: // // curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-object NewFunc func() runtime.Object // NewListFunc returns a new list of the type this registry; it is the // type returned when the resource is listed, e.g.: // // curl GET /apis/group/version/namespaces/my-ns/myresource NewListFunc func() runtime.Object } // 資源建立方法 func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { // 校驗資源合法性 if createValidation != nil { if err := createValidation(ctx, obj.DeepCopyObject()); err != nil { return nil, err } } name, err := e.ObjectNameFunc(obj) key, err := e.KeyFunc(ctx, name) qualifiedResource := e.qualifiedResourceFromContext(ctx) ttl, err := e.calculateTTL(obj, 0, false) out := e.NewFunc() // 最終調用的要麼是 dryrun,要麼是etcd3 // https://github.com/kubernetes/kubernetes/blob/7f7378eddfe7a817c47fc75c220a729f4b78b913/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L144 if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil { err = storeerr.InterpretCreateError(err, qualifiedResource, name) err = rest.CheckGeneratedNameError(e.CreateStrategy, err, obj) // 資源已存在則能夠原諒 if !apierrors.IsAlreadyExists(err) { return nil, err } // 建立後無法得到則不能原諒 if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil { return nil, err } } // 切面 if e.AfterCreate != nil { if err := e.AfterCreate(out); err != nil { return nil, err } } return out, nil }
從restStorageMap可見處理邏輯是Service和Endpoints對象都要被存入etcd,其中Service還有一些特殊的邏輯(如分配IP,健康檢查等),而Endpoint沒啥額外的邏輯,直接使用通用的storage便可。瀏覽器
// service 建立邏輯 func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { service := obj.(*api.Service) // 切面 if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil { return nil, err } // 是否須要釋放IP,相似於事務,若是分配出錯的話,把ip還給資源池 releaseServiceIP := false defer func() { if releaseServiceIP { if helper.IsServiceIPSet(service) { allocator := rs.getAllocatorByClusterIP(service) allocator.Release(net.ParseIP(service.Spec.ClusterIP)) } } }() var err error if !dryrun.IsDryRun(options.DryRun) { // 對於不是ExternalName類型的service才分配IP if service.Spec.Type != api.ServiceTypeExternalName { // 這個 分配器 實際上基於etcd allocator := rs.getAllocatorBySpec(service) if releaseServiceIP, err = initClusterIP(service, allocator); err != nil { return nil, err } } } // 由 分配器 分配端口 nodePortOp := portallocator.StartOperation(rs.serviceNodePorts, dryrun.IsDryRun(options.DryRun)) // 一樣須要判斷是否須要回收 defer nodePortOp.Finish() // 對於 NodePort和LoadBalance類型的service都要分配端口 if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer { if err := initNodePorts(service, nodePortOp); err != nil { return nil, err } } // 對於須要健康檢查的service分配專門的端口 // 至於loadbalance類型且ExternalTrafficPolicy爲Local的才須要分配 if apiservice.NeedsHealthCheck(service) { if err := allocateHealthCheckNodePort(service, nodePortOp); err != nil { return nil, errors.NewInternalError(err) } } // 實際建立 out, err := rs.services.Create(ctx, service, createValidation, options) if err != nil { err = rest.CheckGeneratedNameError(registry.Strategy, err, service) } } // 基於 etcd 的 ip 分配器 serviceClusterIPAllocator, err := ipallocator.NewAllocatorCIDRRange(&serviceClusterIPRange, func(max int, rangeSpec string) (allocator.Interface, error) { mem := allocator.NewAllocationMap(max, rangeSpec) // TODO etcdallocator package to return a storage interface via the storageFactory etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig) if err != nil { return nil, err } serviceClusterIPRegistry = etcd return etcd, nil }) if err != nil { return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err) } restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
對於Pod對象來講,除了存儲外,還要將pod綁定到特定的機器上去:緩存
func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podID, oldMachine, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey, err := r.store.KeyFunc(ctx, podID) if err != nil { return nil, err } err = r.store.Storage.GuaranteedUpdate(ctx, podKey, &api.Pod{}, false, nil, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) // 分配到機器 pod.Spec.NodeName = machine // 設置註解 if pod.Annotations == nil { pod.Annotations = make(map[string]string) } for k, v := range annotations { pod.Annotations[k] = v } // condition,代表已被調度 podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{ Type: api.PodScheduled, Status: api.ConditionTrue, }) finalPod = pod return pod, nil }), dryRun) return finalPod, err }
EndpointController監聽Service和Pod的變化事件,並註冊回調函數,經過Informer實現。同時利用Informer緩存最新的endpoint到本地,可是並不註冊回調事件,由於endpoint基本上是最底層的概念,不須要額外的處理邏輯。app
// 監聽service serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ // 增刪改 AddFunc: e.onServiceUpdate, UpdateFunc: func(old, cur interface{}) { e.onServiceUpdate(cur) }, DeleteFunc: e.onServiceDelete, }) ... // 監聽pod podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, DeleteFunc: e.deletePod, }) // 利用Informer能夠獲取最新的endpoint情況 e.endpointsLister = endpointsInformer.Lister() e.endpointsSynced = endpointsInformer.Informer().HasSynced
收到相關資源增長事件後,把須要處理的service加入隊列
func (e *EndpointController) onServiceUpdate(obj interface{}) { // 得到service key 多是 name 或則 namespace/name key, err := controller.KeyFunc(obj) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } // 更新service的selector _ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector) // 將service加入待處理隊列 e.queue.Add(key) } func (e *EndpointController) addPod(obj interface{}) { pod := obj.(*v1.Pod) // 得到該pod相關的service,這些service的selector包含這個pod services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod) if err != nil { utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return } // 將該pod相關的service加入待處理隊列 for key := range services { e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod) } }
另外一方面,當 EndpointController Run 起來事後,實際上是循環處理隊列中的service,處理內容包括修改Service自己和其對應的Endpoints
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { ... // 能夠啓動多個 goroutine ,來處理endpoint變化 for i := 0; i < workers; i++ { go wait.Until(e.worker, e.workerLoopPeriod, stopCh) } go func() { defer utilruntime.HandleCrash() // 處理無主(沒有對應service)的endpoint,相似垃圾回收, // 固然這個方法只是遍歷service的key並加入隊列,實際處理由syncService完成 e.checkLeftoverEndpoints() }() } // 具體處理方法 func (e *EndpointController) syncService(key string) error { // 得到service service, err := e.serviceLister.Services(namespace).Get(name) if err != nil { // 不是沒找到,返回錯誤 if !errors.IsNotFound(err) { return err } // 沒有這個service,刪掉相應的endpoint。這二者由key關聯 err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) return nil } if service.Spec.Selector == nil { // 沒有selector的service,其endpoint只能是手動建立的,與本Controller無關,直接返回 // https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors return nil } // 得到相應pod pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) // 遍歷這些pod,把合適的pod的ip加入該service的endpoints集合 for _, pod := range pods { // 返回這個pod的端點地址,須要處理v4 v6兩類狀況 ep, err := podToEndpointAddressForService(service, pod) // headless service 能夠不指定端口. if len(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints) } } else { // 針對每一個port映射,生成端點地址 for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] portNum, err := podutil.FindPort(pod, servicePort) epp := endpointPortFromServicePort(servicePort, portNum) subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints) } } } // 檢測service是否真的有變化 // 首先得到最新的端點情況 currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name) if err != nil { // 不存在該endpoint就建立 if errors.IsNotFound(err) { currentEndpoints = &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: service.Name, Labels: service.Labels, }, } } else { return err } } createEndpoints := len(currentEndpoints.ResourceVersion) == 0 // 若是不是新建立的endpoint,則比較是否相同,相同則說明不須要修改 if !createEndpoints && apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) { klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } newEndpoints := currentEndpoints.DeepCopy() newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels // 調用go client ,讓APIservier 建立/更新 endpoint對象 if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{}) } else { // Pre-existing _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{}) } return nil }
ServiceController監聽Service和Node的變化事件,原理與EndpointController一致,都是利用Informer,Informer的事件回調方法主要也是把須要處理的service加入隊列,以及處理node。
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { svc, ok := cur.(*v1.Service) // 將須要分配負載均衡器或者清理的service加入待處理隊列 if ok && (wantsLoadBalancer(svc) || needsCleanup(svc)) { s.enqueueService(cur) } }, UpdateFunc: func(old, cur interface{}) { oldSvc, ok1 := old.(*v1.Service) curSvc, ok2 := cur.(*v1.Service) if ok1 && ok2 && (s.needsUpdate(oldSvc, curSvc) || needsCleanup(curSvc)) { s.enqueueService(cur) } }, }, serviceSyncPeriod, ) nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { s.nodeSyncLoop() }, UpdateFunc: func(old, cur interface{}) { s.nodeSyncLoop() }, DeleteFunc: func(old interface{}) { s.nodeSyncLoop() },
其中處理node的方法nodeSyncLoop,主要工做是對比最新節點和原有節點,如有變化則更新對應的service。
func (s *Controller) nodeSyncLoop() { // 最新且ready的全部節點 // 要全部節點是由於loadbalancer可能須要掛載到全部節點 // 取決於具體策略externalTrafficPolicy,不一樣雲廠商實現大同小異 // https://aws.amazon.com/cn/blogs/opensource/network-load-balancer-support-in-kubernetes-1-9/ // https://help.aliyun.com/document_detail/86531.html#title-cn3-euk-ij6 newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate()) // 節點未變化,原本不須要變更,可是能夠在這裏處理上次處理失敗的service if nodeSlicesEqualForLB(newHosts, s.knownHosts) { s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) return } // 得到全部service s.servicesToUpdate = s.cache.allServices() // 處理service,保留本次處理失敗的service留給下次處理 s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) // 更新本地service s.knownHosts = newHosts } // 處理service,保留本次處理失敗的service func (s *Controller) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { for _, service := range services { func() { if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { servicesToRetry = append(servicesToRetry, service) } }() } return servicesToRetry } func (s *Controller) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { // 只處理 loadbalance 類型的service if !wantsLoadBalancer(service) { return nil } // 由雲廠商實現loadBalancer的分配,好比 aws aliyun等 err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts) if err == nil { return nil } if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil { runtime.HandleError(fmt.Errorf("failed to check if load balancer exists for service %s/%s: %v", service.Namespace, service.Name, err)) } else if !exists { return nil } s.eventRecorder.Eventf(service, v1.EventTypeWarning, "UpdateLoadBalancerFailed", "Error updating load balancer with new hosts %v: %v", nodeNames(hosts), err) return err }
另外一方面,當 ServiceController Run 起來事後,實際上是循環處理隊列中的service和node,主要完成的工做是LoadBalancer類型的service與後端node的映射關係的維護。
func (s *Controller) Run(stopCh <-chan struct{}, workers int) { // 啓動多個協程來處理service for i := 0; i < workers; i++ { go wait.Until(s.worker, time.Second, stopCh) } // 處理節點,也就是說不只有事件觸發,也有主動循環來處理節點變化 go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh) } // 具體處理service的方法 func (s *Controller) syncService(key string) error { // 由key得到命名空間和service name namespace, name, err := cache.SplitMetaNamespaceKey(key) // 最新的service service, err := s.serviceLister.Services(namespace).Get(name) switch { case errors.IsNotFound(err): // 沒找到,說明該刪除這個service了 err = s.processServiceDeletion(key) case err != nil: runtime.HandleError(fmt.Errorf("Unable to retrieve service %v from store: %v", key, err)) default: // 建立或者更新service err = s.processServiceCreateOrUpdate(service, key) } return err }
CoreDNS的 kubernetes 插件配置好並啓動後,以 service 的形式(名字就叫 kube-dns 兼容以前的dns插件名稱)運行在k8s集羣中,DNSController 監聽 namespace、service和pod(可選)、endpoint(可選)的變化,並經過 Informer 機制緩存在本地。
func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { // 獲取配置 config, err := k.getClientConfig() // 依據配置得到client kubeClient, err := kubernetes.NewForConfig(config) if err != nil { return fmt.Errorf("failed to create kubernetes notification controller: %q", err) } k.opts.initPodCache = k.podMode == podModeVerified // controller中監聽各資源 k.APIConn = newdnsController(ctx, kubeClient, k.opts) return err } // 在 controller 中監聽各個資源的變化,並存儲在本地 dns.svcLister, dns.svcController = object.NewIndexerInformer( &cache.ListWatch{ ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &api.Service{}, ) dns.nsLister, dns.nsController = cache.NewInformer( &cache.ListWatch{ ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector), WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector), }, &api.Namespace{}, ) pod和endpoint可選 ...
CoreDNS處理域名查詢經過每個插件的ServeDNS方法完成,在 kubernetes插件 中以下:
// ServeDNS implements the plugin.Handler interface. func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { state := request.Request{W: w, Req: r} // 處理多種請求類型 switch state.QType() { case dns.TypeAXFR, dns.TypeIXFR: k.Transfer(ctx, state) case dns.TypeA: records, err = plugin.A(ctx, &k, zone, state, nil, plugin.Options{}) case dns.TypeAAAA: records, err = plugin.AAAA(ctx, &k, zone, state, nil, plugin.Options{}) case dns.TypeTXT: records, err = plugin.TXT(ctx, &k, zone, state, nil, plugin.Options{}) case dns.TypeCNAME: records, err = plugin.CNAME(ctx, &k, zone, state, plugin.Options{}) case dns.TypePTR: records, err = plugin.PTR(ctx, &k, zone, state, plugin.Options{}) .... default: // Do a fake A lookup, so we can distinguish between NODATA and NXDOMAIN fake := state.NewWithQuestion(state.QName(), dns.TypeA) fake.Zone = state.Zone _, err = plugin.A(ctx, &k, zone, fake, nil, plugin.Options{}) } return dns.RcodeSuccess, nil }
該方法處理具體請求時,是經過informer查找存在本地的service或者pod的endpoints信息完成域名和ip的映射。
func (k *Kubernetes) Services(ctx context.Context, state request.Request, exact bool, opt plugin.Options) (svcs []msg.Service, err error) { // 特殊dns請求類型直接能夠返回 switch state.QType() { case dns.TypeTXT: return []msg.Service{svc}, nil case dns.TypeNS: return svcs, nil } if isDefaultNS(state.Name(), state.Zone) { return svcs, nil } // 其他類型須要查詢k8s返回記錄 s, e := k.Records(ctx, state, false) internal := []msg.Service{} for _, svc := range s { if t, _ := svc.HostType(); t != dns.TypeCNAME { internal = append(internal, svc) } } return internal, e } // records 方法解析請求,作些驗證,而後查找k8s中的相應記錄 func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) { r, e := parseRequest(state.Name(), state.Zone) ... if r.podOrSvc == Pod { // 處理pod請求 pods, err := k.findPods(r, state.Zone) return pods, err } // 處理service請求 services, err := k.findServices(r, state.Zone) return services, err } // 這個方法處理pod的dns請求 func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) { // 特性關閉,返回空 if k.podMode == podModeDisabled { return nil, errNoItems } podname := r.service // pod的name直接能夠解析ip,區分ipv4 ipv6 if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") { ip = strings.Replace(podname, "-", ".", -1) } else { ip = strings.Replace(podname, "-", ":", -1) } if k.podMode == podModeInsecure { // 不需檢查模式,直接返回記錄 return []msg.Service{{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl}}, err } // 須要檢查的模式,只返回存在的pod的記錄 for _, p := range k.APIConn.PodIndex(ip) { // check for matching ip and namespace if ip == p.PodIP && match(namespace, p.Namespace) { s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl} pods = append(pods, s) } } return pods, err } // 這個方法處理各個類型的service請求,從本地cache中讀取相應記錄 func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) { // 肯定是不是模糊查詢 if wildcard(r.service) || wildcard(r.namespace) { // 返回全部狀態正確的service serviceList = k.APIConn.ServiceList() endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EndpointsList() } } else { // 只返回 name + "." + namespace 類型的service idx := object.ServiceKey(r.service, r.namespace) serviceList = k.APIConn.SvcIndex(idx) endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EpIndex(idx) } } zonePath := msg.Path(zone, coredns) for _, svc := range serviceList { // service name和 namespace都要匹配才行 if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) { continue } // 若是是模糊查詢,須要namespace被暴露才行 if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) { continue } // 處理endpoint或者headless service,這兩類請求都需遍歷endpoint if svc.ClusterIP == api.ClusterIPNone || r.endpoint != "" { for _, ep := range endpointsList { if ep.Name != svc.Name || ep.Namespace != svc.Namespace { continue } // 遍歷endpoint的每個ip和port for _, eps := range ep.Subsets { for _, addr := range eps.Addresses { for _, p := range eps.Ports { s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl} s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, endpointHostname(addr, k.endpointNameMode)}, "/") err = nil services = append(services, s) } } } } continue } // 處理 External service if svc.Type == api.ServiceTypeExternalName { // 如 cluster.local/svc/namespace/service s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.ExternalName, TTL: k.ttl} if t, _ := s.HostType(); t == dns.TypeCNAME { // 只有 cname 記錄 s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/") services = append(services, s) err = nil } continue } // 處理 ClusterIP service for _, p := range svc.Ports { if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) { continue } s := msg.Service{Host: svc.ClusterIP, Port: int(p.Port), TTL: k.ttl} s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/") services = append(services, s) } } return services, err }
KubeProxy的主要工做是監聽Service和Endpoints等的變化,並把路由規則(如何根據service的域名或者ip得到後端真實pod ip)刷新到節點上。
這樣,每一個pod在訪問service時,就會向CoreDNS要到對應的service ip 或者直接是每一個backend pod的ip(如 headless service),對於前者由本地路由規則將service ip的流量引導至真正的pod ip。咱們的這份yaml也終於成了一個能夠對外提供服務的service。
KubeProxy這部分因爲通過了多個版本的迭代,目前包括三種類型,限於篇幅本文不展開,且待下回分解。