圖解kubernetes調度器ScheduleAlgorithm核心實現學習框架設計

ScheduleAlgorithm是一個接口負責爲pod選擇一個合適的node節點,本節主要解析如何實現一個可擴展、可配置的通用算法框架來實現通用調度,如何進行算法的統一註冊和構建,如何進行metadata和調度流程上下文數據的傳遞node

1. 設計思考

1.1 調度設計

1.1.1 調度與搶佔

image.png 當接收到pod須要被調度後,默認首先調用schedule來進行正常的業務調度嘗試從當前集羣中選擇一個合適的node image.png 若是調度失敗則嘗試搶佔調度,根據優先級搶佔低優先級的pod運行高優先級pod算法

1.1.2 調度階段

image.png 在k8s的調度算法運行流程中,主要分爲兩個階段:預選和優選,即從當前集羣中選擇符合要求的node,再從這些node中選擇最合適的節點api

1.1.3 節點選擇

隨着集羣的增長集羣中的node數量愈來愈多,k8s並非遍歷全部集羣資源,而是隻選取部分節點,同時藉助以前說的 schedulerCache來實現pod節點的分散微信

1.2 框架設計

1.2.1 註冊表與算法工廠

針對不一樣的算法,聲明不一樣的註冊表,負責集羣中當前全部算法的註冊,從而提供給調度配置決策加載那些插件,實現算法的可擴展性 image.png 並經過工廠模式來進行統一管理,解耦算法的註冊與具體調度流程中的使用,由每一個算法的工廠方法來接受參數進行具體算法的建立數據結構

1.2.3 metadata與PluginContext

image.png 在調度實際運行的過程當中,須要集合當前集羣中的元數據信息(node和pod)來進行具體算法的決策,scheduler採用PredicateMetadataProducer和PriorityMetadataProducer來進行元數據的構建, 其次針對一些可能被多個算法都使用的數據,也會在這裏完成構建,好比親和性的pod、拓撲等閉包

並經過PluginContext進行本次調度上下文數據的存儲,用於在多個調度算法之間存儲數據進行交互框架

1.2.4  Provider

Provider主要是封裝一組具體的預選和優選算法,並經過註冊來實現統一管理, 其中系統內置了DefaultProvideride

1.2.5 framework

framework是一種內部的擴展機制,經過定製給定的階段函數,進行調度流程的影響,本節先不介紹函數

1.2.6 extender

一種外部的擴展機制,能夠根據須要進行動態的配置,其實就是外部的一個service,可是相比framework可使用本身獨立的數據存儲,實現對調度器的擴展源碼分析

2. 源碼分析

2.1 數據結構

type genericScheduler struct {
	cache                    internalcache.Cache
	schedulingQueue          internalqueue.SchedulingQueue
	predicates               map[string]predicates.FitPredicate
	priorityMetaProducer     priorities.PriorityMetadataProducer
	predicateMetaProducer    predicates.PredicateMetadataProducer
	prioritizers             []priorities.PriorityConfig
	framework                framework.Framework
	extenders                []algorithm.SchedulerExtender
	alwaysCheckAllPredicates bool
	nodeInfoSnapshot         *schedulernodeinfo.Snapshot
	volumeBinder             *volumebinder.VolumeBinder
	pvcLister                corelisters.PersistentVolumeClaimLister
	pdbLister                algorithm.PDBLister
	disablePreemption        bool
	percentageOfNodesToScore int32
	enableNonPreempting      bool
}

2.1.1 集羣數據

集羣元數據主要分爲三部分: Cache: 存儲從apiserver獲取的數據  SchedulingQueue: 存儲當前隊列中等待調度和通過調度可是未真正運行的pod

cache                    internalcache.Cache
	schedulingQueue          internalqueue.SchedulingQueue
	nodeInfoSnapshot         *schedulernodeinfo.Snapshot

2.1.1 預選算法相關

預選算法主要包含兩部分:當前使用的預選調度算法結合和元數據構建器

predicates               map[string]predicates.FitPredicate
	predicateMetaProducer    predicates.PredicateMetadataProducer

2.1.3 優先級算法相關

優選算法與預選算法不太相同,在後續文章中會進行介紹

priorityMetaProducer     priorities.PriorityMetadataProducer
	prioritizers             []priorities.PriorityConfig

2.1.4 擴展相關

framework                framework.Framework
	extenders                []algorithm.SchedulerExtender

2.2 調度算法註冊表

image.png Priority會複雜一點,這裏就不介紹了,其核心設計都是同樣的

2.2.1 工廠註冊表

fitPredicateMap        = make(map[string]FitPredicateFactory)

2.2.2 註冊表註冊

註冊主要分兩類:若是後續算法不會使用當前Args裏面的數據,只須要使用metadata裏面的,就直接返回註冊算法,下面的函數就是返回一個工廠方法,可是不會使用Args參數

func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {
	return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) predicates.FitPredicate { return predicate })
}

最終註冊都是經過下面的工廠註冊函數實現,經過mutex和map實現

func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
	schedulerFactoryMutex.Lock()
	defer schedulerFactoryMutex.Unlock()
	validateAlgorithmNameOrDie(name)
	fitPredicateMap[name] = predicateFactory
	return name
}

2.2.3 生成預選算法

經過插件工廠參數影響和Factory構建具體的預選算法,上面構建的工廠方法,下面則給定參數,經過工廠方法利用閉包的方式來進行真正算法的生成

func getFitPredicateFunctions(names sets.String, args PluginFactoryArgs) (map[string]predicates.FitPredicate, error) {
	schedulerFactoryMutex.RLock()
	defer schedulerFactoryMutex.RUnlock()

	fitPredicates := map[string]predicates.FitPredicate{}
	for _, name := range names.List() {
		factory, ok := fitPredicateMap[name]
		if !ok {
			return nil, fmt.Errorf("invalid predicate name %q specified - no corresponding function found", name)
		}
		fitPredicates[name] = factory(args)
	}

	// k8s中默認包含一些強制性的策略,不容許用戶本身進行刪除,這裏是加載這些參數
	for name := range mandatoryFitPredicates {
		if factory, found := fitPredicateMap[name]; found {
			fitPredicates[name] = factory(args)
		}
	}

	return fitPredicates, nil
}

2.2.4 根據當前feature進行算法刪除

當咱們在系統演進的時候,也能夠借鑑這種思想,來避免用戶使用那些當前或者將來版本中可能逐漸被放棄的設計

if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
		// Remove "CheckNodeCondition", "CheckNodeMemoryPressure", "CheckNodePIDPressure"
		// and "CheckNodeDiskPressure" predicates
		factory.RemoveFitPredicate(predicates.CheckNodeConditionPred)
		factory.RemoveFitPredicate(predicates.CheckNodeMemoryPressurePred)
	}

2.3 predicateMetadataProducer

image.png

2.3.1 PredicateMetadata

// PredicateMetadata interface represents anything that can access a predicate metadata.
type PredicateMetadata interface {
	ShallowCopy() PredicateMetadata
	AddPod(addedPod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) error
	RemovePod(deletedPod *v1.Pod, node *v1.Node) error
}

2.3.2 聲明

predicateMetadataProducer PredicateMetadataProducerFactory

工廠函數

// PredicateMetadataProducerFactory produces PredicateMetadataProducer from the given args.
type PredicateMetadataProducerFactory func(PluginFactoryArgs) predicates.PredicateMetadataProducer

PredicateMetadataProducer經過上面的工廠函數建立而來,其接受當前須要調度的pod和snapshot裏面的node信息,從而構建當前的PredicateMetadata

// PredicateMetadataProducer is a function that computes predicate metadata for a given pod.
type PredicateMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata

2.3.2 註冊

// RegisterPredicateMetadataProducerFactory registers a PredicateMetadataProducerFactory.
func RegisterPredicateMetadataProducerFactory(factory PredicateMetadataProducerFactory) {
	schedulerFactoryMutex.Lock()
	defer schedulerFactoryMutex.Unlock()
	predicateMetadataProducer = factory
}

2.3.4 意義

PredicateMetadata其本質上就是當前系統中的元數據,其設計的主要目標是爲了當前的調度流程中後續多個調度算法中均可能須要計算的數據,進行統一的計算,好比節點的親和性、反親和、拓撲分佈等,都在此進行統一的控制, 當前版本的實現時PredicateMetadataFactory,這裏不進行展開

2.4 Provider

2.4.1 AlgorithmProviderConfig

// AlgorithmProviderConfig is used to store the configuration of algorithm providers.
type AlgorithmProviderConfig struct {
	FitPredicateKeys     sets.String
	PriorityFunctionKeys sets.String
}

2.4.2 註冊中心

algorithmProviderMap   = make(map[string]AlgorithmProviderConfig)

2.4.3 註冊

func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
	schedulerFactoryMutex.Lock()
	defer schedulerFactoryMutex.Unlock()
	validateAlgorithmNameOrDie(name)
	algorithmProviderMap[name] = AlgorithmProviderConfig{
		FitPredicateKeys:     predicateKeys,
		PriorityFunctionKeys: priorityKeys,
	}
	return name
}

2.4.4 默認Provider註冊

func init() {
	// 註冊算法DefaulrProvider 的算法provider
	registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}

 

2.5 核心調度流程

image.png 核心調度流程,這裏面只介紹主線的流程,至於怎麼預選和優選則在下一篇文章進行更新,由於稍微有點複雜,而framework和extender則在後續介紹完這兩部分在進行介紹, 其中extender的調用則是在PrioritizeNodes進行優先級算中進行調用

// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
	// 省略非核心代碼
	// 調用framework的RunPreFilterPlugins
	preFilterStatus := g.framework.RunPreFilterPlugins(pluginContext, pod)
	if !preFilterStatus.IsSuccess() {
		return result, preFilterStatus.AsError()
	}

	// 獲取當前的node數量
	numNodes := g.cache.NodeTree().NumNodes()
	if numNodes == 0 {
		return result, ErrNoNodesAvailable
	}

	// 更新snapshot
	if err := g.snapshot(); err != nil {
		return result, err
	}
	// 預選階段
	filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
	if err != nil {
		return result, err
	}

	// 將預選結果調用framework的postfilter
	postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
	if !postfilterStatus.IsSuccess() {
		return result, postfilterStatus.AsError()
	}

	if len(filteredNodes) == 0 {
		return result, &FitError{
			Pod:                   pod,
			NumAllNodes:           numNodes,e
			FailedPredicates:      failedPredicateMap,
			FilteredNodesStatuses: filteredNodesStatuses,
		}
	}

	startPriorityEvalTime := time.Now()
	// 若是隻有一個節點則直接返回
	if len(filteredNodes) == 1 {
		return ScheduleResult{
			SuggestedHost:  filteredNodes[0].Name,
			EvaluatedNodes: 1 + len(failedPredicateMap),
			FeasibleNodes:  1,
		}, nil
	}

	// 獲取全部的調度策略
	metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
	// 獲取全部node的優先級,此處會將extenders進行傳入,實現擴展接口的調用
    priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, pluginContext)
	if err != nil {
		return result, err
	}
	// 從優先級中選擇出合適的node
	host, err := g.selectHost(priorityList)
	trace.Step("Selecting host done")
	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
		FeasibleNodes:  len(filteredNodes),
	}, err
}

3. 設計總結

image.png 在調度算法框架中大量使用了工廠方法來進行算法、元數據等的構建,並經過封裝MetadataProducer來進行公共業務邏輯接口的封裝,經過PluginContext進行調度流程中上下文數據的傳遞,而且用戶能夠經過定製Provider來進行具體調度算法的選擇

本文只介紹了大的框架設計,諸如具體的算法註冊和構建其大多都是在構建scheduler命令行參數處經過加載對應的包和init函數來實現,本文沒有介紹一些具體的細節連搶佔也沒有介紹,後續文章裏面會進行一一展開,感興趣的同窗,歡迎一塊兒學習交流

> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 21天大棚 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈

相關文章
相關標籤/搜索