圖解kubernetes scheduler基於map/reduce模式實現優選階段

優選階段經過分map/reduce模式來實現多個node和多種算法的並行計算,而且經過基於二級索引來設計最終的存儲結果,從而達到整個計算過程當中的無鎖設計,同時爲了保證分配的隨機性,針對同等優先級的採用了隨機的方式來進行最終節點的分配,若是你們後續有相似的需求,不妨能夠借鑑借鑑node

1. 設計基礎

1.1 兩階段: 單點與聚合

在進行優選的時候,除了最後一次計算,在進行鍼對單個算法的計算的時候,會分爲兩個階段:單點和聚合算法

在單點階段,會根據當前算法針對單個node計算 在聚合階段,則會根據當前單點階段計算完成後,來進行聚合api

1.2 並行: 節點與算法

單點和聚合兩階段在計算的時候,都是並行的,可是對象則不一樣,其中單點階段並行是針對單個node的計算,而聚合階段則是針對算法級別的計算,經過這種設計分離計算,從而避免多goroutine之間數據競爭,無鎖加速優選的計算數組

1.3 map與reduce

而map與reduce則是針對一個上面並行的兩種具體實現,其中map中負責單node打分,而reduce則是針對map階段的打分進行聚合後,根據彙總的結果進行二次打分計算微信

1.4 weight

map/reduce階段都是經過算法計算,若是咱們要進行自定義的調整,針對單個算法,咱們能夠調整其在預選流程中的權重,從而進行定製本身的預選流程 數據結構

1.5 隨機分佈

當進行優先級判斷的時候,確定會出現多個node優先級相同的狀況,在優選節點的時候,會進行隨機計算,從而決定是否用當前優先級相同的node替換以前的最合適的nodeapp

2. 源碼分析 

優選的核心流程主要是在PrioritizeNodes中,這裏只介紹其關鍵的核心數據結構設計ide

2.1 無鎖計算結果保存

無鎖計算結果的保存主要是經過下面的二維數組實現, 若是要存儲一個算法針對某個node的結果,其實只須要經過兩個索引便可:算法索引和節點索引,同理若是我吧針對單個node的索引分配給一個goroutine,則其去其餘的goroutine則就能夠並行計算 image.png函數

// 在計算的時候,會傳入nodes []*v1.Node的數組,存儲全部的節點,節點索引主要是指的該部分
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

2.2 基於節點索引的Map計算

image.png 以前在預選階段介紹過ParallelizeUntil函數的實現,其根據傳入的數量來生成計算索引,放入chan中,後續多個goroutine從chan中取出數據直接進行計算便可源碼分析

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
		// 根據節點和配置的算法進行計算
		nodeInfo := nodeNameToInfo[nodes[index].Name]
            // 獲取算法的索引
		for i := range priorityConfigs {
			if priorityConfigs[i].Function != nil {
				continue
			}

			var err error
                
                // 經過節點索引,來進行鍼對單個node的計算結果的保存
			results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
			if err != nil {
				appendError(err)
				results[i][index].Host = nodes[index].Name
			}
		}
	})

2.3 基於算法索引的Reduce計算

image.png 基於算法的並行,則是爲每一個算法的計算都啓動一個goroutine,每一個goroutine經過算法索引來進行該算法的全部map階段的結果的讀取,並進行計算,後續結果仍然存儲在對應的位置

// 計算策略的分值
	for i := range priorityConfigs {
		if priorityConfigs[i].Reduce == nil {
			continue
		}
		wg.Add(1)
		go func(index int) {
			defer wg.Done()
			if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
				appendError(err)
			}
			if klog.V(10) {
				for _, hostPriority := range results[index] {
					klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
				}
			}
		}(i)
	}
	// Wait for all computations to be finished.
	wg.Wait()

2.4 優先級打分結果統計

根據以前的map/reduce階段,接下來就是將針對全部node的全部算法計算結果進行累加便可

// Summarize all scores.
	result := make(schedulerapi.HostPriorityList, 0, len(nodes))

	for i := range nodes {
		result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
		// 便利全部的算法配置
		for j := range priorityConfigs {
			result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
		}

		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}

2.5 根據優先級隨機篩選host

這裏的隨機篩選是指的當多個host優先級相同的時候,會有必定的機率用當前的node替換以前的優先級相等的node(到目前爲止的優先級最高的node), 其主要經過cntOfMaxScore和rand.Intn(cntOfMaxScore)來進行實現

func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
	if len(priorityList) == 0 {
		return "", fmt.Errorf("empty priorityList")
	}
	maxScore := priorityList[0].Score
	selected := priorityList[0].Host
	cntOfMaxScore := 1
	for _, hp := range priorityList[1:] {
		if hp.Score > maxScore {
			maxScore = hp.Score
			selected = hp.Host
			cntOfMaxScore = 1
		} else if hp.Score == maxScore {
			cntOfMaxScore++
			if rand.Intn(cntOfMaxScore) == 0 {
				// Replace the candidate with probability of 1/cntOfMaxScore
				selected = hp.Host
			}
		}
	}
	return selected, nil
}

3. 設計總結

優選階段經過分map/reduce模式來實現多個node和多種算法的並行計算,而且經過基於二級索引來設計最終的存儲結果,從而達到整個計算過程當中的無鎖設計,同時爲了保證分配的隨機性,針對同等優先級的採用了隨機的方式來進行最終節點的分配,若是你們後續有相似的需求,不妨能夠借鑑借鑑

本系列純屬我的臆測僅供參考,若是有看出錯誤的大佬歡迎指正

> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 21天大棚 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈

相關文章
相關標籤/搜索