深刻分析Kubernetes DaemonSet Controller

Author: xidianwangtao@gmail.com | Version: Kubernetes 1.13node

摘要:DaemonSet是Kubernetes中用戶最經常使用的對象之一,咱們用它來部署Nodes上守護應用,好比日誌組件、節點監控組件等。從用戶的使用角度來說,DaemonSet看似簡單,但實際上它涉及的點很是多,好比DaemonSet Pod知足什麼條件才能在Node上運行、Node出現MemoryPressure或者其餘異常Condition時是否能運行、調度的邏輯是怎樣的、滾動更新的邏輯是怎樣的等等,本文講從DaemonSet Controller的源碼着手,分析其中關鍵邏輯。app

DaemonSet Controller

DaemonSet Controller Struct

DaemonSet Controller的核心結構包括:less

  • burstReplcas int: 每次sync時,Create和Delete Pods的數量上限,代碼中寫死爲250。
  • queue workqueue.RateLimitingInterface: 存放待同步DaemonSet Key(namespaces/name)的Delaying Queue。
  • syncHandler func(dsKey string) error: 負責同步DaemonSet Queue中對象,包括Replicas管理、UpdateStrategy升級、更新DaemonSet Status等工做,是DaemonSet Controller中最核心的邏輯。
  • expectations controller.ControllerExpectationsInterface: 維護每一個DaemonSet對象每次Sync指望Create/Delete Pods數的TTLCache。
  • suspendedDaemonPods map[string]sets.String: key爲NodeName,value是DaemonSet集合,這些DaemonSet包含該Node上'wantToRun & !shouldSchedule'的Pod。
    • wantToRun: 爲True,當DaemonSet Controller去Simulate調度時,Predicate(主要是GeneralPredicates和PodToleratesNodeTaints)時忽略以下PredicateFailureError(都是些資源類的Error)時成功,有其餘PredicateFailureError爲False。若是DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定wantToRun的值。
      • ErrDiskConflict;
      • ErrVolumeZoneConflict;
      • ErrMaxVolumeCountExceeded;
      • ErrNodeUnderMemoryPressure;
      • ErrNodeUnderDiskPressure;
      • InsufficientResourceError;
    • shouldSchedule:
      • 若是DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定shouldSchedule的值。
      • 若是Predicate時出現全部類型的PredicateFailureError之一,則shouldSchedule都爲false。
      • 若是出現InsufficientResourceError,則shouldSchedule也爲false。
  • failedPodsBackoff *flowcontrol.Backoff: DaemonSet Controller Run時會啓動一個協程,每隔2*MaxDuration(2*15Min)會強制進行一次failedPods GC清理。每次syncDaemonSet處理該刪除的Pods時,會按照1s,2s,4s,8s,.....15min的Backoff機制作必定的delay處理,實現流控的效果。防止kubelet拒絕某些DaemonSet Pods後,立刻又被拒絕,如此就會出現不少無效的循環,所以加入了Backoff機制。

DaemonSet Controller的建立和啓動

NewDaemonSetsController負責建立Controller,其中很重要的工做就是註冊如下Informer的EventHandler:oop

  • daemonSetInformer: AddFunc/DeleteFunc/UpdateFunc最終其實都主要是enqueue DaemonSet;
  • historyInformer:
    • AddFunc: addHistory;
    • UpdateFunc: updateHistory;
    • DeleteFunc: deleteHistory;
  • podInformer:
    • AddFunc: addPod;
    • UpdateFunc: updatePod;
    • DeleteFunc: deletePod;
  • nodeInformer:
    • AddFunc: addNode;
    • UpdateFunc: updateNode;

DamonSet Controller Run啓動時,主要幹兩件事:源碼分析

  • 啓動2個workers協程,每一個worker負責從queue中取DaemonSet Key進行sync。ui

  • 啓動1個failedPodsBackoff GC協程,每隔1Min清理一次集羣中全部DaemonSet/Node對應的Failed Pods。this

只有deletePod時,纔會requeueSuspendedDaemonPods。-- 爲何?spa

DaemonSet的同步

worker會從queue中取待同步的DamonSet Key,調用syncDaemonSet完成自動管理,syncDaemonSet是DaemonSet管理的核心入口。日誌

pkg/controller/daemon/daemon_controller.go:1208

func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
	...
	ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.V(3).Infof("daemon set has been deleted %v", key)
		dsc.expectations.DeleteExpectations(key)
		return nil
	}
	if err != nil {
		return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
	}

	everything := metav1.LabelSelector{}
	if reflect.DeepEqual(ds.Spec.Selector, &everything) {
		dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
		return nil
	}

	// Don't process a daemon set until all its creations and deletions have been processed.
	// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
	// then we do not want to call manage on foo until the daemon pods have been created.
	...
	if ds.DeletionTimestamp != nil {
		return nil
	}

	// Construct histories of the DaemonSet, and get the hash of current history
	cur, old, err := dsc.constructHistory(ds)
	if err != nil {
		return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
	}
	hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

	if !dsc.expectations.SatisfiedExpectations(dsKey) {
		// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
		return dsc.updateDaemonSetStatus(ds, hash, false)
	}

	err = dsc.manage(ds, hash)
	if err != nil {
		return err
	}

	// Process rolling updates if we're ready.
	if dsc.expectations.SatisfiedExpectations(dsKey) {
		switch ds.Spec.UpdateStrategy.Type {
		case apps.OnDeleteDaemonSetStrategyType:
		case apps.RollingUpdateDaemonSetStrategyType:
			err = dsc.rollingUpdate(ds, hash)
		}
		if err != nil {
			return err
		}
	}

	err = dsc.cleanupHistory(ds, old)
	if err != nil {
		return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
	}

	return dsc.updateDaemonSetStatus(ds, hash, true)
}

核心的流程以下:code

  • 首先檢查該DaemonSet對象在本地Store中是否被刪除,若是是,則從expectations中刪除該DaemonSet對應的數據。
  • 檢查該DaemonSet對象的LabelSelector是否爲空,若是是,則syncDaemonSet返回結束,不進行同步,那麼DaemonSet對應的Pod也不會被建立了。
  • 若是其DeletionTimestamp非空,意味着用戶觸發了刪除,則syncDaemonSet返回結束,不進行同步。DaemonSet對應的Pod交由GC Controller去完成刪除。
  • 而後constructHistory獲取該DaemonSet的Current ControllerRevision和全部Old ControllerRevisions,並確保全部ControllerRevisions都打上Label: "controller-revision-hash: ControllerRevision.Name",更新Current ControllerRevision的Revision = maxRevision(old) + 1。
  • 檢查當前expectations是否已經知足,當不知足時,只更新DaemonSet Status,同步流程結束。
    • expectations中add和del都不大於0,表示Controller expectations已經實現,則當前expectations已經知足。
    • expectations已經超時,超時時間是5min(不可配置),若是超時,則表示須要進行同步。
    • 若是expectations中尚未該DaemonSet的信息,則表示也知足了,將觸發DaemonSet同步。
    • 此處updateDaemonSetStatus會更新該Daemonset.Status的以下字段,注意不會更新ObservedGeneration(也沒發生變化)。
      • DesiredNumberScheduled:用戶指望調度的DaemonSet Pods數量,對應前面提到的wantToRun爲true的pods數量。
      • CurrentNumberScheduled:用戶指望調度的,而且當前已經運行在Node上的Pods數量。
      • NumberMisscheduled:用戶不指望調度的(wantToRun爲false),而且已經運行在對應Node上Pods數量,即已經錯誤調度的Pods數量。
      • NumberReady:CurrentNumberScheduled中,Pod Type Ready Condition爲true的Pods數量。
      • UpdatedNumberScheduled:CurrentNumberScheduled中,Pod Label controller-revision-hash對應的hash值與Current ControllerRevision的該hash值相等的Pods數量,即Pod Template已經更新的Pods數量。
      • NumberAvailable:CurrentNumberScheduled中,Pod Type Ready Condition爲true,而且Available(Ready時間超過minReadySeconds)的Pods數量。
      • NumberUnavailable:desiredNumberScheduled - numberAvailable。
  • 調用manage進行DaemonSet Pod的管理:計算待刪除和建立的Pod列表,而後調用syncNodes分批次(1,2,4,8,..)的完成Pod的建立和刪除。若是syncNodes以前發現某些Node上對應DaemonSet Pod是Failed,那麼syncNodes後返回error。syncNode會將expectations中的add/del都歸零甚至負數,只有這樣,纔會在syncDaemonSet中調用manage進行Pod管理。
  • 若是manage返回error,則syncDaemonSet流程結束。不然會繼續下面的流程。
  • 檢查當前expectations是否已經知足,若是知足,則根據UpdateStrategy觸發DaemonSet更新:
    • 若是UpdateStrategy是OnDelete,則等待用戶delete Pod,觸發對應的DaemonSet的enqueue,在syncNodes時更新最新的Pod Template建立新Pod。
    • 若是UpdateStrategy是RollingUpdate,則調用rollingUpdate進行滾動更新,後面會詳細分析。
  • 若是DaemonSet更新成功,則根據須要(Old ControllerRevisions數量是否超過Spec.RevisionHistoryLimit,默認爲10)清理超過RevisionHistoryLimit的最老的ControllerRevisions。
  • updateDaemonSetStatus會更新該Daemonset.Status,跟前面不一樣的是,這裏還須要更新Status.ObservedGeneration。

DaemonSet Pod的調度

在Kubernetes 1.12以前的版本中,默認由DaemonSet Controller完成Daemon Pods的調度工做,即由DaemonSet Controller給待調度Pod的spec.nodeName設置值,而後對應Node的kubelet watch到該事件,再在本節點建立DaemonSet Pod。在Kubernetes 1.12+,默認啓用了ScheduleDaemonSetPods FeatureGate, DaemonSet的調度就交由default scheduler完成。

DamonSet Pods Should Be On Node

在manage daemonset時,經過調用podsShouldBeOnNode來計算出但願在該Node上啓動的DaemonSet Pods(nodesNeedingDaemonPods)、但願在該Node上刪除的DaemonSet Pods(podsToDelete),以及在該Node上已經Failed DamonSetPods數量,而後在syncNodes中根據這三個信息,去建立、刪除對應的Pods。

func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, hash string) error {
	// Find out the pods which are created for the nodes by DaemonSet.
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
	...
	for _, node := range nodeList {
		nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, failedPodsObservedOnNode, err := dsc.podsShouldBeOnNode(
			node, nodeToDaemonPods, ds)

		if err != nil {
			continue
		}

		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
		podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
		failedPodsObserved += failedPodsObservedOnNode
	}

	// Label new pods using the hash label value of the current history when creating them
	if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
		return err
	}

	...

	return nil
}

podsShouldBeOnNode是如何計算出nodesNeedingDaemonPods、podsToDelete、failedPodsObserved的呢?—— 經過調用nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet)計算出以下三個狀態值:

  • wantToRun: 當DaemonSet Controller去Simulate調度時,Predicate(主要是GeneralPredicates和PodToleratesNodeTaints)時忽略以下PredicateFailureError(都是些資源類的Error)時爲True,有其餘PredicateFailureError爲False。若是DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定wantToRun的值。 - ErrDiskConflict; - ErrVolumeZoneConflict; - ErrMaxVolumeCountExceeded; - ErrNodeUnderMemoryPressure; - ErrNodeUnderDiskPressure; - InsufficientResourceError;
  • shouldSchedule:
    - 若是DaemonSet的Spec中指定了NodeName,則根據其是否與node.Name匹配成功來決定shouldSchedule的值。 - 若是Predicate時出現全部類型的PredicateFailureError之一,則shouldSchedule都爲false。 - 若是出現InsufficientResourceError,則shouldSchedule也爲false。
  • failedPodsBackoff *flowcontrol.Backoff: 按照1s,2s,4s,8s,...的backoff週期去處理(刪除重建)Failed DaemonSet Pods,實現流控的效果。DaemonSet Controller Run時會啓動一個協程,每隔2*MaxDuration(2*15Min)會強制進行一次failedPods GC清理。
  • shouldContinueRunning,以下狀況之一出現,則該值爲false,其餘狀況爲true。
    • ErrNodeSelectorNotMatch,
    • ErrPodNotMatchHostName,
    • ErrNodeLabelPresenceViolated,
    • ErrPodNotFitsHostPorts:
    • ErrTaintsTolerationsNotMatch,若是是No Execute類型的Taint/Toleration匹配,則爲true,不然爲false,也就是說會忽略NoExecute類型的Taint/Toleration匹配。
    • ErrPodAffinityNotMatch,
    • ErrServiceAffinityViolated,
    • unknown predicate failure reason

而後根據這三個狀態值,獲得nodesNeedingDaemonPods []string、podsToDelete []string、failedPodsObserved int

// podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
func (dsc *DaemonSetsController) podsShouldBeOnNode(
	node *v1.Node,
	nodeToDaemonPods map[string][]*v1.Pod,
	ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, failedPodsObserved int, err error) {

	wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
	if err != nil {
		return
	}

	daemonPods, exists := nodeToDaemonPods[node.Name]
	dsKey, _ := cache.MetaNamespaceKeyFunc(ds)

	dsc.removeSuspendedDaemonPods(node.Name, dsKey)

	switch {
	case wantToRun && !shouldSchedule:
		// If daemon pod is supposed to run, but can not be scheduled, add to suspended list.
		dsc.addSuspendedDaemonPods(node.Name, dsKey)
	case shouldSchedule && !exists:
		// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
	case shouldContinueRunning:
		// If a daemon pod failed, delete it
		// If there's non-daemon pods left on this node, we will create it in the next sync loop
		var daemonPodsRunning []*v1.Pod
		for _, pod := range daemonPods {
			if pod.DeletionTimestamp != nil {
				continue
			}
			if pod.Status.Phase == v1.PodFailed {
				failedPodsObserved++

				// This is a critical place where DS is often fighting with kubelet that rejects pods.
				// We need to avoid hot looping and backoff.
				backoffKey := failedPodsBackoffKey(ds, node.Name)

				now := dsc.failedPodsBackoff.Clock.Now()
				inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
				if inBackoff {
					delay := dsc.failedPodsBackoff.Get(backoffKey)
					klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
						pod.Namespace, pod.Name, node.Name, delay)
					dsc.enqueueDaemonSetAfter(ds, delay)
					continue
				}

				dsc.failedPodsBackoff.Next(backoffKey, now)

				msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
				klog.V(2).Infof(msg)
				// Emit an event so that it's discoverable to users.
				dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
				podsToDelete = append(podsToDelete, pod.Name)
			} else {
				daemonPodsRunning = append(daemonPodsRunning, pod)
			}
		}
		// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
		// Sort the daemon pods by creation time, so the oldest is preserved.
		if len(daemonPodsRunning) > 1 {
			sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
			for i := 1; i < len(daemonPodsRunning); i++ {
				podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
			}
		}
	case !shouldContinueRunning && exists:
		// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
		for _, pod := range daemonPods {
			podsToDelete = append(podsToDelete, pod.Name)
		}
	}

	return nodesNeedingDaemonPods, podsToDelete, failedPodsObserved, nil
}

// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a summary. 
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
	newPod := NewPod(ds, node.Name)

	// Because these bools require an && of all their required conditions, we start
	// with all bools set to true and set a bool to false if a condition is not met.
	// A bool should probably not be set to true after this line.
	wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
	// If the daemon set specifies a node name, check that it matches with node.Name.
	if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
		return false, false, false, nil
	}

	reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
	if err != nil {
		klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err)
		return false, false, false, err
	}

	// TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason,
	//              e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning"
	//              into one result, e.g. selectedNode.
	var insufficientResourceErr error
	for _, r := range reasons {
		klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason())
		switch reason := r.(type) {
		case *predicates.InsufficientResourceError:
			insufficientResourceErr = reason
		case *predicates.PredicateFailureError:
			var emitEvent bool
			// we try to partition predicates into two partitions here: intentional on the part of the operator and not.
			switch reason {
			// intentional
			case
				predicates.ErrNodeSelectorNotMatch,
				predicates.ErrPodNotMatchHostName,
				predicates.ErrNodeLabelPresenceViolated,
				// this one is probably intentional since it's a workaround for not having
				// pod hard anti affinity.
				predicates.ErrPodNotFitsHostPorts:
				return false, false, false, nil
			case predicates.ErrTaintsTolerationsNotMatch:
				// DaemonSet is expected to respect taints and tolerations
				fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
				if err != nil {
					return false, false, false, err
				}
				if !fitsNoExecute {
					return false, false, false, nil
				}
				wantToRun, shouldSchedule = false, false
			// unintentional
			case
				predicates.ErrDiskConflict,
				predicates.ErrVolumeZoneConflict,
				predicates.ErrMaxVolumeCountExceeded,
				predicates.ErrNodeUnderMemoryPressure,
				predicates.ErrNodeUnderDiskPressure:
				// wantToRun and shouldContinueRunning are likely true here. They are
				// absolutely true at the time of writing the comment. See first comment
				// of this method.
				shouldSchedule = false
				emitEvent = true
			// unexpected
			case
				predicates.ErrPodAffinityNotMatch,
				predicates.ErrServiceAffinityViolated:
				klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason())
				return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
			default:
				klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason())
				wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
				emitEvent = true
			}
			if emitEvent {
				dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason())
			}
		}
	}
	// only emit this event if insufficient resource is the only thing
	// preventing the daemon pod from scheduling
	if shouldSchedule && insufficientResourceErr != nil {
		dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
		shouldSchedule = false
	}
	return
}
  • 若是shouldSchedule && !exists,則會把該Pod加入到nodesNeedingDaemonPods中。

  • 若是shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase == v1.PodFailed則檢查是否在流控週期(15min, hardcode)中,若是已經超過流控週期,會把該Pod加入到podsToDelete中,不然將再次入隊列。

  • 若是shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase != v1.PodFailed則會把該Pod加入到daemonPodsRunning中記錄着該DamonSet在該Node上正在運行的非Failed的Pods,若是daemonPodsRunning不止一個,則須要按照建立時間排序,將不是最先建立的其餘全部DaemonSet Pods都加入到podsToDelete中。

nodeShouldRunDaemonPod中調用simulate仿真調度返回Pod和Node的匹配結果,根據algorithm.PredicateFailureReason結果知道wantToRun,shouldSchedule,shouldContinueRunning的值。下面咱們看看simulate中的調度邏輯。

// Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates
// and PodToleratesNodeTaints predicate
func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
	var predicateFails []algorithm.PredicateFailureReason

	// If ScheduleDaemonSetPods is enabled, only check nodeSelector, nodeAffinity and toleration/taint match.
	if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
		fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo)
		if err != nil {
			return false, predicateFails, err
		}
		if !fit {
			predicateFails = append(predicateFails, reasons...)
		}

		return len(predicateFails) == 0, predicateFails, nil
	}

	critical := kubelettypes.IsCriticalPod(pod)

	fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo)
	if err != nil {
		return false, predicateFails, err
	}
	if !fit {
		predicateFails = append(predicateFails, reasons...)
	}
	if critical {
		// If the pod is marked as critical and support for critical pod annotations is enabled,
		// check predicates for critical pods only.
		fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo)
	} else {
		fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo)
	}
	if err != nil {
		return false, predicateFails, err
	}
	if !fit {
		predicateFails = append(predicateFails, reasons...)
	}

	return len(predicateFails) == 0, predicateFails, nil
}
  • 若是是啓用了ScheduleDaemonSetPods FeatureGate,則Predicate邏輯以下。這裏並無真正的完成調度,只是作了三個predicate檢查,最終的調度仍是會交給default scheduler。default scheduler又是如何控制DaemonSet Pod和Node綁定關係的呢,先買個關子。
    • PodFitsHost: 檢查Pod.spec.nodeName非空時是否與Node Name匹配;
    • PodMatchNodeSelector: 檢查Pod的NodeSelector和NodeAffinity是否與Node匹配;
    • PodToleratesNodeTaints: 檢查Pod的NoExecute和NoSchedule類型的Toleration是否與Node Taint匹配。
  • 若是是沒啓用ScheduleDaemonSetPods FeatureGate,則Predicate邏輯以下。這裏並無真正的完成調度,只是作了幾個predicate檢查,最終的調度仍是會交給DaemonSet Controller。
    • PodToleratesNodeTaints:檢查Pod的NoExecute和NoSchedule類型的Toleration是否與Node Taint匹配。
    • 若是是Critical DaemonSet Pod,則再進行EssentialPredicates,包括:
      • PodFitsHost:檢查Pod.spec.nodeName非空時是否與Node Name匹配;
      • PodFitsHostPorts:檢查DaemonSet Pods請求的協議&Host端口是否已經被佔用;
      • PodMatchNodeSelector: 檢查Pod的NodeSelector和NodeAffinity是否與Node匹配;
    • 若是不是Critical DaemonSet Pod,則再進行GeneralPredicates,
      • PodFitsResources:檢查Node剩餘可分配資源是否能知足Pod請求;
      • PodFitsHost: 檢查Pod.spec.nodeName非空時是否與Node Name匹配;
      • PodFitsHostPorts: 檢查DaemonSet Pods請求的協議&Host端口是否已經被佔用;
      • PodMatchNodeSelector: 檢查Pod的NodeSelector和NodeAffinity是否與Node匹配;

Sync Nodes

前面經過podsShouldBeOnNode獲得了nodesNeedingDaemonPods []string, podsToDelete []string, failedPodsObserved int,接下來就該去建立和刪除對應的Pods了。

// syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with erros if any
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
	// We need to set expectations before creating/deleting pods to avoid race conditions.
	dsKey, err := controller.KeyFunc(ds)
	if err != nil {
		return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
	}

	createDiff := len(nodesNeedingDaemonPods)
	deleteDiff := len(podsToDelete)

	if createDiff > dsc.burstReplicas {
		createDiff = dsc.burstReplicas
	}
	if deleteDiff > dsc.burstReplicas {
		deleteDiff = dsc.burstReplicas
	}

	dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

	// error channel to communicate back failures.  make the buffer big enough to avoid any blocking
	errCh := make(chan error, createDiff+deleteDiff)

	klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
	createWait := sync.WaitGroup{}
	// If the returned error is not nil we have a parse error.
	// The controller handles this via the hash.
	generation, err := util.GetTemplateGeneration(ds)
	if err != nil {
		generation = nil
	}
	template := util.CreatePodTemplate(ds.Namespace, ds.Spec.Template, generation, hash)
	// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
	// and double with each successful iteration in a kind of "slow start".
	// This handles attempts to start large numbers of pods that would
	// likely all fail with the same error. For example a project with a
	// low quota that attempts to create a large number of pods will be
	// prevented from spamming the API service with the pod create requests
	// after one of its pods fails.  Conveniently, this also prevents the
	// event spam that those failures would generate.
	batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
	for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
		errorCount := len(errCh)
		createWait.Add(batchSize)
		for i := pos; i < pos+batchSize; i++ {
			go func(ix int) {
				defer createWait.Done()
				var err error

				podTemplate := &template

				if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
					podTemplate = template.DeepCopy()
					// The pod's NodeAffinity will be updated to make sure the Pod is bound
					// to the target node by default scheduler. It is safe to do so because there
					// should be no conflicting node affinity with the target node.
					podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
						podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

					err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
						ds, metav1.NewControllerRef(ds, controllerKind))
				} else {
					err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
						ds, metav1.NewControllerRef(ds, controllerKind))
				}

				if err != nil && errors.IsTimeout(err) {
					// Pod is created but its initialization has timed out.
					// If the initialization is successful eventually, the
					// controller will observe the creation via the informer.
					// If the initialization fails, or if the pod keeps
					// uninitialized for a long time, the informer will not
					// receive any update, and the controller will create a new
					// pod when the expectation expires.
					return
				}
				if err != nil {
					klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
					dsc.expectations.CreationObserved(dsKey)
					errCh <- err
					utilruntime.HandleError(err)
				}
			}(i)
		}
		createWait.Wait()
		// any skipped pods that we never attempted to start shouldn't be expected.
		skippedPods := createDiff - batchSize
		if errorCount < len(errCh) && skippedPods > 0 {
			klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
			for i := 0; i < skippedPods; i++ {
				dsc.expectations.CreationObserved(dsKey)
			}
			// The skipped pods will be retried later. The next controller resync will
			// retry the slow start process.
			break
		}
	}

	klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
	deleteWait := sync.WaitGroup{}
	deleteWait.Add(deleteDiff)
	for i := 0; i < deleteDiff; i++ {
		go func(ix int) {
			defer deleteWait.Done()
			if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
				klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
				dsc.expectations.DeletionObserved(dsKey)
				errCh <- err
				utilruntime.HandleError(err)
			}
		}(i)
	}
	deleteWait.Wait()

	// collect errors if any for proper reporting/retry logic in the controller
	errors := []error{}
	close(errCh)
	for err := range errCh {
		errors = append(errors, err)
	}
	return utilerrors.NewAggregate(errors)
}
  • 每次刪除和建立的最大Pods個數分別爲250個。
  • 根據DaemonSet Object構建Pod Template,而且增長/更新如下Tolerations:
    • node.kubernetes.io/not-ready | exist | NoExecute
    • node.kubernetes.io/unreachable | exist | NoExecute
    • node.kubernetes.io/disk-pressure | exist | NoSchedule
    • node.kubernetes.io/memory-pressure | exist | NoSchedule
    • node.kubernetes.io/unschedulable | exist | NoSchedule
    • node.kubernetes.io/network-unavailable | exist | NoSchedule
    • 若是是Critical Pod,還會增長如下Tolerations:
      • node.kubernetes.io/out-of-disk | exist | NoExecute
      • node.kubernetes.io/out-of-disk | exist | NoSchedule
  • 給Pod加上Label: controller-revision-hash=$DaemonSetControlelrHash
  • 分批的建立DaemonSet Pods(按照1,2,4,8,...的batch size去Create DaemonSet Pods,防止大批量的一次性建立全部DaemonSet Pods時因一樣的錯誤致使失敗。對於建立失敗的Pods,注意更新expectations中的Adds值,每失敗一個就會將expectations.adds值減1。
    • 若是啓用了ScheduleDaemonSetPods FeatureGate,則往Pod Tempalete中添加/更新metadata.name=$NodeName的NodeAffinity。經過這種方式,來實現經過default scheduler來調度DaemonSet Pods的目的。
  • 一次性的刪除podsToDelete的Pods。

DaemonSet的滾動更新

DaemonSet的滾動更新,跟Deployment的滾動更新略有不一樣,DaemonSet RollingUpdate只有MaxUnavailable這一個配置項,沒有MinAvailable。

// rollingUpdate deletes old daemon set pods making sure that no more than
// ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable
func (dsc *DaemonSetsController) rollingrollingrollingUpdate(ds *apps.DaemonSet, hash string) error {
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
	if err != nil {
		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
	}

	_, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
	maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods)
	if err != nil {
		return fmt.Errorf("Couldn't get unavailable numbers: %v", err)
	}
	oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)

	// for oldPods delete all not running pods
	var oldPodsToDelete []string
	klog.V(4).Infof("Marking all unavailable old pods for deletion")
	for _, pod := range oldUnavailablePods {
		// Skip terminating pods. We won't delete them again
		if pod.DeletionTimestamp != nil {
			continue
		}
		klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
		oldPodsToDelete = append(oldPodsToDelete, pod.Name)
	}

	klog.V(4).Infof("Marking old pods for deletion")
	for _, pod := range oldAvailablePods {
		if numUnavailable >= maxUnavailable {
			klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
			break
		}
		klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
		oldPodsToDelete = append(oldPodsToDelete, pod.Name)
		numUnavailable++
	}
	return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
}
  • 根據最新的Hash值選出全部的OldPods;
  • 計算那些!available及那些指望調度但還沒運行的Pods之和,做爲numUnavailable。
  • 將OldPods分爲oldAvailablePods和oldUnavailablePods,將DeletionTimestamp爲空的oldUnavailablePods加入到待刪除Pods列表(oldPodsToDelete)。
  • 遍歷oldAvailablePods,逐個加入到oldPodsToDelete中,直到numUnavailable達到maxUnavailable爲止,從oldAvailablePods加入到oldPodsToDelete的Pods最大個數爲(maxUnavailable - 1)。
  • 所以,oldPodsToDelete包括全部的DeletionTimestamp爲空的oldUnavailablePods及最多(maxUnavailable - 1)個oldAvailablePods。
  • 最後調用syncNodes開始刪除oldPodsToDelete中的DaemonSet Pods。

Node更新

Node Add事件很簡單,遍歷全部DaemonSets對象,調用nodeShouldRunDaemonPod計算出每一個DaemonSet是否應該在該Node上啓動。若是要啓動,則把DaemonSet加入到Queue,由syncDaemonSet進行處理。

對於Node Update事件,須要判斷Update的字段等,而後根據狀況決定是否要加入到Queue進行syncDaemonSet。

func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
	oldNode := old.(*v1.Node)
	curNode := cur.(*v1.Node)
	if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
		return
	}

	dsList, err := dsc.dsLister.List(labels.Everything())
	if err != nil {
		klog.V(4).Infof("Error listing daemon sets: %v", err)
		return
	}
	// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
	for _, ds := range dsList {
		_, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
		if err != nil {
			continue
		}
		_, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
		if err != nil {
			continue
		}
		if (oldShouldSchedule != currentShouldSchedule) || (oldShouldContinueRunning != currentShouldContinueRunning) {
			dsc.enqueueDaemonSet(ds)
		}
	}
}
  • 若是Node Condition沒有發生變動,則不能忽略該Node變動事件。
  • 除了Node Condition和ResourceVersion以外,若是新舊Node對象不一致,也不能忽略該變動事件。
  • 對於不能忽略的變動,則分別對於oldNode,currentNode調用nodeShouldRunDaemonPod計算ShouldSchedule、ShouldContinueRunning是否一致,只要ShouldSchedule或者ShouldContinueRunning發生變動,則將該DaemonSet Object入隊列進入syncDaemonSet進行處理。

DaemonSet Controller主體邏輯

總結

本文主要對DaemonSet的結構、建立、同步、調度、滾動更新幾個方面進行了源碼分析,在生產環境中使用DaemonSet進行大規模部署使用以前,加深這些瞭解是有幫助的。下一篇博客,我將會從一些實際問題出發,從用戶角度分析DaemonSet的若干行爲。好比,Node Taint變動後DaemonSet的行爲、DaemonSet刪除時異常致使Hang住的緣由及解決辦法、Node NotReady時DamonSet Pods會怎麼樣等思考。

相關文章
相關標籤/搜索