chedulingQueue是kubernetes scheduler中負責進行等待調度pod存儲的對,Scheduler經過SchedulingQueue來獲取當前系統中等待調度的Pod,本文主要討論SchedulingQueue的設計與實現的各類實現, 瞭解探究其內部實現與底層源碼,本系列代碼基於kubernets1.1.6分析而來node
類型 | 描述 | 一般實現 |
---|---|---|
隊列 | 普通隊列是一個FIFO的數據結構,根據元素入隊的次序依次出隊 | 數組或者鏈表 |
優先級隊列 | 優先級隊列一般是指根據某些優先級策略,高優先級會優先被獲取 | 數組或者樹 |
其實在大多數的調度場景中,大多都是採用優先級隊列來實現,優先知足優先級比較高的任務或者需求,從而減小後續高優先級對低優先級的搶佔,scheduler中也是如此 算法
k8s中調度的單元是Pod,scheduler中根據pod的優先級的高低來進行優先級隊列的構建, 這個實際上是在kubernets的adminission准入插件中,會爲用戶建立的pod根據用戶的設置,進行優先級字段的計算編程
活動隊列存儲當前系統中全部正在等待調度的隊列數組
當pod的資源在當前集羣中不能被知足時,則會被加入到一個不可調度隊列中,而後等待稍後再進行嘗試緩存
backoff機制是併發編程中常見的一種機制,即若是任務反覆執行依舊失敗,則會按次增加等待調度時間,下降重試效率,從而避免反覆失敗浪費調度資源安全
針對調度失敗的pod會優先存儲在backoff隊列中,等待後續重試微信
當隊列中不存在等待調度的pod的時候,會阻塞scheduler等待有須要調度的pod的時候再喚醒調度器,獲取pod進行調度數據結構
nominatedPods存儲pod被提議運行的node,主要用於搶佔調度流程中使用,本節先不分析併發
kubernetes中默認的schedulingQueue實現是PriorityQueue,本章就以該數據結構來分析app
type PriorityQueue struct { stop <-chan struct{} clock util.Clock // 存儲backoff的pod計時器 podBackoff *PodBackoffMap lock sync.RWMutex // 用於協調通知由於獲取不到調度pod而阻塞的cond cond sync.Cond // 活動隊列 activeQ *util.Heap // backoff隊列 podBackoffQ *util.Heap // 不可調度隊列 unschedulableQ *UnschedulablePodsMap // 存儲pod和被提名的node, 實際上就是存儲pod和建議的node節點 nominatedPods *nominatedPodMap // schedulingCycle是一個調度週期的遞增序號,當pod pop的時候會遞增 schedulingCycle int64 // moveRequestCycle緩存schedulingCycle, 當未調度的pod從新被添加到activeQueue中 // 會保存schedulingCycle到moveRequestCycle中 moveRequestCycle int64 closed bool }
PriorityQueue做爲實現SchedulingQueue的實現,其核心數據結構主要包含三個隊列:activeQ、podBackoffQ、unscheduleQ內部經過cond來實現Pop操做的阻塞與通知,接下來先分析核心的調度流程,最後再分析util.Heap裏面的具體實現
存儲全部等待調度的Pod的隊列,默認是基於堆來實現,其中元素的優先級則經過對比pod的建立時間和pod的優先級來進行排序
// activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. activeQ *util.Heap
優先級比較函數
// activeQComp is the function used by the activeQ heap algorithm to sort pods. // It sorts pods based on their priority. When priorities are equal, it uses // PodInfo.timestamp. func activeQComp(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.PodInfo) pInfo2 := podInfo2.(*framework.PodInfo) prio1 := util.GetPodPriority(pInfo1.Pod) prio2 := util.GetPodPriority(pInfo2.Pod) // 首先根據優先級的高低進行比較,而後根據pod的建立時間,越高優先級的Pod越被優先調度 // 越早建立的pod越優先 return (prio1 > prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp)) }
podBackOffQ主要存儲那些在多個schedulingCycle中依舊調度失敗的狀況下,則會經過以前說的backOff機制,延遲等待調度的時間
// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ podBackoffQ *util.Heap
上面提到podBackOffQ隊列中並無存儲pod的backOff的具體信息,好比backoff的計數器,最後一次更新的時間等,podBackOff則相似一個記分板,記錄這些信息,供podBackOffQ使用
// podBackoff tracks backoff for pods attempting to be rescheduled podBackoff *PodBackoffMap // PodBackoffMap is a structure that stores backoff related information for pods type PodBackoffMap struct { // lock for performing actions on this PodBackoffMap lock sync.RWMutex // initial backoff duration initialDuration time.Duration // 當前值是1秒 // maximal backoff duration maxDuration time.Duration // 當前值是1分鐘 // map for pod -> number of attempts for this pod podAttempts map[ktypes.NamespacedName]int // map for pod -> lastUpdateTime pod of this pod podLastUpdateTime map[ktypes.NamespacedName]time.Time }
存儲已經嘗試調度可是當前集羣資源不知足的pod的隊列
當由於集羣資源發生變化會嘗試進行unschedulableQ中的pod轉移到activeQ,moveRequestCycle就是存儲資源變動時的schedulingCycle
func (p *PriorityQueue) MoveAllToActiveQueue() { // 省略其餘代碼 p.moveRequestCycle = p.schedulingCycle }
schedulingCycle是一個遞增的序列每次從activeQ中pop出一個pod都會遞增
func (p *PriorityQueue) Pop() (*v1.Pod, error) { //省略其餘 p.schedulingCycle++ }
SchedulingQueue提供了一個Pop接口用於從獲取當前集羣中等待調度的pod,其內部實現主要經過上面cond與activeQ來實現
當前隊列中沒有可調度的pod的時候,則經過cond.Wait來進行阻塞,而後在忘activeQ中添加pod的時候經過cond.Broadcast來實現通知
func (p *PriorityQueue) Pop() (*v1.Pod, error) { p.lock.Lock() defer p.lock.Unlock() for p.activeQ.Len() == 0 { if p.closed { return nil, fmt.Errorf(queueClosed) } // p.cond.Wait() } obj, err := p.activeQ.Pop() if err != nil { return nil, err } pInfo := obj.(*framework.PodInfo) p.schedulingCycle++ return pInfo.Pod, err }
當pod加入活動隊列中,除了加入activeQ的優先級隊列中,還須要從podBackoffQ和unschedulableQ中移除當前的pod,最後進行廣播通知阻塞在Pop操做的scheudler進行最新pod的獲取
func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() pInfo := p.newPodInfo(pod) // 加入activeQ if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err) return err } // 從unschedulableQ刪除 if p.unschedulableQ.get(pod) != nil { klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name) p.unschedulableQ.delete(pod) } // Delete pod from backoffQ if it is backing off // 從podBackoffQ刪除 if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name) } // 存儲pod和被提名的node p.nominatedPods.add(pod, "") p.cond.Broadcast() return nil }
致使調度週期schedulingCyclye變動主要因素以下:
1.當集羣資源發生變化的時候:好比新添加pv、node等資源,那以前在unschedulableQ中由於資源不知足需求的pod就能夠進行放入activeQ中或者podBackoffQ中,及時進行調度
2.pod被成功調度: 以前因爲親和性不知足被放入到unschedulableQ中的pod,此時也能夠進行嘗試,而沒必要等到超時以後,再加入
這兩種狀況下會分別觸發MoveAllToActiveQueue和movePodsToActiveQueue變動moveRequestCycle使其等於schedulingCycle
當前一個pod失敗的時候,有兩種選擇一是加入podBackoffQ中,二是加入unschedulableQ中,那麼針對一個失敗的pod如何選擇該進入那個隊列中呢
結合上面的moveRequestCycle變動時機,何時moveRequestCycle會大於等於podSchedulingCycle呢?答案就是當前集羣中進行過集羣資源的變動或者pod被成功分配,那這個時候咱們若是重試一個失敗的調度則可能會成功,由於集羣資源變動了可能有新的資源加入
if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { p.unschedulableQ.addOrUpdate(pInfo) }
在建立scheduler Config的時候會經過MakeDefaultErrorFunc注入一個失敗處理函數, 在scheduler調度的時候會進行調用
kubernetes/pkg/scheduler/factory/factory.go: MakeDefaultErrorFunc會將沒有調度到任何一個node的pod從新放回到優先級隊列中
podSchedulingCycle := podQueue.SchedulingCycle() // 省略非核心代碼 if len(pod.Spec.NodeName) == 0 { //從新放回隊列 if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil { klog.Error(err) } }
當調度pod的失敗的時候, scheduler會同時調用sched.Error就是上面注入的失敗處理邏輯,來將調度失敗未分配node的pod節點從新加入到隊裏鍾
kubernetes/pkg/scheduler/scheduler.go
func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) { // 錯誤回調 sched.Error(pod, err) sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message) if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: reason, Message: err.Error(), }); err != nil { klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err) } }
PodBackoffMap主要用於存儲pod的最後一次失敗的更新時間與實現次數,從而根據這些數據來進行pod的backoffTime的計算
type PodBackoffMap struct { // lock for performing actions on this PodBackoffMap lock sync.RWMutex // 初始化 backoff duration initialDuration time.Duration // 當前值是1秒 // 最大 backoff duration maxDuration time.Duration // 當前值是1分鐘 // 記錄pod重試的次數 podAttempts map[ktypes.NamespacedName]int // 記錄pod的最後一次的更新時間 podLastUpdateTime map[ktypes.NamespacedName]time.Time }
初始化的時候回設定initialDuration和maxDuration,在當前版本中分別是1s和10s,也就是backoffQ中的pod最長10s就會從新加入activeQ中(須要等待定時任務進行輔助)
在每次失敗回調的時候,都會進行BackoffPod方法來進行計數更新,在後續獲取pod的backoffTime的時候,只須要獲取次數而後結合initialDuration進行算法計算,結合pod最後一次的更新時間,就會獲取pod的backoffTime的終止時間
其實最終的計算很簡單就是2的N次冪
func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration { // initialDuration是1s backoffDuration := pbm.initialDuration if _, found := pbm.podAttempts[nsPod]; found { // podAttempts裏面包含pod的嘗試失敗的次數 for i := 1; i < pbm.podAttempts[nsPod]; i++ { backoffDuration = backoffDuration * 2 // 最大10s if backoffDuration > pbm.maxDuration { return pbm.maxDuration } } } return backoffDuration }
podBackoffQ實際上會根據pod的backoffTime來進行優先級排序,因此podBackoffQ的隊列頭部,就是最近一個要過時的pod
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool { pInfo1 := podInfo1.(*framework.PodInfo) pInfo2 := podInfo2.(*framework.PodInfo) bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod)) bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod)) return bo1.Before(bo2) }
若是調度失敗,而且moveRequestCycle=podSchedulingCycle的時候就加入podBackfoffQ中
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error { // 省略檢查性代碼 // 更新pod的backoff 信息 p.backoffPod(pod) // moveRequestCycle將pod從unscheduledQ大於pod的調度週期添加到 若是pod的調度週期小於當前的調度週期 if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { p.unschedulableQ.addOrUpdate(pInfo) } p.nominatedPods.add(pod, "") return nil }
在前面介紹的當集羣資源發生變動的時候,會觸發嘗試unschedulabelQ中的pod進行轉移,若是發現當前pod還未到達backoffTime,就加入到podBackoffQ中
if p.isPodBackingOff(pod) { if err := p.podBackoffQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err) addErrorPods = append(addErrorPods, pInfo) } } else { if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err) addErrorPods = append(addErrorPods, pInfo) } }
在建立PriorityQueue的時候,會建立兩個定時任務其中一個就是講backoffQ中的pod到期後的轉移,每秒鐘嘗試一次
func (p *PriorityQueue) run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) }
由於是一個堆結果,因此只須要獲取堆頂的元素,而後肯定是否到期,若是到期後則進行pop處來,加入到activeQ中
func (p *PriorityQueue) flushBackoffQCompleted() { p.lock.Lock() defer p.lock.Unlock() for { // 獲取堆頂元素 rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { return } pod := rawPodInfo.(*framework.PodInfo).Pod // 獲取到期時間 boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod)) if !found { // 若是當前已經不在podBackoff中,則就pop出來而後放入到activeQ klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod)) p.podBackoffQ.Pop() p.activeQ.Add(rawPodInfo) defer p.cond.Broadcast() continue } // 未超時 if boTime.After(p.clock.Now()) { return } // 超時就pop出來 _, err := p.podBackoffQ.Pop() if err != nil { klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod)) return } // 加入到activeQ中 p.activeQ.Add(rawPodInfo) defer p.cond.Broadcast() } }
調度失敗後,若是當前集羣資源沒有發生變動,就加入到unschedulable,緣由上面說過
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error { // 省略檢查性代碼 // 更新pod的backoff 信息 p.backoffPod(pod) // moveRequestCycle將pod從unscheduledQ大於pod的調度週期添加到 若是pod的調度週期小於當前的調度週期 if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { p.unschedulableQ.addOrUpdate(pInfo) } p.nominatedPods.add(pod, "") return nil }
定時任務每30秒執行一次
func (p *PriorityQueue) run() { go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) }
邏輯其實就很是簡單若是當前時間-pod的最後調度時間大於60s,就從新調度,轉移到podBackoffQ或者activeQ中
func (p *PriorityQueue) flushUnschedulableQLeftover() { p.lock.Lock() defer p.lock.Unlock() var podsToMove []*framework.PodInfo currentTime := p.clock.Now() for _, pInfo := range p.unschedulableQ.podInfoMap { lastScheduleTime := pInfo.Timestamp // 若是該pod1分鐘內沒有被調度就加入到podsToMove if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval { podsToMove = append(podsToMove, pInfo) } } if len(podsToMove) > 0 { // podsToMove將這些pod移動到activeQ p.movePodsToActiveQueue(podsToMove) } }
從設計上三隊列分別存儲:活動隊列、bakcoff隊列、不可調度隊列,其中backoff中會根據任務的失敗來逐步遞增重試時間(最長10s)、unschedulableQ隊列則延遲60s
經過後臺定時任務分別將backoffQ隊列、unschedulableQ隊列來進行重試,加入到activeQ中,從而加快完成pod的失敗重試調度
schedulingCycle、moveRequestCycle兩個cycle其實本質上也是爲了加快失敗任務的重試調度,當集羣資源發生變化的時候,進行當即重試,那些失敗的優先級比較高、親和性問題的pod均可能會被優先調度
內部經過lock保證線程安全,並經過cond來實現阻塞等待,從而實現阻塞scheduler worker的通知
今天就分析到這裏,其實參考這個實現,咱們也能夠從中抽象出一些設計思想,實現本身的一個具備優先級、快速重試、高可用的任務隊列,先分析到這,下一個分析的組件是SchedulerCache, 感興趣能夠加我微信一塊兒交流學習,畢竟三個臭皮匠算計不過諸葛亮
k8s源碼閱讀電子書地址: https://www.yuque.com/baxiaoshi/tyado3