在 Kubernetes 中,調度是指將 Pod 放置到合適的 Node 上,而後對應 Node 上的 kubelet 纔可以運行這些 Pod。K8s scheduler 就是用來調度 pod 的一個組件。node
本文主要是經過源碼瞭解調度器的部分工做流程。算法
Based on Kubernetes v1.19.11.api
K8s scheduler 主要的數據結構是:緩存
相關的代碼流程主要分爲兩個部分:數據結構
cmd/kube-scheduler
,這裏是咱們調度器的起始處,主要是讀取配置,初始化並啓動調度器。pkg/scheduler
,這裏是調度器的核心代碼。// pkg/scheduler/scheduler.go // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get // stale while they sit in a channel. NextPod func() *framework.QueuedPodInfo // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*framework.QueuedPodInfo, error) // Close this to shut down the scheduler. StopEverything <-chan struct{} // SchedulingQueue holds pods to be scheduled SchedulingQueue internalqueue.SchedulingQueue // Profiles are the scheduling profiles. Profiles profile.Map scheduledPodsHasSynced func() bool client clientset.Interface }
SchedulerCache
,保存了調度所需的 podStates 和 nodeInfos。Algorithm
,會使用該對象的 Schedule
方法來運行調度邏輯。SchedulingQueue
,調度隊列。Profiles
,調度器配置。Interfaceapp
// pkg/scheduler/internal/queue/scheduling_queue.go // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event string) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. NumUnschedulablePods() int // Run starts the goroutines managing the queue. Run() }
Implementation框架
// PriorityQueue implements a scheduling queue. // The head of PriorityQueue is the highest priority pending pod. This structure // has three sub queues. One sub-queue holds pods that are being considered for // scheduling. This is called activeQ and is a Heap. Another queue holds // pods that are already tried and are determined to be unschedulable. The latter // is called unschedulableQ. The third queue holds pods that are moved from // unschedulable queues and will be moved to active queue when backoff are completed. type PriorityQueue struct { // PodNominator abstracts the operations to maintain nominated Pods. framework.PodNominator stop chan struct{} clock util.Clock // pod initial backoff duration. podInitialBackoffDuration time.Duration // pod maximum backoff duration. podMaxBackoffDuration time.Duration lock sync.RWMutex cond sync.Cond // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. activeQ *heap.Heap // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ podBackoffQ *heap.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap // schedulingCycle represents sequence number of scheduling cycle and is incremented // when a pod is popped. schedulingCycle int64 // moveRequestCycle caches the sequence number of scheduling cycle when we // received a move request. Unscheduable pods in and before this scheduling // cycle will be put back to activeQueue if we were trying to schedule them // when we received move request. moveRequestCycle int64 // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. closed bool }
backOffTime
來排序,backOffTime
受 podInitialBackoffDuration
以及 podMaxBackoffDuration
兩個參數影響。最開始,scheduler 在 cmd/kube-scheduler/scheduler.go
使用 NewSchedulerCommand()
初始化命令並執行命令。less
// cmd/kube-scheduler/scheduler.go func main() { ... command := app.NewSchedulerCommand() ... if err := command.Execute(); err != nil { os.Exit(1) } }
NewSchedulerCommand()
會讀取配置文件和參數,初始化調度命令,其中最主要的函數是 runCommand()
。async
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { ... cmd := &cobra.Command{ Use: "kube-scheduler", ... Run: func(cmd *cobra.Command, args []string) { if err := runCommand(cmd, opts, registryOptions...); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, ... } ... return cmd }
runCommand
主要分爲兩個重要步驟:ide
Setup
:讀取配置文件以及參數,初始化調度器。這裏的配置文件包括 Profiles 配置等。Run
:運行調度器所需的組件,例如健康檢查服務,Informer 等。而後使用 Setup
獲得的調度器運行調度的主流程。func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error { ... cc, sched, err := Setup(ctx, opts, registryOptions...) if err != nil { return err } return Run(ctx, cc, sched) }
Setup
會根據配置文件和參數建立 scheduler。這裏我的以爲最主要的是 Profiles,裏面定義了調度器的名字,以及 scheduling framework 的插件配置。還有一些能夠用來調優的參數,例如 PercentageOfNodesToScore
, PodInitialBackoffSeconds
, PodMaxBackoffSeconds
等。
而且 scheduler.New()
中會有一個 addAllEventHandlers(sched, informerFactory, podInformer)
函數,啓動全部資源對象的事件監聽,來根據狀況調用對應的回調函數,這些回調函數同時也會影響調度隊列的運行過程。
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { ... // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, recorderFactory, ctx.Done(), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), ) if err != nil { return nil, nil, err } return &cc, sched, nil }
Run
主要是啓動一些組件,而後調用 sched.Run(ctx)
進行調度的主流程。
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { ... // Prepare the event broadcaster. cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) // Setup healthz checks. ... // Start up the healthz server. ... // Start all informers. go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) // If leader election is enabled, runCommand via LeaderElector until done and exit. // Leader election ... // Leader election is disabled, so runCommand inline until done. sched.Run(ctx) return fmt.Errorf("finished without leader elect") }
Run
會啓動 scheduling queue,並不斷調用 sched.scheduleOne()
進行調度。
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
// Run starts the goroutine to pump from podBackoffQ to activeQ func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) }
調度隊列的運行邏輯:
podBackoffQ
是否有 pod 能夠放入 activeQ
中。檢查的邏輯是判斷 backOffTime
是否已經到期。unschedulableQ
是否有 pod 能夠放入 activeQ
中。在介紹 scheduleOne
以前,看這張 pod 調度流程圖能有助於咱們理清整個過程。同時這也是 k8s v1.15 開始支持的 Scheduling Framework 的 Plugin 擴展點。
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() ... pod := podInfo.Pod prof, err := sched.profileForPod(pod) ... // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() ... scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) ... // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) ... // Run the Reserve method of reserve plugins. if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { ... } // Run "permit" plugins. runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) ... // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod) if !waitOnPermitStatus.IsSuccess() { ... return } // Run "prebind" plugins. preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { ... return } err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { ... } else { // Run "postbind" plugins. prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() }
ScheduleOne
是調度器的主流程,主要包括如下幾步:
sched.NextPod()
拿到下一個須要調度的 pod。後面會對這個過程進行更詳細的介紹。sched.profileForPod(pod)
,根據 pod 中的 schedulerName 拿到針對該 pod 調度的 Profiles。這些 Profiles 就包括了調度插件的配置等。sched.Algorithm.Schedule()
。此處包括好幾個步驟,其中 PreFilter
, Filter
被稱爲 Predicate,是對節點進行過濾,這裏面考慮了節點資源,Pod Affinity,以及 Node Volumn 等狀況。而 PreScore
, Score
, Nomalize Score
又被稱爲 Priorities,是對節點進行優選打分,這裏會獲得一個適合當前 Pod 分配上去的 Node。Reserve
操做,將調度結果緩存。當後面的調度流程執行失敗,會進行 Unreserve
進行數據回滾。Permit
操做,這裏是用戶自定義的插件,可使 Pod 進行 allow(容許 Pod 經過 Permit 階段)、reject(Pod 調度失敗)和 wait(可設置超時時間)這三種操做。對於 Gang Scheduling (一批 pod 同時建立成功或同時建立失敗),能夠在 Permit
對 Pod 進行控制。WaitOnPermit
操做,這裏會阻塞判斷 Pod 是否 Permit,直到 Pod Permit 狀態爲 allow 或者 reject 再往下繼續運行。PreBind
, Bind
, PostBind
操做。這裏會調用 k8s apiserver 提供的接口 b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
,將待調度的 Pod 與選中的節點進行綁定,可是可能會綁定失敗,此時會作 Unreserve
操做,將節點上面 Pod 的資源解除預留,而後從新放置到失敗隊列中。當 Pod 與 Node 綁定成功後,Node 上面的 kubelet 會 watch 到對應的 event,而後會在節點上建立 Pod,包括建立容器 storage、network 等。等全部的資源都準備完成,kubelet 會把 Pod 狀態更新爲Running。
調度的時候,須要獲取一個調度的 pod,即 sched.NextPod()
,其中調用了 SchedulingQueue 的 Pop()
方法。
當 activeQ
中沒有元素,會經過 p.cond.Wait()
阻塞,直到 podBackoffQ
或者 unschedulableQ
將元素加入 activeQ
並經過 cond.Broadcast()
來喚醒。
// Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It // increments scheduling cycle when a pod is popped. func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) { p.lock.Lock() defer p.lock.Unlock() for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). if p.closed { return nil, fmt.Errorf(queueClosed) } p.cond.Wait() } obj, err := p.activeQ.Pop() if err != nil { return nil, err } pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ return pInfo, err }
當 pod 加入 activeQ
後,還會從 unschedulableQ
以及 podBackoffQ
中刪除對應 pod 的信息,並使用 cond.Broadcast()
來喚醒阻塞的 Pop。
// Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() pInfo := p.newQueuedPodInfo(pod) if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err) return err } if p.unschedulableQ.get(pod) != nil { klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod)) p.unschedulableQ.delete(pod) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod)) } metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() p.PodNominator.AddNominatedPod(pod, "") p.cond.Broadcast() return nil }
當 pod 調度失敗時,會調用 sched.Error()
,其中調用了 p.AddUnschedulableIfNotPresent()
.
決定 pod 調度失敗時進入 podBackoffQ
仍是 unschedulableQ
:若是 moveRequestCycle
大於 podSchedulingCycle
,則進入 podBackoffQ
,不然進入 unschedulableQ
.
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulableQ`. But if there has been a recent move // request, then the pod is put in `podBackoffQ`. func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { ... // If a move request has been received, move it to the BackoffQ, otherwise move // it to unschedulableQ. if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { p.unschedulableQ.addOrUpdate(pInfo) } ... }
什麼時候 moveRequestCycle >= podSchedulingCycle
:
unschedulableQ
中以前由於資源不知足需求的 pod 放入 activeQ
或者 podBackoffQ
,及時進行調度。flushUnschedulableQLeftover
,嘗試調度 unschedulableQ
中的 pod。這二者都會調用 movePodsToActiveOrBackoffQueue
函數,並將 moveRequestCycle
設爲 p.schedulingCycle
.
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) { ... p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() }
加入 podBackoffQ
有兩種狀況會讓 pod 加入 podBackoffQ:
moveRequestCycle >= podSchedulingCycle
,pod 就會加入到 podBackoffQ 中。movePodsToActiveOrBackoffQueue
將 unschedulableQ 的 pod 轉移到 podBackoffQ 或者 activeQ 中。轉移到 podBackoffQ 的條件是 p.isPodBackingoff(pInfo)
,即 pod 仍然處於 backoff 狀態。退出 podBackoffQ
調度器會定時讓 pod 從 podBackoffQ 轉移到 activeQ 中。
在 sched.SchedulingQueue.Run
中運行的 flushBackoffQCompleted
cronjob 會每隔 1s 按照優先級(優先級是按照 backoffTime 排序)依次將知足 backoffTime 條件的 pod 從 podBackoffQ 轉移到 activeQ 中,直到遇到一個不知足 backoffTime 條件的 pod。
加入 unschedulableQ
只有一種狀況會讓 pod 加入 unschedulableQ,那就是調度失敗。若是調度失敗,而且集羣資源沒有發生變動,即 moveRequestCycle < podSchedulingCycle
,那麼 pod 就會加入到 unschedulableQ 中。
退出 unschedulableQ
調度器會一樣定時讓 pod 從 unschedulableQ 轉移到 podBackoffQ 或者 activeQ 中。
在 sched.SchedulingQueue.Run
中運行的 flushUnschedulableQLeftover
最終會調用 movePodsToActiveOrBackoffQueue
將 pod 分別加入到 podBackoffQ 或者 activeQ 中。
Kubernetes scheduler 是 kubernetes 中至關重要的組件,基本上各個雲平臺都會根據本身的業務模型和需求自定義調度器,例如 華爲的 Volcano 計算框架。
經過這方面的學習,能在自定義調度器的開發中更加駕輕就熟。
Referencek8s source code
圖解kubernetes調度器SchedulingQueue核心源碼實現
深刻理解k8s調度器與調度框架核心源碼
Kubernetes資源調度——scheduler