ScheduleAlgorithm是一個接口負責爲pod選擇一個合適的node節點,本節主要解析如何實現一個可擴展、可配置的通用算法框架來實現通用調度,如何進行算法的統一註冊和構建,如何進行metadata和調度流程上下文數據的傳遞node
當接收到pod須要被調度後,默認首先調用schedule來進行正常的業務調度嘗試從當前集羣中選擇一個合適的node
若是調度失敗則嘗試搶佔調度,根據優先級搶佔低優先級的pod運行高優先級pod算法
在k8s的調度算法運行流程中,主要分爲兩個階段:預選和優選,即從當前集羣中選擇符合要求的node,再從這些node中選擇最合適的節點api
隨着集羣的增長集羣中的node數量愈來愈多,k8s並非遍歷全部集羣資源,而是隻選取部分節點,同時藉助以前說的 schedulerCache來實現pod節點的分散微信
針對不一樣的算法,聲明不一樣的註冊表,負責集羣中當前全部算法的註冊,從而提供給調度配置決策加載那些插件,實現算法的可擴展性並經過工廠模式來進行統一管理,解耦算法的註冊與具體調度流程中的使用,由每一個算法的工廠方法來接受參數進行具體算法的建立數據結構
在調度實際運行的過程當中,須要集合當前集羣中的元數據信息(node和pod)來進行具體算法的決策,scheduler採用PredicateMetadataProducer和PriorityMetadataProducer來進行元數據的構建, 其次針對一些可能被多個算法都使用的數據,也會在這裏完成構建,好比親和性的pod、拓撲等閉包
並經過PluginContext進行本次調度上下文數據的存儲,用於在多個調度算法之間存儲數據進行交互框架
Provider主要是封裝一組具體的預選和優選算法,並經過註冊來實現統一管理, 其中系統內置了DefaultProvideride
framework是一種內部的擴展機制,經過定製給定的階段函數,進行調度流程的影響,本節先不介紹函數
一種外部的擴展機制,能夠根據須要進行動態的配置,其實就是外部的一個service,可是相比framework能夠使用本身獨立的數據存儲,實現對調度器的擴展源碼分析
type genericScheduler struct {
cache internalcache.Cache
schedulingQueue internalqueue.SchedulingQueue
predicates map[string]predicates.FitPredicate
priorityMetaProducer priorities.PriorityMetadataProducer
predicateMetaProducer predicates.PredicateMetadataProducer
prioritizers []priorities.PriorityConfig
framework framework.Framework
extenders []algorithm.SchedulerExtender
alwaysCheckAllPredicates bool
nodeInfoSnapshot *schedulernodeinfo.Snapshot
volumeBinder *volumebinder.VolumeBinder
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister algorithm.PDBLister
disablePreemption bool
percentageOfNodesToScore int32
enableNonPreempting bool
}複製代碼
集羣元數據主要分爲三部分:Cache: 存儲從apiserver獲取的數據 SchedulingQueue: 存儲當前隊列中等待調度和通過調度可是未真正運行的pod
cache internalcache.Cache
schedulingQueue internalqueue.SchedulingQueue
nodeInfoSnapshot *schedulernodeinfo.Snapshot複製代碼
預選算法主要包含兩部分:當前使用的預選調度算法結合和元數據構建器
predicates map[string]predicates.FitPredicate
predicateMetaProducer predicates.PredicateMetadataProducer複製代碼
優選算法與預選算法不太相同,在後續文章中會進行介紹
priorityMetaProducer priorities.PriorityMetadataProducer
prioritizers []priorities.PriorityConfig複製代碼
framework framework.Framework
extenders []algorithm.SchedulerExtender複製代碼
Priority會複雜一點,這裏就不介紹了,其核心設計都是同樣的
fitPredicateMap = make(map[string]FitPredicateFactory)複製代碼
註冊主要分兩類:若是後續算法不會使用當前Args裏面的數據,只須要使用metadata裏面的,就直接返回註冊算法,下面的函數就是返回一個工廠方法,可是不會使用Args參數
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {
return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) predicates.FitPredicate { return predicate })
}複製代碼
最終註冊都是經過下面的工廠註冊函數實現,經過mutex和map實現
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
fitPredicateMap[name] = predicateFactory
return name
}複製代碼
經過插件工廠參數影響和Factory構建具體的預選算法,上面構建的工廠方法,下面則給定參數,經過工廠方法利用閉包的方式來進行真正算法的生成
func getFitPredicateFunctions(names sets.String, args PluginFactoryArgs) (map[string]predicates.FitPredicate, error) {
schedulerFactoryMutex.RLock()
defer schedulerFactoryMutex.RUnlock()
fitPredicates := map[string]predicates.FitPredicate{}
for _, name := range names.List() {
factory, ok := fitPredicateMap[name]
if !ok {
return nil, fmt.Errorf("invalid predicate name %q specified - no corresponding function found", name)
}
fitPredicates[name] = factory(args)
}
// k8s中默認包含一些強制性的策略,不容許用戶本身進行刪除,這裏是加載這些參數
for name := range mandatoryFitPredicates {
if factory, found := fitPredicateMap[name]; found {
fitPredicates[name] = factory(args)
}
}
return fitPredicates, nil
}複製代碼
當咱們在系統演進的時候,也能夠借鑑這種思想,來避免用戶使用那些當前或者將來版本中可能逐漸被放棄的設計
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
// Remove "CheckNodeCondition", "CheckNodeMemoryPressure", "CheckNodePIDPressure"
// and "CheckNodeDiskPressure" predicates
factory.RemoveFitPredicate(predicates.CheckNodeConditionPred)
factory.RemoveFitPredicate(predicates.CheckNodeMemoryPressurePred)
}
複製代碼
// PredicateMetadata interface represents anything that can access a predicate metadata.
type PredicateMetadata interface {
ShallowCopy() PredicateMetadata
AddPod(addedPod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) error
RemovePod(deletedPod *v1.Pod, node *v1.Node) error
}
複製代碼
predicateMetadataProducer PredicateMetadataProducerFactory複製代碼
工廠函數
// PredicateMetadataProducerFactory produces PredicateMetadataProducer from the given args.
type PredicateMetadataProducerFactory func(PluginFactoryArgs) predicates.PredicateMetadataProducer複製代碼
PredicateMetadataProducer經過上面的工廠函數建立而來,其接受當前須要調度的pod和snapshot裏面的node信息,從而構建當前的PredicateMetadata
// PredicateMetadataProducer is a function that computes predicate metadata for a given pod.
type PredicateMetadataProducer func(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo) PredicateMetadata複製代碼
// RegisterPredicateMetadataProducerFactory registers a PredicateMetadataProducerFactory.
func RegisterPredicateMetadataProducerFactory(factory PredicateMetadataProducerFactory) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
predicateMetadataProducer = factory
}複製代碼
PredicateMetadata其本質上就是當前系統中的元數據,其設計的主要目標是爲了當前的調度流程中後續多個調度算法中均可能須要計算的數據,進行統一的計算,好比節點的親和性、反親和、拓撲分佈等,都在此進行統一的控制, 當前版本的實現時PredicateMetadataFactory,這裏不進行展開
// AlgorithmProviderConfig is used to store the configuration of algorithm providers.
type AlgorithmProviderConfig struct {
FitPredicateKeys sets.String
PriorityFunctionKeys sets.String
}複製代碼
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)複製代碼
func RegisterAlgorithmProvider(name string, predicateKeys, priorityKeys sets.String) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
algorithmProviderMap[name] = AlgorithmProviderConfig{
FitPredicateKeys: predicateKeys,
PriorityFunctionKeys: priorityKeys,
}
return name
}複製代碼
func init() {
// 註冊算法DefaulrProvider 的算法provider
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}複製代碼
核心調度流程,這裏面只介紹主線的流程,至於怎麼預選和優選則在下一篇文章進行更新,由於稍微有點複雜,而framework和extender則在後續介紹完這兩部分在進行介紹, 其中extender的調用則是在PrioritizeNodes進行優先級算中進行調用
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
// 省略非核心代碼
// 調用framework的RunPreFilterPlugins
preFilterStatus := g.framework.RunPreFilterPlugins(pluginContext, pod)
if !preFilterStatus.IsSuccess() {
return result, preFilterStatus.AsError()
}
// 獲取當前的node數量
numNodes := g.cache.NodeTree().NumNodes()
if numNodes == 0 {
return result, ErrNoNodesAvailable
}
// 更新snapshot
if err := g.snapshot(); err != nil {
return result, err
}
// 預選階段
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
if err != nil {
return result, err
}
// 將預選結果調用framework的postfilter
postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
if !postfilterStatus.IsSuccess() {
return result, postfilterStatus.AsError()
}
if len(filteredNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: numNodes,e
FailedPredicates: failedPredicateMap,
FilteredNodesStatuses: filteredNodesStatuses,
}
}
startPriorityEvalTime := time.Now()
// 若是隻有一個節點則直接返回
if len(filteredNodes) == 1 {
return ScheduleResult{
SuggestedHost: filteredNodes[0].Name,
EvaluatedNodes: 1 + len(failedPredicateMap),
FeasibleNodes: 1,
}, nil
}
// 獲取全部的調度策略
metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
// 獲取全部node的優先級,此處會將extenders進行傳入,實現擴展接口的調用
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, pluginContext)
if err != nil {
return result, err
}
// 從優先級中選擇出合適的node
host, err := g.selectHost(priorityList)
trace.Step("Selecting host done")
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
FeasibleNodes: len(filteredNodes),
}, err
}複製代碼
在調度算法框架中大量使用了工廠方法來進行算法、元數據等的構建,並經過封裝MetadataProducer來進行公共業務邏輯接口的封裝,經過PluginContext進行調度流程中上下文數據的傳遞,而且用戶能夠經過定製Provider來進行具體調度算法的選擇
本文只介紹了大的框架設計,諸如具體的算法註冊和構建其大多都是在構建scheduler命令行參數處經過加載對應的包和init函數來實現,本文沒有介紹一些具體的細節連搶佔也沒有介紹,後續文章裏面會進行一一展開,感興趣的同窗,歡迎一塊兒學習交流
微信號:baxiaoshi2020
關注公告號閱讀更多源碼分析文章
更多文章關注 www.sreguide.com
本文由博客一文多發平臺 OpenWrite 發佈