本節全部的代碼基於1.13.4版本。node
Controller-manager的啓動主要包括各類controller
的配置與初始化。初始化方法在NewControllerManagerCommand
下,該方法經過建立一個cobra.Command
對象,完成初始化的配置工做。
進入NewControllerManagerCommand
方法,該方法主要對controller-manager管理的全部controller進行初始化參數的配置,代碼以下bootstrap
// NewKubeControllerManagerOptions creates a new KubeControllerManagerOptions with a default config.
func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
if err != nil {
return nil, err
}
// 包含各類Controller的初始化配置
s := KubeControllerManagerOptions{
Generic: cmoptions.NewGenericControllerManagerConfigurationOptions(componentConfig.Generic),
KubeCloudShared: cmoptions.NewKubeCloudSharedOptions(componentConfig.KubeCloudShared),
AttachDetachController: &AttachDetachControllerOptions{
ReconcilerSyncLoopPeriod: componentConfig.AttachDetachController.ReconcilerSyncLoopPeriod,
},
CSRSigningController: &CSRSigningControllerOptions{
ClusterSigningCertFile: componentConfig.CSRSigningController.ClusterSigningCertFile,
ClusterSigningKeyFile: componentConfig.CSRSigningController.ClusterSigningKeyFile,
ClusterSigningDuration: componentConfig.CSRSigningController.ClusterSigningDuration,
},
DaemonSetController: &DaemonSetControllerOptions{
ConcurrentDaemonSetSyncs: componentConfig.DaemonSetController.ConcurrentDaemonSetSyncs,
},
DeploymentController: &DeploymentControllerOptions{
ConcurrentDeploymentSyncs: componentConfig.DeploymentController.ConcurrentDeploymentSyncs,
DeploymentControllerSyncPeriod: componentConfig.DeploymentController.DeploymentControllerSyncPeriod,
},
DeprecatedFlags: &DeprecatedControllerOptions{
RegisterRetryCount: componentConfig.DeprecatedController.RegisterRetryCount,
},
EndpointController: &EndpointControllerOptions{
ConcurrentEndpointSyncs: componentConfig.EndpointController.ConcurrentEndpointSyncs,
},
GarbageCollectorController: &GarbageCollectorControllerOptions{
ConcurrentGCSyncs: componentConfig.GarbageCollectorController.ConcurrentGCSyncs,
EnableGarbageCollector: componentConfig.GarbageCollectorController.EnableGarbageCollector,
},
HPAController: &HPAControllerOptions{
HorizontalPodAutoscalerSyncPeriod: componentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod,
HorizontalPodAutoscalerUpscaleForbiddenWindow: componentConfig.HPAController.HorizontalPodAutoscalerUpscaleForbiddenWindow,
HorizontalPodAutoscalerDownscaleForbiddenWindow: componentConfig.HPAController.HorizontalPodAutoscalerDownscaleForbiddenWindow,
HorizontalPodAutoscalerDownscaleStabilizationWindow: componentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow,
HorizontalPodAutoscalerCPUInitializationPeriod: componentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod,
HorizontalPodAutoscalerInitialReadinessDelay: componentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay,
HorizontalPodAutoscalerTolerance: componentConfig.HPAController.HorizontalPodAutoscalerTolerance,
HorizontalPodAutoscalerUseRESTClients: componentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients,
},
JobController: &JobControllerOptions{
ConcurrentJobSyncs: componentConfig.JobController.ConcurrentJobSyncs,
},
NamespaceController: &NamespaceControllerOptions{
NamespaceSyncPeriod: componentConfig.NamespaceController.NamespaceSyncPeriod,
ConcurrentNamespaceSyncs: componentConfig.NamespaceController.ConcurrentNamespaceSyncs,
},
NodeIPAMController: &NodeIPAMControllerOptions{
NodeCIDRMaskSize: componentConfig.NodeIPAMController.NodeCIDRMaskSize,
},
NodeLifecycleController: &NodeLifecycleControllerOptions{
EnableTaintManager: componentConfig.NodeLifecycleController.EnableTaintManager,
NodeMonitorGracePeriod: componentConfig.NodeLifecycleController.NodeMonitorGracePeriod,
NodeStartupGracePeriod: componentConfig.NodeLifecycleController.NodeStartupGracePeriod,
PodEvictionTimeout: componentConfig.NodeLifecycleController.PodEvictionTimeout,
},
PersistentVolumeBinderController: &PersistentVolumeBinderControllerOptions{
PVClaimBinderSyncPeriod: componentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod,
VolumeConfiguration: componentConfig.PersistentVolumeBinderController.VolumeConfiguration,
},
PodGCController: &PodGCControllerOptions{
TerminatedPodGCThreshold: componentConfig.PodGCController.TerminatedPodGCThreshold,
},
ReplicaSetController: &ReplicaSetControllerOptions{
ConcurrentRSSyncs: componentConfig.ReplicaSetController.ConcurrentRSSyncs,
},
ReplicationController: &ReplicationControllerOptions{
ConcurrentRCSyncs: componentConfig.ReplicationController.ConcurrentRCSyncs,
},
ResourceQuotaController: &ResourceQuotaControllerOptions{
ResourceQuotaSyncPeriod: componentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod,
ConcurrentResourceQuotaSyncs: componentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs,
},
SAController: &SAControllerOptions{
ConcurrentSATokenSyncs: componentConfig.SAController.ConcurrentSATokenSyncs,
},
ServiceController: &cmoptions.ServiceControllerOptions{
ConcurrentServiceSyncs: componentConfig.ServiceController.ConcurrentServiceSyncs,
},
TTLAfterFinishedController: &TTLAfterFinishedControllerOptions{
ConcurrentTTLSyncs: componentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs,
},
SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(),
InsecureServing: (&apiserveroptions.DeprecatedInsecureServingOptions{
BindAddress: net.ParseIP(componentConfig.Generic.Address),
BindPort: int(componentConfig.Generic.Port),
BindNetwork: "tcp",
}).WithLoopback(),
Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),
Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(),
}
s.Authentication.RemoteKubeConfigFileOptional = true
s.Authorization.RemoteKubeConfigFileOptional = true
s.Authorization.AlwaysAllowPaths = []string{"/healthz"}
// Set the PairName but leave certificate directory blank to generate in-memory by default
s.SecureServing.ServerCert.CertDirectory = ""
s.SecureServing.ServerCert.PairName = "kube-controller-manager"
s.SecureServing.BindPort = ports.KubeControllerManagerPort
gcIgnoredResources := make([]kubectrlmgrconfig.GroupResource, 0, len(garbagecollector.DefaultIgnoredResources()))
for r := range garbagecollector.DefaultIgnoredResources() {
gcIgnoredResources = append(gcIgnoredResources, kubectrlmgrconfig.GroupResource{Group: r.Group, Resource: r.Resource})
}
s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources
return &s, nil
}
複製代碼
能夠看到,咱們熟悉的相應的controller都在配置內。
返回到NewControllerManagerCommand
方法,在執行最終的Run
方法以前,須要對初始化參數進行校驗,而且須要對每一個controller配置啓動方法,相應的代碼以下 api
KnownControllers
方法,該方法就是用來配置各類controller的啓動方法,以map類型保存,最終的代碼以下
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
// TODO: volume controller into the IncludeCloudLoops only set.
// TODO: Separate cluster in cloud check from node lifecycle controller.
}
controllers["nodelifecycle"] = startNodeLifecycleController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
return controllers
}
複製代碼
其中各類以start
前綴開頭的均爲相對應的controller的啓動方法,後續分析。
進入最核心的Run
方法,主要過程包括一些server的配置、各類健康檢查以及是否須要配置Leader的選舉(controller-manager的選舉同scheduler一致),最終依次啓動全部的start
開頭的controller,完成啓動過程。
啓動過程比較簡潔,主要就是參數的配置、參數的校驗以及最終的啓動工做,和以前介紹的組件原理基本差很少,對controller-manager的分析主要仍是要對應到每一個具體的controller上。這裏以最經常使用的Deployment爲例。app
熟悉CRD的應該都熟悉,kubernetes controller使用的是informer和workqueue的機制,即經過informer監控通知資源的變化,經過workqueue入隊處理資源。Deployment也不例外。
以前說過,controller全部的入口都是以start
開頭,則Deployment Controller的入口方法爲startDeploymentController
。方法比較簡單,以下tcp
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
複製代碼
主要分紅三個步驟:
一、判斷資源版本是否可用,即deployment應該爲apps/v1;
二、經過NewDeploymentController
建立一個DeploymentController對象;
三、啓動,循環檢測資源的變化,完成最終處理任務。
進入NewDeploymentController
方法,對informer的操做主要有Deployment、ReplicaSet以及Pod,與咱們對Deployment瞭解的資源基本符合,經過各類處理方法處理資源的CRUD等操做。
。 oop
syncDeployment
處理Deployment的檢測,保證Deployment資源一直可以被處理,
enqueue
方法保證資源的及時入隊。
Run
方法進入到最終的
dc.worker
方法,最終調用的爲
dc.syncHandler
方法,也就是配置中的
syncDeployment
方法,經過不停循環檢測執行
syncDeployment
方法,保證資源的及時處理。方法以下:
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (dc *DeploymentController) syncDeployment(key string) error {
startTime := time.Now()
klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
defer func() {
klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
}()
// 經過key獲取資源的namespace和name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 經過namespace和name獲取deployment信息
deployment, err := dc.dLister.Deployments(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(2).Infof("Deployment %v has been deleted", key)
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
}
return nil
}
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
// through adoption/orphaning.
// 經過deployment獲取其對應的ReplicaSet
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
// 經過Deployment和ReplicaSet獲取相應的pod信息
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
// 判斷須要進行什麼操做
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}
// Update deployment conditions with an Unknown condition when pausing/resuming
// a deployment. In this way, we can be sure that we won't timeout when a user
// resumes a Deployment with a set progressDeadlineSeconds.
if err = dc.checkPausedConditions(d); err != nil {
return err
}
if d.Spec.Paused {
return dc.sync(d, rsList)
}
// rollback is not re-entrant in case the underlying replica sets are updated with a new
// revision so we should ensure that we won't proceed to update replica sets until we
// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
if getRollbackTo(d) != nil {
return dc.rollback(d, rsList)
}
scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(d, rsList)
}
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
return dc.rolloutRecreate(d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
複製代碼
步驟以下:
一、經過參數key獲取資源的namespace和name;
二、經過namespace和name獲取deployment信息;
三、經過Deployment獲取其對應的ReplicaSet信息;
四、經過Deployment和ReplicaSet獲取相對應的pod信息;
五、根據配置的參數,判斷須要進行什麼操做,包括sync、rollback、rolloutRecreate以及rolloutRolling操做。
至此,DeploymentController的任務基本完成。咱們自定義實現CRD與Controller的步驟基本與此一致,都是經過監聽須要的資源,經過對資源列表和資源操做進行比對,判斷對資源最終的操做,完成controller任務。ui