k8s調度器及調度隊列源碼分析

簡介

在 Kubernetes 中,調度是指將 Pod 放置到合適的 Node 上,而後對應 Node 上的 kubelet 纔可以運行這些 Pod。K8s scheduler 就是用來調度 pod 的一個組件。node

本文主要是經過源碼瞭解調度器的部分工做流程。算法

調度器流程圖

源碼分析

Based on Kubernetes v1.19.11.api

K8s scheduler 主要的數據結構是:緩存

  1. Scheduler。
  2. SchedulingQueue。

相關的代碼流程主要分爲兩個部分:數據結構

  1. cmd/kube-scheduler,這裏是咱們調度器的起始處,主要是讀取配置,初始化並啓動調度器。
  2. pkg/scheduler,這裏是調度器的核心代碼。

數據結構

Scheduler

// pkg/scheduler/scheduler.go

// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
    // It is expected that changes made via SchedulerCache will be observed
    // by NodeLister and Algorithm.
    SchedulerCache internalcache.Cache
    Algorithm core.ScheduleAlgorithm

    // NextPod should be a function that blocks until the next pod
    // is available. We don't use a channel for this, because scheduling
    // a pod may take some amount of time and we don't want pods to get
    // stale while they sit in a channel.
    NextPod func() *framework.QueuedPodInfo

    // Error is called if there is an error. It is passed the pod in
    // question, and the error
    Error func(*framework.QueuedPodInfo, error)

    // Close this to shut down the scheduler.
    StopEverything <-chan struct{}

    // SchedulingQueue holds pods to be scheduled
    SchedulingQueue internalqueue.SchedulingQueue

    // Profiles are the scheduling profiles.
    Profiles profile.Map

    scheduledPodsHasSynced func() bool
    client clientset.Interface
}
  1. SchedulerCache ,保存了調度所需的 podStates 和 nodeInfos。
  2. Algorithm ,會使用該對象的 Schedule 方法來運行調度邏輯。
  3. SchedulingQueue ,調度隊列。
  4. Profiles ,調度器配置。

SchedulingQueue

Interfaceapp

// pkg/scheduler/internal/queue/scheduling_queue.go

// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {
    framework.PodNominator
    Add(pod *v1.Pod) error
    // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
    // The podSchedulingCycle represents the current scheduling cycle number which can be
    // returned by calling SchedulingCycle().
    AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
    // SchedulingCycle returns the current number of scheduling cycle which is
    // cached by scheduling queue. Normally, incrementing this number whenever
    // a pod is popped (e.g. called Pop()) is enough.
    SchedulingCycle() int64
    // Pop removes the head of the queue and returns it. It blocks if the
    // queue is empty and waits until a new item is added to the queue.
    Pop() (*framework.QueuedPodInfo, error)
    Update(oldPod, newPod *v1.Pod) error
    Delete(pod *v1.Pod) error
    MoveAllToActiveOrBackoffQueue(event string)
    AssignedPodAdded(pod *v1.Pod)
    AssignedPodUpdated(pod *v1.Pod)
    PendingPods() []*v1.Pod
    // Close closes the SchedulingQueue so that the goroutine which is
    // waiting to pop items can exit gracefully.
    Close()
    // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
    NumUnschedulablePods() int
    // Run starts the goroutines managing the queue.
    Run()
}

Implementation框架

// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has three sub queues. One sub-queue holds pods that are being considered for
// scheduling. This is called activeQ and is a Heap. Another queue holds
// pods that are already tried and are determined to be unschedulable. The latter
// is called unschedulableQ. The third queue holds pods that are moved from
// unschedulable queues and will be moved to active queue when backoff are completed.
type PriorityQueue struct {
    // PodNominator abstracts the operations to maintain nominated Pods.
    framework.PodNominator
    stop  chan struct{}
    clock util.Clock
    // pod initial backoff duration.
    podInitialBackoffDuration time.Duration
    // pod maximum backoff duration.
    podMaxBackoffDuration time.Duration

    lock sync.RWMutex
    cond sync.Cond
    // activeQ is heap structure that scheduler actively looks at to find pods to
    // schedule. Head of heap is the highest priority pod.
    activeQ *heap.Heap

    // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
    // are popped from this heap before the scheduler looks at activeQ
    podBackoffQ *heap.Heap

    // unschedulableQ holds pods that have been tried and determined unschedulable.
    unschedulableQ *UnschedulablePodsMap

    // schedulingCycle represents sequence number of scheduling cycle and is incremented
    // when a pod is popped.
    schedulingCycle int64
    // moveRequestCycle caches the sequence number of scheduling cycle when we
    // received a move request. Unscheduable pods in and before this scheduling
    // cycle will be put back to activeQueue if we were trying to schedule them
    // when we received move request.
    moveRequestCycle int64

    // closed indicates that the queue is closed.
    // It is mainly used to let Pop() exit its control loop while waiting for an item.
    closed bool
}
  1. PodNominator:調度算法調度的結果,保存了 Pod 和 Node 的關係。
  2. cond:用來控制調度隊列的 Pop 操做。
  3. activeQ:用堆維護的優先隊列,保存着待調度的 pod,其中優先級默認是根據 Pod 的優先級和建立時間來排序。
  4. podBackoffQ:一樣是用堆維護的優先隊列,保存着運行失敗的 Pod,優先級是根據 backOffTime 來排序,backOffTimepodInitialBackoffDuration 以及 podMaxBackoffDuration 兩個參數影響。
  5. unschedulableQ:是一個 Map 結構,保存着暫時沒法調度(多是資源不知足等狀況)的 Pod。

cmd/kube-scheduler

調度器的入口 main

最開始,scheduler 在 cmd/kube-scheduler/scheduler.go 使用 NewSchedulerCommand() 初始化命令並執行命令。less

// cmd/kube-scheduler/scheduler.go

func main() {
    ...
    command := app.NewSchedulerCommand()
    ...
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

初始化調度器命令 NewSchedulerCommand

NewSchedulerCommand() 會讀取配置文件和參數,初始化調度命令,其中最主要的函數是 runCommand()async

func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    ...
    cmd := &cobra.Command{
        Use: "kube-scheduler",
        ...
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, opts, registryOptions...); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
        ...
    }

    ...
    return cmd
}

執行調度器命令 runCommand

runCommand 主要分爲兩個重要步驟:ide

  1. Setup :讀取配置文件以及參數,初始化調度器。這裏的配置文件包括 Profiles 配置等。
  2. Run:運行調度器所需的組件,例如健康檢查服務,Informer 等。而後使用 Setup 獲得的調度器運行調度的主流程。
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
    ...

    cc, sched, err := Setup(ctx, opts, registryOptions...)
    if err != nil {
        return err
    }

    return Run(ctx, cc, sched)
}

建立調度器 Setup

Setup 會根據配置文件和參數建立 scheduler。這裏我的以爲最主要的是 Profiles,裏面定義了調度器的名字,以及 scheduling framework 的插件配置。還有一些能夠用來調優的參數,例如 PercentageOfNodesToScore, PodInitialBackoffSeconds , PodMaxBackoffSeconds 等。

而且 scheduler.New() 中會有一個 addAllEventHandlers(sched, informerFactory, podInformer) 函數,啓動全部資源對象的事件監聽,來根據狀況調用對應的回調函數,這些回調函數同時也會影響調度隊列的運行過程。

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
    ...
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory,
        cc.PodInformer,
        recorderFactory,
        ctx.Done(),
        scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
        scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
        scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
        scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
        scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
    )
    if err != nil {
        return nil, nil, err
    }

    return &cc, sched, nil
}

運行調度器 Run

Run 主要是啓動一些組件,而後調用 sched.Run(ctx) 進行調度的主流程。

func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    ...
    // Prepare the event broadcaster.
    cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
    // Setup healthz checks.
    ...
    // Start up the healthz server.
    ...
    // Start all informers.
    go cc.PodInformer.Informer().Run(ctx.Done())
    cc.InformerFactory.Start(ctx.Done())
    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(ctx.Done())

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    // Leader election
    ...
    // Leader election is disabled, so runCommand inline until done.
    sched.Run(ctx)
    return fmt.Errorf("finished without leader elect")
}

pkg/scheduler

運行調度器主流程

Run 會啓動 scheduling queue,並不斷調用 sched.scheduleOne() 進行調度。

// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
    if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
        return
    }
    sched.SchedulingQueue.Run()
    wait.UntilWithContext(ctx, sched.scheduleOne, 0)
    sched.SchedulingQueue.Close()
}

運行調度隊列

// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

調度隊列的運行邏輯:

  1. 每隔 1s 檢查 podBackoffQ 是否有 pod 能夠放入 activeQ 中。檢查的邏輯是判斷 backOffTime 是否已經到期。
  2. 每隔 30s 檢查 unschedulableQ 是否有 pod 能夠放入 activeQ 中。

單個 Pod 的調度 scheduleOne

在介紹 scheduleOne 以前,看這張 pod 調度流程圖能有助於咱們理清整個過程。同時這也是 k8s v1.15 開始支持的 Scheduling Framework 的 Plugin 擴展點。

pod scheduling context

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    podInfo := sched.NextPod()
    ...
    pod := podInfo.Pod
    prof, err := sched.profileForPod(pod)
    ...
    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    state := framework.NewCycleState()
    ...
    scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
    ...

    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    // This allows us to keep scheduling without waiting on binding to occur.
    assumedPodInfo := podInfo.DeepCopy()
    assumedPod := assumedPodInfo.Pod
    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    ...

    // Run the Reserve method of reserve plugins.
    if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        ...
    }

    // Run "permit" plugins.
    runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    ...

    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
        if !waitOnPermitStatus.IsSuccess() {
            ...
            return
        }

        // Run "prebind" plugins.
        preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        if !preBindStatus.IsSuccess() {
            ...
            return
        }

        err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
        if err != nil {
            ...
        } else {
            // Run "postbind" plugins.
            prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        }
    }()
}

ScheduleOne 是調度器的主流程,主要包括如下幾步:

  1. 調用 sched.NextPod() 拿到下一個須要調度的 pod。後面會對這個過程進行更詳細的介紹。
  2. 調用 sched.profileForPod(pod) ,根據 pod 中的 schedulerName 拿到針對該 pod 調度的 Profiles。這些 Profiles 就包括了調度插件的配置等。
  3. 進行上圖中的 Scheduling Cycle 部分,這部分是單線程運行的。
    • 調用 sched.Algorithm.Schedule()。此處包括好幾個步驟,其中 PreFilter, Filter 被稱爲 Predicate,是對節點進行過濾,這裏面考慮了節點資源,Pod Affinity,以及 Node Volumn 等狀況。而 PreScore , Score , Nomalize Score 又被稱爲 Priorities,是對節點進行優選打分,這裏會獲得一個適合當前 Pod 分配上去的 Node。
    • 進行 Reserve 操做,將調度結果緩存。當後面的調度流程執行失敗,會進行 Unreserve 進行數據回滾。
    • 進行 Permit 操做,這裏是用戶自定義的插件,可使 Pod 進行 allow(容許 Pod 經過 Permit 階段)、reject(Pod 調度失敗)和 wait(可設置超時時間)這三種操做。對於 Gang Scheduling (一批 pod 同時建立成功或同時建立失敗),能夠在 Permit 對 Pod 進行控制。
  4. 進行圖中的 Binding Cycle 部分,這部分是起了一個 Goroutine 去完成工做的,不會阻塞調度主流程。
    • 最開始會進行 WaitOnPermit 操做,這裏會阻塞判斷 Pod 是否 Permit,直到 Pod Permit 狀態爲 allow 或者 reject 再往下繼續運行。
    • 進行 PreBind , Bind , PostBind 操做。這裏會調用 k8s apiserver 提供的接口 b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}),將待調度的 Pod 與選中的節點進行綁定,可是可能會綁定失敗,此時會作 Unreserve 操做,將節點上面 Pod 的資源解除預留,而後從新放置到失敗隊列中。

當 Pod 與 Node 綁定成功後,Node 上面的 kubelet 會 watch 到對應的 event,而後會在節點上建立 Pod,包括建立容器 storage、network 等。等全部的資源都準備完成,kubelet 會把 Pod 狀態更新爲Running。

SchedulingQueue 細節

獲取下一個運行的 Pod

調度的時候,須要獲取一個調度的 pod,即 sched.NextPod() ,其中調用了 SchedulingQueue 的 Pop() 方法。

activeQ 中沒有元素,會經過 p.cond.Wait() 阻塞,直到 podBackoffQ 或者 unschedulableQ 將元素加入 activeQ 並經過 cond.Broadcast() 來喚醒。

// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
    p.lock.Lock()
    defer p.lock.Unlock()
    for p.activeQ.Len() == 0 {
        // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
        // When Close() is called, the p.closed is set and the condition is broadcast,
        // which causes this loop to continue and return from the Pop().
        if p.closed {
            return nil, fmt.Errorf(queueClosed)
        }
        p.cond.Wait()
    }
    obj, err := p.activeQ.Pop()
    if err != nil {
        return nil, err
    }
    pInfo := obj.(*framework.QueuedPodInfo)
    pInfo.Attempts++
    p.schedulingCycle++
    return pInfo, err
}

將 Pod 加入 activeQ

當 pod 加入 activeQ 後,還會從 unschedulableQ 以及 podBackoffQ 中刪除對應 pod 的信息,並使用 cond.Broadcast() 來喚醒阻塞的 Pop。

// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {
    p.lock.Lock()
    defer p.lock.Unlock()
    pInfo := p.newQueuedPodInfo(pod)
    if err := p.activeQ.Add(pInfo); err != nil {
        klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
        return err
    }
    if p.unschedulableQ.get(pod) != nil {
        klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
        p.unschedulableQ.delete(pod)
    }
    // Delete pod from backoffQ if it is backing off
    if err := p.podBackoffQ.Delete(pInfo); err == nil {
        klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
    }
    metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
    p.PodNominator.AddNominatedPod(pod, "")
    p.cond.Broadcast()

    return nil
}

當 Pod 調度失敗時進入失敗隊列

當 pod 調度失敗時,會調用 sched.Error() ,其中調用了 p.AddUnschedulableIfNotPresent() .

決定 pod 調度失敗時進入 podBackoffQ 仍是 unschedulableQ :若是 moveRequestCycle 大於 podSchedulingCycle ,則進入 podBackoffQ ,不然進入 unschedulableQ .

// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
// unschedulable pods in `unschedulableQ`. But if there has been a recent move
// request, then the pod is put in `podBackoffQ`.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
    ...
    // If a move request has been received, move it to the BackoffQ, otherwise move
    // it to unschedulableQ.
    if p.moveRequestCycle >= podSchedulingCycle {
        if err := p.podBackoffQ.Add(pInfo); err != nil {
            return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
        }
    } else {
        p.unschedulableQ.addOrUpdate(pInfo)
    }
    ...
}

什麼時候 moveRequestCycle >= podSchedulingCycle

  1. 咱們在集羣資源變動的時候(例如添加 Node 或者刪除 Pod),會有回調函數嘗試將 unschedulableQ 中以前由於資源不知足需求的 pod 放入 activeQ 或者 podBackoffQ ,及時進行調度。
  2. 調度隊列會每隔 30s 定時運行 flushUnschedulableQLeftover ,嘗試調度 unschedulableQ 中的 pod。

這二者都會調用 movePodsToActiveOrBackoffQueue 函數,並將 moveRequestCycle 設爲 p.schedulingCycle.

func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
    ...
    p.moveRequestCycle = p.schedulingCycle
    p.cond.Broadcast()
}

podBackoffQ 中 pod 的生命週期

加入 podBackoffQ

有兩種狀況會讓 pod 加入 podBackoffQ:

  1. 調度失敗。若是調度失敗,而且集羣資源發生變動,即 moveRequestCycle >= podSchedulingCycle ,pod 就會加入到 podBackoffQ 中。
  2. 從 unschedulableQ 中轉移。當集羣資源發生變化的時候,最終會調用 movePodsToActiveOrBackoffQueue 將 unschedulableQ 的 pod 轉移到 podBackoffQ 或者 activeQ 中。轉移到 podBackoffQ 的條件是 p.isPodBackingoff(pInfo) ,即 pod 仍然處於 backoff 狀態。

退出 podBackoffQ

調度器會定時讓 pod 從 podBackoffQ 轉移到 activeQ 中。

sched.SchedulingQueue.Run 中運行的 flushBackoffQCompleted cronjob 會每隔 1s 按照優先級(優先級是按照 backoffTime 排序)依次將知足 backoffTime 條件的 pod 從 podBackoffQ 轉移到 activeQ 中,直到遇到一個不知足 backoffTime 條件的 pod。

unschedulableQ 中 pod 的生命週期

加入 unschedulableQ

只有一種狀況會讓 pod 加入 unschedulableQ,那就是調度失敗。若是調度失敗,而且集羣資源沒有發生變動,即 moveRequestCycle < podSchedulingCycle ,那麼 pod 就會加入到 unschedulableQ 中。

退出 unschedulableQ

調度器會一樣定時讓 pod 從 unschedulableQ 轉移到 podBackoffQ 或者 activeQ 中。

sched.SchedulingQueue.Run 中運行的 flushUnschedulableQLeftover 最終會調用 movePodsToActiveOrBackoffQueue 將 pod 分別加入到 podBackoffQ 或者 activeQ 中。

總結

Kubernetes scheduler 是 kubernetes 中至關重要的組件,基本上各個雲平臺都會根據本身的業務模型和需求自定義調度器,例如 華爲的 Volcano 計算框架。

經過這方面的學習,能在自定義調度器的開發中更加駕輕就熟。

Reference

k8s source code

圖解kubernetes調度器SchedulingQueue核心源碼實現

深刻理解k8s調度器與調度框架核心源碼

Kubernetes資源調度——scheduler

相關文章
相關標籤/搜索