Kube Controller Manager 源碼分析

Kube Controller Manager 源碼分析

Controller Manager 在k8s 集羣中扮演着中心管理的角色,它負責Deployment, StatefulSet, ReplicaSet 等資源的建立與管理,能夠說是k8s的核心模塊,下面咱們以概略的形式走讀一下k8s Controller Manager 代碼。node

func NewControllerManagerCommand() *cobra.Command {
    s, err := options.NewKubeControllerManagerOptions()
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-controller-manager",
        Long: `The Kubernetes controller manager is a daemon that embeds
the core control loops shipped with Kubernetes. In applications of robotics and
automation, a control loop is a non-terminating loop that regulates the state of
the system. In Kubernetes, a controller is a control loop that watches the shared
state of the cluster through the apiserver and makes changes attempting to move the
current state towards the desired state. Examples of controllers that ship with
Kubernetes today are the replication controller, endpoints controller, namespace
controller, and serviceaccounts controller.`,
        Run: func(cmd *cobra.Command, args []string) {
            verflag.PrintAndExitIfRequested()
            utilflag.PrintFlags(cmd.Flags())

            c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
            if err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }

            if err := Run(c.Complete(), wait.NeverStop); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }

Controller Manager 也是一個命令行,經過一系列flag啓動,具體的各個flag 咱們就很少看,有興趣的能夠去文檔或者flags_opinion.go 文件裏面去過濾一下,咱們直接從Run 函數入手。bootstrap

Run Function 啓動流程

Kube Controller Manager 既能夠單實例啓動,也能夠多實例啓動。 若是爲了保證 HA 而啓動多個Controller Manager,它就須要選主來保證同一時間只有一個Master 實例。咱們來看一眼Run 函數的啓動流程,這裏會把一些不重要的細節函數略過,只看重點api

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {

    run := func(ctx context.Context) {
        rootClientBuilder := controller.SimpleControllerClientBuilder{
            ClientConfig: c.Kubeconfig,
        }
        
        controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
        if err != nil {
            klog.Fatalf("error building controller context: %v", err)
        }
        
        if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
            klog.Fatalf("error starting controllers: %v", err)
        }
    
        controllerContext.InformerFactory.Start(controllerContext.Stop)
        close(controllerContext.InformersStarted)
    
        select {}
    }

    id, err := os.Hostname()
    if err != nil {
        return err
    }

    // add a uniquifier so that two processes on the same host don't accidentally both become active
    id = id + "_" + string(uuid.NewUUID())
    rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
        "kube-system",
        "kube-controller-manager",
        c.LeaderElectionClient.CoreV1(),
        resourcelock.ResourceLockConfig{
            Identity:      id,
            EventRecorder: c.EventRecorder,
        })
    if err != nil {
        klog.Fatalf("error creating lock: %v", err)
    }

    leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
        RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
        RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                klog.Fatalf("leaderelection lost")
            },
        },
        WatchDog: electionChecker,
        Name:     "kube-controller-manager",
    })
    panic("unreachable")
}

這裏的基本流程以下:數據結構

  • 首先定義了run 函數,run 函數負責具體的controller 構建以及最終的controller 操做的執行
  • 使用Client-go 提供的選主函數來進行選主
  • 若是得到主權限,那麼就調用OnStartedLeading 註冊函數,也就是上面的run 函數來執行操做,若是沒選中,就hang住等待

選主流程解析

Client-go 選主工具類主要是經過kubeClient 在Configmap或者Endpoint選擇一個資源建立,而後哪個goroutine 建立成功了資源,哪個goroutine 得到鎖,固然全部的鎖信息都會存在Configmap 或者Endpoint裏面。之因此選擇這兩個資源類型,主要是考慮他們被Watch的少,可是如今kube Controller Manager 仍是適用的Endpoint,後面會逐漸遷移到ConfigMap,由於Endpoint會被kube-proxy Ingress Controller等頻繁Watch,咱們來看一眼集羣內Endpoint內容app

[root@iZ8vb5qgxqbxakfo1cuvpaZ ~]# kubectl get ep -n kube-system kube-controller-manager -o yaml
apiVersion: v1
kind: Endpoints
metadata:
  annotations:
    control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"iZ8vbccmhgkyfdi8aii1hnZ_d880fea6-1322-11e9-913f-00163e033b49","leaseDurationSeconds":15,"acquireTime":"2019-01-08T08:53:49Z","renewTime":"2019-01-22T11:16:59Z","leaderTransitions":1}'
  creationTimestamp: 2019-01-08T08:52:56Z
  name: kube-controller-manager
  namespace: kube-system
  resourceVersion: "2978183"
  selfLink: /api/v1/namespaces/kube-system/endpoints/kube-controller-manager
  uid: cade1b65-1322-11e9-9931-00163e033b49

能夠看到,這裏面涵蓋了當前Master ID,獲取Master的時間,更新頻率以及下一次更新時間。這一切最終仍是靠ETCD 完成的選主。主要的選主代碼以下ide

func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
    switch lockType {
    case EndpointsResourceLock:
        return &EndpointsLock{
            EndpointsMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    case ConfigMapsResourceLock:
        return &ConfigMapLock{
            ConfigMapMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

StartController

選主完畢後,就須要真正啓動controller了,咱們來看一下啓動controller 的代碼函數

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
    // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
    // If this fails, just return here and fail since other controllers won't be able to get credentials.
    if _, _, err := startSATokenController(ctx); err != nil {
        return err
    }

    // Initialize the cloud provider with a reference to the clientBuilder only after token controller
    // has started in case the cloud provider uses the client builder.
    if ctx.Cloud != nil {
        ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop)
    }

    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
            klog.Warningf("%q is disabled", controllerName)
            continue
        }

        time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

        klog.V(1).Infof("Starting %q", controllerName)
        debugHandler, started, err := initFn(ctx)
        if err != nil {
            klog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
            klog.Warningf("Skipping %q", controllerName)
            continue
        }
        if debugHandler != nil && unsecuredMux != nil {
            basePath := "/debug/controllers/" + controllerName
            unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
            unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
        }
        klog.Infof("Started %q", controllerName)
    }

    return nil
}
  • 遍歷全部的controller list
  • 執行每一個controller 的Init Function

那麼一共有多少Controller 呢工具

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    controllers["nodeipam"] = startNodeIpamController
    controllers["nodelifecycle"] = startNodeLifecycleController
    if loopMode == IncludeCloudLoops {
        controllers["service"] = startServiceController
        controllers["route"] = startRouteController
        controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
        // TODO: volume controller into the IncludeCloudLoops only set.
    }
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController
    controllers["ttl-after-finished"] = startTTLAfterFinishedController
    controllers["root-ca-cert-publisher"] = startRootCACertPublisher

    return controllers
}

答案就在這裏,上面的代碼列出來了當前kube controller manager 全部的controller,既有你們熟悉的Deployment StatefulSet 也有一些不熟悉的身影。下面咱們以Deployment 爲例看看它到底幹了什麼oop

Deployment Controller

先來看一眼Deployemnt Controller 啓動函數源碼分析

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
        return nil, false, nil
    }
    dc, err := deployment.NewDeploymentController(
        ctx.InformerFactory.Apps().V1().Deployments(),
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    }
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    return nil, true, nil
}

看到這裏,若是看過上一篇針對Client-go Informer 文章的確定不陌生,這裏又使用了InformerFactory,並且是好幾個。其實kube Controller Manager 裏面大量使用了Informer,Controller 就是使用 Informer 來通知和觀察全部的資源。能夠看到,這裏Deployment Controller 主要關注Deployment ReplicaSet Pod 這三個資源。

Deployment Controller 資源初始化

下面來看一下Deployemnt Controller 初始化須要的資源

// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

    if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
        if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
            return nil, err
        }
    }
    dc := &DeploymentController{
        client:        client,
        eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    }
    dc.rsControl = controller.RealRSControl{
        KubeClient: client,
        Recorder:   dc.eventRecorder,
    }

    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,
        UpdateFunc: dc.updateDeployment,
        // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
        DeleteFunc: dc.deleteDeployment,
    })
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addReplicaSet,
        UpdateFunc: dc.updateReplicaSet,
        DeleteFunc: dc.deleteReplicaSet,
    })
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: dc.deletePod,
    })

    dc.syncHandler = dc.syncDeployment
    dc.enqueueDeployment = dc.enqueue

    dc.dLister = dInformer.Lister()
    dc.rsLister = rsInformer.Lister()
    dc.podLister = podInformer.Lister()
    dc.dListerSynced = dInformer.Informer().HasSynced
    dc.rsListerSynced = rsInformer.Informer().HasSynced
    dc.podListerSynced = podInformer.Informer().HasSynced
    return dc, nil
}

是否是這裏的代碼似曾相識,若是接觸過Client-go Informer 的代碼,能夠看到這裏一模一樣,基本上就是對建立的資源分別觸發對應的Add Update Delete 函數,同時全部的資源經過Lister得到,不須要真正的Query APIServer。

先來看一下針對Deployment 的Handler

func (dc *DeploymentController) addDeployment(obj interface{}) {
    d := obj.(*apps.Deployment)
    klog.V(4).Infof("Adding deployment %s", d.Name)
    dc.enqueueDeployment(d)
}

func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
    oldD := old.(*apps.Deployment)
    curD := cur.(*apps.Deployment)
    klog.V(4).Infof("Updating deployment %s", oldD.Name)
    dc.enqueueDeployment(curD)
}

func (dc *DeploymentController) deleteDeployment(obj interface{}) {
    d, ok := obj.(*apps.Deployment)
    if !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
            return
        }
        d, ok = tombstone.Obj.(*apps.Deployment)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj))
            return
        }
    }
    klog.V(4).Infof("Deleting deployment %s", d.Name)
    dc.enqueueDeployment(d)
}

不管是Add Update Delete,處理方法一模一樣,都是一股腦的塞到Client-go 提供的worker Queue裏面。 再來看看ReplicaSet

func (dc *DeploymentController) addReplicaSet(obj interface{}) {
    rs := obj.(*apps.ReplicaSet)

    if rs.DeletionTimestamp != nil {
        // On a restart of the controller manager, it's possible for an object to
        // show up in a state that is already pending deletion.
        dc.deleteReplicaSet(rs)
        return
    }

    // If it has a ControllerRef, that's all that matters.
    if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
        d := dc.resolveControllerRef(rs.Namespace, controllerRef)
        if d == nil {
            return
        }
        klog.V(4).Infof("ReplicaSet %s added.", rs.Name)
        dc.enqueueDeployment(d)
        return
    }

    // Otherwise, it's an orphan. Get a list of all matching Deployments and sync
    // them to see if anyone wants to adopt it.
    ds := dc.getDeploymentsForReplicaSet(rs)
    if len(ds) == 0 {
        return
    }
    klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
    for _, d := range ds {
        dc.enqueueDeployment(d)
    }
}
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
    curRS := cur.(*apps.ReplicaSet)
    oldRS := old.(*apps.ReplicaSet)
    if curRS.ResourceVersion == oldRS.ResourceVersion {
        // Periodic resync will send update events for all known replica sets.
        // Two different versions of the same replica set will always have different RVs.
        return
    }

    curControllerRef := metav1.GetControllerOf(curRS)
    oldControllerRef := metav1.GetControllerOf(oldRS)
    controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
    if controllerRefChanged && oldControllerRef != nil {
        // The ControllerRef was changed. Sync the old controller, if any.
        if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
            dc.enqueueDeployment(d)
        }
    }

    // If it has a ControllerRef, that's all that matters.
    if curControllerRef != nil {
        d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
        if d == nil {
            return
        }
        klog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
        dc.enqueueDeployment(d)
        return
    }

    // Otherwise, it's an orphan. If anything changed, sync matching controllers
    // to see if anyone wants to adopt it now.
    labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
    if labelChanged || controllerRefChanged {
        ds := dc.getDeploymentsForReplicaSet(curRS)
        if len(ds) == 0 {
            return
        }
        klog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)
        for _, d := range ds {
            dc.enqueueDeployment(d)
        }
    }
}

總結一下Add 和 Update

  • 根據ReplicaSet ownerReferences 尋找到對應的Deployment Name

判斷是否Rs 發生了變化

  • 若是變化就把Deployment 塞到Wokrer Queue裏面去
  • 最後看一下針對Pod 的處理
func (dc *DeploymentController) deletePod(obj interface{}) {
    pod, ok := obj.(*v1.Pod)

    // When a delete is dropped, the relist will notice a pod in the store not
    // in the list, leading to the insertion of a tombstone object which contains
    // the deleted key/value. Note that this value might be stale. If the Pod
    // changed labels the new deployment will not be woken up till the periodic resync.
    if !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
            return
        }
        pod, ok = tombstone.Obj.(*v1.Pod)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
            return
        }
    }
    klog.V(4).Infof("Pod %s deleted.", pod.Name)
    if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
        // Sync if this Deployment now has no more Pods.
        rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
        if err != nil {
            return
        }
        podMap, err := dc.getPodMapForDeployment(d, rsList)
        if err != nil {
            return
        }
        numPods := 0
        for _, podList := range podMap {
            numPods += len(podList.Items)
        }
        if numPods == 0 {
            dc.enqueueDeployment(d)
        }
    }
}

能夠看到,基本思路差很少,當檢查到Deployment 全部的Pod 都被刪除後,將Deployment name 塞到Worker Queue 裏面去。

Deployment Controller Run 函數

資源初始化完畢後,就開始真正的Run 來看一下Run 函數

func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dc.queue.ShutDown()

    klog.Infof("Starting deployment controller")
    defer klog.Infof("Shutting down deployment controller")

    if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(dc.worker, time.Second, stopCh)
    }

    <-stopCh
}


func (dc *DeploymentController) worker() {
    for dc.processNextWorkItem() {
    }
}

func (dc *DeploymentController) processNextWorkItem() bool {
    key, quit := dc.queue.Get()
    if quit {
        return false
    }
    defer dc.queue.Done(key)

    err := dc.syncHandler(key.(string))
    dc.handleErr(err, key)

    return true
}

能夠看到 這個代碼就是Client-go 裏面標準版的Worker 消費者,不斷的從Queue 裏面拿Obj 而後調用syncHandler 處理,一塊兒來看看最終的Handler如何處理

dc.syncHandler

func (dc *DeploymentController) syncDeployment(key string) error {
    startTime := time.Now()
    klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
    defer func() {
        klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    deployment, err := dc.dLister.Deployments(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(2).Infof("Deployment %v has been deleted", key)
        return nil
    }
    if err != nil {
        return err
    }

    // Deep-copy otherwise we are mutating our cache.
    // TODO: Deep-copy only when needed.
    d := deployment.DeepCopy()

    everything := metav1.LabelSelector{}
    if reflect.DeepEqual(d.Spec.Selector, &everything) {
        dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
        if d.Status.ObservedGeneration < d.Generation {
            d.Status.ObservedGeneration = d.Generation
            dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)
        }
        return nil
    }

    // List ReplicaSets owned by this Deployment, while reconciling ControllerRef
    // through adoption/orphaning.
    rsList, err := dc.getReplicaSetsForDeployment(d)
    if err != nil {
        return err
    }
    // List all Pods owned by this Deployment, grouped by their ReplicaSet.
    // Current uses of the podMap are:
    //
    // * check if a Pod is labeled correctly with the pod-template-hash label.
    // * check that no old Pods are running in the middle of Recreate Deployments.
    podMap, err := dc.getPodMapForDeployment(d, rsList)
    if err != nil {
        return err
    }

    if d.DeletionTimestamp != nil {
        return dc.syncStatusOnly(d, rsList)
    }

    // Update deployment conditions with an Unknown condition when pausing/resuming
    // a deployment. In this way, we can be sure that we won't timeout when a user
    // resumes a Deployment with a set progressDeadlineSeconds.
    if err = dc.checkPausedConditions(d); err != nil {
        return err
    }

    if d.Spec.Paused {
        return dc.sync(d, rsList)
    }

    // rollback is not re-entrant in case the underlying replica sets are updated with a new
    // revision so we should ensure that we won't proceed to update replica sets until we
    // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
    if getRollbackTo(d) != nil {
        return dc.rollback(d, rsList)
    }

    scalingEvent, err := dc.isScalingEvent(d, rsList)
    if err != nil {
        return err
    }
    if scalingEvent {
        return dc.sync(d, rsList)
    }

    switch d.Spec.Strategy.Type {
    case apps.RecreateDeploymentStrategyType:
        return dc.rolloutRecreate(d, rsList, podMap)
    case apps.RollingUpdateDeploymentStrategyType:
        return dc.rolloutRolling(d, rsList)
    }
    return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
  • 根據Worker Queue 取出來的Namespace & Name 從Lister 內Query到真正的Deployment 對象
  • 根據Deployment label 查詢對應的ReplicaSet 列表
  • 根據ReplicaSet label 查詢對應的 Pod 列表,並生成一個key 爲ReplicaSet ID Value 爲PodList的Map 數據結構
  • 判斷當前Deployment 是否處於暫停狀態
  • 判斷當前Deployment 是否處於回滾狀態
  • 根據更新策略Recreate 仍是 RollingUpdate 決定對應的動做

這裏咱們以Recreate爲例來看一下策略動做

func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID]*v1.PodList) error {
    // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
    newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    if err != nil {
        return err
    }
    allRSs := append(oldRSs, newRS)
    activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

    // scale down old replica sets.
    scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
    if err != nil {
        return err
    }
    if scaledDown {
        // Update DeploymentStatus.
        return dc.syncRolloutStatus(allRSs, newRS, d)
    }

    // Do not process a deployment when it has old pods running.
    if oldPodsRunning(newRS, oldRSs, podMap) {
        return dc.syncRolloutStatus(allRSs, newRS, d)
    }

    // If we need to create a new RS, create it now.
    if newRS == nil {
        newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
        if err != nil {
            return err
        }
        allRSs = append(oldRSs, newRS)
    }

    // scale up new replica set.
    if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
        return err
    }

    if util.DeploymentComplete(d, &d.Status) {
        if err := dc.cleanupDeployment(oldRSs, d); err != nil {
            return err
        }
    }

    // Sync deployment status.
    return dc.syncRolloutStatus(allRSs, newRS, d)
}
  • 根據ReplicaSet 獲取當前全部的新老ReplicaSet
  • 若是有老的ReplicaSet 那麼先把老的ReplicaSet replicas 縮容設置爲0,固然第一次建立的時候是沒有老ReplicaSet的
  • 若是第一次建立,那麼須要去建立對應的ReplicaSet
  • 建立完畢對應的ReplicaSet後 擴容ReplicaSet 到對應的值
  • 等待新建的建立完畢,清理老的ReplcaiSet
  • 更新Deployment Status

下面咱們看看第一次建立Deployment 的代碼

func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
    existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)

    // Calculate the max revision number among all old RSes
    maxOldRevision := deploymentutil.MaxRevision(oldRSs)
    // Calculate revision number for this new replica set
    newRevision := strconv.FormatInt(maxOldRevision+1, 10)

    // Latest replica set exists. We need to sync its annotations (includes copying all but
    // annotationsToSkip from the parent deployment, and update revision, desiredReplicas,
    // and maxReplicas) and also update the revision annotation in the deployment with the
    // latest revision.
    if existingNewRS != nil {
        rsCopy := existingNewRS.DeepCopy()

        // Set existing new replica set's annotation
        annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true)
        minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
        if annotationsUpdated || minReadySecondsNeedsUpdate {
            rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
            return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(rsCopy)
        }

        // Should use the revision in existingNewRS's annotation, since it set by before
        needsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation])
        // If no other Progressing condition has been recorded and we need to estimate the progress
        // of this deployment then it is likely that old users started caring about progress. In that
        // case we need to take into account the first time we noticed their new replica set.
        cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
        if deploymentutil.HasProgressDeadline(d) && cond == nil {
            msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)
            condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)
            deploymentutil.SetDeploymentCondition(&d.Status, *condition)
            needsUpdate = true
        }

        if needsUpdate {
            var err error
            if d, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d); err != nil {
                return nil, err
            }
        }
        return rsCopy, nil
    }

    if !createIfNotExisted {
        return nil, nil
    }

    // new ReplicaSet does not exist, create one.
    newRSTemplate := *d.Spec.Template.DeepCopy()
    podTemplateSpecHash := controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount)
    newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
    // Add podTemplateHash label to selector.
    newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)

    // Create new ReplicaSet
    newRS := apps.ReplicaSet{
        ObjectMeta: metav1.ObjectMeta{
            // Make the name deterministic, to ensure idempotence
            Name:            d.Name + "-" + podTemplateSpecHash,
            Namespace:       d.Namespace,
            OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, controllerKind)},
            Labels:          newRSTemplate.Labels,
        },
        Spec: apps.ReplicaSetSpec{
            Replicas:        new(int32),
            MinReadySeconds: d.Spec.MinReadySeconds,
            Selector:        newRSSelector,
            Template:        newRSTemplate,
        },
    }
    allRSs := append(oldRSs, &newRS)
    newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)
    if err != nil {
        return nil, err
    }

    *(newRS.Spec.Replicas) = newReplicasCount
    // Set new replica set's annotation
    deploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false)
    // Create the new ReplicaSet. If it already exists, then we need to check for possible
    // hash collisions. If there is any other error, we need to report it in the status of
    // the Deployment.
    alreadyExists := false
    createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(&newRS)

這裏截取了部分重要代碼

  • 首先查詢一下當前是否有對應的新的ReplicaSet
  • 若是有那麼僅僅須要更新Deployment Status 便可
  • 若是沒有 那麼建立對應的ReplicaSet 結構體
  • 最後調用Client-go 建立對應的ReplicaSet 實例

後面還有一些代碼 這裏就不貼了,核心思想就是,根據ReplicaSet的狀況建立對應的新的ReplicaSet,其實看到使用Client-go 建立ReplicaSet Deployment 這裏基本完成了使命,剩下的就是根據watch 改變一下Deployment 的狀態了,至於真正的Pod 的建立,那麼就得ReplicaSet Controller 來完成了。

ReplicaSet Controller

ReplicaSet Controller 和Deployment Controller 長得差很少,重複的部分咱們就很少說,先看一下初始化的時候,ReplicaSet 主要關注哪些資源

func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    rsc := &ReplicaSetController{
        GroupVersionKind: gvk,
        kubeClient:       kubeClient,
        podControl:       podControl,
        burstReplicas:    burstReplicas,
        expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
    }

    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.enqueueReplicaSet,
        UpdateFunc: rsc.updateRS,
        // This will enter the sync loop and no-op, because the replica set has been deleted from the store.
        // Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
        // way of achieving this is by performing a `stop` operation on the replica set.
        DeleteFunc: rsc.enqueueReplicaSet,
    })
    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        // This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
        // overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
        // local storage, so it should be ok.
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced

    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

能夠看到ReplicaSet Controller 主要關注全部的ReplicaSet Pod的建立,他們的處理邏輯是同樣的,都是根據觸發函數,找到對應的ReplicaSet實例後,將對應的ReplicaSet 實例放到Worker Queue裏面去。

syncReplicaSet

這裏咱們直接來看ReplicaSet Controller 的真正處理函數

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {

    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
        rsc.expectations.DeleteExpectations(key)
        return nil
    }
    if err != nil {
        return err
    }

    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
        return nil
    }

    // list all pods to include the pods that don't match the rs`s selector
    // anymore but has the stale controller ref.
    // TODO: Do the List and Filter in a single pass, or use an index.
    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
    if err != nil {
        return err
    }
    // Ignore inactive pods.
    var filteredPods []*v1.Pod
    for _, pod := range allPods {
        if controller.IsPodActive(pod) {
            filteredPods = append(filteredPods, pod)
        }
    }

    // NOTE: filteredPods are pointing to objects from cache - if you need to
    // modify them, you need to copy it first.
    filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
    if err != nil {
        return err
    }

    var manageReplicasErr error
    if rsNeedsSync && rs.DeletionTimestamp == nil {
        manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
    }
    rs = rs.DeepCopy()
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
  • 根據從Worker Queue 獲得的Name 獲取真正的ReplicaSet 實例
  • 根據ReplicaSet Label 獲取對應的全部的Pod List
  • 將全部的Running Pod 遍歷出來
  • 根據Pod 狀況判斷是否須要建立 Pod
  • 將新的狀態更新到ReplicaSet Status 字段中

manageReplicas

咱們主要來看一眼建立Pod 的函數

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
        return nil
    }
    if diff < 0 {
        diff *= -1
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        // TODO: Track UIDs of creates just like deletes. The problem currently
        // is we'd need to wait on the result of a create to record the pod's
        // UID, which would require locking *across* the create, which will turn
        // into a performance bottleneck. We should generate a UID for the pod
        // beforehand and store it via ExpectCreations.
        rsc.expectations.ExpectCreations(rsKey, diff)
        klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
        // and double with each successful iteration in a kind of "slow start".
        // This handles attempts to start large numbers of pods that would
        // likely all fail with the same error. For example a project with a
        // low quota that attempts to create a large number of pods will be
        // prevented from spamming the API service with the pod create requests
        // after one of its pods fails.  Conveniently, this also prevents the
        // event spam that those failures would generate.
        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
            boolPtr := func(b bool) *bool { return &b }
            controllerRef := &metav1.OwnerReference{
                APIVersion:         rsc.GroupVersion().String(),
                Kind:               rsc.Kind,
                Name:               rs.Name,
                UID:                rs.UID,
                BlockOwnerDeletion: boolPtr(true),
                Controller:         boolPtr(true),
            }
            err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
            if err != nil && errors.IsTimeout(err) {
                // Pod is created but its initialization has timed out.
                // If the initialization is successful eventually, the
                // controller will observe the creation via the informer.
                // If the initialization fails, or if the pod keeps
                // uninitialized for a long time, the informer will not
                // receive any update, and the controller will create a new
                // pod when the expectation expires.
                return nil
            }
            return err
        })

        // Any skipped pods that we never attempted to start shouldn't be expected.
        // The skipped pods will be retried later. The next controller resync will
        // retry the slow start process.
        if skippedPods := diff - successfulCreations; skippedPods > 0 {
            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
            for i := 0; i < skippedPods; i++ {
                // Decrement the expected number of creates because the informer won't observe this pod
                rsc.expectations.CreationObserved(rsKey)
            }
        }
        return err
    } else if diff > 0 {
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)

        // Choose which Pods to delete, preferring those in earlier phases of startup.
        podsToDelete := getPodsToDelete(filteredPods, diff)

        // Snapshot the UIDs (ns/name) of the pods we're expecting to see
        // deleted, so we know to record their expectations exactly once either
        // when we see it as an update of the deletion timestamp, or as a delete.
        // Note that if the labels on a pod/rs change in a way that the pod gets
        // orphaned, the rs will only wake up after the expectations have
        // expired even if other pods are deleted.
        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

        errCh := make(chan error, diff)
        var wg sync.WaitGroup
        wg.Add(diff)
        for _, pod := range podsToDelete {
            go func(targetPod *v1.Pod) {
                defer wg.Done()
                if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    podKey := controller.PodKey(targetPod)
                    klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
                    rsc.expectations.DeletionObserved(rsKey, podKey)
                    errCh <- err
                }
            }(pod)
        }
        wg.Wait()

這裏的邏輯就很是簡單的,基本上就是根據當前Running Pod 數量和真正的replicas 聲明比對,若是少了那麼就調用Client-go 建立Pod ,若是多了就調用CLient-go 去刪除 Pod。

總結

至此,一個Deployment -> ReplicaSet -> Pod 就真正的建立完畢。當Pod 被刪除時候,ReplicaSet Controller 就會把 Pod 拉起來。若是更新Deployment 就會建立新的ReplicaSet 一層層嵌套多個Controller 結合完成最終的 Pod 建立。 固然,這裏其實僅僅完成了Pod 數據寫入到ETCD,其實真正的 Pod 實例並無建立,還須要scheduler & kubelet 配合完成,咱們會在後面的章節繼續介紹。



本文做者:xianlubird

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索