本節開始主要分析kubernetes源碼部分,版本基於當前最新的1.13.4。node
Kubernetes基礎組件的入口均在cmd
目錄下,kube-schduler入口在scheduler.go
下,如圖 git
command
的形式,引用的是
spf13類庫
經過將配置文件轉化成
command
的形式,調用
Execute
方法執行定義的
Run
方法
進入
runCommand
方法,經過完成配置的初始化,調用
Run
方法,進一步啓動。
Run方法主要作了如下工做:
一、判斷是否須要添加VolumeScheduling
新特性;
二、初始化調度參數的相關結構體;
三、配置準備事件廣播;
四、健康檢查相關配置;
五、Metrics
相關配置;
六、啓動全部的Informer
(kubernetes主要就是經過Informer
和Workqueue
機制監聽事件的變化);
七、判斷是否須要LeaderElection
,決定最終的啓動。github
最終的調度接口進入的是pkg
下的scheduler.go
文件,經過啓動單獨的協程處理調度工做。 算法
scheduleOne,顧名思義,每次調度一個Pod,總體文件以下
api
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
// 1.從隊列中取出待調度的Pod
pod := sched.config.NextPod()
// pod could be nil when schedulerQueue is closed
if pod == nil {
return
}
if pod.DeletionTimestamp != nil {
sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
return
}
klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
// Synchronously attempt to find a fit for the pod.
start := time.Now()
// 2.獲取待調度Pod匹配的主機名
suggestedHost, err := sched.schedule(pod)
if err != nil {
// schedule() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
if fitError, ok := err.(*core.FitError); ok {
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
metrics.PreemptionAttempts.Inc()
metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
// Pod did not fit anywhere, so it is counted as a failure. If preemption
// succeeds, the pod should get counted as a success the next time we try to
// schedule it. (hopefully)
metrics.PodScheduleFailures.Inc()
} else {
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
// 3.Pod與Node緩存,保證調度一直進行,不用等待每次綁定完成(綁定是一個耗時的過程)
assumedPod := pod.DeepCopy()
// Assume volumes first before assuming the pod.
//
// If all volumes are completely bound, then allBound is true and binding will be skipped.
//
// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
//
// This function modifies 'assumedPod' if volume binding is required.
// 4.判斷是否須要VolumeScheduling特性
allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
if err != nil {
klog.Errorf("error assuming volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
// assume modifies `assumedPod` by setting NodeName=suggestedHost
// 5.Pod對應的NodeName寫上主機名,存入緩存
err = sched.assume(assumedPod, suggestedHost)
if err != nil {
klog.Errorf("error assuming pod: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
// 6.請求apiserver,異步處理最終的綁定,寫入到etcd
go func() {
// Bind volumes first before Pod
if !allBound {
err := sched.bindVolumes(assumedPod)
if err != nil {
klog.Errorf("error binding volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
}
err := sched.bind(assumedPod, &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1.ObjectReference{
Kind: "Node",
Name: suggestedHost,
},
})
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
if err != nil {
klog.Errorf("error binding pod: %v", err)
metrics.PodScheduleErrors.Inc()
} else {
metrics.PodScheduleSuccesses.Inc()
}
}()
}
複製代碼
主要作了如下工做:
一、從隊列中取出待調度的Pod
二、根據調度算法(預選+優選)獲取待調度Pod匹配的主機,若是未獲取到合適的主機,判斷是否須要preempt
,即Pod的搶佔策略,爲Pod分配節點
三、將當前Pod緩存起來,假定已經綁定成功(主要是爲了將scheduling與binding過程分開)
四、判斷是否須要VolumeScheduling特性繼續添加Pod信息
五、Pod對應的NodeName寫上主機名(調度的本質就是將爲空的NodeName寫上相應的Node的值)
六、啓動新的binding協程,請求apiserver,異步處理最終的綁定,將結果寫入到etcd中數組
最終的調度在generic_scheduler.go
的Schedule
方法。調度主要分兩步,預選和優選。緩存
預選算法調用的接口是findNodesThatFit
,主要代碼以下:app
// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
// 該if表示,若是沒有配置預選的算法,則直接將全部的Node寫入匹配數組
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree().NumNodes)
// numFeasibleNodesToFind保證一次性不用返回過多的Node數量,避免數組過大
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)
errs := errors.MessageCountMap{}
var (
predicateResultLock sync.Mutex
filteredLen int32
equivClass *equivalence.Class
)
ctx, cancel := context.WithCancel(context.Background())
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
if g.equivalenceCache != nil {
// getEquivalenceClassInfo will return immediately if no equivalence pod found
equivClass = equivalence.NewClass(pod)
}
// checkNode處理預選策略
checkNode := func(i int) {
var nodeCache *equivalence.NodeCache
// 每次獲取Node信息
nodeName := g.cache.NodeTree().Next()
if g.equivalenceCache != nil {
nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
}
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
nodeCache,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
equivClass,
)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
predicateResultLock.Unlock()
return
}
if fits {
// 保證獲取的Node數量在numNodesToFind內
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
// 通知ParallelizeUntil任務結束
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
}
} else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
// 並行處理多個Node的checkNode工做
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
}
}
if len(filtered) > 0 && len(g.extenders) != 0 {
for _, extender := range g.extenders {
if !extender.IsInterested(pod) {
continue
}
filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
if err != nil {
if extender.IsIgnorable() {
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}
}
for failedNodeName, failedMsg := range failedMap {
if _, found := failedPredicateMap[failedNodeName]; !found {
failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
}
failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
}
filtered = filteredList
if len(filtered) == 0 {
break
}
}
}
return filtered, failedPredicateMap, nil
}
複製代碼
findNodesThatFit
主要作了幾個操做
一、判斷是否配置了預選算法,若是沒有,直接返回Node列表信息;
二、若是配置了預選算法,則同時對多個Node(最多一次16個)調用checkNode
方法,判斷Pod是否能夠調度在該Node上;
三、預選篩選以後,若是配置了調度的擴展算法,須要繼續對篩選後的Pod與Node進行再一次的篩選,獲取最終匹配的Node列表。
這裏有一個注意的地方,獲取匹配的Node節點數量時,經過numFeasibleNodesToFind
函數限制了每次獲取的節點數,最大值爲100
。這樣當匹配到相應的Node數時,checkNode
方法再也不調用。
這裏我的覺着有些問題,當Node數量足夠多的時候(大於100),因爲numFeasibleNodesToFind
限制了Node數量,致使並不能掃描到全部的Node,這樣可能致使最合適的Node沒有被掃描到,匹配到的只是較優先的Node,則最終調度到的Node也不是最合適的Node,只是相較於比較合適。
最終實現調度判斷的接口是podFitsOnNode
。
podFitsOnNode
最難理解的就是for循環了兩次,根據註釋,大體意思以下:
一、第一次循環,將全部的優先級比較高或者相等的nominatedPods
加入到Node中,更新meta
和nodeInfo
。nominatedPods
是指已經分配到Node內可是尚未真正運行起來的Pods。這樣作能夠保證優先級高的Pods不會由於如今的Pod的加入而致使調度失敗;
二、第二次調度,不將nominatedPods
加入到Node內。這樣的緣由是由於考慮到像Pod affinity策略的話,若是當前的Pod依賴的是nominatedPods
,這樣就會有問題。由於,nominatedPods
不能保證必定能夠調度到相應的Node上。less
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
// ---
// podFitsOnNode根據給定的NodeInfo判斷是否匹配相應的預選函數
// 對於一個給定的Pod,podFitsOnNode會檢查以前是否有等價的Pod,這樣就能夠直接複用等價Pod的預選結果
// 該函數會有兩個地方調用:Schedule和Preempt
// 當Schedule(正常調度)的時候,判斷Node上全部已經存在的Pod和將被指定將要調度到這個Node上的其餘全部高優先級Pod外,當前的Pod是否能夠調度
// 當Preempt(搶佔式)的時候,待定。。。
func podFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, nodeCache *equivalence.NodeCache, queue internalqueue.SchedulingQueue, alwaysCheckAllPredicates bool, equivClass *equivalence.Class, ) (bool, []algorithm.PredicateFailureReason, error) {
var (
eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason
)
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
// 兩次循環的緣由主要就是由於NominatedPods調度的不必定就是此Node,還有Pod的親和性等問題
for i := 0; i < 2; i++ {
metaToUse := meta
nodeInfoToUse := info
if i == 0 {
// 第一次調度,根據NominatedPods更新meta和nodeInfo信息,pod根據更新後的信息去預選
// 第二次調度,meta和nodeInfo信息不變,保證pod不徹底依賴於NominatedPods(主要考慮到pod親和性之類的)
podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
} else if !podsAdded || len(failedPredicates) != 0 {
break
}
// Bypass eCache if node has any nominated pods.
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
for predicateID, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []algorithm.PredicateFailureReason
err error
)
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
} else {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
}
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
if !alwaysCheckAllPredicates {
klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
"evaluation is short circuited and there are chances " +
"of other predicates failing as well.")
break
}
}
}
}
}
return len(failedPredicates) == 0, failedPredicates, nil
}
複製代碼
以後就是根據預選的調度算法,一個個判斷是否都知足。這裏有個小優化,若是當前的Pod在以前有一個等價的Pod,則直接從緩存返回相應上一次的結果。若是成功則不用繼續調用預選算法。可是,對於緩存部分,我我的有些疑問,可能對於上一個Pod緩存的結果是成功的,可是本次調度,Node信息發生變化了,緩存結果是成功的,可是實際上可能並不必定會成功。dom
本節主要說的是默認的調度算法。默認的代碼在pkg/scheduler/algorithmprovider/defaults/defaults.go
下,defaultPredicates
方法返回的是默認的一系列預選算法。與預選相關的代碼都在pkg/scheduler/algorithm/predicates/predicates.go
下
(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo)
,返回值爲
(bool, []algorithm.PredicateFailureReason, error)
。
預選完成以後會獲得一個Node的數組。若是預選合適的節點數大於1,則須要調用優選算法根據評分獲取最優的節點。
優選算法調用的接口是PrioritizeNodes
,使用與預選相似的多任務同步調用方式,採用MapReduce的思想,Map根據不一樣的優選算法獲取對某一Node的值,根據Reduce統計最終的結果。
優選調度算法默認代碼在pkg/scheduler/algorithmprovider/defaults/defaults.go
下,defaultPriorities
方法返回的是默認的一系列優選算法,經過工廠模式處理相應的優選算法,代碼以下
func defaultPriorities() sets.String {
return sets.NewString(
// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
factory.RegisterPriorityConfigFactory(
"SelectorSpreadPriority",
factory.PriorityConfigFactory{
MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
},
Weight: 1,
},
),
// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
factory.RegisterPriorityConfigFactory(
"InterPodAffinityPriority",
factory.PriorityConfigFactory{
Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
},
Weight: 1,
},
),
// Prioritize nodes by least requested utilization.
factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),
// Prioritizes nodes to help achieve balanced resource usage
factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),
// Set this weight large enough to override all other priority functions.
// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
// Prioritizes nodes that have labels matching NodeAffinity
factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),
// Prioritizes nodes that marked with taint which pod can tolerate.
factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
)
}
複製代碼
用到的優選算法經過代碼結構基本能夠看出
每個不一樣的優選策略獨立成一個單獨的文件。selectHost
方法獲取分數最高的Node。若是多個Node分數相同,則使用輪詢的方式獲得最終的Node。
當經過正常的調度流程若是沒有找到合適的節點(主要是預選沒有合適的節點),會判斷需不須要進行搶佔調度,具體的代碼在pkg/scheduler/scheduler.go
文件下,用到的方法preempt
,具體以下:
// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
// If it succeeds, it adds the name of the node where preemption has happened to the pod annotations.
// It returns the node name and an error if any.
// ---
// preempt儘量的經過去搶佔低優先級的Pod的空間,爲調度失敗的Pod創造空間
// 若是成功了,就會去添加在Pod註解中聲明的Node名稱
// 返回Node名稱和錯誤(若是有錯誤的話)
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
// 1.判斷是否開啓Pod優先級,調度器是否配置了DisablePreemption,二者中任一知足即中止搶佔
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
return "", nil
}
// 2.獲取待搶佔Pod的信息
preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
if err != nil {
klog.Errorf("Error getting the updated preemptor pod object: %v", err)
return "", err
}
// 3.根據配置的算法獲取搶佔的節點
// 獲取到的四個參數
// 1.搶佔獲取到的Node
// 2.須要被刪除掉的低優先級的Pod列表
// 3.須要刪除掉的nominatedPods列表
// 4.錯誤信息
node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
metrics.PreemptionVictims.Set(float64(len(victims)))
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
return "", err
}
var nodeName = ""
if node != nil {
// 1.將Pod和Node結合,更新相應的信息(Pod的nodeName有值),而且構造apiserver的調用
// 2.全部的將要被刪除的Pod一一被刪除
// 只有二者都知足了,才能保證搶佔成功
nodeName = node.Name
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// Make a call to update nominated node name of the pod on the API server.
err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
for _, victim := range victims {
if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
}
// Clearing nominated pods should happen outside of "if node != nil". Node could
// be nil when a pod with nominated node name is eligible to preempt again,
// but preemption logic does not find any node for it. In that case Preempt()
// function of generic_scheduler.go returns the pod itself for removal of the annotation.
// 4.刪除nominatedPods,不要求必定成功,對總體結果不影響
for _, p := range nominatedPodsToClear {
rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
複製代碼
總體代碼結構比較清晰,有以下幾個步驟:
一、判斷是否須要進行搶佔調度,主要有兩個判斷項(PodPriority是否開啓、調度器是否配置DisablePreemption),二者缺一不可;
二、獲取待搶佔調度Pod配置的信息;
三、經過配置算法的搶佔策略獲取搶佔調度的結果(最核心的步驟);
四、收尾工做(更新Pod的信息、刪除低優先級的Pod、刪除一些資源如nominatedPods)
整個過程最核心的是調度算法獲取調度結果的接口,同預選優選同樣,默認的調度實現均在generic_scheduler.go
文件,方法是Preempt
。
Preempt
方法返回四個參數,分別是
1)Preempt獲得的Node;
2)被搶佔的Pod的列表(待刪除);
3)將要被清除的nominatedPods(待清除);
4)可能返回的error消息
Preempt
方法主要執行如下幾個步驟:
一、從預選失敗的節點中獲取能夠用來作搶佔調度的節點,經過一個switch
語句排除不能夠用來作搶佔調度的節點
selectNodesForPreemption
方法,判斷哪些Node能夠進行搶佔調度。經過
ParallelizeUntil
方法同步對全部的Node進行判斷,判斷路徑爲
checkNode-->selectVictimsOnNode-->podFitsOnNode
,最終同預選方法相似,使用了
podFitsOnNode
方法。不一樣於普通預選,搶佔調度會先對Pod優先級判斷,而後在移除掉優先級較低的Pod以後再調用
podFitsOnNode
方法,以此達到搶佔的效果。
selectNodesForPreemption
方法返回的參數是一個map類型的值,key爲Node信息,value爲該Node若是做爲調度節點,將要清除的一些信息,包括Pods和PDB信息
pickOneNodeForPreemption
方法,主要基於5個原則:
// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns 1) the node, 2) the list of preempted pods if such a node is found,
// 3) A list of pods whose nominated node name should be cleared, and 4) any
// possible error.
// Preempt does not update its snapshot. It uses the same snapshot used in the
// scheduling cycle. This is to avoid a scenario where preempt finds feasible
// nodes without preempting any pod. When there are many pending pods in the
// scheduling queue a nominated pod will go back to the queue and behind
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
}
if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
allNodes, err := nodeLister.List()
if err != nil {
return nil, nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
// 1.獲取預選調度失敗的節點,可是多是潛在的搶佔可能成功的節點(全部的搶佔節點都是在潛在節點內部選擇)
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
// 2.獲取PDB(Pod中斷預算)列表
pdbs, err := g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
// 3.獲取全部能夠進行Preempt的Node節點的信息,主要包含該節點哪些Pod須要被搶佔掉
nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
g.predicateMetaProducer, g.schedulingQueue, pdbs)
if err != nil {
return nil, nil, nil, err
}
// We will only check nodeToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
// 4.擴展的Preempt調度判斷
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
// 5.選中某一個Node
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, err
}
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
// 6.判斷哪些Pod優先級較低,後續須要被清除掉,不做爲NominatedPods存在
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
複製代碼
綜上,搶佔調度主要強調的一點是Pod的優先級。與普通調度不一樣的是,搶佔調度對Pod作了明確的優先級區分,以此來達到搶佔的目的。
在Scheduler啓動的時候,須要判斷是否須要作選主操做。配置選舉操做很簡單,只須要在配置文件中添加--leader-elect=true
便可。代碼中,若是檢測到了配置選舉,則首先會參加選舉,只有拿到主節點的scheduler才能執行調度相關工做。
選舉代碼結構比較簡單,如圖,代碼位於client-go
包中,路徑爲client-go/tools/leaderelection/leaderelection.go
le.acquire(ctx)
、
le.renew(ctx)
以及
le.config.Callbacks.OnStartedLeading(ctx)
。
acquire
表示是否選主成功,只有成功了以後,才能執行
OnStartedLeading
和
renew
。
OnStartedLeading
是一個回調方法,執行的就是scheduler的
run
方法。
renew
主要作選主的更新操做。當節點上的scheduler被選主時,還須要不斷的更新信息,判斷是否主節點功能正常。
acquire
或者
renew
方法,有一個共同的調用方法是
tryAcquireOrRenew
,該方法就是整個選舉的核心實現。
tryAcquireOrRenew
顧名思義,若是沒有獲取到租約,就去獲取leader的租約,不然就去更新租約。主要有三部分操做:
Get
操做獲取是否存在ElectionRecord。若是不存在,則調用
Create
方法新建一個新的Endpoint,當前節點爲scheduler的主節點,選舉成功;不然,執行更新操做;
Update
操做,更新選主信息。
// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
// ---
// tryAcquireOrRenew,若是沒有獲取到租約,就去獲取leader的租約,不然去更新租約。
func (le *LeaderElector) tryAcquireOrRenew() bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 1. 調用Endpoint的Get操做,獲取oldLeaderElectionRecord
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
// 建立新的Endpoint
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// 2. Record obtained, check the Identity & Time
// 2. 獲取到了記錄,檢查下身份和時間信息,判斷是否合法
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = le.clock.Now()
}
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// update the lock itself
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
複製代碼
Scheduler的選舉操做比較簡單,主要就是經過判斷記錄在Etcd中的Endpoints是否能夠更新來判斷是否能夠進行選舉。整個選舉操做依賴於Etcd的特色來保證分佈式操做的成功和惟一。在kube-system
的namespace下能夠查看相應的endpoint:kube-scheduler
。