Kubernetes源碼閱讀筆記——Controller Manager(之三)

前一篇文章中,咱們探索了Informer工做的大體邏輯,提到了添加回調函數部分包含了三塊,即:Informer的建立、函數調用的邏輯、以及回調函數自己。前兩塊已在前文談到過,下面咱們來看看第三塊,即回調函數自身的處理邏輯:緩存

1、回調函數app

這裏仍然以deployment爲例。首先仍是進入NewDeploymentController方法:函數

pkg/controller/deployment/deployment_controller.go

func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	...
	dc := &DeploymentController{
		client:        client,
		eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
	dc.rsControl = controller.RealRSControl{
		KubeClient: client,
		Recorder:   dc.eventRecorder,
	}

	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addDeployment,
		UpdateFunc: dc.updateDeployment,
		
		DeleteFunc: dc.deleteDeployment,
	})
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dc.addReplicaSet,
		UpdateFunc: dc.updateReplicaSet,
		DeleteFunc: dc.deleteReplicaSet,
	})
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		DeleteFunc: dc.deletePod,
	})

	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

	dc.dLister = dInformer.Lister()
	dc.rsLister = rsInformer.Lister()
	dc.podLister = podInformer.Lister()
	dc.dListerSynced = dInformer.Informer().HasSynced
	dc.rsListerSynced = rsInformer.Informer().HasSynced
	dc.podListerSynced = podInformer.Informer().HasSynced
	return dc, nil
}

咱們看到,方法爲Deployment和ReplicaSet的informer分別添加了增、改、刪的回調函數,而爲Pod的informer僅僅添加了刪除的回調函數。這主要是針對策略爲Recreate的Deployment(即全部舊Pod都刪除後再建立新Pod),且僅當全部舊Pod都刪除後纔會進行下一步操做(立刻提到)。對於RollingUpdate策略的Deployment,Pod數量的維持由dc的rsControl字段經過建立一個RSController來負責,不涉及進一步的操做。ui

這裏說的進一步操做,指調用dc的enqueueDeployment方法,將Deployment加入隊列中。好比,看addDeployment方法:this

pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) addDeployment(obj interface{}) { d := obj.(*apps.Deployment) klog.V(4).Infof("Adding deployment %s", d.Name) dc.enqueueDeployment(d) }

再好比,看deleteDeployment方法:spa

pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) deleteDeployment(obj interface{}) { d, ok := obj.(*apps.Deployment) if !ok { ... } klog.V(4).Infof("Deleting deployment %s", d.Name) dc.enqueueDeployment(d) }

7個回調函數都是這樣,在最後調用了enqueueDeployment方法,將Deployment存入隊列中。code

enqueueDeployment已被設置爲enqueue方法,因此看一下enqueue方法:orm

pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) enqueue(deployment *apps.Deployment) { key, err := controller.KeyFunc(deployment) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err)) return } dc.queue.Add(key) }

這裏,KeyFunc的做用是將Deployment的namespace和name字段提取出來,並以namespace/name的格式返回字符串。而這個字符串則被存入隊列中,以待下一步的處理。協程

因此,7個回調函數異曲同工,最終都是將相關的Deployment以namespace/name的格式存入了隊列中。對象

2、Run

存入隊列中的Deployment如何被處理呢?答案是在startDeploymentController方法中經過Go協程調用Run方法。如今咱們來看一下Run方法:

pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() klog.Infof("Starting deployment controller") defer klog.Infof("Shutting down deployment controller") if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) { return } for i := 0; i < workers; i++ { go wait.Until(dc.worker, time.Second, stopCh) } <-stopCh }

其中,WaitForCacheSync方法是在對controller中Deployment、ReplicaSet和Pod的緩存進行同步,若是失敗,則直接返回。

後面經過goroutine調用worker方法。worker方法則是調用了processNextWorkItem方法,在隊列不爲空的狀況下持續取出隊列中的下一個元素,並調用syncHandler方法執行操做。而前面的NewDeploymentController方法中,有一句dc.syncHandler = dc.syncDeployment,所以調用syncHandler方法即爲調用syncDeployment方法。

3、syncDeployment

這個方法是對隊列中Deployment元素的處理函數。方法中包含了對Deployment的一些操做,如獲取ReplicaSet列表、進行更新和回滾等等。

pkg/controller/deployment/deployment_controller.go

func (dc *DeploymentController) syncDeployment(key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime) defer func() { klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2).Infof("Deployment %v has been deleted", key) return nil } if err != nil { return err } // Deep-copy otherwise we are mutating our cache. // TODO: Deep-copy only when needed. d := deployment.DeepCopy() everything := metav1.LabelSelector{} if reflect.DeepEqual(d.Spec.Selector, &everything) { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d) } return nil } // List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(d) if err != nil { return err } // List all Pods owned by this Deployment, grouped by their ReplicaSet. // Current uses of the podMap are: // // * check if a Pod is labeled correctly with the pod-template-hash label. // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(d, rsList) } // Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. if err = dc.checkPausedConditions(d); err != nil { return err } if d.Spec.Paused { return dc.sync(d, rsList) } // rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. if getRollbackTo(d) != nil { return dc.rollback(d, rsList) } scalingEvent, err := dc.isScalingEvent(d, rsList) if err != nil { return err } if scalingEvent { return dc.sync(d, rsList) } switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(d, rsList) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) }

方法大約包含如下幾部分:

(1)取出的元素爲namespace/name格式,因此首先根據這兩個字段獲取deployment對象。

(2)檢查Deployment的DeletionTimestamp、PauseCondition、ScalingEvent等字段的內容,並調用相應的處理函數,包括sync、syncStatusOnly等。

(3)判斷Deployment的策略是RollingUpdate仍是Recreate,並調用相應的處理函數,包括rolloutRolling等。

下面咱們看幾個處理函數。

4、處理函數

先來看一下syncStatusOnly方法:

pkg/controller/deployment/sync.go // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    if err != nil {
        return err
    }

    allRSs := append(oldRSs, newRS)
    return dc.syncDeploymentStatus(allRSs, newRS, d)
}

正如註釋所說,syncStatusOnly方法只作了一件事,就是調用syncDeploymentStatus方法,來同步Deployment的狀態。此方法在後面分析。

再來看sync方法:

pkg/controller/deployment/sync.go // sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    if err != nil {
        return err
    }
    if err := dc.scale(d, newRS, oldRSs); err != nil {
        // If we get an error while trying to scale, the deployment will be requeued
        // so we can abort this resync
        return err
    }

    // Clean up the deployment when it's paused and no rollback is in flight.
    if d.Spec.Paused && getRollbackTo(d) == nil {
        if err := dc.cleanupDeployment(oldRSs, d); err != nil {
            return err
        }
    }

    allRSs := append(oldRSs, newRS)
    return dc.syncDeploymentStatus(allRSs, newRS, d)
}

如註釋所說,此方法比syncStatusOnly方法多了對Deployment的scale和pause事件的處理,但最後仍然是調用syncDeploymentStatus方法,同步Deployment的狀態。

再來看看syncDeploymentStatus方法:

pkg/controller/deployment/sync.go

func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { newStatus := calculateStatus(allRSs, newRS, d) if reflect.DeepEqual(d.Status, newStatus) { return nil } newDeployment := d newDeployment.Status = newStatus _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment) return err }

此方法首先檢查新舊Deployment的狀態是否同樣,若是同樣則直接返回。不然,就經過client鏈接API Server,修改Deployment的狀態。這一步,就是controller處理deployment的最終目的。至於後面對Pod的具體操做,就是kubelet的職責了,而controller所負責的就是與API Server交互,並在API Server中更新deployment的狀態。

rolloutRolling方法用於處理Deployment的rollingupdat事件,最後的本質還是經過與API Server交互來更新Deployment的狀態。

5、總結

至此,Controller Manager的大體運行流程就整理完了。

總結一下,Controller Manager本質上是kubernetes中衆多controller的管理組件。每一個controller在啓動時都會運行本身的informer。這些informer經過list-watch機制,經過與API Server交互,獲取監聽資源的實時狀態變化,並在FIFO隊列中進行更新。

informer不斷從FIFO隊列中取出元素,一方面更新Thread Safe Store,另外一方面調用具體controller的回調函數對元素進行處理。此外,informer會使用listener機制,從Thread Safe Store中取出內容,調用相應的回調函數進行處理。

以deployment controller爲例,deployment的informer最終會將deployment存入隊列中。隊列中的元素取出後通過一系列deployment的scale、rollingupdate等處理,最終再次經過與API Server的交互,在API Server中更新處理後資源的狀態。這就是一個controller的循環。其他的Controller大體也是依照這一流程在運行的。

相關文章
相關標籤/搜索