調度框架定義了一組擴展點,用戶能夠實現擴展點定義的接口來定義本身的調度邏輯(咱們稱之爲擴展),並將擴展註冊到擴展點上,調度框架在執行調度工做流時,遇到對應的擴展點時,將調用用戶註冊的擴展。調度框架在預留擴展點時,都是有特定的目的,有些擴展點上的擴展能夠改變調度程序的決策方法,有些擴展點上的擴展只是發送一個通知。node
咱們知道每當調度一個 Pod 時,都會按照兩個過程來執行:調度過程和綁定過程。nginx
調度過程爲 Pod 選擇一個合適的節點,綁定過程則將調度過程的決策應用到集羣中(也就是在被選定的節點上運行 Pod),將調度過程和綁定過程合在一塊兒,稱之爲調度上下文(scheduling context)。須要注意的是調度過程是 同步運行的(同一時間點只爲一個 Pod 進行調度),綁定過程可異步運行(同一時間點可併發爲多個 Pod 執行綁定)。git
調度過程和綁定過程遇到以下狀況時會中途退出:github
調度程序認爲當前沒有該 Pod 的可選節點api
這個時候,該 Pod 將被放回到 待調度隊列,並等待下次重試。markdown
下圖展現了調度框架中的調度上下文及其中的擴展點,一個擴展能夠註冊多個擴展點,以即可以執行更復雜的有狀態的任務。
網絡
QueueSort 擴展用於對 Pod 的待調度隊列進行排序,以決定先調度哪一個 Pod, QueueSort 擴展本質上只須要實現一個方法 Less(Pod1,Pod2) 用於比較兩個 Pod 誰更優先得到調度便可,同一時間點只能有一個 QueueSort 插件生效。併發
Pre-filter 擴展用於對 Pod 的信息進行預處理,或者檢查一些集羣或 Pod 必須知足的前提條件,若是 pre-filter 返回了 error,則調度過程終止。app
Filter 擴展用於排除那些不能運行該 Pod 的節點,對於每個節點,調度器將按順序執行 filter 擴展;若是任何一個 filter 將節點標記爲不可選,則餘下的 filter 擴展將不會被執行。調度器能夠同時對多個節點執行 filter 擴展。框架
Post-filter 是一個通知類型的擴展點,調用該擴展的參數是 filter 階段結束後被篩選爲可選節點的節點列表,能夠在擴展中使用這些信息更新內部狀態,或者產生日誌或 metrics 信息。
Scoring 擴展用於爲全部可選節點進行打分,調度器將針對每個節點調用 Soring 擴展,評分結果是一個範圍內的整數。在 normalize scoring 階段,調度器將會把每一個 scoring 擴展對具體某個節點的評分結果和該擴展的權重合並起來,做爲最終評分結果。
Normalizescoring 擴展在調度器對節點進行最終排序以前修改每一個節點的評分結果,註冊到該擴展點的擴展在被調用時,將得到同一個插件中的 scoring 擴展的評分結果做爲參數,調度框架每執行一次調度,都將調用全部插件中的一個 normalize scoring 擴展一次。
Reserve 是一個通知性質的擴展點,有狀態的插件可使用該擴展點來得到節點上爲 Pod 預留的資源,該事件發生在調度器將 Pod 綁定到節點以前,目的是避免調度器在等待 Pod 與節點綁定的過程當中調度新的 Pod 到節點上時,發生實際使用資源超出可用資源的狀況。(由於綁定 Pod 到節點上是異步發生的)。這是調度過程的最後一個步驟,Pod 進入 reserved 狀態之後,要麼在綁定失敗時觸發 Unreserve 擴展,要麼在綁定成功時,由 Post-bind 擴展結束綁定過程。
approve(批准):當全部的 permit 擴展都 approve 了 Pod 與節點的綁定,調度器將繼續執行綁定過程
deny(拒絕):若是任何一個 permit 擴展 deny 了 Pod 與節點的綁定,Pod 將被放回到待調度隊列,此時將觸發 Unreserve 擴展
Pre-bind 擴展用於在 Pod 綁定以前執行某些邏輯。例如,pre-bind 擴展能夠將一個基於網絡的數據卷掛載到節點上,以便 Pod 可使用。若是任何一個 pre-bind 擴展返回錯誤,Pod 將被放回到待調度隊列,此時將觸發 Unreserve 擴展。
只有全部的 pre-bind 擴展都成功執行了,bind 擴展才會執行
調度框架按照 bind 擴展註冊的順序逐個調用 bind 擴展
具體某個 bind 擴展能夠選擇處理或者不處理該 Pod
Post-bind 擴展在 Pod 成功綁定到節點上以後被動調用
若是咱們要實現本身的插件,必須向調度框架註冊插件並完成配置,另外還必須實現擴展點接口,對應的擴展點接口咱們能夠在源碼 pkg/scheduler/framework/v1alpha1/interface.go 文件中找到,以下所示:
// Plugin is the parent type for all the scheduling framework plugins. type Plugin interface { Name() string } type QueueSortPlugin interface { Plugin Less(*PodInfo, *PodInfo) bool } // PreFilterPlugin is an interface that must be implemented by "prefilter" plugins. // These plugins are called at the beginning of the scheduling cycle. type PreFilterPlugin interface { Plugin PreFilter(pc *PluginContext, p *v1.Pod) *Status } // FilterPlugin is an interface for Filter plugins. These plugins are called at the // filter extension point for filtering out hosts that cannot run a pod. // This concept used to be called 'predicate' in the original scheduler. // These plugins should return "Success", "Unschedulable" or "Error" in Status.code. // However, the scheduler accepts other valid codes as well. // Anything other than "Success" will lead to exclusion of the given host from // running the pod. type FilterPlugin interface { Plugin Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status } // PostFilterPlugin is an interface for Post-filter plugin. Post-filter is an // informational extension point. Plugins will be called with a list of nodes // that passed the filtering phase. A plugin may use this data to update internal // state or to generate logs/metrics. type PostFilterPlugin interface { Plugin PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status } // ScorePlugin is an interface that must be implemented by "score" plugins to rank // nodes that passed the filtering phase. type ScorePlugin interface { Plugin Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status) } // ScoreWithNormalizePlugin is an interface that must be implemented by "score" // plugins that also need to normalize the node scoring results produced by the same // plugin's "Score" method. type ScoreWithNormalizePlugin interface { ScorePlugin NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status } // ReservePlugin is an interface for Reserve plugins. These plugins are called // at the reservation point. These are meant to update the state of the plugin. // This concept used to be called 'assume' in the original scheduler. // These plugins should return only Success or Error in Status.code. However, // the scheduler accepts other valid codes as well. Anything other than Success // will lead to rejection of the pod. type ReservePlugin interface { Plugin Reserve(pc *PluginContext, p *v1.Pod, nodeName string) *Status } // PreBindPlugin is an interface that must be implemented by "prebind" plugins. // These plugins are called before a pod being scheduled. type PreBindPlugin interface { Plugin PreBind(pc *PluginContext, p *v1.Pod, nodeName string) *Status } // PostBindPlugin is an interface that must be implemented by "postbind" plugins. // These plugins are called after a pod is successfully bound to a node. type PostBindPlugin interface { Plugin PostBind(pc *PluginContext, p *v1.Pod, nodeName string) } // UnreservePlugin is an interface for Unreserve plugins. This is an informational // extension point. If a pod was reserved and then rejected in a later phase, then // un-reserve plugins will be notified. Un-reserve plugins should clean up state // associated with the reserved Pod. type UnreservePlugin interface { Plugin Unreserve(pc *PluginContext, p *v1.Pod, nodeName string) } // PermitPlugin is an interface that must be implemented by "permit" plugins. // These plugins are called before a pod is bound to a node. type PermitPlugin interface { Plugin Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration) } // BindPlugin is an interface that must be implemented by "bind" plugins. Bind // plugins are used to bind a pod to a Node. type BindPlugin interface { Plugin Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status }
對於調度框架插件的啓用或者禁用,咱們一樣可使用上面的 KubeSchedulerConfiguration(https://godoc.org/k8s.io/kubernetes/pkg/scheduler/apis/config#KubeSchedulerConfiguration) 資源對象來進行配置。下面的例子中的配置啓用了一個實現了 reserve 和 preBind 擴展點的插件,而且禁用了另一個插件,同時爲插件 foo 提供了一些配置信息:
apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration ... plugins: reserve: enabled: - name: foo - name: bar disabled: - name: baz preBind: enabled: - name: foo disabled: - name: baz pluginConfig: - name: foo args: > foo插件能夠解析的任意內容
擴展的調用順序以下:
若是某個擴展點沒有配置對應的擴展,調度框架將使用默認插件中的擴展
若是爲某個擴展點配置且激活了擴展,則調度框架將先調用默認插件的擴展,再調用配置中的擴展
默認插件的擴展始終被最早調用,而後按照 KubeSchedulerConfiguration 中擴展的激活 enabled 順序逐個調用擴展點的擴展
假設默認插件 foo 實現了 reserve 擴展點,此時咱們要添加一個插件 bar,想要在 foo 以前被調用,則應該先禁用 foo 再按照 bar foo 的順序激活。示例配置以下所示:
apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration ... plugins: reserve: enabled: - name: bar - name: foo disabled: - name: foo
在源碼目錄 pkg/scheduler/framework/plugins/examples 中有幾個示範插件,咱們能夠參照其實現方式。
其實要實現一個調度框架的插件,並不難,咱們只要實現對應的擴展點,而後將插件註冊到調度器中便可,下面是默認調度器在初始化的時候註冊的插件:
func NewRegistry() Registry { return Registry{ // FactoryMap: // New plugins are registered here. // example: // { // stateful_plugin.Name: stateful.NewStatefulMultipointExample, // fooplugin.Name: fooplugin.New, // } } }
可是能夠看到默認並無註冊一些插件,因此要想讓調度器可以識別咱們的插件代碼,就須要本身來實現一個調度器了,固然這個調度器咱們徹底不必徹底本身實現,直接調用默認的調度器,而後在上面的 NewRegistry() 函數中將咱們的插件註冊進去便可。在 kube-scheduler 的源碼文件 kubernetes/cmd/kube-scheduler/app/server.go 中有一個 NewSchedulerCommand 入口函數,其中的參數是一個類型爲 Option 的列表,而這個 Option 剛好就是一個插件配置的定義:
// Option configures a framework.Registry. type Option func(framework.Registry) error // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { ...... }
因此咱們徹底就能夠直接調用這個函數來做爲咱們的函數入口,而且傳入咱們本身實現的插件做爲參數便可,並且該文件下面還有一個名爲 WithPlugin 的函數能夠來建立一個 Option 實例:
// WithPlugin creates an Option based on plugin name and factory. func WithPlugin(name string, factory framework.PluginFactory) Option { return func(registry framework.Registry) error { return registry.Register(name, factory) } }
因此最終咱們的入口函數以下所示:
func main() { rand.Seed(time.Now().UTC().UnixNano()) command := app.NewSchedulerCommand( app.WithPlugin(sample.Name, sample.New), ) logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
其中 app.WithPlugin(sample.Name,sample.New) 就是咱們接下來要實現的插件,從 WithPlugin 函數的參數也能夠看出咱們這裏的 sample.New 必須是一個 framework.PluginFactory 類型的值,而 PluginFactory 的定義就是一個函數:
type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)
因此 sample.New 實際上就是上面的這個函數,在這個函數中咱們能夠獲取到插件中的一些數據而後進行邏輯處理便可,插件實現以下所示,咱們這裏只是簡單獲取下數據打印日誌,若是你有實際需求的能夠根據獲取的數據就行處理便可,咱們這裏只是實現了 PreFilter、 Filter、 PreBind 三個擴展點,其餘的能夠用一樣的方式來擴展便可:
// 插件名稱 const Name = "sample-plugin" type Sample struct { handle framework.FrameworkHandle } func (s *Sample) Name() string { return Name } func (s *Sample) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status { klog.V(3).Infof("prefilter pod: %v", pod.Name) return framework.NewStatus(framework.Success, "") } func (s *Sample) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { klog.V(3).Infof("filter pod: %v, node: %v", pod.Name, nodeName) return framework.NewStatus(framework.Success, "") } func (s *Sample) PreBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status { if nodeInfo, ok := s.handle.NodeInfoSnapshot().NodeInfoMap[nodeName]; !ok { return framework.NewStatus(framework.Error, fmt.Sprintf("prebind get node info error: %+v", nodeName)) } else { klog.V(3).Infof("prebind node info: %+v", nodeInfo.Node()) return framework.NewStatus(framework.Success, "") } } //type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error) func New(configuration *runtime.Unknown, f framework.FrameworkHandle) (framework.Plugin, error) { var args interface{} if err := framework.DecodeInto(configuration, args); err != nil { return nil, err } klog.V(3).Infof("--------> args: %+v", args) return &Sample{ handle: f, }, nil }
完整代碼能夠前往倉庫 https://github.com/cnych/sample-scheduler-framework 獲取。
實現完成後,編譯打包成鏡像便可,而後咱們就能夠當成普通的應用用一個 Deployment 控制器來部署便可,因爲咱們須要去獲取集羣中的一些資源對象,因此固然須要申請 RBAC 權限,而後一樣經過 --config 參數來配置咱們的調度器,一樣仍是使用一個 KubeSchedulerConfiguration 資源對象配置,能夠經過 plugins 來啓用或者禁用咱們實現的插件,也能夠經過 pluginConfig 來傳遞一些參數值給插件:
kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: sample-scheduler-clusterrole rules: - apiGroups: - "" resources: - endpoints - events verbs: - create - get - update - apiGroups: - "" resources: - nodes verbs: - get - list - watch - apiGroups: - "" resources: - pods verbs: - delete - get - list - watch - update - apiGroups: - "" resources: - bindings - pods/binding verbs: - create - apiGroups: - "" resources: - pods/status verbs: - patch - update - apiGroups: - "" resources: - replicationcontrollers - services verbs: - get - list - watch - apiGroups: - apps - extensions resources: - replicasets verbs: - get - list - watch - apiGroups: - apps resources: - statefulsets verbs: - get - list - watch - apiGroups: - policy resources: - poddisruptionbudgets verbs: - get - list - watch - apiGroups: - "" resources: - persistentvolumeclaims - persistentvolumes verbs: - get - list - watch - apiGroups: - "" resources: - configmaps verbs: - get - list - watch - apiGroups: - "storage.k8s.io" resources: - storageclasses - csinodes verbs: - get - list - watch - apiGroups: - "coordination.k8s.io" resources: - leases verbs: - create - get - list - update - apiGroups: - "events.k8s.io" resources: - events verbs: - create - patch - update --- apiVersion: v1 kind: ServiceAccount metadata: name: sample-scheduler-sa namespace: kube-system --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: sample-scheduler-clusterrolebinding namespace: kube-system roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: sample-scheduler-clusterrole subjects: - kind: ServiceAccount name: sample-scheduler-sa namespace: kube-system --- apiVersion: v1 kind: ConfigMap metadata: name: scheduler-config namespace: kube-system data: scheduler-config.yaml: | apiVersion: kubescheduler.config.k8s.io/v1alpha1 kind: KubeSchedulerConfiguration schedulerName: sample-scheduler leaderElection: leaderElect: true lockObjectName: sample-scheduler lockObjectNamespace: kube-system plugins: preFilter: enabled: - name: "sample-plugin" filter: enabled: - name: "sample-plugin" preBind: enabled: - name: "sample-plugin" pluginConfig: - name: "sample-plugin" args: {"master": "master", "kubeconfig": "kubeconfig"} --- apiVersion: apps/v1 kind: Deployment metadata: name: sample-scheduler namespace: kube-system labels: component: sample-scheduler spec: replicas: 1 selector: matchLabels: component: sample-scheduler template: metadata: labels: component: sample-scheduler spec: serviceAccount: sample-scheduler-sa priorityClassName: system-cluster-critical volumes: - name: scheduler-config configMap: name: scheduler-config containers: - name: scheduler-ctrl image: cnych/sample-scheduler:v0.1.0 imagePullPolicy: IfNotPresent args: - sample-scheduler-framework - --config=/etc/kubernetes/scheduler-config.yaml - --v=3 resources: requests: cpu: "50m" volumeMounts: - name: scheduler-config mountPath: /etc/kubernetes
直接部署上面的資源對象便可,這樣咱們就部署了一個名爲 sample-scheduler 的調度器了,接下來咱們能夠部署一個應用來使用這個調度器進行調度:
apiVersion: apps/v1 kind: Deployment metadata: name: test-scheduler spec: replicas: 1 selector: matchLabels: app: test-scheduler template: metadata: labels: app: test-scheduler spec: schedulerName: sample-scheduler containers: - image: nginx imagePullPolicy: IfNotPresent name: nginx ports: - containerPort: 80
這裏須要注意的是咱們如今手動指定了一個 schedulerName 的字段,將其設置成上面咱們自定義的調度器名稱 sample-scheduler。
咱們直接建立這個資源對象,建立完成後查看咱們自定義調度器的日誌信息:
$ kubectl get pods -n kube-system -l component=sample-scheduler NAME READY STATUS RESTARTS AGE sample-scheduler-7c469787f-rwhhd 1/1 Running 0 13m $ kubectl logs -f sample-scheduler-7c469787f-rwhhd -n kube-system I0104 08:24:22.087881 1 scheduler.go:530] Attempting to schedule pod: default/test-scheduler-6d779d9465-rq2bb I0104 08:24:22.087992 1 plugins.go:23] prefilter pod: test-scheduler-6d779d9465-rq2bb I0104 08:24:22.088657 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node1 I0104 08:24:22.088797 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node2 I0104 08:24:22.088871 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node3 I0104 08:24:22.088946 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node4 I0104 08:24:22.088992 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-master I0104 08:24:22.090653 1 plugins.go:36] prebind node info: &Node{ObjectMeta:{ydzs-node3 /api/v1/nodes/ydzs-node3 1ff6e228-4d98-4737-b6d3-30a5d55ccdc2 15466372 0 2019-11-10 09:05:09 +0000 UTC <nil> <nil> ......} I0104 08:24:22.091761 1 factory.go:610] Attempting to bind test-scheduler-6d779d9465-rq2bb to ydzs-node3 I0104 08:24:22.104994 1 scheduler.go:667] pod default/test-scheduler-6d779d9465-rq2bb is bound successfully on node "ydzs-node3", 5 nodes evaluated, 4 nodes were found feasible. Bound node resource: "Capacity: CPU<4>|Memory<8008820Ki>|Pods<110>|StorageEphemeral<17921Mi>; Allocatable: CPU<4>|Memory<7906420Ki>|Pods<110>|StorageEphemeral<16912377419>.".
能夠看到當咱們建立完 Pod 後,在咱們自定義的調度器中就出現了對應的日誌,而且在咱們定義的擴展點上面都出現了對應的日誌,證實咱們的示例成功了,也能夠經過查看 Pod 的 schedulerName 來驗證:
$ kubectl get pods NAME READY STATUS RESTARTS AGE test-scheduler-6d779d9465-rq2bb 1/1 Running 0 22m $ kubectl get pod test-scheduler-6d779d9465-rq2bb -o yaml ...... restartPolicy: Always schedulerName: sample-scheduler securityContext: {} serviceAccount: default ......
在最新的 Kubernetes v1.17 版本中, SchedulerFramework 內置的預選和優選函數已經所有插件化,因此要擴展調度器咱們應該掌握並理解調度框架這種方式。