Kubernetes 調度器實現初探

Kubernetes 調度器

Kubernetes 是一個基於容器的分佈式調度器,實現了本身的調度模塊。
在Kubernetes集羣中,調度器做爲一個獨立模塊經過pod運行。從幾個方面介紹Kubernetes調度器。node

調度器工做方式

Kubernetes中的調度器,是做爲單獨組件運行,通常運行在Master中,和Master數量保持一致。經過Raft協議選出一個實例做爲Leader工做,其餘實例Backup。 當Master故障,其餘實例之間繼續經過Raft協議選出新的Master工做。
其工做模式以下:git

  • 調度器內部維護一個調度的pods隊列podQueue, 並監聽APIServer。
  • 當咱們建立Pod時,首先經過APIServer 往ETCD寫入pod元數據。
  • 調度器經過Informer監聽pods狀態,當有新增pod時,將pod加入到podQueue中。
  • 調度器中的主進程,會不斷的從podQueue取出的pod,並將pod進入調度分配節點環節
  • 調度環節分爲兩個步奏, Filter過濾知足條件的節點 、 Prioritize根據pod配置,例如資源使用率,親和性等指標,給這些節點打分,最終選出分數最高的節點。
  • 分配節點成功, 調用apiServer的binding pod 接口, 將pod.Spec.NodeName設置爲所分配的那個節點。
  • 節點上的kubelet一樣監聽ApiServer,若是發現有新的pod被調度到所在節點,調用本地的dockerDaemon 運行容器。
  • 假如調度器嘗試調度Pod不成功,若是開啓了優先級和搶佔功能,會嘗試作一次搶佔,將節點中優先級較低的pod刪掉,並將待調度的pod調度到節點上。 若是未開啓,或者搶佔失敗,會記錄日誌,並將pod加入podQueue隊尾。

實現細節

kube-scheduling 是一個獨立運行的組件,主要工做內容在 Run 函數 。github

這裏面主要作幾件事情:算法

  • 初始化一個Scheduler 實例 sched,傳入各類Informer,爲關心的資源變化創建監聽並註冊handler,例如維護podQuene
  • 註冊events組件,設置日誌
  • 註冊http/https 監聽,提供健康檢查和metrics 請求
  • 運行主要的調度內容入口 sched.run() 。 若是設置 --leader-elect=true ,表明啓動多個實例,經過Raft選主,實例只有當被選爲master後運行主要工做函數sched.run

調度核心內容在 sched.run() 函數,它會啓動一個go routine不斷運行sched.scheduleOne, 每次運行表明一個調度週期。docker

func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

咱們看下 sched.scheduleOne 主要作什麼api

func (sched *Scheduler) scheduleOne() {
  pod := sched.config.NextPod()
  .... // do some pre check
  scheduleResult, err := sched.schedule(pod)
    if err != nil {
        if fitError, ok := err.(*core.FitError); ok {
            if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
                ..... // do some log
            } else {
                sched.preempt(pod, fitError)
            }
        }
    }
    ... 
    // Assume volumes first before assuming the pod.
    allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
    ...     
    fo func() {
        // Bind volumes first before Pod
        if !allBound {
            err := sched.bindVolumes(assumedPod)
            if err != nil {
                klog.Errorf("error binding volumes: %v", err)
                metrics.PodScheduleErrors.Inc()
                return
            }
        }
      err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: scheduleResult.SuggestedHost,
            },
        })
    }
}

sched.scheduleOne 中,主要會作幾件事情網絡

  • 經過sched.config.NextPod(), 從podQuene中取出pod
  • 運行sched.schedule,嘗試進行一次調度。
  • 假如調度失敗,若是開啓了搶佔功能,會調用sched.preempt 嘗試進行搶佔,驅逐一些pod,爲被調度的pod預留空間,在下一次調度中生效。
  • 若是調度成功,執行bind接口。在執行bind以前會爲pod volume中聲明的的PVC 作provision。

sched.schedule 是主要的pod調度邏輯數據結構

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
    // Get node list
    nodes, err := nodeLister.List()
    // Filter
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return result, err
    }
    // Priority
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return result, err
    }

    // SelectHost
    host, err := g.selectHost(priorityList)
    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
        FeasibleNodes:  len(filteredNodes),
    }, err
}

調度主要分爲三個步奏:併發

  • Filters: 過濾條件不知足的節點
  • PrioritizeNodes: 在條件知足的節點中作Scoring,獲取一個最終打分列表priorityList
  • selectHost: 在priorityList中選取分數最高的一組節點,從中根據round-robin 方式選取一個節點。

接下來咱們繼續拆解, 分別看下這三個步奏會怎麼作app

Filters

Filters 相對比較容易,調度器默認註冊了一系列的predicates方法, 調度過程爲併發調用每一個節點的predicates 方法。最終獲得一個node list,包含符合條件的節點對象。

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
    if len(g.predicates) == 0 {
        filtered = nodes
    } else {
        allNodes := int32(g.cache.NodeTree().NumNodes())
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)

        checkNode := func(i int) {
            nodeName := g.cache.NodeTree().Next()
      // 此處會調用這個節點的全部predicates 方法
            fits, failedPredicates, err := podFitsOnNode(
                pod,
                meta,
                g.cachedNodeInfoMap[nodeName],
                g.predicates,
                g.schedulingQueue,
                g.alwaysCheckAllPredicates,
            )

            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
            // 若是當前符合條件的節點數已經足夠,會中止計算。
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                }
            }
        }
    // 併發調用checkNode 方法
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
        filtered = filtered[:filteredLen]
    }
    return filtered, failedPredicateMap, nil
}

值得注意的是, 1.13中引入了FeasibleNodes 機制,爲了提升大規模集羣的調度性能。容許咱們經過bad-percentage-of-nodes-to-score 參數, 設置filter的計算比例(默認50%), 當節點數大於100個, 在 filters的過程,只要知足條件的節點數超過這個比例,就會中止filter過程,而不是計算所有節點。
舉個例子,當節點數爲1000, 咱們設置的計算比例爲30%,那麼調度器認爲filter過程只須要找到知足條件的300個節點,filter過程當中當知足條件的節點數達到300個,filter過程結束。 這樣filter不用計算所有的節點,一樣也會下降Prioritize 的計算數量。 可是帶來的影響是pod有可能沒有被調度到最合適的節點。

Prioritize

Prioritize 的目的是幫助pod,爲每一個符合條件的節點打分,幫助pod找到最合適的節點。一樣調度器默認註冊了一系列Prioritize方法。這是Prioritize 對象的數據結構

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
    Name   string
    Map    PriorityMapFunction
    Reduce PriorityReduceFunction
    // TODO: Remove it after migrating all functions to
    // Map-Reduce pattern.
    Function PriorityFunction
    Weight   int
}

每一個PriorityConfig 表明一個評分的指標,會考慮服務的均衡性,節點的資源分配等因素。 一個 PriorityConfig 的主要Scoring過程分爲 Map和Reduce,

  • Map 過程計算每一個節點的分數值
  • Reduce 過程會將當前PriorityConfig的全部節點的打分結果再作一次處理。

全部PriorityConfig 計算完畢後,將每一個PriorityConfig的數值乘以對應的權重,並按照節點再作一次聚合。

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            var err error
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
        }
    })

    for i := range priorityConfigs {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]);
        }(i)
    }
    wg.Wait()

    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

此外Filter和Prioritize 都支持extener scheduler 的調用,本文不作過多闡述。

現狀

目前kubernetes調度器的調度方式是Pod-by-Pod,也是當前調度器不足的地方。主要瓶頸以下

  • kubernets目前調度的方式,每一個pod會對全部節點都計算一遍,當集羣規模很是大,節點數不少時,pod的調度時間會很是慢。 這也是percentage-of-nodes-to-score 嘗試要解決的問題
  • pod-by-pod的調度方式不適合一些機器學習場景。 kubernetes早期設計主要爲在線任務服務,在一些離線任務場景,好比分佈式機器學習中,咱們須要一種新的算法gang scheduler,pod也許對調度的即時性要求沒有那麼高,可是提交任務後,只有當一個批量計算任務的全部workers都運行起來時,纔會開始計算任務。 pod-by-pod 方式在這個場景下,當資源不足時很是容易引發資源死鎖。
  • 當前調度器的擴展性不是十分好,特定場景的調度流程都須要經過硬編碼實如今主流程中,好比咱們看到的bindVolume部分, 一樣也致使Gang Scheduler 沒法在當前調度器框架下經過原生方式實現。

Kubernetes調度器的發展

社區調度器的發展,也是爲了解決這些問題

接下來,咱們會分析一個具體的調度器方法實現,幫助理解拆解調度器的過程。 而且關注分析調度器的社區動態。

參考

https://medium.com/jorgeacetozi/kubernetes-master-components-etcd-api-server-controller-manager-and-scheduler-3a0179fc8186
https://jvns.ca/blog/2017/07/27/how-does-the-kubernetes-scheduler-work/

 



本文做者:蕭元

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索