Kubernetes 調度器實現初探

Kubernetes 調度器
Kubernetes 是一個基於容器的分佈式調度器,實現了本身的調度模塊。
在Kubernetes集羣中,調度器做爲一個獨立模塊經過pod運行。從幾個方面介紹Kubernetes調度器。node

調度器工做方式
Kubernetes中的調度器,是做爲單獨組件運行,通常運行在Master中,和Master數量保持一致。經過Raft協議選出一個實例做爲Leader工做,其餘實例Backup。 當Master故障,其餘實例之間繼續經過Raft協議選出新的Master工做。
其工做模式以下:git

調度器內部維護一個調度的pods隊列podQueue, 並監聽APIServer。
當咱們建立Pod時,首先經過APIServer 往ETCD寫入pod元數據。
調度器經過Informer監聽pods狀態,當有新增pod時,將pod加入到podQueue中。
調度器中的主進程,會不斷的從podQueue取出的pod,並將pod進入調度分配節點環節
調度環節分爲兩個步奏, Filter過濾知足條件的節點 、 Prioritize根據pod配置,例如資源使用率,親和性等指標,給這些節點打分,最終選出分數最高的節點。
分配節點成功, 調用apiServer的binding pod 接口, 將pod.Spec.NodeName設置爲所分配的那個節點。
節點上的kubelet一樣監聽ApiServer,若是發現有新的pod被調度到所在節點,調用本地的dockerDaemon 運行容器。
假如調度器嘗試調度Pod不成功,若是開啓了優先級和搶佔功能,會嘗試作一次搶佔,將節點中優先級較低的pod刪掉,並將待調度的pod調度到節點上。 若是未開啓,或者搶佔失敗,會記錄日誌,並將pod加入podQueue隊尾。github

clipboard.png

實現細節
kube-scheduling 是一個獨立運行的組件,主要工做內容在 Run 函數 。 算法

這裏面主要作幾件事情:docker

初始化一個Scheduler 實例 sched,傳入各類Informer,爲關心的資源變化創建監聽並註冊handler,例如維護podQuene
註冊events組件,設置日誌
註冊http/https 監聽,提供健康檢查和metrics 請求
運行主要的調度內容入口 sched.run() 。 若是設置 --leader-elect=true ,表明啓動多個實例,經過Raft選主,實例只有當被選爲master後運行主要工做函數sched.run。
調度核心內容在 sched.run() 函數,它會啓動一個go routine不斷運行sched.scheduleOne, 每次運行表明一個調度週期。api

func (sched *Scheduler) Run() {網絡

if !sched.config.WaitForCacheSync() {
    return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)

}
咱們看下 sched.scheduleOne 主要作什麼數據結構

func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
.... // do some pre check
scheduleResult, err := sched.schedule(pod)併發

if err != nil {
    if fitError, ok := err.(*core.FitError); ok {
        if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
            ..... // do some log
        } else {
            sched.preempt(pod, fitError)
        }
    }
}
... 
// Assume volumes first before assuming the pod.
allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
...     
fo func() {
    // Bind volumes first before Pod
    if !allBound {
        err := sched.bindVolumes(assumedPod)
        if err != nil {
            klog.Errorf("error binding volumes: %v", err)
            metrics.PodScheduleErrors.Inc()
            return
        }
    }
  err := sched.bind(assumedPod, &v1.Binding{
        ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
        Target: v1.ObjectReference{
            Kind: "Node",
            Name: scheduleResult.SuggestedHost,
        },
    })
}

}
在sched.scheduleOne 中,主要會作幾件事情app

經過sched.config.NextPod(), 從podQuene中取出pod
運行sched.schedule,嘗試進行一次調度。
假如調度失敗,若是開啓了搶佔功能,會調用sched.preempt 嘗試進行搶佔,驅逐一些pod,爲被調度的pod預留空間,在下一次調度中生效。
若是調度成功,執行bind接口。在執行bind以前會爲pod volume中聲明的的PVC 作provision。
sched.schedule 是主要的pod調度邏輯

func (g genericScheduler) Schedule(pod v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {

// Get node list
nodes, err := nodeLister.List()
// Filter
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil {
    return result, err
}
// Priority
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
    return result, err
}

// SelectHost
host, err := g.selectHost(priorityList)
return ScheduleResult{
    SuggestedHost:  host,
    EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
    FeasibleNodes:  len(filteredNodes),
}, err

}
調度主要分爲三個步奏:

Filters: 過濾條件不知足的節點
PrioritizeNodes: 在條件知足的節點中作Scoring,獲取一個最終打分列表priorityList
selectHost: 在priorityList中選取分數最高的一組節點,從中根據round-robin 方式選取一個節點。

clipboard.png

接下來咱們繼續拆解, 分別看下這三個步奏會怎麼作

Filters
Filters 相對比較容易,調度器默認註冊了一系列的predicates方法, 調度過程爲併發調用每一個節點的predicates 方法。最終獲得一個node list,包含符合條件的節點對象。

func (g genericScheduler) findNodesThatFit(pod v1.Pod, nodes []v1.Node) ([]v1.Node, FailedPredicateMap, error) {

if len(g.predicates) == 0 {
    filtered = nodes
} else {
    allNodes := int32(g.cache.NodeTree().NumNodes())
    numNodesToFind := g.numFeasibleNodesToFind(allNodes)

    checkNode := func(i int) {
        nodeName := g.cache.NodeTree().Next()
  // 此處會調用這個節點的全部predicates 方法
        fits, failedPredicates, err := podFitsOnNode(
            pod,
            meta,
            g.cachedNodeInfoMap[nodeName],
            g.predicates,
            g.schedulingQueue,
            g.alwaysCheckAllPredicates,
        )

        if fits {
            length := atomic.AddInt32(&filteredLen, 1)
            if length > numNodesToFind {
        // 若是當前符合條件的節點數已經足夠,會中止計算。
                cancel()
                atomic.AddInt32(&filteredLen, -1)
            } else {
                filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
            }
        }
    }
// 併發調用checkNode 方法
    workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
    filtered = filtered[:filteredLen]
}
return filtered, failedPredicateMap, nil

}
值得注意的是, 1.13中引入了FeasibleNodes 機制,爲了提升大規模集羣的調度性能。容許咱們經過bad-percentage-of-nodes-to-score 參數, 設置filter的計算比例(默認50%), 當節點數大於100個, 在 filters的過程,只要知足條件的節點數超過這個比例,就會中止filter過程,而不是計算所有節點。
舉個例子,當節點數爲1000, 咱們設置的計算比例爲30%,那麼調度器認爲filter過程只須要找到知足條件的300個節點,filter過程當中當知足條件的節點數達到300個,filter過程結束。 這樣filter不用計算所有的節點,一樣也會下降Prioritize 的計算數量。 可是帶來的影響是pod有可能沒有被調度到最合適的節點。

Prioritize
Prioritize 的目的是幫助pod,爲每一個符合條件的節點打分,幫助pod找到最合適的節點。一樣調度器默認註冊了一系列Prioritize方法。這是Prioritize 對象的數據結構

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {

Name   string
Map    PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight   int

}
每一個PriorityConfig 表明一個評分的指標,會考慮服務的均衡性,節點的資源分配等因素。 一個 PriorityConfig 的主要Scoring過程分爲 Map和Reduce,

Map 過程計算每一個節點的分數值
Reduce 過程會將當前PriorityConfig的全部節點的打分結果再作一次處理。
全部PriorityConfig 計算完畢後,將每一個PriorityConfig的數值乘以對應的權重,並按照節點再作一次聚合。

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            var err error
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
        }
    })

    for i := range priorityConfigs {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]);
        }(i)
    }
    wg.Wait()

    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

此外Filter和Prioritize 都支持extener scheduler 的調用,本文不作過多闡述。

現狀
目前kubernetes調度器的調度方式是Pod-by-Pod,也是當前調度器不足的地方。主要瓶頸以下

kubernets目前調度的方式,每一個pod會對全部節點都計算一遍,當集羣規模很是大,節點數不少時,pod的調度時間會很是慢。 這也是percentage-of-nodes-to-score 嘗試要解決的問題
pod-by-pod的調度方式不適合一些機器學習場景。 kubernetes早期設計主要爲在線任務服務,在一些離線任務場景,好比分佈式機器學習中,咱們須要一種新的算法gang scheduler,pod也許對調度的即時性要求沒有那麼高,可是提交任務後,只有當一個批量計算任務的全部workers都運行起來時,纔會開始計算任務。 pod-by-pod 方式在這個場景下,當資源不足時很是容易引發資源死鎖。
當前調度器的擴展性不是十分好,特定場景的調度流程都須要經過硬編碼實如今主流程中,好比咱們看到的bindVolume部分, 一樣也致使Gang Scheduler 沒法在當前調度器框架下經過原生方式實現。
Kubernetes調度器的發展
社區調度器的發展,也是爲了解決這些問題

調度器V2框架,加強了擴展性,也爲在原生調度器中實現Gang schedule作準備。
Kube-batch: 一種Gang schedule的實現 https://github.com/kubernetes...
poseidon: Firmament 一種基於網絡圖調度算法的調度器,poseidon 是將Firmament接入Kubernetes調度器的實現 https://github.com/kubernetes...
接下來,咱們會分析一個具體的調度器方法實現,幫助理解拆解調度器的過程。 而且關注分析調度器的社區動態。

本文做者:蕭元

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索