Controller Manager 在k8s 集羣中扮演着中心管理的角色,它負責Deployment, StatefulSet, ReplicaSet 等資源的建立與管理,能夠說是k8s的核心模塊,下面咱們以概略的形式走讀一下k8s Controller Manager 代碼。node
func NewControllerManagerCommand() *cobra.Command { s, err := options.NewKubeControllerManagerOptions() if err != nil { klog.Fatalf("unable to initialize command options: %v", err) } cmd := &cobra.Command{ Use: "kube-controller-manager", Long: `The Kubernetes controller manager is a daemon that embeds the core control loops shipped with Kubernetes. In applications of robotics and automation, a control loop is a non-terminating loop that regulates the state of the system. In Kubernetes, a controller is a control loop that watches the shared state of the cluster through the apiserver and makes changes attempting to move the current state towards the desired state. Examples of controllers that ship with Kubernetes today are the replication controller, endpoints controller, namespace controller, and serviceaccounts controller.`, Run: func(cmd *cobra.Command, args []string) { verflag.PrintAndExitIfRequested() utilflag.PrintFlags(cmd.Flags()) c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List()) if err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } if err := Run(c.Complete(), wait.NeverStop); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, }
Controller Manager 也是一個命令行,經過一系列flag啓動,具體的各個flag 咱們就很少看,有興趣的能夠去文檔或者flags_opinion.go 文件裏面去過濾一下,咱們直接從Run 函數入手。bootstrap
Kube Controller Manager 既能夠單實例啓動,也能夠多實例啓動。 若是爲了保證 HA 而啓動多個Controller Manager,它就須要選主來保證同一時間只有一個Master 實例。咱們來看一眼Run 函數的啓動流程,這裏會把一些不重要的細節函數略過,只看重點api
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { run := func(ctx context.Context) { rootClientBuilder := controller.SimpleControllerClientBuilder{ ClientConfig: c.Kubeconfig, } controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done()) if err != nil { klog.Fatalf("error building controller context: %v", err) } if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil { klog.Fatalf("error starting controllers: %v", err) } controllerContext.InformerFactory.Start(controllerContext.Stop) close(controllerContext.InformersStarted) select {} } id, err := os.Hostname() if err != nil { return err } // add a uniquifier so that two processes on the same host don't accidentally both become active id = id + "_" + string(uuid.NewUUID()) rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock, "kube-system", "kube-controller-manager", c.LeaderElectionClient.CoreV1(), resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: c.EventRecorder, }) if err != nil { klog.Fatalf("error creating lock: %v", err) } leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, }, WatchDog: electionChecker, Name: "kube-controller-manager", }) panic("unreachable") }
這裏的基本流程以下:數據結構
OnStartedLeading
註冊函數,也就是上面的run 函數來執行操做,若是沒選中,就hang住等待Client-go 選主工具類主要是經過kubeClient 在Configmap
或者Endpoint
選擇一個資源建立,而後哪個goroutine 建立成功了資源,哪個goroutine 得到鎖,固然全部的鎖信息都會存在Configmap
或者Endpoint
裏面。之因此選擇這兩個資源類型,主要是考慮他們被Watch的少,可是如今kube Controller Manager 仍是適用的Endpoint
,後面會逐漸遷移到ConfigMap
,由於Endpoint
會被kube-proxy Ingress Controller
等頻繁Watch,咱們來看一眼集羣內Endpoint
內容app
[root@iZ8vb5qgxqbxakfo1cuvpaZ ~]# kubectl get ep -n kube-system kube-controller-manager -o yaml apiVersion: v1 kind: Endpoints metadata: annotations: control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"iZ8vbccmhgkyfdi8aii1hnZ_d880fea6-1322-11e9-913f-00163e033b49","leaseDurationSeconds":15,"acquireTime":"2019-01-08T08:53:49Z","renewTime":"2019-01-22T11:16:59Z","leaderTransitions":1}' creationTimestamp: 2019-01-08T08:52:56Z name: kube-controller-manager namespace: kube-system resourceVersion: "2978183" selfLink: /api/v1/namespaces/kube-system/endpoints/kube-controller-manager uid: cade1b65-1322-11e9-9931-00163e033b49
能夠看到,這裏面涵蓋了當前Master ID,獲取Master的時間,更新頻率以及下一次更新時間。這一切最終仍是靠ETCD 完成的選主。主要的選主代碼以下ide
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) { switch lockType { case EndpointsResourceLock: return &EndpointsLock{ EndpointsMeta: metav1.ObjectMeta{ Namespace: ns, Name: name, }, Client: client, LockConfig: rlc, }, nil case ConfigMapsResourceLock: return &ConfigMapLock{ ConfigMapMeta: metav1.ObjectMeta{ Namespace: ns, Name: name, }, Client: client, LockConfig: rlc, }, nil default: return nil, fmt.Errorf("Invalid lock-type %s", lockType) } }
選主完畢後,就須要真正啓動controller了,咱們來看一下啓動controller 的代碼函數
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error { // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest // If this fails, just return here and fail since other controllers won't be able to get credentials. if _, _, err := startSATokenController(ctx); err != nil { return err } // Initialize the cloud provider with a reference to the clientBuilder only after token controller // has started in case the cloud provider uses the client builder. if ctx.Cloud != nil { ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop) } for controllerName, initFn := range controllers { if !ctx.IsControllerEnabled(controllerName) { klog.Warningf("%q is disabled", controllerName) continue } time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) klog.V(1).Infof("Starting %q", controllerName) debugHandler, started, err := initFn(ctx) if err != nil { klog.Errorf("Error starting %q", controllerName) return err } if !started { klog.Warningf("Skipping %q", controllerName) continue } if debugHandler != nil && unsecuredMux != nil { basePath := "/debug/controllers/" + controllerName unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler)) unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler)) } klog.Infof("Started %q", controllerName) } return nil }
那麼一共有多少Controller 呢工具
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefulset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["csrsigning"] = startCSRSigningController controllers["csrapproving"] = startCSRApprovingController controllers["csrcleaner"] = startCSRCleanerController controllers["ttl"] = startTTLController controllers["bootstrapsigner"] = startBootstrapSignerController controllers["tokencleaner"] = startTokenCleanerController controllers["nodeipam"] = startNodeIpamController controllers["nodelifecycle"] = startNodeLifecycleController if loopMode == IncludeCloudLoops { controllers["service"] = startServiceController controllers["route"] = startRouteController controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController // TODO: volume controller into the IncludeCloudLoops only set. } controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController controllers["pvc-protection"] = startPVCProtectionController controllers["pv-protection"] = startPVProtectionController controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher return controllers }
答案就在這裏,上面的代碼列出來了當前kube controller manager 全部的controller,既有你們熟悉的Deployment
StatefulSet
也有一些不熟悉的身影。下面咱們以Deployment 爲例看看它到底幹了什麼oop
先來看一眼Deployemnt Controller 啓動函數源碼分析
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] { return nil, false, nil } dc, err := deployment.NewDeploymentController( ctx.InformerFactory.Apps().V1().Deployments(), ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("deployment-controller"), ) if err != nil { return nil, true, fmt.Errorf("error creating Deployment controller: %v", err) } go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop) return nil, true, nil }
看到這裏,若是看過上一篇針對Client-go Informer 文章的確定不陌生,這裏又使用了InformerFactory,並且是好幾個。其實kube Controller Manager 裏面大量使用了Informer,Controller 就是使用 Informer 來通知和觀察全部的資源。能夠看到,這裏Deployment Controller 主要關注Deployment ReplicaSet Pod
這三個資源。
下面來看一下Deployemnt Controller 初始化須要的資源
// NewDeploymentController creates a new DeploymentController. func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}) if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil { return nil, err } } dc := &DeploymentController{ client: client, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"), } dc.rsControl = controller.RealRSControl{ KubeClient: client, Recorder: dc.eventRecorder, } dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, }) dc.syncHandler = dc.syncDeployment dc.enqueueDeployment = dc.enqueue dc.dLister = dInformer.Lister() dc.rsLister = rsInformer.Lister() dc.podLister = podInformer.Lister() dc.dListerSynced = dInformer.Informer().HasSynced dc.rsListerSynced = rsInformer.Informer().HasSynced dc.podListerSynced = podInformer.Informer().HasSynced return dc, nil }
是否是這裏的代碼似曾相識,若是接觸過Client-go Informer 的代碼,能夠看到這裏一模一樣,基本上就是對建立的資源分別觸發對應的Add Update Delete
函數,同時全部的資源經過Lister得到,不須要真正的Query APIServer。
先來看一下針對Deployment 的Handler
func (dc *DeploymentController) addDeployment(obj interface{}) { d := obj.(*apps.Deployment) klog.V(4).Infof("Adding deployment %s", d.Name) dc.enqueueDeployment(d) } func (dc *DeploymentController) updateDeployment(old, cur interface{}) { oldD := old.(*apps.Deployment) curD := cur.(*apps.Deployment) klog.V(4).Infof("Updating deployment %s", oldD.Name) dc.enqueueDeployment(curD) } func (dc *DeploymentController) deleteDeployment(obj interface{}) { d, ok := obj.(*apps.Deployment) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } d, ok = tombstone.Obj.(*apps.Deployment) if !ok { utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj)) return } } klog.V(4).Infof("Deleting deployment %s", d.Name) dc.enqueueDeployment(d) }
不管是Add Update Delete
,處理方法一模一樣,都是一股腦的塞到Client-go 提供的worker Queue裏面。 再來看看ReplicaSet
func (dc *DeploymentController) addReplicaSet(obj interface{}) { rs := obj.(*apps.ReplicaSet) if rs.DeletionTimestamp != nil { // On a restart of the controller manager, it's possible for an object to // show up in a state that is already pending deletion. dc.deleteReplicaSet(rs) return } // If it has a ControllerRef, that's all that matters. if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil { d := dc.resolveControllerRef(rs.Namespace, controllerRef) if d == nil { return } klog.V(4).Infof("ReplicaSet %s added.", rs.Name) dc.enqueueDeployment(d) return } // Otherwise, it's an orphan. Get a list of all matching Deployments and sync // them to see if anyone wants to adopt it. ds := dc.getDeploymentsForReplicaSet(rs) if len(ds) == 0 { return } klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name) for _, d := range ds { dc.enqueueDeployment(d) } }
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) { curRS := cur.(*apps.ReplicaSet) oldRS := old.(*apps.ReplicaSet) if curRS.ResourceVersion == oldRS.ResourceVersion { // Periodic resync will send update events for all known replica sets. // Two different versions of the same replica set will always have different RVs. return } curControllerRef := metav1.GetControllerOf(curRS) oldControllerRef := metav1.GetControllerOf(oldRS) controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil { dc.enqueueDeployment(d) } } // If it has a ControllerRef, that's all that matters. if curControllerRef != nil { d := dc.resolveControllerRef(curRS.Namespace, curControllerRef) if d == nil { return } klog.V(4).Infof("ReplicaSet %s updated.", curRS.Name) dc.enqueueDeployment(d) return } // Otherwise, it's an orphan. If anything changed, sync matching controllers // to see if anyone wants to adopt it now. labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels) if labelChanged || controllerRefChanged { ds := dc.getDeploymentsForReplicaSet(curRS) if len(ds) == 0 { return } klog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name) for _, d := range ds { dc.enqueueDeployment(d) } } }
總結一下Add 和 Update
判斷是否Rs 發生了變化
func (dc *DeploymentController) deletePod(obj interface{}) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains // the deleted key/value. Note that this value might be stale. If the Pod // changed labels the new deployment will not be woken up till the periodic resync. if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj)) return } } klog.V(4).Infof("Pod %s deleted.", pod.Name) if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType { // Sync if this Deployment now has no more Pods. rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1())) if err != nil { return } podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return } numPods := 0 for _, podList := range podMap { numPods += len(podList.Items) } if numPods == 0 { dc.enqueueDeployment(d) } } }
能夠看到,基本思路差很少,當檢查到Deployment 全部的Pod 都被刪除後,將Deployment name 塞到Worker Queue 裏面去。
資源初始化完畢後,就開始真正的Run 來看一下Run 函數
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() klog.Infof("Starting deployment controller") defer klog.Infof("Shutting down deployment controller") if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } for i := 0; i < workers; i++ { go wait.Until(dc.worker, time.Second, stopCh) } <-stopCh } func (dc *DeploymentController) worker() { for dc.processNextWorkItem() { } } func (dc *DeploymentController) processNextWorkItem() bool { key, quit := dc.queue.Get() if quit { return false } defer dc.queue.Done(key) err := dc.syncHandler(key.(string)) dc.handleErr(err, key) return true }
能夠看到 這個代碼就是Client-go 裏面標準版的Worker 消費者,不斷的從Queue 裏面拿Obj 而後調用syncHandler
處理,一塊兒來看看最終的Handler
如何處理
func (dc *DeploymentController) syncDeployment(key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime) defer func() { klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2).Infof("Deployment %v has been deleted", key) return nil } if err != nil { return err } // Deep-copy otherwise we are mutating our cache. // TODO: Deep-copy only when needed. d := deployment.DeepCopy() everything := metav1.LabelSelector{} if reflect.DeepEqual(d.Spec.Selector, &everything) { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d) } return nil } // List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(d) if err != nil { return err } // List all Pods owned by this Deployment, grouped by their ReplicaSet. // Current uses of the podMap are: // // * check if a Pod is labeled correctly with the pod-template-hash label. // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(d, rsList) } // Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. if err = dc.checkPausedConditions(d); err != nil { return err } if d.Spec.Paused { return dc.sync(d, rsList) } // rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. if getRollbackTo(d) != nil { return dc.rollback(d, rsList) } scalingEvent, err := dc.isScalingEvent(d, rsList) if err != nil { return err } if scalingEvent { return dc.sync(d, rsList) } switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(d, rsList) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) }
Recreate
仍是 RollingUpdate
決定對應的動做這裏咱們以Recreate
爲例來看一下策略動做
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID]*v1.PodList) error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) if err != nil { return err } allRSs := append(oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) // scale down old replica sets. scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus. return dc.syncRolloutStatus(allRSs, newRS, d) } // Do not process a deployment when it has old pods running. if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(allRSs, newRS, d) } // If we need to create a new RS, create it now. if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) if err != nil { return err } allRSs = append(oldRSs, newRS) } // scale up new replica set. if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(oldRSs, d); err != nil { return err } } // Sync deployment status. return dc.syncRolloutStatus(allRSs, newRS, d) }
下面咱們看看第一次建立Deployment 的代碼
func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) { existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList) // Calculate the max revision number among all old RSes maxOldRevision := deploymentutil.MaxRevision(oldRSs) // Calculate revision number for this new replica set newRevision := strconv.FormatInt(maxOldRevision+1, 10) // Latest replica set exists. We need to sync its annotations (includes copying all but // annotationsToSkip from the parent deployment, and update revision, desiredReplicas, // and maxReplicas) and also update the revision annotation in the deployment with the // latest revision. if existingNewRS != nil { rsCopy := existingNewRS.DeepCopy() // Set existing new replica set's annotation annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true) minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(rsCopy) } // Should use the revision in existingNewRS's annotation, since it set by before needsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation]) // If no other Progressing condition has been recorded and we need to estimate the progress // of this deployment then it is likely that old users started caring about progress. In that // case we need to take into account the first time we noticed their new replica set. cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing) if deploymentutil.HasProgressDeadline(d) && cond == nil { msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name) condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg) deploymentutil.SetDeploymentCondition(&d.Status, *condition) needsUpdate = true } if needsUpdate { var err error if d, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d); err != nil { return nil, err } } return rsCopy, nil } if !createIfNotExisted { return nil, nil } // new ReplicaSet does not exist, create one. newRSTemplate := *d.Spec.Template.DeepCopy() podTemplateSpecHash := controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount) newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) // Add podTemplateHash label to selector. newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash) // Create new ReplicaSet newRS := apps.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ // Make the name deterministic, to ensure idempotence Name: d.Name + "-" + podTemplateSpecHash, Namespace: d.Namespace, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, controllerKind)}, Labels: newRSTemplate.Labels, }, Spec: apps.ReplicaSetSpec{ Replicas: new(int32), MinReadySeconds: d.Spec.MinReadySeconds, Selector: newRSSelector, Template: newRSTemplate, }, } allRSs := append(oldRSs, &newRS) newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS) if err != nil { return nil, err } *(newRS.Spec.Replicas) = newReplicasCount // Set new replica set's annotation deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false) // Create the new ReplicaSet. If it already exists, then we need to check for possible // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. alreadyExists := false createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(&newRS)
這裏截取了部分重要代碼
後面還有一些代碼 這裏就不貼了,核心思想就是,根據ReplicaSet的狀況建立對應的新的ReplicaSet,其實看到使用Client-go 建立ReplicaSet Deployment 這裏基本完成了使命,剩下的就是根據watch 改變一下Deployment 的狀態了,至於真正的Pod 的建立,那麼就得ReplicaSet Controller 來完成了。
ReplicaSet Controller 和Deployment Controller 長得差很少,重複的部分咱們就很少說,先看一下初始化的時候,ReplicaSet 主要關注哪些資源
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter()) } rsc := &ReplicaSetController{ GroupVersionKind: gvk, kubeClient: kubeClient, podControl: podControl, burstReplicas: burstReplicas, expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName), } rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rsc.enqueueReplicaSet, UpdateFunc: rsc.updateRS, // This will enter the sync loop and no-op, because the replica set has been deleted from the store. // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the replica set. DeleteFunc: rsc.enqueueReplicaSet, }) rsc.rsLister = rsInformer.Lister() rsc.rsListerSynced = rsInformer.Informer().HasSynced podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rsc.addPod, // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from // local storage, so it should be ok. UpdateFunc: rsc.updatePod, DeleteFunc: rsc.deletePod, }) rsc.podLister = podInformer.Lister() rsc.podListerSynced = podInformer.Informer().HasSynced rsc.syncHandler = rsc.syncReplicaSet return rsc }
能夠看到ReplicaSet Controller 主要關注全部的ReplicaSet Pod
的建立,他們的處理邏輯是同樣的,都是根據觸發函數,找到對應的ReplicaSet實例後,將對應的ReplicaSet 實例放到Worker Queue裏面去。
這裏咱們直接來看ReplicaSet Controller 的真正處理函數
func (rsc *ReplicaSetController) syncReplicaSet(key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if errors.IsNotFound(err) { klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key) rsc.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err)) return nil } // list all pods to include the pods that don't match the rs`s selector // anymore but has the stale controller ref. // TODO: Do the List and Filter in a single pass, or use an index. allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) if err != nil { return err } // Ignore inactive pods. var filteredPods []*v1.Pod for _, pod := range allPods { if controller.IsPodActive(pod) { filteredPods = append(filteredPods, pod) } } // NOTE: filteredPods are pointing to objects from cache - if you need to // modify them, you need to copy it first. filteredPods, err = rsc.claimPods(rs, selector, filteredPods) if err != nil { return err } var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { manageReplicasErr = rsc.manageReplicas(filteredPods, rs) } rs = rs.DeepCopy() newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
咱們主要來看一眼建立Pod 的函數
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } if diff < 0 { diff *= -1 if diff > rsc.burstReplicas { diff = rsc.burstReplicas } // TODO: Track UIDs of creates just like deletes. The problem currently // is we'd need to wait on the result of a create to record the pod's // UID, which would require locking *across* the create, which will turn // into a performance bottleneck. We should generate a UID for the pod // beforehand and store it via ExpectCreations. rsc.expectations.ExpectCreations(rsKey, diff) klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would // likely all fail with the same error. For example a project with a // low quota that attempts to create a large number of pods will be // prevented from spamming the API service with the pod create requests // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ APIVersion: rsc.GroupVersion().String(), Kind: rsc.Kind, Name: rs.Name, UID: rs.UID, BlockOwnerDeletion: boolPtr(true), Controller: boolPtr(true), } err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) if err != nil && errors.IsTimeout(err) { // Pod is created but its initialization has timed out. // If the initialization is successful eventually, the // controller will observe the creation via the informer. // If the initialization fails, or if the pod keeps // uninitialized for a long time, the informer will not // receive any update, and the controller will create a new // pod when the expectation expires. return nil } return err }) // Any skipped pods that we never attempted to start shouldn't be expected. // The skipped pods will be retried later. The next controller resync will // retry the slow start process. if skippedPods := diff - successfulCreations; skippedPods > 0 { klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name) for i := 0; i < skippedPods; i++ { // Decrement the expected number of creates because the informer won't observe this pod rsc.expectations.CreationObserved(rsKey) } } return err } else if diff > 0 { if diff > rsc.burstReplicas { diff = rsc.burstReplicas } klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) // Choose which Pods to delete, preferring those in earlier phases of startup. podsToDelete := getPodsToDelete(filteredPods, diff) // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either // when we see it as an update of the deletion timestamp, or as a delete. // Note that if the labels on a pod/rs change in a way that the pod gets // orphaned, the rs will only wake up after the expectations have // expired even if other pods are deleted. rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete)) errCh := make(chan error, diff) var wg sync.WaitGroup wg.Add(diff) for _, pod := range podsToDelete { go func(targetPod *v1.Pod) { defer wg.Done() if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion podKey := controller.PodKey(targetPod) klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name) rsc.expectations.DeletionObserved(rsKey, podKey) errCh <- err } }(pod) } wg.Wait()
這裏的邏輯就很是簡單的,基本上就是根據當前Running Pod 數量和真正的replicas 聲明比對,若是少了那麼就調用Client-go 建立Pod ,若是多了就調用CLient-go 去刪除 Pod。
至此,一個Deployment -> ReplicaSet -> Pod 就真正的建立完畢。當Pod 被刪除時候,ReplicaSet Controller 就會把 Pod 拉起來。若是更新Deployment 就會建立新的ReplicaSet 一層層嵌套多個Controller 結合完成最終的 Pod 建立。 固然,這裏其實僅僅完成了Pod 數據寫入到ETCD,其實真正的 Pod 實例並無建立,還須要scheduler & kubelet 配合完成,咱們會在後面的章節繼續介紹。
本文爲雲棲社區原創內容,未經容許不得轉載。