在kubernetes的scheduler調度器的設計中爲用戶預留了兩種擴展機制SchdulerExtender與Framework,本文主要淺談一下SchdulerExtender的實現, 由於還有一篇Framework, 因此本文的k8s代碼切到1.18版本node
SchdulerExtender是kubernets外部擴展方式,用戶能夠根據需求獨立構建調度服務,實現對應的遠程調用接口(目前是http), scheduler在調度的對應階段會根據用戶定義的資源和接口來進行遠程調用,對應的service根據本身的資源數據和scheduler傳遞過來的中間調度結果來進行決策算法
extender只須要實現對應插件的接口,並編寫yaml文件來進行註冊對應的服務接口,就能夠實現scheduler的擴展,不須要修改任何調度器的代碼,便可實現調度插件的插拔json
由於是獨立的服務,extender能夠實現自定義資源的存儲與獲取,甚至能夠不依賴於etcd使用第三方的存儲來進行資源的存儲,主要是用於kubernetes中不支持的那些資源的調度擴展緩存
Scheduler主要用於擴展微信
type SchedulerExtender interface { // Name returns a unique name that identifies the extender. Name() string //預選階段, 進行篩選 Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) // 優選階段,參與優選評分 Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) // extender對pod指向綁定操做 Bind(binding *v1.Binding) error // 擴展是否支持bind IsBinder() bool // 是否對對應的pod的資源感興趣 IsInterested(pod *v1.Pod) bool // 搶佔階段 ProcessPreemption( pod *v1.Pod, nodeToVictims map[*v1.Node]*extenderv1.Victims, nodeInfos listers.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error) // 是否支持搶佔 SupportsPreemption() bool // IsIgnorable returns true indicates scheduling should not fail when this extender // is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well. IsIgnorable() bool }
// HTTPExtender implements the algorithm.SchedulerExtender interface. type HTTPExtender struct { extenderURL string preemptVerb string filterVerb string prioritizeVerb string bindVerb string weight int64 // 對應的權重 client *http.Client // 負責http接口經過 nodeCacheCapable bool // 是否傳遞node元數據 managedResources sets.String // 當前extender管理的資源 ignorable bool }
extender的默認是海鮮是同過 HTTPExtender實現,即基於http協議經過json來進行數據傳遞,其核心數據結構以下數據結構
其實通訊很簡單,經過http協議json序列化方式來進行遠程post的提交,並序列化返回的結果app
// Helper function to send messages to the extender func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error { // 序列化 out, err := json.Marshal(args) if err != nil { return err } // 拼接url url := strings.TrimRight(h.extenderURL, "/") + "/" + action req, err := http.NewRequest("POST", url, bytes.NewReader(out)) if err != nil { return err } // 設置http header req.Header.Set("Content-Type", "application/json") // 發送數據接收結果 resp, err := h.client.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode) } // 序列化返回結果 return json.NewDecoder(resp.Body).Decode(result) }
nodeCacheCapable是聲明extender的一個參數,即對應的extender是否會緩存node的數據,若是緩存數據,則只須要傳遞node的名字,而不會進行全部元數據的傳遞,能夠減小通訊的數據包大小ide
if h.nodeCacheCapable { nodeNameSlice := make([]string, 0, len(nodes)) for _, node := range nodes { // 只會傳遞node的名字 nodeNameSlice = append(nodeNameSlice, node.Name) } nodeNames = &nodeNameSlice } else { nodeList = &v1.NodeList{} for _, node := range nodes { // 傳遞node全部元數據 nodeList.Items = append(nodeList.Items, *node) } } // 構建傳遞的數據 args = &extenderv1.ExtenderArgs{ Pod: pod, Nodes: nodeList, NodeNames: nodeNames, }
在進行extender的調用的時候,會進行檢測extenders會否對對應的pod的container的資源感興趣,若是感興趣,則進行調用,不然則會進行跳過源碼分析
func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool { if h.managedResources.Len() == 0 { return true } // pod的容器 if h.hasManagedResources(pod.Spec.Containers) { return true } // pod的初始化容器 if h.hasManagedResources(pod.Spec.InitContainers) { return true } return false } func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool { for i := range containers { container := &containers[i] // 檢查container的requests裏面是否有感興趣的資源 for resourceName := range container.Resources.Requests { if h.managedResources.Has(string(resourceName)) { return true } } // 檢查container的limits裏面是否有感興趣的資源 for resourceName := range container.Resources.Limits { if h.managedResources.Has(string(resourceName)) { return true } } } return false }
Filter主要是用於在預選階段完成後調用extender進行二次過濾post
在findNodesThatPassExtenders中會遍歷全部的extender來肯定是否關心對應的資源,若是關心就會調用Filter接口來進行遠程調用,並將篩選結果傳遞給下一個extender,逐步縮小篩選集合,注意這個階段的插件調用是串行,由於每一個插件都以上個插件的結果來繼續篩選
func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { for _, extender := range g.extenders { if len(filtered) == 0 { break } // 判斷對應的extender是否關心pod中容器的資源 if !extender.IsInterested(pod) { continue } // 進行遠程過程的調用 filteredList, failedMap, err := extender.Filter(pod, filtered) if err != nil { if extender.IsIgnorable() { klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set", extender, err) continue } return nil, err } // 經過結果 for failedNodeName, failedMsg := range failedMap { if _, found := statuses[failedNodeName]; !found { statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg) } else { statuses[failedNodeName].AppendReason(failedMsg) } } // 傳遞給下一個extender以前的FIlter結果 filtered = filteredList } return filtered, nil }
func (h *HTTPExtender) Filter( pod *v1.Pod, nodes []*v1.Node, ) ([]*v1.Node, extenderv1.FailedNodesMap, error) { var ( result extenderv1.ExtenderFilterResult nodeList *v1.NodeList nodeNames *[]string nodeResult []*v1.Node args *extenderv1.ExtenderArgs ) fromNodeName := make(map[string]*v1.Node) for _, n := range nodes { fromNodeName[n.Name] = n } if h.filterVerb == "" { return nodes, extenderv1.FailedNodesMap{}, nil } // 根據nodeCacheCapable來進行參數的傳遞 if h.nodeCacheCapable { nodeNameSlice := make([]string, 0, len(nodes)) for _, node := range nodes { nodeNameSlice = append(nodeNameSlice, node.Name) } nodeNames = &nodeNameSlice } else { nodeList = &v1.NodeList{} for _, node := range nodes { nodeList.Items = append(nodeList.Items, *node) } } args = &extenderv1.ExtenderArgs{ Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } // 調用對應service的filter接口 if err := h.send(h.filterVerb, args, &result); err != nil { return nil, nil, err } if result.Error != "" { return nil, nil, fmt.Errorf(result.Error) } // 根據nodeCacheCapable和結果來進行結果數據的組合 if h.nodeCacheCapable && result.NodeNames != nil { nodeResult = make([]*v1.Node, len(*result.NodeNames)) for i, nodeName := range *result.NodeNames { if n, ok := fromNodeName[nodeName]; ok { nodeResult[i] = n } else { return nil, nil, fmt.Errorf( "extender %q claims a filtered node %q which is not found in the input node list", h.extenderURL, nodeName) } } } else if result.Nodes != nil { nodeResult = make([]*v1.Node, len(result.Nodes.Items)) for i := range result.Nodes.Items { nodeResult[i] = &result.Nodes.Items[i] } } return nodeResult, result.FailedNodes, nil }
優先級階段調用extender插件是並行的,經過並行的調用extender獲取主機結果,而後再串行的彙總結果,計算算法爲:主機得分=得分*當前extender的優先級
var mu sync.Mutex var wg sync.WaitGroup combinedScores := make(map[string]int64, len(nodes)) for i := range g.extenders { if !g.extenders[i].IsInterested(pod) { continue } wg.Add(1) // 並行調用 extender go func(extIndex int) { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec() wg.Done() }() prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) if err != nil { // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities return } mu.Lock() // 串行進行結果的彙總 for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score if klog.V(10) { klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score) } // 主機的結果=得分*當前extender的優先級 combinedScores[host] += score * weight } mu.Unlock() }(i) } // wait for all go routines to finish wg.Wait()
結果彙總的得分,在當前版本中的計算:主機得分=主機得分*(100/10),
for i := range result { // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, therefore we need to scale the score returned by extenders to the score range used by the scheduler. result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority) }
優先級調用接口跟Filter流程上都是同樣的,只須要拼接傳遞數據,而後返回結果便可,不一樣的是返回結果中會返回當前extender的優先級,以用於後續計算
func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) { var ( result extenderv1.HostPriorityList nodeList *v1.NodeList nodeNames *[]string args *extenderv1.ExtenderArgs ) if h.prioritizeVerb == "" { result := extenderv1.HostPriorityList{} for _, node := range nodes { result = append(result, extenderv1.HostPriority{Host: node.Name, Score: 0}) } return &result, 0, nil } // 根據node cache來進行傳遞參數的構建 if h.nodeCacheCapable { nodeNameSlice := make([]string, 0, len(nodes)) for _, node := range nodes { nodeNameSlice = append(nodeNameSlice, node.Name) } nodeNames = &nodeNameSlice } else { nodeList = &v1.NodeList{} for _, node := range nodes { nodeList.Items = append(nodeList.Items, *node) } } args = &extenderv1.ExtenderArgs{ Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } if err := h.send(h.prioritizeVerb, args, &result); err != nil { return nil, 0, err } // 返回結果 return &result, h.weight, nil }
綁定階段其實就只須要把當前結果傳遞給對應的插件,便可
func (h *HTTPExtender) Bind(binding *v1.Binding) error { var result extenderv1.ExtenderBindingResult if !h.IsBinder() { // This shouldn't happen as this extender wouldn't have become a Binder. return fmt.Errorf("Unexpected empty bindVerb in extender") } req := &extenderv1.ExtenderBindingArgs{ PodName: binding.Name, PodNamespace: binding.Namespace, PodUID: binding.UID, Node: binding.Target.Name, } if err := h.send(h.bindVerb, &req, &result); err != nil { return err } if result.Error != "" { return fmt.Errorf(result.Error) } return nil }
新年回來第一次更新,文章內容相對簡單一點,今天就到這裏了,謝謝大佬們觀看,但願對大佬們有用,擴展機制的後續總結會在分析完framework以後,但願大佬們能幫轉發下,謝謝你們 > 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章
> 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈