Author: xidianwangtao@gmail.com | Version: Kubernetes 1.13node
摘要:DaemonSet是Kubernetes中用戶最經常使用的對象之一,咱們用它來部署Nodes上守護應用,好比日誌組件、節點監控組件等。從用戶的使用角度來說,DaemonSet看似簡單,但實際上它涉及的點很是多,好比DaemonSet Pod知足什麼條件才能在Node上運行、Node出現MemoryPressure或者其餘異常Condition時是否能運行、調度的邏輯是怎樣的、滾動更新的邏輯是怎樣的等等,本文講從DaemonSet Controller的源碼着手,分析其中關鍵邏輯。app
DaemonSet Controller的核心結構包括:less
burstReplcas int
: 每次sync時,Create和Delete Pods的數量上限,代碼中寫死爲250。queue workqueue.RateLimitingInterface
: 存放待同步DaemonSet Key(namespaces/name)的Delaying Queue。syncHandler func(dsKey string) error
: 負責同步DaemonSet Queue中對象,包括Replicas管理、UpdateStrategy升級、更新DaemonSet Status等工做,是DaemonSet Controller中最核心的邏輯。expectations controller.ControllerExpectationsInterface
: 維護每一個DaemonSet對象每次Sync指望Create/Delete Pods數的TTLCache。suspendedDaemonPods map[string]sets.String
: key爲NodeName,value是DaemonSet集合,這些DaemonSet包含該Node上'wantToRun & !shouldSchedule'的Pod。
failedPodsBackoff *flowcontrol.Backoff
: DaemonSet Controller Run時會啓動一個協程,每隔2*MaxDuration(2*15Min)
會強制進行一次failedPods GC清理。每次syncDaemonSet處理該刪除的Pods時,會按照1s,2s,4s,8s,.....15min的Backoff機制作必定的delay處理,實現流控的效果。防止kubelet拒絕某些DaemonSet Pods後,立刻又被拒絕,如此就會出現不少無效的循環,所以加入了Backoff機制。NewDaemonSetsController負責建立Controller,其中很重要的工做就是註冊如下Informer的EventHandler:oop
DamonSet Controller Run啓動時,主要幹兩件事:源碼分析
啓動2個workers協程,每一個worker負責從queue中取DaemonSet Key進行sync。ui
啓動1個failedPodsBackoff GC協程,每隔1Min清理一次集羣中全部DaemonSet/Node對應的Failed Pods。this
只有deletePod時,纔會requeueSuspendedDaemonPods。-- 爲何?spa
worker會從queue中取待同步的DamonSet Key,調用syncDaemonSet完成自動管理,syncDaemonSet是DaemonSet管理的核心入口。日誌
pkg/controller/daemon/daemon_controller.go:1208 func (dsc *DaemonSetsController) syncDaemonSet(key string) error { ... ds, err := dsc.dsLister.DaemonSets(namespace).Get(name) if errors.IsNotFound(err) { klog.V(3).Infof("daemon set has been deleted %v", key) dsc.expectations.DeleteExpectations(key) return nil } if err != nil { return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err) } everything := metav1.LabelSelector{} if reflect.DeepEqual(ds.Spec.Selector, &everything) { dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.") return nil } // Don't process a daemon set until all its creations and deletions have been processed. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, // then we do not want to call manage on foo until the daemon pods have been created. ... if ds.DeletionTimestamp != nil { return nil } // Construct histories of the DaemonSet, and get the hash of current history cur, old, err := dsc.constructHistory(ds) if err != nil { return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err) } hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey] if !dsc.expectations.SatisfiedExpectations(dsKey) { // Only update status. Don't raise observedGeneration since controller didn't process object of that generation. return dsc.updateDaemonSetStatus(ds, hash, false) } err = dsc.manage(ds, hash) if err != nil { return err } // Process rolling updates if we're ready. if dsc.expectations.SatisfiedExpectations(dsKey) { switch ds.Spec.UpdateStrategy.Type { case apps.OnDeleteDaemonSetStrategyType: case apps.RollingUpdateDaemonSetStrategyType: err = dsc.rollingUpdate(ds, hash) } if err != nil { return err } } err = dsc.cleanupHistory(ds, old) if err != nil { return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) } return dsc.updateDaemonSetStatus(ds, hash, true) }
核心的流程以下:code
controller-revision-hash
對應的hash值與Current ControllerRevision的該hash值相等的Pods數量,即Pod Template已經更新的Pods數量。在Kubernetes 1.12以前的版本中,默認由DaemonSet Controller完成Daemon Pods的調度工做,即由DaemonSet Controller給待調度Pod的spec.nodeName
設置值,而後對應Node的kubelet watch到該事件,再在本節點建立DaemonSet Pod。在Kubernetes 1.12+,默認啓用了ScheduleDaemonSetPods
FeatureGate, DaemonSet的調度就交由default scheduler完成。
在manage daemonset時,經過調用podsShouldBeOnNode
來計算出但願在該Node上啓動的DaemonSet Pods(nodesNeedingDaemonPods)、但願在該Node上刪除的DaemonSet Pods(podsToDelete),以及在該Node上已經Failed DamonSetPods數量,而後在syncNodes中根據這三個信息,去建立、刪除對應的Pods。
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, hash string) error { // Find out the pods which are created for the nodes by DaemonSet. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) ... for _, node := range nodeList { nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, failedPodsObservedOnNode, err := dsc.podsShouldBeOnNode( node, nodeToDaemonPods, ds) if err != nil { continue } nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...) podsToDelete = append(podsToDelete, podsToDeleteOnNode...) failedPodsObserved += failedPodsObservedOnNode } // Label new pods using the hash label value of the current history when creating them if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { return err } ... return nil }
podsShouldBeOnNode
是如何計算出nodesNeedingDaemonPods、podsToDelete、failedPodsObserved的呢?—— 經過調用nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet)
計算出以下三個狀態值:
failedPodsBackoff *flowcontrol.Backoff
: 按照1s,2s,4s,8s,...的backoff週期去處理(刪除重建)Failed DaemonSet Pods,實現流控的效果。DaemonSet Controller Run時會啓動一個協程,每隔2*MaxDuration(2*15Min)
會強制進行一次failedPods GC清理。而後根據這三個狀態值,獲得nodesNeedingDaemonPods []string、podsToDelete []string、failedPodsObserved int
。
// podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node: func (dsc *DaemonSetsController) podsShouldBeOnNode( node *v1.Node, nodeToDaemonPods map[string][]*v1.Pod, ds *apps.DaemonSet, ) (nodesNeedingDaemonPods, podsToDelete []string, failedPodsObserved int, err error) { wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { return } daemonPods, exists := nodeToDaemonPods[node.Name] dsKey, _ := cache.MetaNamespaceKeyFunc(ds) dsc.removeSuspendedDaemonPods(node.Name, dsKey) switch { case wantToRun && !shouldSchedule: // If daemon pod is supposed to run, but can not be scheduled, add to suspended list. dsc.addSuspendedDaemonPods(node.Name, dsKey) case shouldSchedule && !exists: // If daemon pod is supposed to be running on node, but isn't, create daemon pod. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name) case shouldContinueRunning: // If a daemon pod failed, delete it // If there's non-daemon pods left on this node, we will create it in the next sync loop var daemonPodsRunning []*v1.Pod for _, pod := range daemonPods { if pod.DeletionTimestamp != nil { continue } if pod.Status.Phase == v1.PodFailed { failedPodsObserved++ // This is a critical place where DS is often fighting with kubelet that rejects pods. // We need to avoid hot looping and backoff. backoffKey := failedPodsBackoffKey(ds, node.Name) now := dsc.failedPodsBackoff.Clock.Now() inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now) if inBackoff { delay := dsc.failedPodsBackoff.Get(backoffKey) klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining", pod.Namespace, pod.Name, node.Name, delay) dsc.enqueueDaemonSetAfter(ds, delay) continue } dsc.failedPodsBackoff.Next(backoffKey, now) msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name) klog.V(2).Infof(msg) // Emit an event so that it's discoverable to users. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg) podsToDelete = append(podsToDelete, pod.Name) } else { daemonPodsRunning = append(daemonPodsRunning, pod) } } // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. // Sort the daemon pods by creation time, so the oldest is preserved. if len(daemonPodsRunning) > 1 { sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) for i := 1; i < len(daemonPodsRunning); i++ { podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name) } } case !shouldContinueRunning && exists: // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. for _, pod := range daemonPods { podsToDelete = append(podsToDelete, pod.Name) } } return nodesNeedingDaemonPods, podsToDelete, failedPodsObserved, nil } // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a summary. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) { newPod := NewPod(ds, node.Name) // Because these bools require an && of all their required conditions, we start // with all bools set to true and set a bool to false if a condition is not met. // A bool should probably not be set to true after this line. wantToRun, shouldSchedule, shouldContinueRunning = true, true, true // If the daemon set specifies a node name, check that it matches with node.Name. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { return false, false, false, nil } reasons, nodeInfo, err := dsc.simulate(newPod, node, ds) if err != nil { klog.Warningf("DaemonSet Predicates failed on node %s for ds '%s/%s' due to unexpected error: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, err) return false, false, false, err } // TODO(k82cn): When 'ScheduleDaemonSetPods' upgrade to beta or GA, remove unnecessary check on failure reason, // e.g. InsufficientResourceError; and simplify "wantToRun, shouldSchedule, shouldContinueRunning" // into one result, e.g. selectedNode. var insufficientResourceErr error for _, r := range reasons { klog.V(4).Infof("DaemonSet Predicates failed on node %s for ds '%s/%s' for reason: %v", node.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Name, r.GetReason()) switch reason := r.(type) { case *predicates.InsufficientResourceError: insufficientResourceErr = reason case *predicates.PredicateFailureError: var emitEvent bool // we try to partition predicates into two partitions here: intentional on the part of the operator and not. switch reason { // intentional case predicates.ErrNodeSelectorNotMatch, predicates.ErrPodNotMatchHostName, predicates.ErrNodeLabelPresenceViolated, // this one is probably intentional since it's a workaround for not having // pod hard anti affinity. predicates.ErrPodNotFitsHostPorts: return false, false, false, nil case predicates.ErrTaintsTolerationsNotMatch: // DaemonSet is expected to respect taints and tolerations fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo) if err != nil { return false, false, false, err } if !fitsNoExecute { return false, false, false, nil } wantToRun, shouldSchedule = false, false // unintentional case predicates.ErrDiskConflict, predicates.ErrVolumeZoneConflict, predicates.ErrMaxVolumeCountExceeded, predicates.ErrNodeUnderMemoryPressure, predicates.ErrNodeUnderDiskPressure: // wantToRun and shouldContinueRunning are likely true here. They are // absolutely true at the time of writing the comment. See first comment // of this method. shouldSchedule = false emitEvent = true // unexpected case predicates.ErrPodAffinityNotMatch, predicates.ErrServiceAffinityViolated: klog.Warningf("unexpected predicate failure reason: %s", reason.GetReason()) return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason()) default: klog.V(4).Infof("unknown predicate failure reason: %s", reason.GetReason()) wantToRun, shouldSchedule, shouldContinueRunning = false, false, false emitEvent = true } if emitEvent { dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, reason.GetReason()) } } } // only emit this event if insufficient resource is the only thing // preventing the daemon pod from scheduling if shouldSchedule && insufficientResourceErr != nil { dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error()) shouldSchedule = false } return }
若是shouldSchedule && !exists
,則會把該Pod加入到nodesNeedingDaemonPods
中。
若是shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase == v1.PodFailed
則檢查是否在流控週期(15min, hardcode)中,若是已經超過流控週期,會把該Pod加入到podsToDelete
中,不然將再次入隊列。
若是shouldContinueRunning && pod.DeletionTimestamp == nil && pod.Status.Phase != v1.PodFailed
則會把該Pod加入到daemonPodsRunning
中記錄着該DamonSet在該Node上正在運行的非Failed的Pods,若是daemonPodsRunning
不止一個,則須要按照建立時間排序,將不是最先建立的其餘全部DaemonSet Pods都加入到podsToDelete
中。
在nodeShouldRunDaemonPod
中調用simulate
仿真調度返回Pod和Node的匹配結果,根據algorithm.PredicateFailureReason
結果知道wantToRun,shouldSchedule,shouldContinueRunning的值。下面咱們看看simulate中的調度邏輯。
// Predicates checks if a DaemonSet's pod can be scheduled on a node using GeneralPredicates // and PodToleratesNodeTaints predicate func Predicates(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) { var predicateFails []algorithm.PredicateFailureReason // If ScheduleDaemonSetPods is enabled, only check nodeSelector, nodeAffinity and toleration/taint match. if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { fit, reasons, err := checkNodeFitness(pod, nil, nodeInfo) if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } return len(predicateFails) == 0, predicateFails, nil } critical := kubelettypes.IsCriticalPod(pod) fit, reasons, err := predicates.PodToleratesNodeTaints(pod, nil, nodeInfo) if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } if critical { // If the pod is marked as critical and support for critical pod annotations is enabled, // check predicates for critical pods only. fit, reasons, err = predicates.EssentialPredicates(pod, nil, nodeInfo) } else { fit, reasons, err = predicates.GeneralPredicates(pod, nil, nodeInfo) } if err != nil { return false, predicateFails, err } if !fit { predicateFails = append(predicateFails, reasons...) } return len(predicateFails) == 0, predicateFails, nil }
ScheduleDaemonSetPods
FeatureGate,則Predicate邏輯以下。這裏並無真正的完成調度,只是作了三個predicate檢查,最終的調度仍是會交給default scheduler。default scheduler又是如何控制DaemonSet Pod和Node綁定關係的呢,先買個關子。
ScheduleDaemonSetPods
FeatureGate,則Predicate邏輯以下。這裏並無真正的完成調度,只是作了幾個predicate檢查,最終的調度仍是會交給DaemonSet Controller。
前面經過podsShouldBeOnNode獲得了nodesNeedingDaemonPods []string, podsToDelete []string, failedPodsObserved int
,接下來就該去建立和刪除對應的Pods了。
// syncNodes deletes given pods and creates new daemon set pods on the given nodes // returns slice with erros if any func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { // We need to set expectations before creating/deleting pods to avoid race conditions. dsKey, err := controller.KeyFunc(ds) if err != nil { return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } createDiff := len(nodesNeedingDaemonPods) deleteDiff := len(podsToDelete) if createDiff > dsc.burstReplicas { createDiff = dsc.burstReplicas } if deleteDiff > dsc.burstReplicas { deleteDiff = dsc.burstReplicas } dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff) // error channel to communicate back failures. make the buffer big enough to avoid any blocking errCh := make(chan error, createDiff+deleteDiff) klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) createWait := sync.WaitGroup{} // If the returned error is not nil we have a parse error. // The controller handles this via the hash. generation, err := util.GetTemplateGeneration(ds) if err != nil { generation = nil } template := util.CreatePodTemplate(ds.Namespace, ds.Spec.Template, generation, hash) // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would // likely all fail with the same error. For example a project with a // low quota that attempts to create a large number of pods will be // prevented from spamming the API service with the pod create requests // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize) for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize { errorCount := len(errCh) createWait.Add(batchSize) for i := pos; i < pos+batchSize; i++ { go func(ix int) { defer createWait.Done() var err error podTemplate := &template if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) { podTemplate = template.DeepCopy() // The pod's NodeAffinity will be updated to make sure the Pod is bound // to the target node by default scheduler. It is safe to do so because there // should be no conflicting node affinity with the target node. podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind)) } else { err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind)) } if err != nil && errors.IsTimeout(err) { // Pod is created but its initialization has timed out. // If the initialization is successful eventually, the // controller will observe the creation via the informer. // If the initialization fails, or if the pod keeps // uninitialized for a long time, the informer will not // receive any update, and the controller will create a new // pod when the expectation expires. return } if err != nil { klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) errCh <- err utilruntime.HandleError(err) } }(i) } createWait.Wait() // any skipped pods that we never attempted to start shouldn't be expected. skippedPods := createDiff - batchSize if errorCount < len(errCh) && skippedPods > 0 { klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name) for i := 0; i < skippedPods; i++ { dsc.expectations.CreationObserved(dsKey) } // The skipped pods will be retried later. The next controller resync will // retry the slow start process. break } } klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff) deleteWait := sync.WaitGroup{} deleteWait.Add(deleteDiff) for i := 0; i < deleteDiff; i++ { go func(ix int) { defer deleteWait.Done() if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.DeletionObserved(dsKey) errCh <- err utilruntime.HandleError(err) } }(i) } deleteWait.Wait() // collect errors if any for proper reporting/retry logic in the controller errors := []error{} close(errCh) for err := range errCh { errors = append(errors, err) } return utilerrors.NewAggregate(errors) }
node.kubernetes.io/not-ready
| exist | NoExecutenode.kubernetes.io/unreachable
| exist | NoExecutenode.kubernetes.io/disk-pressure
| exist | NoSchedulenode.kubernetes.io/memory-pressure
| exist | NoSchedulenode.kubernetes.io/unschedulable
| exist | NoSchedulenode.kubernetes.io/network-unavailable
| exist | NoSchedulenode.kubernetes.io/out-of-disk
| exist | NoExecutenode.kubernetes.io/out-of-disk
| exist | NoSchedulemetadata.name=$NodeName
的NodeAffinity。經過這種方式,來實現經過default scheduler來調度DaemonSet Pods的目的。DaemonSet的滾動更新,跟Deployment的滾動更新略有不一樣,DaemonSet RollingUpdate只有MaxUnavailable這一個配置項,沒有MinAvailable。
// rollingUpdate deletes old daemon set pods making sure that no more than // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable func (dsc *DaemonSetsController) rollingrollingrollingUpdate(ds *apps.DaemonSet, hash string) error { nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods) if err != nil { return fmt.Errorf("Couldn't get unavailable numbers: %v", err) } oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods) // for oldPods delete all not running pods var oldPodsToDelete []string klog.V(4).Infof("Marking all unavailable old pods for deletion") for _, pod := range oldUnavailablePods { // Skip terminating pods. We won't delete them again if pod.DeletionTimestamp != nil { continue } klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) oldPodsToDelete = append(oldPodsToDelete, pod.Name) } klog.V(4).Infof("Marking old pods for deletion") for _, pod := range oldAvailablePods { if numUnavailable >= maxUnavailable { klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable) break } klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) oldPodsToDelete = append(oldPodsToDelete, pod.Name) numUnavailable++ } return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) }
!available
及那些指望調度但還沒運行的Pods之和,做爲numUnavailable。Node Add事件很簡單,遍歷全部DaemonSets對象,調用nodeShouldRunDaemonPod計算出每一個DaemonSet是否應該在該Node上啓動。若是要啓動,則把DaemonSet加入到Queue,由syncDaemonSet進行處理。
對於Node Update事件,須要判斷Update的字段等,而後根據狀況決定是否要加入到Queue進行syncDaemonSet。
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { oldNode := old.(*v1.Node) curNode := cur.(*v1.Node) if shouldIgnoreNodeUpdate(*oldNode, *curNode) { return } dsList, err := dsc.dsLister.List(labels.Everything()) if err != nil { klog.V(4).Infof("Error listing daemon sets: %v", err) return } // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). for _, ds := range dsList { _, oldShouldSchedule, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds) if err != nil { continue } _, currentShouldSchedule, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds) if err != nil { continue } if (oldShouldSchedule != currentShouldSchedule) || (oldShouldContinueRunning != currentShouldContinueRunning) { dsc.enqueueDaemonSet(ds) } } }
本文主要對DaemonSet的結構、建立、同步、調度、滾動更新幾個方面進行了源碼分析,在生產環境中使用DaemonSet進行大規模部署使用以前,加深這些瞭解是有幫助的。下一篇博客,我將會從一些實際問題出發,從用戶角度分析DaemonSet的若干行爲。好比,Node Taint變動後DaemonSet的行爲、DaemonSet刪除時異常致使Hang住的緣由及解決辦法、Node NotReady時DamonSet Pods會怎麼樣等思考。