Author: xidianwangtao@gmail.comnode
閱讀本博文前,建議先閱讀解析Kubernetes 1.8中的基於Pod優先級的搶佔式調度。ios
在Kubernetes 1.8中,對ScheduleAlgorithm Interface的定義發生了改變,多了一個Preempt(...)
。所以,我在博文Kubernetes Scheduler原理解析(當時是基於kubernetes 1.5)中對scheduler調度過程開的一句話歸納「將PodSpec.NodeName爲空的Pods逐個地,通過預選(Predicates)和優選(Priorities)兩個步驟,挑選最合適的Node做爲該Pod的Destination。」將再也不準確了。git
如今應該一句話這樣描述纔算準確了:「將PodSpec.NodeName爲空的Pods逐個地,通過預選(Predicates)和優選(Priorities)兩個步驟,挑選最合適的Node做爲該Pod的Destination。若是通過預選和優選仍然沒有找到合適的節點,而且啓動了Pod Priority,那麼該Pod將會進行Preempt搶佔式調度找到最合適的節點及須要Evict的Pods。」github
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods // onto machines. type ScheduleAlgorithm interface { Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error) // Preempt receives scheduling errors for a pod and tries to create room for // the pod by preempting lower priority pods if possible. // It returns the node where preemption happened, a list of preempted pods, and error if any. Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, err error) // Predicates() returns a pointer to a map of predicate functions. This is // exposed for testing. Predicates() map[string]FitPredicate // Prioritizers returns a slice of priority config. This is exposed for // testing. Prioritizers() []PriorityConfig }
個人博文Kubernetes Scheduler源碼分析(當時是基於kubernetes 1.5)對schedule的全過程作過全面的代碼解讀,當時的描述是這樣子的:Scheduler.scheduleOne開始真正的調度邏輯,每次負責一個Pod的調度,邏輯以下:api
在1.8中,但預選和優選調度完整沒有找到合適node時(其實必定會是預選沒有找到nodes,優選只是挑更好的),還會調用sched.preempt進行搶佔式調度。併發
plugin/pkg/scheduler/scheduler.go:293 func (sched *Scheduler) scheduleOne() { pod := sched.config.NextPod() if pod.DeletionTimestamp != nil { sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return } glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) // Synchronously attempt to find a fit for the pod. start := time.Now() suggestedHost, err := sched.schedule(pod) metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { // schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. if fitError, ok := err.(*core.FitError); ok { sched.preempt(pod, fitError) } return } // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPod := *pod // assume modifies `assumedPod` by setting NodeName=suggestedHost err = sched.assume(&assumedPod, suggestedHost) if err != nil { return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { err := sched.bind(&assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: suggestedHost, }, }) metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { glog.Errorf("Internal error binding pod: (%v)", err) } }() }
好的,關於預選和優選,我這裏不作過多解讀,由於整個源碼邏輯和1.5是同樣,不一樣的是1.8增長了更多的Predicate和Priority Policys及其實現。下面只看搶佔式調度Preempt的代碼。app
plugin/pkg/scheduler/scheduler.go:191 func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) { if !utilfeature.DefaultFeatureGate.Enabled(features.PodPriority) { glog.V(3).Infof("Pod priority feature is not enabled. No preemption is performed.") return "", nil } preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor) if err != nil { glog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err } node, victims, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr) if err != nil { glog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name) return "", err } if node == nil { return "", err } glog.Infof("Preempting %d pod(s) on node %v to make room for %v/%v.", len(victims), node.Name, preemptor.Namespace, preemptor.Name) annotations := map[string]string{core.NominatedNodeAnnotationKey: node.Name} err = sched.config.PodPreemptor.UpdatePodAnnotations(preemptor, annotations) if err != nil { glog.Errorf("Error in preemption process. Cannot update pod %v annotations: %v", preemptor.Name, err) return "", err } for _, victim := range victims { if err := sched.config.PodPreemptor.DeletePod(victim); err != nil { glog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, node.Name) } return node.Name, err }
注意:在scheduler調用shed.schedule(pod)進行預選和優選調度失敗時,Pod Bind Node失敗,該Pod會requeue unscheduled Cache podqueue中,若是在這個pod調度過程當中又有新的pod加入到待調度隊列,那麼該pod requeue時它前面就有其餘pod,下一次調度就是先調度在它前面的pod,而這些pod的調度有可能會調度到剛剛經過Preempt釋放資源的Node上,致使把剛纔Preemptor釋放的resource消耗掉。當再次輪到上次的Preemptor調度時,可能又須要觸發一次某個節點的Preempt。less
ScheduleAlgorithm.Preempt是搶佔式調度的關鍵實現,其對應的實如今genericScheduler中:async
plugin/pkg/scheduler/core/generic_scheduler.go:181 // preempt finds nodes with pods that can be preempted to make room for "pod" to // schedule. It chooses one of the nodes and preempts the pods on the node and // returns the node and the list of preempted pods if such a node is found. // TODO(bsalamat): Add priority-based scheduling. More info: today one or more // pending pods (different from the pod that triggered the preemption(s)) may // schedule into some portion of the resources freed up by the preemption(s) // before the pod that triggered the preemption(s) has a chance to schedule // there, thereby preventing the pod that triggered the preemption(s) from // scheduling. Solution is given at: // https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanics func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, error) { // Scheduler may return various types of errors. Consider preemption only if // the error is of type FitError. fitError, ok := scheduleErr.(*FitError) if !ok || fitError == nil { return nil, nil, nil } err := g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap) if err != nil { return nil, nil, err } if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) { glog.V(5).Infof("Pod %v is not eligible for more preemption.", pod.Name) return nil, nil, nil } allNodes, err := nodeLister.List() if err != nil { return nil, nil, err } if len(allNodes) == 0 { return nil, nil, ErrNoNodesAvailable } potentialNodes := nodesWherePreemptionMightHelp(pod, allNodes, fitError.FailedPredicates) if len(potentialNodes) == 0 { glog.V(3).Infof("Preemption will not help schedule pod %v on any node.", pod.Name) return nil, nil, nil } nodeToPods, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer) if err != nil { return nil, nil, err } for len(nodeToPods) > 0 { node := pickOneNodeForPreemption(nodeToPods) if node == nil { return nil, nil, err } passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToPods[node], g.cachedNodeInfoMap, g.extenders) if passes && pErr == nil { return node, nodeToPods[node], err } if pErr != nil { glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr) } // Remove the node from the map and try to pick a different node. delete(nodeToPods, node) } return nil, nil, err }
若是該Pod已經包含Annotation:NominatedNodeName=nodeName(說明該pod以前已經Preempted),而且Annotation中的這個Node有比該pod優先級更低的pod正在Terminating,則認爲該pod不適合進行後續的Preemption,流程結束。ide
除此以外,繼續後續的流程。
對應代碼以下:
plugin/pkg/scheduler/core/generic_scheduler.go:756 func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) bool { if nodeName, found := pod.Annotations[NominatedNodeAnnotationKey]; found { if nodeInfo, found := nodeNameToInfo[nodeName]; found { for _, p := range nodeInfo.Pods() { if p.DeletionTimestamp != nil && util.GetPodPriority(p) < util.GetPodPriority(pod) { // There is a terminating pod on the nominated node. return false } } } } return true }
遍歷全部的nodes,對每一個nodes在sched.schedule()在預選階段失敗的Predicate策略(failedPredicates)進行掃描,若是failedPredicates包含如下Policy,則說明該node不適合做爲Preempt的備選節點。
除此以外的Node均做爲Potential Nodes。
對應代碼以下:
func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node { potentialNodes := []*v1.Node{} for _, node := range nodes { unresolvableReasonExist := false failedPredicates, found := failedPredicatesMap[node.Name] // If we assume that scheduler looks at all nodes and populates the failedPredicateMap // (which is the case today), the !found case should never happen, but we'd prefer // to rely less on such assumptions in the code when checking does not impose // significant overhead. for _, failedPredicate := range failedPredicates { switch failedPredicate { case predicates.ErrNodeSelectorNotMatch, predicates.ErrPodNotMatchHostName, predicates.ErrTaintsTolerationsNotMatch, predicates.ErrNodeLabelPresenceViolated, predicates.ErrNodeNotReady, predicates.ErrNodeNetworkUnavailable, predicates.ErrNodeUnschedulable, predicates.ErrNodeUnknownCondition: unresolvableReasonExist = true break // TODO(bsalamat): Please add affinity failure cases once we have specific affinity failure errors. } } if !found || !unresolvableReasonExist { glog.V(3).Infof("Node %v is a potential node for preemption.", node.Name) potentialNodes = append(potentialNodes, node) } } return potentialNodes }
遍歷該node上全部的scheduled pods(包括assumed pods),將優先級比Preemptor更低的Pods都加入到Potential victims List中,而且將這些victims從NodeInfoCopy中刪除,下次進行Predicate時就意味着Node上有更多資源可用。
對Potential victims中元素進行排序,排序規則是按照優先級從高到底排序的,index爲0的對應的優先級最高。
檢查Preemptor是否能scheduler配置的全部Predicates Policy(基於前面將這些victims從NodeInfoCopy中刪除,將全部更低優先級的pods資源所有釋放了),若是不經過則返回,表示該node不合適。All Predicate經過後,繼續下面流程。
遍歷全部的Potential victims list item(已經按照優先級從高到底排序),試着把Potential victims中第一個Pod(優先級最高)加回到NodeInfoCopy中,再檢查Preemptor是否能scheduler配置的全部Predicates Policy,若是不知足就把該pod再從NodeInfoCopy中刪除,而且正式加入到victims list中。接着對Potential victims中第2,3...個Pod進行一樣處理。這樣作,是爲了保證儘可能保留優先級更高的Pods,儘可能刪除更少的Pods。
最終返回每一個可行node及其對應victims list。
selectNodesForPreemption代碼以下,其實核心代碼在selectVictimsOnNode
。
plugin/pkg/scheduler/core/generic_scheduler.go:583 func selectNodesForPreemption(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, potentialNodes []*v1.Node, predicates map[string]algorithm.FitPredicate, metadataProducer algorithm.PredicateMetadataProducer, ) (map[*v1.Node][]*v1.Pod, error) { nodeNameToPods := map[*v1.Node][]*v1.Pod{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := potentialNodes[i].Name var metaCopy algorithm.PredicateMetadata if meta != nil { metaCopy = meta.ShallowCopy() } pods, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates) if fits { resultLock.Lock() nodeNameToPods[potentialNodes[i]] = pods resultLock.Unlock() } } workqueue.Parallelize(16, len(potentialNodes), checkNode) return nodeNameToPods, nil }
plugin/pkg/scheduler/core/generic_scheduler.go:659 func selectVictimsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo, fitPredicates map[string]algorithm.FitPredicate) ([]*v1.Pod, bool) { potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} nodeInfoCopy := nodeInfo.Clone() removePod := func(rp *v1.Pod) { nodeInfoCopy.RemovePod(rp) if meta != nil { meta.RemovePod(rp) } } addPod := func(ap *v1.Pod) { nodeInfoCopy.AddPod(ap) if meta != nil { meta.AddPod(ap, nodeInfoCopy) } } // As the first step, remove all the lower priority pods from the node and // check if the given pod can be scheduled. podPriority := util.GetPodPriority(pod) for _, p := range nodeInfoCopy.Pods() { if util.GetPodPriority(p) < podPriority { potentialVictims.Items = append(potentialVictims.Items, p) removePod(p) } } potentialVictims.Sort() // If the new pod does not fit after removing all the lower priority pods, // we are almost done and this node is not suitable for preemption. The only condition // that we should check is if the "pod" is failing to schedule due to pod affinity // failure. // TODO(bsalamat): Consider checking affinity to lower priority pods if feasible with reasonable performance. if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { if err != nil { glog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } return nil, false } victims := []*v1.Pod{} // Try to reprieve as many pods as possible starting from the highest priority one. for _, p := range potentialVictims.Items { lpp := p.(*v1.Pod) addPod(lpp) if fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil); !fits { removePod(lpp) victims = append(victims, lpp) glog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", lpp.Name, nodeInfo.Node().Name) } } return victims, true }
選擇victims中最高pod優先級最低的那個Node。
若是上一步有不止一個Nodes知足條件,則再對選擇全部victims優先級之和最小的那個Node。
若是上一步有不止一個Nodes知足條件,則再選擇victims pod數最少的Node。
若是上一步有不止一個Nodes知足條件,則再隨機選擇一個Node。
以上每一步的Nodes列表,都是基於上一步篩選後的Nodes。
plugin/pkg/scheduler/core/generic_scheduler.go:501 func pickOneNodeForPreemption(nodesToPods map[*v1.Node][]*v1.Pod) *v1.Node { type nodeScore struct { node *v1.Node highestPriority int32 sumPriorities int64 numPods int } if len(nodesToPods) == 0 { return nil } minHighestPriority := int32(math.MaxInt32) minPriorityScores := []*nodeScore{} for node, pods := range nodesToPods { if len(pods) == 0 { // We found a node that doesn't need any preemption. Return it! // This should happen rarely when one or more pods are terminated between // the time that scheduler tries to schedule the pod and the time that // preemption logic tries to find nodes for preemption. return node } // highestPodPriority is the highest priority among the victims on this node. highestPodPriority := util.GetPodPriority(pods[0]) if highestPodPriority < minHighestPriority { minHighestPriority = highestPodPriority minPriorityScores = nil } if highestPodPriority == minHighestPriority { minPriorityScores = append(minPriorityScores, &nodeScore{node: node, highestPriority: highestPodPriority, numPods: len(pods)}) } } if len(minPriorityScores) == 1 { return minPriorityScores[0].node } // There are a few nodes with minimum highest priority victim. Find the // smallest sum of priorities. minSumPriorities := int64(math.MaxInt64) minSumPriorityScores := []*nodeScore{} for _, nodeScore := range minPriorityScores { var sumPriorities int64 for _, pod := range nodesToPods[nodeScore.node] { // We add MaxInt32+1 to all priorities to make all of them >= 0. This is // needed so that a node with a few pods with negative priority is not // picked over a node with a smaller number of pods with the same negative // priority (and similar scenarios). sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1) } if sumPriorities < minSumPriorities { minSumPriorities = sumPriorities minSumPriorityScores = nil } nodeScore.sumPriorities = sumPriorities if sumPriorities == minSumPriorities { minSumPriorityScores = append(minSumPriorityScores, nodeScore) } } if len(minSumPriorityScores) == 1 { return minSumPriorityScores[0].node } // There are a few nodes with minimum highest priority victim and sum of priorities. // Find one with the minimum number of pods. minNumPods := math.MaxInt32 minNumPodScores := []*nodeScore{} for _, nodeScore := range minSumPriorityScores { if nodeScore.numPods < minNumPods { minNumPods = nodeScore.numPods minNumPodScores = nil } if nodeScore.numPods == minNumPods { minNumPodScores = append(minNumPodScores, nodeScore) } } // At this point, even if there are more than one node with the same score, // return the first one. if len(minNumPodScores) > 0 { return minNumPodScores[0].node } glog.Errorf("Error in logic of node scoring for preemption. We should never reach here!") return nil }
整個搶佔式調度的邏輯概括爲: