圖解kubernetes調度器預選設計實現學習

Scheduler中在進行node選舉的時候會首先進行一輪預選流程,即從當前集羣中選擇一批node節點,本文主要分析k8s在預選流程上一些優秀的篩選設計思想,歡迎大佬們指正node

1. 基礎設計

1.1 預選場景

預選顧名思義就是從當前集羣中的全部的node中,選擇出知足當前pod資源和親和性等需求的node節點,如何在集羣中快速選擇這樣的節點,是個複雜的問題算法

1.2 平均分佈

平均分佈主要是經過讓一個分配索引來進行即只有當全部的node都在本輪分配週期內分配一次後,纔開始從頭進行分配,從而保證集羣的平均分佈api

1.3 預選中斷

預選終端即在預選的過程當中若是發現node已經不能知足當前pod資源需求的時候,就進行中斷預選流程,嘗試下一個節點微信

1.4 並行篩選

在當前k8s版本中,默認會啓動16個goroutine來進行並行的預選,從而提升性能,從而提升預選的性能app

1.5 局部最優解

預選流程須要從當前集羣中選擇一臺符合要求的node隨着集羣規模的增加,若是每次遍歷全部集羣node則會必然致使性能的降低,因而經過局部最優解的方式,縮小篩選節點的數量ide

2. 源碼分析

預選的核心流程是經過findNodesThatFit來完成,其返回預選結果供優選流程使用函數

2.1 取樣邏輯

取樣是經過當前集羣中的node數量和默認的最小值來決定本次預選階段須要獲取的node節點數量源碼分析

// 獲取全部的節點數量,並經過計算百分比,獲取本次選舉選擇的節點數量
		allNodes := int32(g.cache.NodeTree().NumNodes())
		// 肯定要查找node數量
		numNodesToFind := g.numFeasibleNodesToFind(allNodes)

2.2 取樣算法

取樣算法很簡單從集羣中獲取指定百分比的節點默認是50%,若是50%的節點數量小於minFeasibleNodesToFind則按照minFeasibleNodesToFind(最小取樣節點數量)來取樣,性能

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
	// 若是當前節點數量小於minFeasibleNodesToFind即小於100臺node
    // 同理百分好比果大於100就是全量取樣
    // 這兩種狀況都直接遍歷整個集羣中全部節點
    if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
		return numAllNodes
	}

	adaptivePercentage := g.percentageOfNodesToScore
	if adaptivePercentage <= 0 {
		adaptivePercentage = schedulerapi.DefaultPercentageOfNodesToScore - numAllNodes/125
		if adaptivePercentage < minFeasibleNodesPercentageToFind {
			adaptivePercentage = minFeasibleNodesPercentageToFind
		}
	}

    // 正常取樣計算:好比numAllNodes爲5000,而adaptivePercentage爲50%
    // 則numNodes=50000*0.5/100=250
	numNodes = numAllNodes * adaptivePercentage / 100
	if numNodes < minFeasibleNodesToFind { // 若是小於最少取樣則按照最少取樣進行取樣
		return minFeasibleNodesToFind
	}

	return numNodes
}

2.3 取樣元數據準備

經過filtered來進行預選結果的存儲,經過filteredLen來進行原子保護協做多個取樣goroutine, 並經過predicateMetaProducer和當前的snapshot來進行元數據構建ui

filtered = make([]*v1.Node, numNodesToFind)
		errs := errors.MessageCountMap{}
		var (
			predicateResultLock sync.Mutex
			filteredLen         int32
		)

		ctx, cancel := context.WithCancel(context.Background())

		// We can use the same metadata producer for all nodes.
		meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)

2.4 經過channel協做並行取樣

並行取樣主要經過調用下面的函數來啓動16個goroutine來進行並行取樣,並經過ctx來協調退出

workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

image.png 經過channel來構建取樣索引的管道,每一個worker會負責從channel獲取的指定索引取樣node的填充

func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
	var stop <-chan struct{}
	if ctx != nil {
		stop = ctx.Done()
	}

    // 生成指定數量索引,worker經過索引來進行預選成功節點的存儲
	toProcess := make(chan int, pieces)
	for i := 0; i < pieces; i++ {
		toProcess <- i
	}
	close(toProcess)

	if pieces < workers {
		workers = pieces
	}

	wg := sync.WaitGroup{}
	wg.Add(workers)
	for i := 0; i < workers; i++ {
        // 啓動多個goroutine
		go func() {
			defer utilruntime.HandleCrash()
			defer wg.Done()
			for piece := range toProcess {
				select {
				case <-stop:
					return
				default:
                    //獲取索引,後續會經過該索引來進行結果的存儲
					doWorkPiece(piece)
				}
			}
		}()
	}
    // 等待退出
	wg.Wait()
}

2.5 取樣並行函數

checkNode := func(i int) {
			// 獲取一個節點
			nodeName := g.cache.NodeTree().Next()

            // 取樣核心流程是經過podFitsOnNode來肯定
			fits, failedPredicates, status, err := g.podFitsOnNode(
				pluginContext,
				pod,
				meta,
				g.nodeInfoSnapshot.NodeInfoMap[nodeName],
				g.predicates, // 傳遞預選算法
				g.schedulingQueue,
				g.alwaysCheckAllPredicates,
			)
			if err != nil {
				predicateResultLock.Lock()
				errs[err.Error()]++
				predicateResultLock.Unlock()
				return
			}
			if fits {
				// 若是當前以及查找到的數量大於預選的數量,就退出
				length := atomic.AddInt32(&filteredLen, 1)
				if length > numNodesToFind {
					cancel()
					atomic.AddInt32(&filteredLen, -1)
				} else {
					filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
				}
			} else {
                // 進行錯誤狀態的保存 
				predicateResultLock.Lock()
				if !status.IsSuccess() {
					filteredNodesStatuses[nodeName] = status
				}
				if len(failedPredicates) != 0 {
					failedPredicateMap[nodeName] = failedPredicates
				}
				predicateResultLock.Unlock()
			}
		}

2.6 面向將來的篩選

image.png 在kubernetes中通過調度器調度後的pod結果會放入到SchedulingQueue中進行暫存,這些pod將來可能會通過後續調度流程運行在提議的node上,也可能由於某些緣由致使最終沒有運行,而預選流程爲了減小後續由於調度衝突(好比pod之間的親和性等問題,而且當前pod不能搶佔這些pod),則會在進行預選的時候,將這部分pod考慮進去

若是在這些pod存在的狀況下,node能夠知足當前pod的篩選條件,則能夠去除被提議的pod再進行篩選(若是這些提議的pod最終沒有調度到node,則當前node也須要知足各類親和性的需求)

2.6 取樣核心設計

image.png 結合上面說的面向將來的篩選,經過兩輪篩選在不管那些優先級高的pod是否被調度到當前node上,均可以知足pod的調度需求,在調度的流程中只須要獲取以前註冊的調度算法,完成預選檢測,若是發現有條件不經過則不會進行第二輪篩選,繼續選擇下一個節點

func (g *genericScheduler) podFitsOnNode(
	pluginContext *framework.PluginContext,
	pod *v1.Pod,
	meta predicates.PredicateMetadata,
	info *schedulernodeinfo.NodeInfo,
	predicateFuncs map[string]predicates.FitPredicate,
	queue internalqueue.SchedulingQueue,
	alwaysCheckAllPredicates bool,
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
	var failedPredicates []predicates.PredicateFailureReason
	var status *framework.Status

    // podsAdded主要用於標識當前是否有提議的pod若是沒有提議的pod則就不須要再進行一輪篩選了
	podsAdded := false
	
	for i := 0; i < 2; i++ {
		metaToUse := meta
		nodeInfoToUse := info
		if i == 0 {
			// 首先獲取那些提議的pod進行第一輪篩選, 若是第一輪篩選出錯,則不會進行第二輪篩選
			podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
		} else if !podsAdded || len(failedPredicates) != 0 {
            // 若是
			break
		}
		for _, predicateKey := range predicates.Ordering() {
			var (
				fit     bool
				reasons []predicates.PredicateFailureReason
				err     error
			)
			//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
			if predicate, exist := predicateFuncs[predicateKey]; exist {
				// 預選算法計算
				fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
				if err != nil {
					return false, []predicates.PredicateFailureReason{}, nil, err
				}

				if !fit {
					// eCache is available and valid, and predicates result is unfit, record the fail reasons
					failedPredicates = append(failedPredicates, reasons...)
					// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
					if !alwaysCheckAllPredicates {
						klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
							"evaluation is short circuited and there are chances " +
							"of other predicates failing as well.")
						break
					}
				}
			}
		}

		status = g.framework.RunFilterPlugins(pluginContext, pod, info.Node().Name)
		if !status.IsSuccess() && !status.IsUnschedulable() {
			return false, failedPredicates, status, status.AsError()
		}
	}

	return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}

> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 21天大棚 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈

相關文章
相關標籤/搜索