Kubernetes kube-controller-manager 控制中心機制源碼深刻剖析-Kubernetes商業環境實戰

專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。 node

1 管理控制中心機制(Informer聯動)

1.1 包結構

1.2 controller的啓動去脈

  • kube-controller-manager 做爲集羣的管理控制中心,維護集羣中的全部控制器,對維持集羣的穩定和自我修復,實現高可用,副本控制等起關鍵做用。
  • Controller.run。主要解決自定義的資源變化事件回調處理邏輯
  • 在controller-manager的Run函數部分調用了InformerFactory.Start的方法,Start方法初始化各類類型的informer,而且每一個類型起了個informer.Run的goroutine,Informer的做用就是監聽和變化事件扔進隊列中。
  • 三大核心函數:CreateControllerContext,StartControllers,ctx.InformerFactory.Start
k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig) error {
  
        //1:拿到對kube-APIserver中資源的操做句柄,建立控制器上下文 
        ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)

        //2:初始化的全部控制器(包括apiserver的客戶端,informer的回調函數等等)
        if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
            glog.Fatalf("error starting controllers: %v", err)
        }
        
        //3:啓動Informer,並完成Controller最終的啓動以及資源監聽機制
        ctx.InformerFactory.Start(ctx.Stop)
        close(ctx.InformersStarted)
}
複製代碼

2 控制中心機制

2.1 控制中心入口之NewControllerManagerCommand

  • Kube-controller-manager的代碼風格仍然是Cobra命令行框架。經過構造ControllerManagerCommand,而後執行command.Execute()函數。
k8s.io/kubernetes/cmd/kube-controller-manager/controller-manager.go
func main() {
    rand.Seed(time.Now().UTC().UnixNano())

    command := app.NewControllerManagerCommand()

    // TODO: once we switch everything over to Cobra commands, we can go back to calling
    // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    // normalize func and add the go flag set by hand.
    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    // utilflag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}
複製代碼

2.2 控制中心之NewKubeControllerManagerOptions

  • 構建ControllerManagerCommand基本的流程就是先構造NewControllerManagerCommand,添加Flags,執行Run函數。
  • NewKubeControllerManagerOptions是初始化controllerManager的參數,其中主要包括了各類controller的option
func NewControllerManagerCommand() *cobra.Command {

    s, err := options.NewKubeControllerManagerOptions()
    if err != nil {
        glog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-controller-manager",
        Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
the system. In Kubernetes, a controller is a control loop that watches the shared
state of the cluster through the apiserver and makes changes attempting to move the
current state towards the desired state. Examples of controllers that ship with
Kubernetes today are the replication controller, endpoints controller, namespace
controller, and serviceaccounts controller.`,
        Run: func(cmd *cobra.Command, args []string) {
            verflag.PrintAndExitIfRequested()
            utilflag.PrintFlags(cmd.Flags())

            c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
            if err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }

            if err := Run(c.Complete()); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }
    s.AddFlags(cmd.Flags(), KnownControllers(), ControllersDisabledByDefault.List())

    return cmd
}

//初始化controllerManager的參數,其中主要包括了各類controller的option,例如DeploymentControllerOptions
//NewKubeControllerManagerOptions creates a new KubeControllerManagerOptions with a default config.
func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
    componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
    if err != nil {
        return nil, err
    }

    s := KubeControllerManagerOptions{
        Generic:         cmoptions.NewGenericControllerManagerConfigurationOptions(componentConfig.Generic),
        KubeCloudShared: cmoptions.NewKubeCloudSharedOptions(componentConfig.KubeCloudShared),
        AttachDetachController: &AttachDetachControllerOptions{
            ReconcilerSyncLoopPeriod: componentConfig.AttachDetachController.ReconcilerSyncLoopPeriod,
        },
        CSRSigningController: &CSRSigningControllerOptions{
            ClusterSigningCertFile: componentConfig.CSRSigningController.ClusterSigningCertFile,
            ClusterSigningKeyFile:  componentConfig.CSRSigningController.ClusterSigningKeyFile,
            ClusterSigningDuration: componentConfig.CSRSigningController.ClusterSigningDuration,
        },
        DaemonSetController: &DaemonSetControllerOptions{
            ConcurrentDaemonSetSyncs: componentConfig.DaemonSetController.ConcurrentDaemonSetSyncs,
        },
        DeploymentController: &DeploymentControllerOptions{
            ConcurrentDeploymentSyncs:      componentConfig.DeploymentController.ConcurrentDeploymentSyncs,
            DeploymentControllerSyncPeriod: componentConfig.DeploymentController.DeploymentControllerSyncPeriod,
        },
        DeprecatedFlags: &DeprecatedControllerOptions{
            RegisterRetryCount: componentConfig.DeprecatedController.RegisterRetryCount,
        },
        EndpointController: &EndpointControllerOptions{
            ConcurrentEndpointSyncs: componentConfig.EndpointController.ConcurrentEndpointSyncs,
        },
        GarbageCollectorController: &GarbageCollectorControllerOptions{
            ConcurrentGCSyncs:      componentConfig.GarbageCollectorController.ConcurrentGCSyncs,
            EnableGarbageCollector: componentConfig.GarbageCollectorController.EnableGarbageCollector,
        },
        HPAController: &HPAControllerOptions{
            HorizontalPodAutoscalerSyncPeriod:                   componentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod,
            HorizontalPodAutoscalerUpscaleForbiddenWindow:       componentConfig.HPAController.HorizontalPodAutoscalerUpscaleForbiddenWindow,
            HorizontalPodAutoscalerDownscaleForbiddenWindow:     componentConfig.HPAController.HorizontalPodAutoscalerDownscaleForbiddenWindow,
            HorizontalPodAutoscalerDownscaleStabilizationWindow: componentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow,
            HorizontalPodAutoscalerCPUInitializationPeriod:      componentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod,
            HorizontalPodAutoscalerInitialReadinessDelay:        componentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay,
            HorizontalPodAutoscalerTolerance:                    componentConfig.HPAController.HorizontalPodAutoscalerTolerance,
            HorizontalPodAutoscalerUseRESTClients:               componentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients,
        },
        JobController: &JobControllerOptions{
            ConcurrentJobSyncs: componentConfig.JobController.ConcurrentJobSyncs,
        },
        NamespaceController: &NamespaceControllerOptions{
            NamespaceSyncPeriod:      componentConfig.NamespaceController.NamespaceSyncPeriod,
            ConcurrentNamespaceSyncs: componentConfig.NamespaceController.ConcurrentNamespaceSyncs,
        },
        NodeIPAMController: &NodeIPAMControllerOptions{
            NodeCIDRMaskSize: componentConfig.NodeIPAMController.NodeCIDRMaskSize,
        },
        NodeLifecycleController: &NodeLifecycleControllerOptions{
            EnableTaintManager:     componentConfig.NodeLifecycleController.EnableTaintManager,
            NodeMonitorGracePeriod: componentConfig.NodeLifecycleController.NodeMonitorGracePeriod,
            NodeStartupGracePeriod: componentConfig.NodeLifecycleController.NodeStartupGracePeriod,
            PodEvictionTimeout:     componentConfig.NodeLifecycleController.PodEvictionTimeout,
        },
        PersistentVolumeBinderController: &PersistentVolumeBinderControllerOptions{
            PVClaimBinderSyncPeriod: componentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod,
            VolumeConfiguration:     componentConfig.PersistentVolumeBinderController.VolumeConfiguration,
        },
        PodGCController: &PodGCControllerOptions{
            TerminatedPodGCThreshold: componentConfig.PodGCController.TerminatedPodGCThreshold,
        },
        ReplicaSetController: &ReplicaSetControllerOptions{
            ConcurrentRSSyncs: componentConfig.ReplicaSetController.ConcurrentRSSyncs,
        },
        ReplicationController: &ReplicationControllerOptions{
            ConcurrentRCSyncs: componentConfig.ReplicationController.ConcurrentRCSyncs,
        },
        ResourceQuotaController: &ResourceQuotaControllerOptions{
            ResourceQuotaSyncPeriod:      componentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod,
            ConcurrentResourceQuotaSyncs: componentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs,
        },
        SAController: &SAControllerOptions{
            ConcurrentSATokenSyncs: componentConfig.SAController.ConcurrentSATokenSyncs,
        },
        ServiceController: &cmoptions.ServiceControllerOptions{
            ConcurrentServiceSyncs: componentConfig.ServiceController.ConcurrentServiceSyncs,
        },
        TTLAfterFinishedController: &TTLAfterFinishedControllerOptions{
            ConcurrentTTLSyncs: componentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs,
        },
        SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
        InsecureServing: (&apiserveroptions.DeprecatedInsecureServingOptions{
            BindAddress: net.ParseIP(componentConfig.Generic.Address),
            BindPort:    int(componentConfig.Generic.Port),
            BindNetwork: "tcp",
        }).WithLoopback(),
        Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
        Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(),
    }

    s.Authentication.RemoteKubeConfigFileOptional = true
    s.Authorization.RemoteKubeConfigFileOptional = true
    s.Authorization.AlwaysAllowPaths = []string{"/healthz"}

    s.SecureServing.ServerCert.CertDirectory = "/var/run/kubernetes"
    s.SecureServing.ServerCert.PairName = "kube-controller-manager"
    s.SecureServing.BindPort = ports.KubeControllerManagerPort

    gcIgnoredResources := make([]kubectrlmgrconfig.GroupResource, 0, len(garbagecollector.DefaultIgnoredResources()))
    for r := range garbagecollector.DefaultIgnoredResources() {
        gcIgnoredResources = append(gcIgnoredResources, kubectrlmgrconfig.GroupResource{Group: r.Group, Resource: r.Resource})
    }

    s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources

    return &s, nil
}

複製代碼

2.3 控制中心之command.Execute()

  • 加載全部控制器,並將對應參數注入到控制器中
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())

// KnownControllers returns all known controllers's name func KnownControllers() []string { ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops)) // add "special" controllers that aren't initialized normally.  These controllers cannot be initialized
	// using a normal function.  The only known special case is the SA token controller which *must* be started
	// first to ensure that the SA tokens for future controllers will exist.  Think very carefully before adding
	// to this list.
	ret.Insert(
		saTokenControllerName,
	)

	return ret.List()
}

//k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
KnownControllers()中的NewControllerInitializers初始化全部的控制器

c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    controllers["nodeipam"] = startNodeIpamController
    if loopMode == IncludeCloudLoops {
        controllers["service"] = startServiceController
        controllers["route"] = startRouteController
        // TODO: volume controller into the IncludeCloudLoops only set.
        // TODO: Separate cluster in cloud check from node lifecycle controller.
    }
    controllers["nodelifecycle"] = startNodeLifecycleController
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController
    return controllers
}
複製代碼
  • 啓動controller-manager的http服務和對應處理器,包括安全和非安全:BuildHandlerChain ,構造run的執行體。
k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig) error {
  
        //1:拿到對kube-APIserver中資源的操做句柄,建立控制器上下文 
        ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)

        //2:初始化的全部控制器(包括apiserver的客戶端,informer的回調函數等等)
        if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
            glog.Fatalf("error starting controllers: %v", err)
        }
        
        //3:啓動Informer,並完成Controller最終的啓動以及資源監聽機制
        ctx.InformerFactory.Start(ctx.Stop)
        close(ctx.InformersStarted)
}
複製代碼

2.4 建立控制器上下文

  • 拿到對kube-APIserver中資源的操做句柄
  • 確認Kube-APIServer的健康(最多等待10s),而後拿獲取鏈接
  • 建立控制器上下文
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
    //拿到對APIServer資源的操做句柄
    versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
    sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

    // If apiserver is not running we should wait for some time and fail only then. This is particularly
    // important when we start apiserver and controller manager at the same time.
    //gaogao note : 10s內檢查APIserver服務是否可用
    if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
        return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
    }

    // Use a discovery client capable of being refreshed.
    discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
    //note:  DiscoveryClient = discoveryClient.Discovery()
    cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
    restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
    go wait.Until(func() {
        restMapper.Reset()
    }, 30*time.Second, stop)

    availableResources, err := GetAvailableResources(rootClientBuilder)
    if err != nil {
        return ControllerContext{}, err
    }

    cloud, loopMode, err := createCloudProvider(s.ComponentConfig.CloudProvider.Name, s.ComponentConfig.ExternalCloudVolumePlugin,
        s.ComponentConfig.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
    if err != nil {
        return ControllerContext{}, err
    }

    ctx := ControllerContext{
        ClientBuilder:      clientBuilder,
        InformerFactory:    sharedInformers,
        ComponentConfig:    s.ComponentConfig,
        RESTMapper:         restMapper,
        AvailableResources: availableResources,
        Cloud:              cloud,
        LoopMode:           loopMode,
        Stop:               stop,
        InformersStarted:   make(chan struct{}),
        ResyncPeriod:       ResyncPeriod(s),
    }
    return ctx, nil
}
複製代碼

2.5 初始化的全部控制器(完成註冊Informer註冊到Factory,並啓動全部Controller)

  • 構造controller manager option,並轉化爲Config對象,執行Run函數。
  • 基於Config對象建立ControllerContext,其中包含InformerFactory。
  • 基於ControllerContext運行各類controller,各類controller的定義在NewControllerInitializers中。
  • 執行InformerFactory.Start(ControllerContext已經定義好controller),實際執行了controllerContext.InformerFactory.Start(controllerContext.Stop)
  • 每種controller都會構造自身的結構體並執行對應的Run函數。
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
    ···
    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
            glog.Warningf("%q is disabled", controllerName)
            continue
        }

        time.Sleep(wait.Jitter(ctx.ComponentConfig.GenericComponent.ControllerStartInterval.Duration, ControllerStartJitter))

        glog.V(1).Infof("Starting %q", controllerName)
        
        //note : initFn爲初始化controller是建立的初始化函數
        started, err := initFn(ctx)
        ···
    }

    return nil
}
複製代碼
  • initFn 就是一個大而全的map[string]InitFunc 其中之一的函數,如startEndpointController。startEndpointController用於構建Controller和Informr並運行Controller.run
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc.  This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefulset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["csrsigning"] = startCSRSigningController
	controllers["csrapproving"] = startCSRApprovingController
	controllers["csrcleaner"] = startCSRCleanerController
	controllers["ttl"] = startTTLController
	controllers["bootstrapsigner"] = startBootstrapSignerController
	controllers["tokencleaner"] = startTokenCleanerController
	controllers["nodeipam"] = startNodeIpamController
	controllers["nodelifecycle"] = startNodeLifecycleController
	if loopMode == IncludeCloudLoops {
		controllers["service"] = startServiceController
		controllers["route"] = startRouteController
		controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
		// TODO: volume controller into the IncludeCloudLoops only set.
	}
	controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
	controllers["attachdetach"] = startAttachDetachController
	controllers["persistentvolume-expander"] = startVolumeExpandController
	controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
	controllers["pvc-protection"] = startPVCProtectionController
	controllers["pv-protection"] = startPVProtectionController
	controllers["ttl-after-finished"] = startTTLAfterFinishedController
	controllers["root-ca-cert-publisher"] = startRootCACertPublisher

	return controllers
}
複製代碼
  • Informer註冊是在startDeploymentController方法中實現的,以Deployment爲例,註冊的目的其實就是Deployment Informer經過函數InformerFor註冊進入Factory中,註冊時實現了ListWatcher方法和Listener。而Controller.run實現了事件的回調處理。
  • 下圖標識生成三個Informer
  • 下圖ctx.InformerFactory.Apps().V1().Deployments()點進去,發現三個Informer生成的思路,並註冊進了internalinterfaces.SharedINformerFactory中。
  • 三個Informer進一步經過Informer()註冊進入Factory,並生成相關的EventHandler及ListWacher
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

	if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
		if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
			return nil, err
		}
	}
	dc := &DeploymentController{
		client:        client,
		eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
	dc.rsControl = controller.RealRSControl{
		KubeClient: client,
		Recorder:   dc.eventRecorder,
	}

	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addDeployment,
		UpdateFunc: dc.updateDeployment,
		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
		DeleteFunc: dc.deleteDeployment,
	})
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addReplicaSet,
		UpdateFunc: dc.updateReplicaSet,
		DeleteFunc: dc.deleteReplicaSet,
	})
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: dc.deletePod,
	})

	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

	dc.dLister = dInformer.Lister()
	dc.rsLister = rsInformer.Lister()
	dc.podLister = podInformer.Lister()
	dc.dListerSynced = dInformer.Informer().HasSynced
	dc.rsListerSynced = rsInformer.Informer().HasSynced
	dc.podListerSynced = podInformer.Informer().HasSynced
	return dc, nil
複製代碼
  • 啓動Informer和監聽資源的事件,ListWacher
//SharedInformerFactory是一個informer工廠的接口定義。
// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
    Start(stopCh <-chan struct{})
    InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}
複製代碼

2.6 InformerFactory.Start的調用者controllerContext

  • InformerFactory其實是SharedInformerFactory,具體的實現邏輯在client-go中的informer的實現機制。
controllerContext.InformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)
複製代碼

2.7 整體架構圖一覽

  • 管理控制中心機制源碼調用關係
上圖參考https://www.jianshu.com/p/ac9179007fe2,感謝原做者,本文主要融入新理解,全新解讀。
複製代碼
  • Informer與controller關係一覽

3 再總結

  • Kube-controller-manager對應的cmd部分的調用流程以下:Main-->NewControllerManagerCommand--> Run(c.Complete(), wait.NeverStop)-->StartControllers-->initFn(ctx)-->startDeploymentController/startStatefulSetController-->InformerFactory.Start->sts.NewStatefulSetController.Run/dc.NewDeploymentController.Run-->pkg/controller。
  • 其中CreateControllerContext函數用來建立各種型controller所須要使用的context,NewControllerInitializers初始化了各類類型的controller,其中就包括DeploymentController和StatefulSetController等。

4 最後

  • kube-controller-manager管理控制中心是和InformerFactory.Start遙相呼應的。
  • StartControllers完成了全部Controller的啓動及回調處理邏輯,及Controller對應Informer的註冊及ListWatcher函數。
  • Informer主要就是監聽APIServer資源變化及放進隊列中,Informer啓動是經過controllerContext.InformerFactory.Start(controllerContext.Stop)完成的,可是首先經過InformerFor完成註冊

專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。 bootstrap

相關文章
相關標籤/搜索