kubernetes schedule模塊

1、kubernetes schedule 介紹

image.png
該圖是kubernetes的總體架構圖,設計到kubernetes中的一些重要模塊,除了 kubernetes api server 之外,其他的模塊都要和 api server 進行通訊來獲取須要的資源,原理是 list、watch 機制,當用戶建立pod資源後,此時的 pod 中 nodename 屬性值是空的,schedule 模塊會獲取到這樣的 pod併爲其選擇合適的運行節點,schedule爲pod選擇合適的運行節點是一個很複雜的過程,要考慮不少因素好比zone、node的 affinity、anti-afinity,主機的 cpu、內存、卷衝突、taint等。當爲pod獲取到合適的運行主機後,會將主機名設置給pod做爲一個屬性,存儲到持久化存儲也就是etcd中,kubelet模塊在監聽pod的nodename屬性有值的pod,獲取到後在當前主機上運行pod。node

2、kubenetes schedule framework 介紹

image.png
這是官方的一張框架圖,這個是最新版本中實現的選擇node的流程,在以前的版本中存在不少自定義插件帶來的痛點,在以前的版本中插件是基於predicate、Prioritize的方式進行註冊,改成 framerwork後的註冊方式更加清晰而且擴展性更好,這些會在kubernetes extension中詳細說明,圖中每個位置都是一個可擴展的點。
基於predicate、Prioritize的註冊方式git

// NewLegacyRegistry returns a legacy algorithm registry of predicates and priorities.
func NewLegacyRegistry() *LegacyRegistry {
    registry := &LegacyRegistry{
        // MandatoryPredicates the set of keys for predicates that the scheduler will
        // be configured with all the time.
        MandatoryPredicates: sets.NewString(
            PodToleratesNodeTaintsPred,
            CheckNodeUnschedulablePred,
        ),

        // Used as the default set of predicates if Policy was specified, but predicates was nil.
        DefaultPredicates: sets.NewString(
            NoVolumeZoneConflictPred,
            MaxEBSVolumeCountPred,
            MaxGCEPDVolumeCountPred,
            MaxAzureDiskVolumeCountPred,
            MaxCSIVolumeCountPred,
            MatchInterPodAffinityPred,
            NoDiskConflictPred,
            GeneralPred,
            PodToleratesNodeTaintsPred,
            CheckVolumeBindingPred,
            CheckNodeUnschedulablePred,
        ),

        // Used as the default set of predicates if Policy was specified, but priorities was nil.
        DefaultPriorities: map[string]int64{
            SelectorSpreadPriority:      1,
            InterPodAffinityPriority:    1,
            LeastRequestedPriority:      1,
            BalancedResourceAllocation:  1,
            NodePreferAvoidPodsPriority: 10000,
            NodeAffinityPriority:        1,
            TaintTolerationPriority:     1,
            ImageLocalityPriority:       1,
        },

        PredicateToConfigProducer: make(map[string]ConfigProducer),
        PriorityToConfigProducer:  make(map[string]ConfigProducer),
    }

    registry.registerPredicateConfigProducer(GeneralPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            // GeneralPredicate is a combination of predicates.
            plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
            if args.NodeResourcesFitArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
            }
            plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(PodToleratesNodeTaintsPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, tainttoleration.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(PodFitsResourcesPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, noderesources.FitName, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, noderesources.FitName, nil)
            if args.NodeResourcesFitArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(noderesources.FitName, args.NodeResourcesFitArgs))
            }
            return
        })
    registry.registerPredicateConfigProducer(HostNamePred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodename.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(PodFitsHostPortsPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeports.Name, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, nodeports.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(MatchNodeSelectorPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeaffinity.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(CheckNodeUnschedulablePred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodeunschedulable.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(CheckVolumeBindingPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, volumebinding.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(NoDiskConflictPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, volumerestrictions.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(NoVolumeZoneConflictPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, volumezone.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxCSIVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CSIName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxEBSVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.EBSName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxGCEPDVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.GCEPDName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxAzureDiskVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.AzureDiskName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MaxCinderVolumeCountPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodevolumelimits.CinderName, nil)
            return
        })
    registry.registerPredicateConfigProducer(MatchInterPodAffinityPred,
        func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, interpodaffinity.Name, nil)
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, interpodaffinity.Name, nil)
            return
        })
    registry.registerPredicateConfigProducer(CheckNodeLabelPresencePred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, nodelabel.Name, nil)
            if args.NodeLabelArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(nodelabel.Name, args.NodeLabelArgs))
            }
            return
        })
    registry.registerPredicateConfigProducer(CheckServiceAffinityPred,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Filter = appendToPluginSet(plugins.Filter, serviceaffinity.Name, nil)
            if args.ServiceAffinityArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
            }
            plugins.PreFilter = appendToPluginSet(plugins.PreFilter, serviceaffinity.Name, nil)
            return
        })

    // Register Priorities.
    registry.registerPriorityConfigProducer(SelectorSpreadPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, defaultpodtopologyspread.Name, &args.Weight)
            plugins.PreScore = appendToPluginSet(plugins.PreScore, defaultpodtopologyspread.Name, nil)
            return
        })
    registry.registerPriorityConfigProducer(TaintTolerationPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.PreScore = appendToPluginSet(plugins.PreScore, tainttoleration.Name, nil)
            plugins.Score = appendToPluginSet(plugins.Score, tainttoleration.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(NodeAffinityPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, nodeaffinity.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(ImageLocalityPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, imagelocality.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(InterPodAffinityPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.PreScore = appendToPluginSet(plugins.PreScore, interpodaffinity.Name, nil)
            plugins.Score = appendToPluginSet(plugins.Score, interpodaffinity.Name, &args.Weight)
            if args.InterPodAffinityArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(interpodaffinity.Name, args.InterPodAffinityArgs))
            }
            return
        })
    registry.registerPriorityConfigProducer(NodePreferAvoidPodsPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, nodepreferavoidpods.Name, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(MostRequestedPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.MostAllocatedName, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(BalancedResourceAllocation,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.BalancedAllocationName, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(LeastRequestedPriority,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.LeastAllocatedName, &args.Weight)
            return
        })
    registry.registerPriorityConfigProducer(noderesources.RequestedToCapacityRatioName,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            plugins.Score = appendToPluginSet(plugins.Score, noderesources.RequestedToCapacityRatioName, &args.Weight)
            if args.RequestedToCapacityRatioArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(noderesources.RequestedToCapacityRatioName, args.RequestedToCapacityRatioArgs))
            }
            return
        })

    registry.registerPriorityConfigProducer(nodelabel.Name,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            // If there are n LabelPreference priorities in the policy, the weight for the corresponding
            // score plugin is n*weight (note that the validation logic verifies that all LabelPreference
            // priorities specified in Policy have the same weight).
            weight := args.Weight * int32(len(args.NodeLabelArgs.PresentLabelsPreference)+len(args.NodeLabelArgs.AbsentLabelsPreference))
            plugins.Score = appendToPluginSet(plugins.Score, nodelabel.Name, &weight)
            if args.NodeLabelArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(nodelabel.Name, args.NodeLabelArgs))
            }
            return
        })
    registry.registerPriorityConfigProducer(serviceaffinity.Name,
        func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
            // If there are n ServiceAffinity priorities in the policy, the weight for the corresponding
            // score plugin is n*weight (note that the validation logic verifies that all ServiceAffinity
            // priorities specified in Policy have the same weight).
            weight := args.Weight * int32(len(args.ServiceAffinityArgs.AntiAffinityLabelsPreference))
            plugins.Score = appendToPluginSet(plugins.Score, serviceaffinity.Name, &weight)
            if args.ServiceAffinityArgs != nil {
                pluginConfig = append(pluginConfig, NewPluginConfig(serviceaffinity.Name, args.ServiceAffinityArgs))
            }
            return
        })

    // The following two features are the last ones to be supported as predicate/priority.
    // Once they graduate to GA, there will be no more checking for featue gates here.
    // Only register EvenPodsSpread predicate & priority if the feature is enabled
    if utilfeature.DefaultFeatureGate.Enabled(features.EvenPodsSpread) {
        klog.Infof("Registering EvenPodsSpread predicate and priority function")

        registry.registerPredicateConfigProducer(EvenPodsSpreadPred,
            func(_ ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
                plugins.PreFilter = appendToPluginSet(plugins.PreFilter, podtopologyspread.Name, nil)
                plugins.Filter = appendToPluginSet(plugins.Filter, podtopologyspread.Name, nil)
                return
            })
        registry.DefaultPredicates.Insert(EvenPodsSpreadPred)

        registry.registerPriorityConfigProducer(EvenPodsSpreadPriority,
            func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
                plugins.PreScore = appendToPluginSet(plugins.PreScore, podtopologyspread.Name, nil)
                plugins.Score = appendToPluginSet(plugins.Score, podtopologyspread.Name, &args.Weight)
                return
            })
        registry.DefaultPriorities[EvenPodsSpreadPriority] = 1
    }

    // Prioritizes nodes that satisfy pod's resource limits
    if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
        klog.Infof("Registering resourcelimits priority function")

        registry.registerPriorityConfigProducer(ResourceLimitsPriority,
            func(args ConfigProducerArgs) (plugins config.Plugins, pluginConfig []config.PluginConfig) {
                plugins.PreScore = appendToPluginSet(plugins.PreScore, noderesources.ResourceLimitsName, nil)
                plugins.Score = appendToPluginSet(plugins.Score, noderesources.ResourceLimitsName, &args.Weight)
                return
            })
        registry.DefaultPriorities[ResourceLimitsPriority] = 1
    }

    return registry
}

基於framerwork的註冊方式github

// ListAlgorithmProviders lists registered algorithm providers.
func ListAlgorithmProviders() string {
    r := NewRegistry()
    var providers []string
    for k := range r {
        providers = append(providers, k)
    }
    sort.Strings(providers)
    return strings.Join(providers, " | ")
}

func getDefaultConfig() *schedulerapi.Plugins {
    return &schedulerapi.Plugins{
        QueueSort: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: queuesort.Name},
            },
        },
        PreFilter: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.FitName},
                {Name: nodeports.Name},
                {Name: interpodaffinity.Name},
            },
        },
        Filter: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: nodeunschedulable.Name},
                {Name: noderesources.FitName},
                {Name: nodename.Name},
                {Name: nodeports.Name},
                {Name: nodeaffinity.Name},
                {Name: volumerestrictions.Name},
                {Name: tainttoleration.Name},
                {Name: nodevolumelimits.EBSName},
                {Name: nodevolumelimits.GCEPDName},
                {Name: nodevolumelimits.CSIName},
                {Name: nodevolumelimits.AzureDiskName},
                {Name: volumebinding.Name},
                {Name: volumezone.Name},
                {Name: interpodaffinity.Name},
            },
        },
        PreScore: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: interpodaffinity.Name},
                {Name: defaultpodtopologyspread.Name},
                {Name: tainttoleration.Name},
            },
        },
        Score: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: noderesources.BalancedAllocationName, Weight: 1},
                {Name: imagelocality.Name, Weight: 1},
                {Name: interpodaffinity.Name, Weight: 1},
                {Name: noderesources.LeastAllocatedName, Weight: 1},
                {Name: nodeaffinity.Name, Weight: 1},
                {Name: nodepreferavoidpods.Name, Weight: 10000},
                {Name: defaultpodtopologyspread.Name, Weight: 1},
                {Name: tainttoleration.Name, Weight: 1},
            },
        },
        Bind: &schedulerapi.PluginSet{
            Enabled: []schedulerapi.Plugin{
                {Name: defaultbinder.Name},
            },
        },
    }
}
  • Pre-filter
    pre filter中的插件用來作一些必要的檢查,在每一次調度循環中只運行一次,若是在執行過程當中發生錯誤則本次調度終止。
  • Filter
    filter用來過濾掉不知足pod運行條件的node,該過程能夠併發對全部node進行檢查,每個node都會被filter中的插件檢查一遍,除非在某一個插件處遇到錯誤將中止其餘插件的執行,在一個調度週期內能夠執行屢次
  • Score
    這個階段會運行 score plugin 對賽選出來的node打分,最後會彙總全部 score plugin對每個node的打分,最終挑選出分數最高的node
  • Bind
    這一步將選出來的node name做爲一個屬性值設置到pod資源上,而後調用api server進行持久化存儲
  • 搶佔
    若是在一輪調度後沒有合適的節點被選出那麼將執行搶佔邏輯,總體步驟以下
  1. 對搶佔者進行合法搶佔檢查
    搶佔者開啓了搶佔選項而且搶佔策略是能夠搶佔的,若是該搶佔者以前已經搶佔過了一次,NominatedNodeName已經被設置了某個節點的name值,可是在該節點中存在優先級比搶佔者低而且已經處於將要被刪除狀態,那麼禁止該搶佔者搶佔
  2. 找出潛在能夠被搶佔的pod所在的主機
    在本次調度失敗後,失敗的一些緣由已經被記錄了下來,經過失敗記錄排除掉存在 UnschedulableAndUnresolvable 這種錯誤的節點
  3. 嘗試找到被搶佔後付出代價最小的節點
    根據pdb特性以及afffinity、anti-affinity再次對node進行過濾,而後從中根據一些條件好比被搶佔的pod的優先級最低、ypod數量最少、全部pod的優先級總和最小等策略選擇出最合適的node

3、kubenetes schedule extension 介紹

在schedule模塊中原生的插件只是最基本的,在使用過程當中必定須要不少適合業務的一些插件須要被執行,新的framerwork框架爲用戶提供瞭如下幾種擴展實現方式api

  1. 將插件實如今schedule源碼中
    按照源碼中現有的程序組織方式實現進來,這種方式對實現者的能力有很大要求,實現和必須對k8s的整個架構以及各個模塊細節瞭如指掌,要否則會致使整個k8s出現錯誤,不推薦這種方式
  2. 以http擴展的方式
    這種方式在schedule源碼中已經實現了擴展點實現者可使用任何語言來實現一個http server接收對應的參數而且實現本身的插件邏輯,這種方式是比較推薦的方式,可是是以https協議實現的schedule須要和extensuon進行通訊會有延遲,而且無法控制取消調度以及共享cache,實現者須要本身再實現一套緩存在在即的擴展server中
  3. 修改kube-schedule代碼結構中的main方法
    實現本身的register,將本身的插件以函數的方式註冊進來,而後和schedule源碼一塊兒編譯,這樣解決了擴展方式2中帶來的問題,固然不可避免的也要修改一點點源碼,相比較方式1來講好了不少,至少不會影響到schedule的核心邏輯

4、參考

  1. https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/20180409-scheduling-framework.md#configuring-plugins
  2. https://github.com/kubernetes/enhancements/tree/master/keps/sig-scheduling/1819-scheduler-extender
  3. https://github.com/kubernetes/community/blob/b3349d5b1354df814b67bbdee6890477f3c250cb/contributors/design-proposals/scheduling/pod-preemption.md
  4. https://draveness.me/system-design-scheduler/
  5. https://developer.ibm.com/technologies/containers/articles/creating-a-custom-kube-scheduler/
  6. https://github.com/AliyunContainerService/gpushare-scheduler-extender
相關文章
相關標籤/搜索