Kubernetes 是一個基於容器的分佈式調度器,實現了本身的調度模塊。
在Kubernetes集羣中,調度器做爲一個獨立模塊經過pod運行。從幾個方面介紹Kubernetes調度器。node
Kubernetes中的調度器,是做爲單獨組件運行,通常運行在Master中,和Master數量保持一致。經過Raft協議選出一個實例做爲Leader工做,其餘實例Backup。 當Master故障,其餘實例之間繼續經過Raft協議選出新的Master工做。
其工做模式以下:git
pod.Spec.NodeName
設置爲所分配的那個節點。kube-scheduling 是一個獨立運行的組件,主要工做內容在 Run 函數 。github
這裏面主要作幾件事情:算法
sched
,傳入各類Informer,爲關心的資源變化創建監聽並註冊handler,例如維護podQuenesched.run()
。 若是設置 --leader-elect=true
,表明啓動多個實例,經過Raft選主,實例只有當被選爲master後運行主要工做函數sched.run
。調度核心內容在 sched.run()
函數,它會啓動一個go routine不斷運行sched.scheduleOne
, 每次運行表明一個調度週期。docker
func (sched *Scheduler) Run() { if !sched.config.WaitForCacheSync() { return } go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) }
咱們看下 sched.scheduleOne
主要作什麼api
func (sched *Scheduler) scheduleOne() { pod := sched.config.NextPod() .... // do some pre check scheduleResult, err := sched.schedule(pod) if err != nil { if fitError, ok := err.(*core.FitError); ok { if !util.PodPriorityEnabled() || sched.config.DisablePreemption { ..... // do some log } else { sched.preempt(pod, fitError) } } } ... // Assume volumes first before assuming the pod. allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost) ... fo func() { // Bind volumes first before Pod if !allBound { err := sched.bindVolumes(assumedPod) if err != nil { klog.Errorf("error binding volumes: %v", err) metrics.PodScheduleErrors.Inc() return } } err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: scheduleResult.SuggestedHost, }, }) } }
在sched.scheduleOne
中,主要會作幾件事情網絡
sched.config.NextPod()
, 從podQuene中取出podsched.schedule
,嘗試進行一次調度。sched.preempt
嘗試進行搶佔,驅逐一些pod,爲被調度的pod預留空間,在下一次調度中生效。sched.schedule
是主要的pod調度邏輯數據結構
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) { // Get node list nodes, err := nodeLister.List() // Filter filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes) if err != nil { return result, err } // Priority priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) if err != nil { return result, err } // SelectHost host, err := g.selectHost(priorityList) return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap), FeasibleNodes: len(filteredNodes), }, err }
調度主要分爲三個步奏:併發
接下來咱們繼續拆解, 分別看下這三個步奏會怎麼作app
Filters 相對比較容易,調度器默認註冊了一系列的predicates方法, 調度過程爲併發調用每一個節點的predicates 方法。最終獲得一個node list,包含符合條件的節點對象。
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { if len(g.predicates) == 0 { filtered = nodes } else { allNodes := int32(g.cache.NodeTree().NumNodes()) numNodesToFind := g.numFeasibleNodesToFind(allNodes) checkNode := func(i int) { nodeName := g.cache.NodeTree().Next() // 此處會調用這個節點的全部predicates 方法 fits, failedPredicates, err := podFitsOnNode( pod, meta, g.cachedNodeInfoMap[nodeName], g.predicates, g.schedulingQueue, g.alwaysCheckAllPredicates, ) if fits { length := atomic.AddInt32(&filteredLen, 1) if length > numNodesToFind { // 若是當前符合條件的節點數已經足夠,會中止計算。 cancel() atomic.AddInt32(&filteredLen, -1) } else { filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node() } } } // 併發調用checkNode 方法 workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode) filtered = filtered[:filteredLen] } return filtered, failedPredicateMap, nil }
值得注意的是, 1.13中引入了FeasibleNodes 機制,爲了提升大規模集羣的調度性能。容許咱們經過bad-percentage-of-nodes-to-score
參數, 設置filter的計算比例(默認50%), 當節點數大於100個, 在 filters的過程,只要知足條件的節點數超過這個比例,就會中止filter過程,而不是計算所有節點。
舉個例子,當節點數爲1000, 咱們設置的計算比例爲30%,那麼調度器認爲filter過程只須要找到知足條件的300個節點,filter過程當中當知足條件的節點數達到300個,filter過程結束。 這樣filter不用計算所有的節點,一樣也會下降Prioritize 的計算數量。 可是帶來的影響是pod有可能沒有被調度到最合適的節點。
Prioritize 的目的是幫助pod,爲每一個符合條件的節點打分,幫助pod找到最合適的節點。一樣調度器默認註冊了一系列Prioritize方法。這是Prioritize 對象的數據結構
// PriorityConfig is a config used for a priority function. type PriorityConfig struct { Name string Map PriorityMapFunction Reduce PriorityReduceFunction // TODO: Remove it after migrating all functions to // Map-Reduce pattern. Function PriorityFunction Weight int }
每一個PriorityConfig 表明一個評分的指標,會考慮服務的均衡性,節點的資源分配等因素。 一個 PriorityConfig 的主要Scoring過程分爲 Map和Reduce,
全部PriorityConfig 計算完畢後,將每一個PriorityConfig的數值乘以對應的權重,並按照節點再作一次聚合。
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { nodeInfo := nodeNameToInfo[nodes[index].Name] for i := range priorityConfigs { var err error results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo) } }) for i := range priorityConfigs { wg.Add(1) go func(index int) { defer wg.Done() if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); }(i) } wg.Wait() // Summarize all scores. result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for i := range nodes { result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0}) for j := range priorityConfigs { result[i].Score += results[j][i].Score * priorityConfigs[j].Weight } }
此外Filter和Prioritize 都支持extener scheduler 的調用,本文不作過多闡述。
目前kubernetes調度器的調度方式是Pod-by-Pod,也是當前調度器不足的地方。主要瓶頸以下
社區調度器的發展,也是爲了解決這些問題
接下來,咱們會分析一個具體的調度器方法實現,幫助理解拆解調度器的過程。 而且關注分析調度器的社區動態。
https://medium.com/jorgeacetozi/kubernetes-master-components-etcd-api-server-controller-manager-and-scheduler-3a0179fc8186
https://jvns.ca/blog/2017/07/27/how-does-the-kubernetes-scheduler-work/
本文做者:蕭元
本文爲雲棲社區原創內容,未經容許不得轉載。