轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客:https://www.luozhiyun.comhtml
源碼版本是1.19node
此次講解的是k8s的調度器部分的代碼,相對來講比較複雜,慢慢的梳理清楚邏輯花費了很多的時間,不過在梳理過程當中也對k8s有了一個更深入的理解。git
調度器的主要職責,就是爲一個新建立出來的 Pod,尋找一個最合適的節點(Node)。kube-scheduler 就是 Kubernetes 集羣的默認調度器。github
默認調度器會首先調用一組Filter過濾器,也就是使用相應的Predicates的調度算法來進行過濾。而後,再調用一組叫做 Priority 的調度算法,來給上一步獲得的結果裏的每一個 Node 打分,而後根據打分來對Node進行排序,找出最優節點,若是多個節點都有最高的優先級分數,那麼則循環分配,確保平均分配給pod。算法
調度算法執行完成後,調度器就須要將 Pod 對象的 nodeName 字段的值,修改成上述 Node 的名字。api
Filter過濾器的做用主要是從當前集羣的全部節點中,「過濾」出一系列符合條件的節點,有以下幾種調度策略:數組
GeneralPredicates緩存
這一組過濾規則,負責的是最基礎的調度策略。好比,計算宿主機的 CPU 和內存資源等是否夠用; ,等等。app
Volume過濾規則ide
這一組過濾規則,負責的是跟容器持久化 Volume 相關的調度策略。如:檢查多個 Pod 聲明掛載的持久化 Volume 是否有衝突;檢查一個節點上某種類型的持久化 Volume 是否是已經超過了必定數目;檢查Pod 對應的 PV 的 nodeAffinity 字段,是否跟某個節點的標籤相匹配等等。
檢查調度 Pod 是否知足 Node 自己的某些條件
如PodToleratesNodeTaints負責檢查的就是咱們前面常常用到的 Node 的「污點」機制。NodeMemoryPressurePredicate,檢查的是當前節點的內存是否是已經不夠充足。
檢查親密與反親密關係
檢查待調度 Pod 與 Node 上的已有 Pod 之間的親密(affinity)和反親密(anti-affinity)關係。
在調用Filter過濾器的時候須要關注整個集羣的信息,Kubernetes 調度器會在爲每一個待調度 Pod 執行該調度算法以前,先將算法須要的集羣信息初步計算一遍,而後緩存起來。這樣也能夠加快執行速度。
而Priorities裏的打分規則包含如:空閒資源(CPU 和 Memory)多的宿主機能夠得高權重;CPU和Memory使用都比較均衡則能夠得高權重;爲了不這個算法引起調度堆疊若是大鏡像分佈的節點數目不多,那麼這些節點的權重就會被調低等。
整個的流程圖以下:
整個調度過程如流程圖:
代碼路徑:pkg/scheduler/scheduler.go
Scheduler對象是運行kube-scheduler組件的主對象,因此kube-scheduler會在運行的時候建立一個scheduler對象:
sched, err := scheduler.New(...)
調用的scheduler的New方法,這個方法會實例化scheduler對象並返回。
在建立scheduler實例的時候會根據Schedule rAlgorithm Source來實例化調度算法函數:
代碼路徑:pkg/scheduler/apis/config/types.go
type SchedulerAlgorithmSource struct { Policy *SchedulerPolicySource Provider *string }
Policy是經過參數--policy-config-file參數指定調度策略文件來定義策略。
Providre是通用調度器,是kube-scheduler默認調度方式。
而後會根據設置的策略來建立不一樣的scheduler:
func New(...) (*Scheduler, error) { ... case source.Provider != nil: sc, err := configurator.createFromProvider(*source.Provider) ... case source.Policy != nil: ... sc, err := configurator.createFromConfig(*policy) ... }
createFromProvider方法裏面設置好Filter和Score,也就是過濾策略和打分策略:
代碼路徑:pkg/scheduler/algorithmprovider/registry.go
func getDefaultConfig() *schedulerapi.Plugins { return &schedulerapi.Plugins{ ... Filter: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: nodeunschedulable.Name}, {Name: noderesources.FitName}, {Name: nodename.Name}, {Name: nodeports.Name}, {Name: nodeaffinity.Name}, {Name: volumerestrictions.Name}, {Name: tainttoleration.Name}, {Name: nodevolumelimits.EBSName}, {Name: nodevolumelimits.GCEPDName}, {Name: nodevolumelimits.CSIName}, {Name: nodevolumelimits.AzureDiskName}, {Name: volumebinding.Name}, {Name: volumezone.Name}, {Name: podtopologyspread.Name}, {Name: interpodaffinity.Name}, }, }, ... Score: &schedulerapi.PluginSet{ Enabled: []schedulerapi.Plugin{ {Name: noderesources.BalancedAllocationName, Weight: 1}, {Name: imagelocality.Name, Weight: 1}, {Name: interpodaffinity.Name, Weight: 1}, {Name: noderesources.LeastAllocatedName, Weight: 1}, {Name: nodeaffinity.Name, Weight: 1}, {Name: nodepreferavoidpods.Name, Weight: 10000}, // Weight is doubled because: // - This is a score coming from user preference. // - It makes its signal comparable to NodeResourcesLeastAllocated. {Name: podtopologyspread.Name, Weight: 2}, {Name: tainttoleration.Name, Weight: 1}, }, }, ... } }
最後kube-scheduler處理完一系列的邏輯,最後會調用到Scheduler的run方法:
func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
sched.scheduleOne會被wait.UntilWithContext定時調用,直到ctx.Done()返回true爲止。sched.scheduleOne是核心實現,主要作了如下幾件事:
下面咱們直接看一下sched.Algorithm.Schedule方法的實現:
代碼路徑:pkg/scheduler/core/generic_scheduler.go
//將pod調度到某一node上,若是成功則返回node的名稱,若是成功則返回失敗信息 func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) //檢查pod上聲明的pvc,包括pvc是否存在,是否已被刪除等 if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } trace.Step("Basic checks done") if err := g.snapshot(); err != nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } startPredicateEvalTime := time.Now() //這裏是Predicates部分的邏輯,負責選出一系列符合條件的節點 feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) if err != nil { return result, err } trace.Step("Computing predicates done") //表示沒有 找到合適的節點 if len(feasibleNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPredicateEvalTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime)) startPriorityEvalTime := time.Now() // When only one node after predicate, just use it. //找到惟一的node節點,並返回 if len(feasibleNodes) == 1 { metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) return ScheduleResult{ SuggestedHost: feasibleNodes[0].Name, EvaluatedNodes: 1 + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } //若是節點不是惟一,那麼須要進行打分排序 priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes) if err != nil { return result, err } metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime)) //選擇最佳的節點 host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses), FeasibleNodes: len(feasibleNodes), }, err }
這個方法邏輯仍是比較清晰的,總共分爲以下幾部分:
下面咱們看看findNodesThatFitPod時如何實現篩選過濾的。
代碼位置:pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { filteredNodesStatuses := make(framework.NodeToStatusMap) //前置過濾插件用於預處理 Pod 的相關信息,或者檢查集羣或 Pod 必須知足的某些條件。 //若是 PreFilter 插件返回錯誤,則調度週期將終止 s := prof.RunPreFilterPlugins(ctx, state, pod) if !s.IsSuccess() { if !s.IsUnschedulable() { return nil, nil, s.AsError() } // All nodes will have the same status. Some non trivial refactoring is // needed to avoid this copy. allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, nil, err } for _, n := range allNodes { filteredNodesStatuses[n.Node().Name] = s } return nil, filteredNodesStatuses, nil } //過濾掉不符合條件的node feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) if err != nil { return nil, nil, err } //SchdulerExtender是kubernets外部擴展方式,用戶能夠根據需求獨立構建調度服務 feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses) if err != nil { return nil, nil, err } return feasibleNodes, filteredNodesStatuses, nil }
這個方法首先會經過前置過濾器來校驗pod是否符合條件,而後調用findNodesThatPassFilters方法過濾掉不符合條件的node。findNodesThatPassExtenders是kubernets留給用戶的外部擴展方式,暫且不表。
下面咱們接着看findNodesThatPassFilters方法:
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err != nil { return nil, err } //根據集羣節點數量選擇參與調度的節點的數量 numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) //初始化一個大小和numNodesToFind同樣的數組,用來存放node節點 feasibleNodes := make([]*v1.Node, numNodesToFind) ... checkNode := func(i int) { //咱們從上一個調度週期中離開的節點開始檢查節點,以確保全部節點在Pod中被檢查的機會相同。 nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] fits, status, err := PodPassesFiltersOnNode(ctx, prof.PreemptHandle(), state, pod, nodeInfo) if err != nil { errCh.SendErrorWithCancel(err, cancel) return } //若是該節點合適,那麼放入到feasibleNodes列表中 if fits { length := atomic.AddInt32(&feasibleNodesLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&feasibleNodesLen, -1) } else { feasibleNodes[length-1] = nodeInfo.Node() } } else { statusesLock.Lock() if !status.IsSuccess() { statuses[nodeInfo.Node().Name] = status } statusesLock.Unlock() } } ... //開啓16個線程尋找符合條件的node節點,數量等於feasibleNodes parallelize.Until(ctx, len(allNodes), checkNode) processedNodes := int(feasibleNodesLen) + len(statuses) //設置下次開始尋找node的位置 g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) feasibleNodes = feasibleNodes[:feasibleNodesLen] if err := errCh.ReceiveError(); err != nil { statusCode = framework.Error return nil, err } return feasibleNodes, nil }
在這個方法中首先會根據numFeasibleNodesToFind方法選擇參與調度的節點的數量,而後調用parallelize.Until方法開啓16個線程來調用checkNode方法尋找合適的節點。
對於numFeasibleNodesToFind的邏輯以下:
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) { //對於一個小於100的節點,所有節點參與調度 //percentageOfNodesToScore參數值是一個集羣中全部節點的百分比,範圍是1和100之間,0表示不啓用 if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 { return numAllNodes } adaptivePercentage := g.percentageOfNodesToScore //當numAllNodes大於100時,若是沒有設置percentageOfNodesToScore,那麼這裏須要計算出一個值 if adaptivePercentage <= 0 { basePercentageOfNodesToScore := int32(50) adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125 if adaptivePercentage < minFeasibleNodesPercentageToFind { adaptivePercentage = minFeasibleNodesPercentageToFind } } numNodes = numAllNodes * adaptivePercentage / 100 if numNodes < minFeasibleNodesToFind { return minFeasibleNodesToFind } return numNodes }
找出可以進行調度的節點,若是節點小於100,那麼所有節點參與調度。
percentageOfNodesToScore參數值是一個集羣中全部節點的百分比,範圍是1和100之間,0表示不啓用。若是集羣節點數大於100,那麼就會根據這個值來計算讓合適的節點數參與調度。
若是一個5000個節點的集羣,percentageOfNodesToScore會默認設置爲10%,也就是500個節點參與調度。
由於若是一個5000節點的集羣來進行調度的話,不進行控制時,每一個pod調度都須要嘗試5000次的節點預選過程時很是消耗資源的。
而後咱們回到findNodesThatPassFilters方法中,咱們看一下PodPassesFiltersOnNode是如何篩選出合適的節點的:
func PodPassesFiltersOnNode( ctx context.Context, ph framework.PreemptHandle, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo, ) (bool, *framework.Status, error) { var status *framework.Status podsAdded := false //待檢查的 Node 是一個即將被搶佔的節點,調度器就會對這個 Node ,將一樣的 Predicates 算法運行兩遍。 for i := 0; i < 2; i++ { stateToUse := state nodeInfoToUse := info //處理優先級pod的邏輯 if i == 0 { var err error //查找是否有優先級大於或等於當前pod的NominatedPods,而後加入到nodeInfoToUse中 podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, ph, pod, state, info) if err != nil { return false, nil, err } } else if !podsAdded || !status.IsSuccess() { break } //運行過濾器檢查該pod是否能運行在該節點上 statusMap := ph.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) status = statusMap.Merge() if !status.IsSuccess() && !status.IsUnschedulable() { return false, status, status.AsError() } } return status.IsSuccess(), status, nil }
這個方法用來檢測node是否能經過過濾器,此方法會在調度Schedule和搶佔Preempt的時被調用,若是在Schedule時被調用,那麼會測試nod,可否可讓全部存在的pod以及更高優先級的pod在該node上運行。若是在搶佔時被調用,那麼咱們首先要移除搶佔失敗的pod,添加將要搶佔的pod。
而後RunFilterPlugins會調用runFilterPlugin方法來運行咱們上面講的getDefaultConfig中設置的過濾器:
func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if !state.ShouldRecordPluginMetrics() { return pl.Filter(ctx, state, pod, nodeInfo) } startTime := time.Now() status := pl.Filter(ctx, state, pod, nodeInfo) f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime)) return status }
過濾器總共有這些:nodeunschedulable,noderesources,nodename,nodeports,nodeaffinity,volumerestrictions,tainttoleration,nodevolumelimits,nodevolumelimits,nodevolumelimits,nodevolumelimits,volumebinding,volumezone,podtopologyspread,interpodaffinity
過濾器太多就不一一看了,裏面的邏輯仍是很清晰的,感興趣的本身能夠看看具體實現。
下面咱們繼續回到Schedule方法,運行完findNodesThatFitPod後會找到一系列符合條件的node節點,而後會調用prioritizeNodes進行打分排序:
func (g *genericScheduler) prioritizeNodes( ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) (framework.NodeScoreList, error) { ... scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) if !scoreStatus.IsSuccess() { return nil, scoreStatus.AsError() } // Summarize all scores. result := make(framework.NodeScoreList, 0, len(nodes)) //將分數按照node維度進行彙總 for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } ... return result, nil }
prioritizeNodes裏面會調用RunScorePlugins方法,裏面會遍歷一系列的插件的方式爲node打分。而後遍歷scoresMap將結果按照node維度進行聚合。
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) { ... //開啓16個線程爲node進行打分 parallelize.Until(ctx, len(nodes), func(index int) { for _, pl := range f.scorePlugins { nodeName := nodes[index].Name s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel) return } pluginToNodeScores[pl.Name()][index] = framework.NodeScore{ Name: nodeName, Score: int64(s), } } }) if err := errCh.ReceiveError(); err != nil { msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err) klog.Error(msg) return nil, framework.NewStatus(framework.Error, msg) } //用於在調度程序計算節點的最終排名以前修改分數,保證 Score 插件的輸出必須是 [MinNodeScore,MaxNodeScore]([0-100]) 範圍內的整數 parallelize.Until(ctx, len(f.scorePlugins), func(index int) { pl := f.scorePlugins[index] nodeScoreList := pluginToNodeScores[pl.Name()] if pl.ScoreExtensions() == nil { return } status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList) if !status.IsSuccess() { err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message()) errCh.SendErrorWithCancel(err, cancel) return } }) if err := errCh.ReceiveError(); err != nil { msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err) klog.Error(msg) return nil, framework.NewStatus(framework.Error, msg) } // 爲每一個節點的分數乘上一個權重 parallelize.Until(ctx, len(f.scorePlugins), func(index int) { pl := f.scorePlugins[index] // Score plugins' weight has been checked when they are initialized. weight := f.pluginNameToWeightMap[pl.Name()] nodeScoreList := pluginToNodeScores[pl.Name()] for i, nodeScore := range nodeScoreList { // return error if score plugin returns invalid score. if nodeScore.Score > int64(framework.MaxNodeScore) || nodeScore.Score < int64(framework.MinNodeScore) { err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, framework.MinNodeScore, framework.MaxNodeScore) errCh.SendErrorWithCancel(err, cancel) return } nodeScoreList[i].Score = nodeScore.Score * int64(weight) } }) ... return pluginToNodeScores, nil }
RunScorePlugins裏面分別調用parallelize.Until方法跑三次來進行打分:
第一次會調用runScorePlugin方法,裏面會調用getDefaultConfig裏面設置的score的Plugin來進行打分;
第二次會調用runScoreExtension方法,裏面會調用Plugin的NormalizeScore方法,用來保證分數必須是0到100之間,不是每個plugin都會實現NormalizeScore方法。
第三此會調用遍歷全部的scorePlugins,並對對應的算出的來的分數乘以一個權重。
打分的plugin共有:noderesources,imagelocality,interpodaffinity,noderesources,nodeaffinity,nodepreferavoidpods,podtopologyspread,tainttoleration
在爲全部node打完分以後就會調用selectHost方法來挑選一個合適的node:
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) { if len(nodeScoreList) == 0 { return "", fmt.Errorf("empty priorityList") } maxScore := nodeScoreList[0].Score selected := nodeScoreList[0].Name cntOfMaxScore := 1 for _, ns := range nodeScoreList[1:] { if ns.Score > maxScore { maxScore = ns.Score selected = ns.Name cntOfMaxScore = 1 } else if ns.Score == maxScore { cntOfMaxScore++ if rand.Intn(cntOfMaxScore) == 0 { // Replace the candidate with probability of 1/cntOfMaxScore selected = ns.Name } } } return selected, nil }
這個方法十分簡單,就是挑選分數高的,若是分數相同,那麼則隨機挑選一個。
經過這篇文章咱們深刻分析了k8s是如何調度節點的,以及調度節點的時候具體作了什麼事情,熟悉了整個調度流程。經過對調度流程的掌握,能夠直到一個pod被調度到node節點上須要通過Predicates的過濾,而後經過對node的打分,最終選擇一個合適的節點進行調度。不過介於Filter以及Score的plugin太多,沒有一一去介紹,感興趣的能夠本身去逐個看看。
https://kubernetes.io/docs/concepts/scheduling-eviction/kube-scheduler/
https://kubernetes.io/docs/concepts/scheduling-eviction/scheduler-perf-tuning/
https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/
https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/
https://www.huweihuang.com/k8s-source-code-analysis/kube-scheduler/preempt.html
http://www.javashuo.com/article/p-aoxucrnr-ev.html
https://www.servicemesher.com/blog/202003-k8s-scheduling-framework/