搶佔調度是分佈式調度中一種常見的設計,其核心目標是當不能爲高優先級的任務分配資源的時候,會經過搶佔低優先級的任務來進行高優先級的調度,本文主要學習k8s的搶佔調度以及裏面的一些有趣的算法node
搶佔調度原理其實很簡單就是經過高優先級的pod搶佔低優先級的pod資源,從而知足高優先pod的調度算法
在kubernetes中爲了保證服務儘量的高可用,設計PDB(PodDisruptionBudget)其核心目標就是在保證對應pod在指定的數量,主要是爲了保證服務的可用性,在進行搶佔的過程當中,應儘量遵照該設計,儘可能不去搶佔有PDB的資源,避免由於搶佔致使服務的不可用api
優先級反轉是信號量裏面的一種機制即由於低優先級任務的運行阻塞高優先級的任務運行數組
在k8s中搶佔調度是經過高優先級搶佔低優先級pod,若是高優先級pod依賴低優先級pod, 則會由於依賴問題,致使優先級失效,因此應該儘量減小高優先級pod對低優先級的pod的依賴, 後面進行篩選源碼分析時能夠看到微信
搶佔選擇算法是指的經過搶佔部分節點後,如何從被搶佔的node數組中篩選出一個node節點,目前k8s中主要實現了5個算法數據結構
即最少違反PDB規則app
比較全部node的最高優先級的pod,找到優先級最低的node運維
計算每一個node上面的被搶佔的pod優先級之和,選擇優先級和最低的節點數據結構和算法
計算須要搶佔的節點數量最少的節點優先分佈式
比較每一個node中被驅逐的pod中最先啓動的pod的啓動時間,最近啓動的pod的節點,會被選擇
搶佔的流程主要是經過Preempt來實現,其針對預選失敗的節點來進行驅逐某些低優先級的pod來知足高優先級pod
func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { // 只容許預選失敗的pod進行重試 fitError, ok := scheduleErr.(*FitError) if !ok || fitError == nil { return nil, nil, nil, nil } // 是否容許搶佔其餘提議的pod if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } // 獲取當前集羣中的全部node allNodes := g.cache.ListNodes() if len(allNodes) == 0 { return nil, nil, nil, ErrNoNodesAvailable } // 初步篩選潛在的能夠進行搶佔操做的node potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } // 獲取全部pdb pdbs, err := g.pdbLister.List(labels.Everything()) if err != nil { return nil, nil, nil, err } // 針對以前初步篩選的node嘗試進行搶佔和預選操做,返回結果中包含全部能夠經過搶佔低優先級pod完成pod調度的node節點與搶佔的pod nodeToVictims, err := g.selectNodesForPreemption(pluginContext, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) if err != nil { return nil, nil, nil, err } // 調用extenders進行再一輪的篩選 nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) if err != nil { return nil, nil, nil, err } // 從篩選結果中選擇最適合搶佔的node candidateNode := pickOneNodeForPreemption(nodeToVictims) if candidateNode == nil { return nil, nil, nil, nil } // 若是candidateNode不爲nil,則找到一個最優的執行搶佔操做的node, 返回低優先的提議的pod // 還有搶佔的pod和當前節點 nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil } return nil, nil, nil, fmt.Errorf( "preemption failed: the target node %s has been deleted from scheduler cache", candidateNode.Name) }
若是發現須要執行搶佔的pod有提名的node而且對應node上面存在比本身優先級低的pod正在進行刪除, 則不容許進行搶佔
func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool { if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever) return false } nomNodeName := pod.Status.NominatedNodeName if len(nomNodeName) > 0 { if nodeInfo, found := nodeNameToInfo[nomNodeName]; found { podPriority := util.GetPodPriority(pod) for _, p := range nodeInfo.Pods() { if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority { // 正在終止的優先級低於當前pod的pod就不會進行搶佔 return false } } } } return true }
每一個node在預選階段都會進行一個標記,標記當前node執行預選失敗的緣由,篩選潛在節點主要是根據對應的錯誤來進行篩選,若是不是不可解決的預選錯誤,則該node節點就能夠參與接下來的搶佔階段
func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node { potentialNodes := []*v1.Node{} // 根據預選階段的錯誤緣由,若是不存在沒法解決的錯誤,則這些node可能在接下來的搶佔流程中被使用 for _, node := range nodes { if fitErr.FilteredNodesStatuses[node.Name].Code() == framework.UnschedulableAndUnresolvable { continue } failedPredicates, _ := fitErr.FailedPredicates[node.Name] if !unresolvablePredicateExists(failedPredicates) { // 若是咱們發現並非不可解決的調度錯誤的時候,就講這個節點加入到這裏 // 可能經過後續的調整會讓這些node從新知足 klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name) potentialNodes = append(potentialNodes, node) } } return potentialNodes }
不可經過調整的預選失敗緣由
var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{ predicates.ErrNodeSelectorNotMatch: {}, predicates.ErrPodAffinityRulesNotMatch: {}, predicates.ErrPodNotMatchHostName: {}, predicates.ErrTaintsTolerationsNotMatch: {}, predicates.ErrNodeLabelPresenceViolated: {}, // 省略大部分,感興趣的能夠本身關注下 }
篩選搶佔節點主要是並行對以前篩選潛在node進行嘗試,經過驅逐低優先級pod知足高優先級pod調度,最終會篩選一批能夠經過搶佔來知足pod調度須要的節點, 其核心實現時經過selectVictimsOnNode來進行檢測,繼續往下看
func (g *genericScheduler) selectNodesForPreemption( pluginContext *framework.PluginContext, pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, potentialNodes []*v1.Node, fitPredicates map[string]predicates.FitPredicate, metadataProducer predicates.PredicateMetadataProducer, queue internalqueue.SchedulingQueue, pdbs []*policy.PodDisruptionBudget, ) (map[*v1.Node]*schedulerapi.Victims, error) { nodeToVictims := map[*v1.Node]*schedulerapi.Victims{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. meta := metadataProducer(pod, nodeNameToInfo) checkNode := func(i int) { nodeName := potentialNodes[i].Name var metaCopy predicates.PredicateMetadata if meta != nil { metaCopy = meta.ShallowCopy() } pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs) if fits { resultLock.Lock() victims := schedulerapi.Victims{ Pods: pods, NumPDBViolations: numPDBViolations, } nodeToVictims[potentialNodes[i]] = &victims resultLock.Unlock() } } workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode) return nodeToVictims, nil }
selectVictimsOnNode即單點篩選流程是針對單個node來指向具體的驅逐搶佔決策的流程, 其核心流程以下
優先級篩選首先會對當前node上面的全部節點進行優先級排序,移除全部比當前pod低的pod
potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod} nodeInfoCopy := nodeInfo.Clone() removePod := func(rp *v1.Pod) { nodeInfoCopy.RemovePod(rp) if meta != nil { meta.RemovePod(rp, nodeInfoCopy.Node()) } } addPod := func(ap *v1.Pod) { nodeInfoCopy.AddPod(ap) if meta != nil { meta.AddPod(ap, nodeInfoCopy) } } podPriority := util.GetPodPriority(pod) for _, p := range nodeInfoCopy.Pods() { if util.GetPodPriority(p) < podPriority { // 移除全部優先級比本身低的pod potentialVictims.Items = append(potentialVictims.Items, p) removePod(p) } }
對移除全部優先級比本身的pod以後,會嘗試進行預選流程,若是發現預選流程失敗,則當前node即便經過移除全部比本身優先級低的pod也不能知足調度需求,則就進行下一個node判斷
if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits { if err != nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } return nil, 0, false }
PDB分組就是對當前節點上篩選出來的低優先級pod按照是否有PDB匹配來進行分組,分爲違反PDB和未違反PDB的兩組
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
分組算法其實也不難,只須要遍歷全部的pdb和pod就能夠獲得最終的分組
func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) { for _, obj := range pods { pod := obj.(*v1.Pod) pdbForPodIsViolated := false // A pod with no labels will not match any PDB. So, no need to check. if len(pod.Labels) != 0 { for _, pdb := range pdbs { if pdb.Namespace != pod.Namespace { continue } selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector) if err != nil { continue } // A PDB with a nil or empty selector matches nothing. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { continue } // We have found a matching PDB. if pdb.Status.PodDisruptionsAllowed <= 0 { pdbForPodIsViolated = true break } } } if pdbForPodIsViolated { violatingPods = append(violatingPods, pod) } else { nonViolatingPods = append(nonViolatingPods, pod) } } return violatingPods, nonViolatingPods }
會分別對違反PDB和不違反的pod集合來進行reprievePod檢測,若是加入當前pod後,不能知足預選篩選流程,則該pod則必須被進行移除加入到victims中, 同時若是是違反PDB的pod則須要進行違反pdb計數numViolatingVictim
reprievePod := func(p *v1.Pod) bool { // 咱們首先將pod加入到meta中 addPod(p) fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false) // if !fits { // 若是咱們加入了pod而後致使了預選不成功,則這個pod必須給移除 removePod(p) victims = append(victims, p) // 添加到咱們須要移除的列表裏面 klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name) } return fits } for _, p := range violatingVictims { if !reprievePod(p) { numViolatingVictim++ } } // Now we try to reprieve non-violating victims. for _, p := range nonViolatingVictims { // 嘗試移除未違反pdb的pod reprievePod(p) } return victims, numViolatingVictim, true
最優篩選主要是經過pickOneNodeForPreemption實現,其中篩選數據存儲結構主要是經過重用minNodes1和minNodes2兩段內存來進行實現,這兩個node數組分別配有兩個計數器lenNodes1和lenNodes2, 針對具備相同優先級、相同數量的node,每增長一個會進行一次計數器累加, 核心算法流程以下
最少違反PDB是根據前面統計的違反PDB的計數統計,找到最少違反的node,若是是單個node則直接返回篩選結束
minNumPDBViolatingPods := math.MaxInt32 var minNodes1 []*v1.Node lenNodes1 := 0 for node, victims := range nodesToVictims { if len(victims.Pods) == 0 { // 若是發現一個noed不須要任何搶佔,則返回它 return node } numPDBViolatingPods := victims.NumPDBViolations if numPDBViolatingPods < minNumPDBViolatingPods { // 若是小於最小pdb數量, 若是數量發生變化,就重置 minNumPDBViolatingPods = numPDBViolatingPods minNodes1 = nil lenNodes1 = 0 } if numPDBViolatingPods == minNumPDBViolatingPods { // 多個相同的node會進行追加,並累加計數器 minNodes1 = append(minNodes1, node) lenNodes1++ } } if lenNodes1 == 1 { return minNodes1[0] }
最高優先級最小優先是指經過對比多個node的最高優先級的pod,優先級最低的那個node被選中,若是多個則進行下一個算法
minHighestPriority := int32(math.MaxInt32) var minNodes2 = make([]*v1.Node, lenNodes1) lenNodes2 := 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] victims := nodesToVictims[node] // highestPodPriority is the highest priority among the victims on this node. // 返回優先級最高的pod highestPodPriority := util.GetPodPriority(victims.Pods[0]) if highestPodPriority < minHighestPriority { // 重置狀態 minHighestPriority = highestPodPriority lenNodes2 = 0 } if highestPodPriority == minHighestPriority { // 若是優先級相等則加入進去 minNodes2[lenNodes2] = node lenNodes2++ } } if lenNodes2 == 1 { return minNodes2[0] }
統計每一個node上的全部被搶佔的pod的優先級的總和,而後在多個node之間進行比較,優先級總和最低的節點被選中
minSumPriorities := int64(math.MaxInt64) lenNodes1 = 0 for i := 0; i < lenNodes2; i++ { var sumPriorities int64 node := minNodes2[i] // 統計全部優先級 for _, pod := range nodesToVictims[node].Pods { sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1) } if sumPriorities < minSumPriorities { minSumPriorities = sumPriorities lenNodes1 = 0 } if sumPriorities == minSumPriorities { minNodes1[lenNodes1] = node lenNodes1++ } } // 最少優先級的node if lenNodes1 == 1 { return minNodes1[0] }
最少搶佔數量優先即統計每一個node被搶佔的節點數量,數量最少得被選中
minNumPods := math.MaxInt32 lenNodes2 = 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] numPods := len(nodesToVictims[node].Pods) if numPods < minNumPods { minNumPods = numPods lenNodes2 = 0 } if numPods == minNumPods { minNodes2[lenNodes2] = node lenNodes2++ } } // 最少節點數量 if lenNodes2 == 1 { return minNodes2[0] }
該算法會篩選每一個node驅逐的pod中優先級最高的pod的最先更新時間(其實就是說這個pod早就被建立了),而後在多個node之間進行比較,若是誰上面的時間越新(即這個node上的pod多是最近被調度上去的),則就選中這個節點
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]]) if latestStartTime == nil { // If the earliest start time of all pods on the 1st node is nil, just return it, // which is not expected to happen. // 若是第一個節點上全部pod的最先開始時間爲零,那麼返回它 klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0]) return minNodes2[0] } nodeToReturn := minNodes2[0] for i := 1; i < lenNodes2; i++ { node := minNodes2[i] // Get earliest start time of all pods on the current node. // 獲取當前node最先啓動時間 earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) if earliestStartTimeOnNode == nil { klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node) continue } if earliestStartTimeOnNode.After(latestStartTime.Time) { latestStartTime = earliestStartTimeOnNode nodeToReturn = node } } return nodeToReturn
由於是純的算法流程,並無複雜的數據結構,你們看看就好,調度器的設計可能就看到這了,後面把以前的都串起來,算是一個總結,若是有興趣我就再看看 SchedulerExtender和framework的設計, 其實學習scheduler調度器部分只是由於本身對分佈式調度這塊比較好奇,並且本身有運維開發的經驗,這對pod調度相似場景並不陌生,看起來總的來講相對容易一點,並且我只分析了核心的數據結構和算法,還有幾個階段,爲了不陷入對kubenretes一些複雜邏輯的處理,我都儘可能簡化邏輯,就是但願即時不去看k8s scheduler的代碼,也能有所收穫 > 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈