圖解kubernetes調度器SchedulerExtender擴展

在kubernetes的scheduler調度器的設計中爲用戶預留了兩種擴展機制SchdulerExtender與Framework,本文主要淺談一下SchdulerExtender的實現, 由於還有一篇Framework, 因此本文的k8s代碼切到1.18版本node

1. 設計思路

image.png

1.1 實現機制

SchdulerExtender是kubernets外部擴展方式,用戶能夠根據需求獨立構建調度服務,實現對應的遠程調用接口(目前是http), scheduler在調度的對應階段會根據用戶定義的資源和接口來進行遠程調用,對應的service根據本身的資源數據和scheduler傳遞過來的中間調度結果來進行決策算法

1.2 服務插拔

extender只須要實現對應插件的接口,並編寫yaml文件來進行註冊對應的服務接口,就能夠實現scheduler的擴展,不須要修改任何調度器的代碼,便可實現調度插件的插拔json

1.3 資源存儲

由於是獨立的服務,extender能夠實現自定義資源的存儲與獲取,甚至能夠不依賴於etcd使用第三方的存儲來進行資源的存儲,主要是用於kubernetes中不支持的那些資源的調度擴展緩存

2. SchedulerExtender

2.1 接口與實現

2.1.1 接口聲明

Scheduler主要用於擴展微信

type SchedulerExtender interface {
	// Name returns a unique name that identifies the extender.
	Name() string

	//預選階段, 進行篩選
	Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)

	// 優選階段,參與優選評分
	Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)

	// extender對pod指向綁定操做
	Bind(binding *v1.Binding) error

	// 擴展是否支持bind
	IsBinder() bool

	// 是否對對應的pod的資源感興趣
	IsInterested(pod *v1.Pod) bool
	// 搶佔階段
	ProcessPreemption(
		pod *v1.Pod,
		nodeToVictims map[*v1.Node]*extenderv1.Victims,
		nodeInfos listers.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error)

	// 是否支持搶佔
	SupportsPreemption() bool

	// IsIgnorable returns true indicates scheduling should not fail when this extender
	// is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well.
	IsIgnorable() bool
}

2.1.2 默認實現

// HTTPExtender implements the algorithm.SchedulerExtender interface.
type HTTPExtender struct {
	extenderURL      string
	preemptVerb      string
	filterVerb       string
	prioritizeVerb   string
	bindVerb         string
	weight           int64		  // 對應的權重
	client           *http.Client // 負責http接口經過
	nodeCacheCapable bool		  // 是否傳遞node元數據
	managedResources sets.String  // 當前extender管理的資源
	ignorable        bool
}

extender的默認是海鮮是同過 HTTPExtender實現,即基於http協議經過json來進行數據傳遞,其核心數據結構以下數據結構

2.2 關鍵實現機制

2.2.1 遠程通訊接口

image.png 其實通訊很簡單,經過http協議json序列化方式來進行遠程post的提交,並序列化返回的結果app

// Helper function to send messages to the extender
func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
	// 序列化
    out, err := json.Marshal(args)
	if err != nil {
		return err
	}

    // 拼接url
	url := strings.TrimRight(h.extenderURL, "/") + "/" + action

	req, err := http.NewRequest("POST", url, bytes.NewReader(out))
	if err != nil {
		return err
	}
	// 設置http header
	req.Header.Set("Content-Type", "application/json")

    // 發送數據接收結果
	resp, err := h.client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("Failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
	}
	// 序列化返回結果
	return json.NewDecoder(resp.Body).Decode(result)
}

2.2.2 node cache

image.png nodeCacheCapable是聲明extender的一個參數,即對應的extender是否會緩存node的數據,若是緩存數據,則只須要傳遞node的名字,而不會進行全部元數據的傳遞,能夠減小通訊的數據包大小ide

if h.nodeCacheCapable {
		nodeNameSlice := make([]string, 0, len(nodes))
		for _, node := range nodes {
            // 只會傳遞node的名字
			nodeNameSlice = append(nodeNameSlice, node.Name)
		}
		nodeNames = &nodeNameSlice
	} else {
		nodeList = &v1.NodeList{}
		for _, node := range nodes {
            // 傳遞node全部元數據
			nodeList.Items = append(nodeList.Items, *node)
		}
	}
	// 構建傳遞的數據
	args = &extenderv1.ExtenderArgs{
		Pod:       pod,
		Nodes:     nodeList,
		NodeNames: nodeNames,
	}

2.2.3 managedResources

image.png 在進行extender的調用的時候,會進行檢測extenders會否對對應的pod的container的資源感興趣,若是感興趣,則進行調用,不然則會進行跳過源碼分析

func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
	if h.managedResources.Len() == 0 {
		return true
	}
    // pod的容器
	if h.hasManagedResources(pod.Spec.Containers) {
		return true
	}
    // pod的初始化容器
	if h.hasManagedResources(pod.Spec.InitContainers) {
		return true
	}
	return false
}

func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
	for i := range containers {
		container := &containers[i]
        // 檢查container的requests裏面是否有感興趣的資源
		for resourceName := range container.Resources.Requests {
			if h.managedResources.Has(string(resourceName)) {
				return true
			}
		}
        // 檢查container的limits裏面是否有感興趣的資源
		for resourceName := range container.Resources.Limits {
			if h.managedResources.Has(string(resourceName)) {
				return true
			}
		}
	}
	return false
}

2.3 過濾接口Filter

Filter主要是用於在預選階段完成後調用extender進行二次過濾post

2.3.1 循環串行調用

在findNodesThatPassExtenders中會遍歷全部的extender來肯定是否關心對應的資源,若是關心就會調用Filter接口來進行遠程調用,並將篩選結果傳遞給下一個extender,逐步縮小篩選集合,注意這個階段的插件調用是串行,由於每一個插件都以上個插件的結果來繼續篩選

func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, filtered []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
	for _, extender := range g.extenders {
		if len(filtered) == 0 {
			break
		}
        // 判斷對應的extender是否關心pod中容器的資源
		if !extender.IsInterested(pod) {
			continue
		}
        // 進行遠程過程的調用
		filteredList, failedMap, err := extender.Filter(pod, filtered)
		if err != nil {
			if extender.IsIgnorable() {
				klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
					extender, err)
				continue
			}
			return nil, err
		}
		// 經過結果
		for failedNodeName, failedMsg := range failedMap {
			if _, found := statuses[failedNodeName]; !found {
				statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
			} else {
				statuses[failedNodeName].AppendReason(failedMsg)
			}
		}
        // 傳遞給下一個extender以前的FIlter結果
		filtered = filteredList
	}
	return filtered, nil
}

2.3.2 遠程過濾接口

func (h *HTTPExtender) Filter(
	pod *v1.Pod,
	nodes []*v1.Node,
) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
	var (
		result     extenderv1.ExtenderFilterResult
		nodeList   *v1.NodeList
		nodeNames  *[]string
		nodeResult []*v1.Node
		args       *extenderv1.ExtenderArgs
	)
	fromNodeName := make(map[string]*v1.Node)
	for _, n := range nodes {
		fromNodeName[n.Name] = n
	}

	if h.filterVerb == "" {
		return nodes, extenderv1.FailedNodesMap{}, nil
	}

    // 根據nodeCacheCapable來進行參數的傳遞
	if h.nodeCacheCapable {
		nodeNameSlice := make([]string, 0, len(nodes))
		for _, node := range nodes {
			nodeNameSlice = append(nodeNameSlice, node.Name)
		}
		nodeNames = &nodeNameSlice
	} else {
		nodeList = &v1.NodeList{}
		for _, node := range nodes {
			nodeList.Items = append(nodeList.Items, *node)
		}
	}

	args = &extenderv1.ExtenderArgs{
		Pod:       pod,
		Nodes:     nodeList,
		NodeNames: nodeNames,
	}
	// 調用對應service的filter接口
	if err := h.send(h.filterVerb, args, &result); err != nil {
		return nil, nil, err
	}
	if result.Error != "" {
		return nil, nil, fmt.Errorf(result.Error)
	}

    // 根據nodeCacheCapable和結果來進行結果數據的組合
	if h.nodeCacheCapable && result.NodeNames != nil {
		nodeResult = make([]*v1.Node, len(*result.NodeNames))
		for i, nodeName := range *result.NodeNames {
			if n, ok := fromNodeName[nodeName]; ok {
				nodeResult[i] = n
			} else {
				return nil, nil, fmt.Errorf(
					"extender %q claims a filtered node %q which is not found in the input node list",
					h.extenderURL, nodeName)
			}
		}
	} else if result.Nodes != nil {
		nodeResult = make([]*v1.Node, len(result.Nodes.Items))
		for i := range result.Nodes.Items {
			nodeResult[i] = &result.Nodes.Items[i]
		}
	}

	return nodeResult, result.FailedNodes, nil
}

2.4 優先級接口Prioritize

2.4.1 並行優先級統計

優先級階段調用extender插件是並行的,經過並行的調用extender獲取主機結果,而後再串行的彙總結果,計算算法爲:主機得分=得分*當前extender的優先級

var mu sync.Mutex
		var wg sync.WaitGroup
		combinedScores := make(map[string]int64, len(nodes))
		for i := range g.extenders {
			if !g.extenders[i].IsInterested(pod) {
				continue
			}
			wg.Add(1)
            // 並行調用 extender
			go func(extIndex int) {
				metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc()
				defer func() {
					metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec()
					wg.Done()
				}()
				prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
				if err != nil {
					// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
					return
				}
				mu.Lock()
                // 串行進行結果的彙總
				for i := range *prioritizedList {
					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
					if klog.V(10) {
						klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score)
					}
                    // 主機的結果=得分*當前extender的優先級
					combinedScores[host] += score * weight
				}
				mu.Unlock()
			}(i)
		}
		// wait for all go routines to finish
		wg.Wait()

2.4.2 合併優先級結果

結果彙總的得分,在當前版本中的計算:主機得分=主機得分*(100/10),

for i := range result {
			// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, therefore we need to scale the score returned by extenders to the score range used by the scheduler.
			result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
		}

2.4.3 優先級接口調用

優先級調用接口跟Filter流程上都是同樣的,只須要拼接傳遞數據,而後返回結果便可,不一樣的是返回結果中會返回當前extender的優先級,以用於後續計算

func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) {
	var (
		result    extenderv1.HostPriorityList
		nodeList  *v1.NodeList
		nodeNames *[]string
		args      *extenderv1.ExtenderArgs
	)

	if h.prioritizeVerb == "" {
		result := extenderv1.HostPriorityList{}
		for _, node := range nodes {
			result = append(result, extenderv1.HostPriority{Host: node.Name, Score: 0})
		}
		return &result, 0, nil
	}

    // 根據node cache來進行傳遞參數的構建
	if h.nodeCacheCapable {
		nodeNameSlice := make([]string, 0, len(nodes))
		for _, node := range nodes {
			nodeNameSlice = append(nodeNameSlice, node.Name)
		}
		nodeNames = &nodeNameSlice
	} else {
		nodeList = &v1.NodeList{}
		for _, node := range nodes {
			nodeList.Items = append(nodeList.Items, *node)
		}
	}

	args = &extenderv1.ExtenderArgs{
		Pod:       pod,
		Nodes:     nodeList,
		NodeNames: nodeNames,
	}

	if err := h.send(h.prioritizeVerb, args, &result); err != nil {
		return nil, 0, err
	}
    // 返回結果
	return &result, h.weight, nil
}

2.5 綁定階段

綁定階段其實就只須要把當前結果傳遞給對應的插件,便可

func (h *HTTPExtender) Bind(binding *v1.Binding) error {
	var result extenderv1.ExtenderBindingResult
	if !h.IsBinder() {
		// This shouldn't happen as this extender wouldn't have become a Binder.
		return fmt.Errorf("Unexpected empty bindVerb in extender")
	}
	req := &extenderv1.ExtenderBindingArgs{
		PodName:      binding.Name,
		PodNamespace: binding.Namespace,
		PodUID:       binding.UID,
		Node:         binding.Target.Name,
	}
	if err := h.send(h.bindVerb, &req, &result); err != nil {
		return err
	}
	if result.Error != "" {
		return fmt.Errorf(result.Error)
	}
	return nil
}

新年回來第一次更新,文章內容相對簡單一點,今天就到這裏了,謝謝大佬們觀看,但願對大佬們有用,擴展機制的後續總結會在分析完framework以後,但願大佬們能幫轉發下,謝謝你們 > 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 21天大棚 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈

相關文章
相關標籤/搜索