kubernetes調度器以前已經分析過SchedulerCache、ScheduleAlgorithm、SchedulerExtender、Framework等核心數據結構,也分析了優選、調度、搶佔流程的核心實現,本文是本系列目前打算的最後一章, 也是當前階段對調度的學習的一個總結node
整個系列文檔我已經已經更新到語雀上了地址是,謝謝你們分享加微信一塊兒交流 https://www.yuque.com/baxiaoshi/tyado3/git
Binder負責將調度器的調度結果,傳遞給apiserver,即將一個pod綁定到選擇出來的node節點github
在scheduler/factory中會構建一個默認的binder算法
func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder { defaultBinder := &binder{client} return func(pod *v1.Pod) Binder { for _, extender := range extenders { if extender.IsBinder() && extender.IsInterested(pod) { return extender } } return defaultBinder } }
binder接口和簡單隻須要調用apiserver的pod的bind接口便可完成綁定操做api
// Implement Binder interface var _ Binder = &binder{} // Bind just does a POST binding RPC. func (b *binder) Bind(binding *v1.Binding) error { klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) }
執行綁定的操做位於Scheudler.bind接口,在調用Framework.RunBindPlugins後,只有當返回的狀態不是成功,而是SKIP的時候,才執行bind操做,真的不知道是怎麼想的,後續若是加入對應的bind插件,也須要返回SKIP,理解不了大神的思惟緩存
bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) var err error if !bindStatus.IsSuccess() { if bindStatus.Code() == framework.Skip { // 若是全部的插件都skip了菜容許將pod綁定到apiserver err = sched.GetBinder(assumed).Bind(&v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: targetNode, }, }) } else { err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message()) } }
調度器的參數的初始化已經都放到defaultSchedulerOptions中了,後續應該更多的都會採用改種方式,避免散落在構建參數的各個階段微信
var defaultSchedulerOptions = schedulerOptions{ schedulerName: v1.DefaultSchedulerName, schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{ Provider: defaultAlgorithmSourceProviderName(), }, hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, disablePreemption: false, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, bindTimeoutSeconds: BindTimeoutSeconds, podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), }
插件工廠註冊表的初始化分爲兩個部分in tree和out of tree即當前版本自帶的和用戶自定義的兩部分數據結構
// 首先進行當前版本的插件註冊表的註冊 registry := frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{ VolumeBinder: volumeBinder, }) // 加載用戶自定義的插件註冊表 if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { return nil, err }
綁定事件回調主要是經過AddAllEventHandlers主要是將各類資源數據經過SchedulerCache放入本地緩存中,同時針對未調度的pod(!assignedPod即沒有綁定Node的pod)加入到調度隊列中架構
func AddAllEventHandlers( sched *Scheduler, schedulerName string, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) {
當資源發生變化的時候,好比service、volume等就會對unschedulableQ中的以前調度失敗的pod進行重試,選擇將其轉移到activeQ或者backoffQ中併發
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) { p.lock.Lock() defer p.lock.Unlock() unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap)) // 獲取全部unschedulable的pod for _, pInfo := range p.unschedulableQ.podInfoMap { unschedulablePods = append(unschedulablePods, pInfo) } // 將unschedulable的pod轉移到backoffQ隊列或者activeQ隊列中 p.movePodsToActiveOrBackoffQueue(unschedulablePods, event) // 修改遷移調度器請求週期, 在失敗的時候會進行比較pod的moveRequestCycle是否>=schedulingCycle p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() }
最後則會啓動調度器,其核心流程是在scheduleOne中
func (sched *Scheduler) Run(ctx context.Context) { // 首先會進行同步緩存 if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } // 啓動調度隊列的後臺定時任務 sched.SchedulingQueue.Run() // 啓動調度流程 wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
獲取等待調度的pod則直接經過NextPod拉進行,其實內部就是對schedulingQUeue.pop的封裝
// 從隊列中獲取等待調度的pod podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return }
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo { return func() *framework.PodInfo { podInfo, err := queue.Pop() if err == nil { klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name) return podInfo } klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) return nil } }
skipPodSchedule即檢查當前 pod是否能夠進行跳過,其中一個是pod已經被刪除,另一個就是pod已經被提議調度到某個節點,此時若是隻是版本的更新,即除了ResourceVersion、Annotations、NodeName三個字段其他的都不曾變化,就不須要進行重複的調度
if sched.skipPodSchedule(pod) { return }
檢測提議pod重複調度算法, 若是相等則不進行任何操做
f := func(pod *v1.Pod) *v1.Pod { p := pod.DeepCopy() p.ResourceVersion = "" p.Spec.NodeName = "" // Annotations must be excluded for the reasons described in // https://github.com/kubernetes/kubernetes/issues/52914. p.Annotations = nil return p } assumedPodCopy, podCopy := f(assumedPod), f(pod) // 若是pod的信息沒有發生變動則不須要進行更新 if !reflect.DeepEqual(assumedPodCopy, podCopy) { return false } return true
生成CycleState和context, 其中CycleState用於進行調度器週期上線文數據傳遞共享,而context則負責統一的退出協調管理
// 構建CycleState和context state := framework.NewCycleState() state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel()
調度流程中底層依賴的數據結構ScheduleAlgorithm內部實現以前的分析中已經詳細說過,這裏會省略一些諸如volume bind、framework階段鉤子的調用
正常調度只須要調度ScheduleAlgorithm來進行調度,具體實現細節能夠看以前的文章
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
若是一個Pod被提議存儲到某個節點,則會先將其加入到SchedulerCache中,同時從SchedulingQueue中移除,避免重複調度
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { assumed.Spec.NodeName = host // 存儲到SchedulerCache中這樣下個調度週期中,pod會佔用對應node的資源 if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) return err } // if "assumed" is a nominated pod, we should remove it from internal cache // 從調度隊列中移除pod if sched.SchedulingQueue != nil { sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) } return nil }
bind階段與調度階段是並行的關係,當執行bind的時候,會啓動一個goroutine來單獨執行bind操做, 省略關於framework、extender相關的hook調用
在綁定流程中若是發現以前的Volumes未所有綁定,則會先進行volumes綁定操做
if !allBound { err := sched.bindVolumes(assumedPod)
綁定操做主要是位於scheduler.bind,會進行最終的節點綁定
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
執行以前說的bind綁定操做,這裏是真正操縱apiserver發生pod與node綁定請求的地方
bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) var err error if !bindStatus.IsSuccess() { if bindStatus.Code() == framework.Skip { // 若是全部的插件都skip了才容許將pod綁定到apiserver err = sched.GetBinder(assumed).Bind(&v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: targetNode, }, }) } else { err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message()) } }
會調用SchedulerCache裏面提議節點的過時時間,若是超過指定的過時時間,則會進行移除操做,釋放node資源
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) }
若是在以前正常調度失敗的時候,首先會發一個在recordSchedulingFailure中調用sched.Error來將失敗的pod轉移到backoffQ或者unschedulableQ隊列中
sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
若是是預選失敗的,而且當前調度器容許搶佔功能,則會進行搶佔調度處理即sched.preempt
if fitError, ok := err.(*core.FitError); ok { // 若是是預選失敗則進行 if sched.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { preemptionStartTime := time.Now() // 搶佔調度 sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) }
首先經過apiserver獲取當前須要執行搶佔的pod的最新Pod信息
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err }
經過Preempt篩選要進行搶佔操做的node節點、待驅逐的pod、待驅逐的提議的pod
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr) if err != nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return "", err }
若是節點搶佔一個pod成功,則會更新隊列中的搶佔節點的提議節點信息,這樣在下個調度週期中,就可使用該信息
sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
這裏會直接調用apiserver中節點的提議節點信息,爲何要這樣作呢?由於當前pod已經搶佔了node上部分的節點信息,可是在被搶佔的pod徹底從節點上刪除以前的這段時間,該pod調度依然會失敗,可是此時不能繼續調用搶佔流程了,由於你已經執行了搶佔,此時只須要等待對應節點上的node都刪除,則再詞繼續嘗試調度
err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
刪除被驅逐節點直接調用apiserver進行操做,若是此時發現當前pod還在等待插件的Allow操做,則直接進行Reject
for _, victim := range victims { // 調用apiserver進行刪除pod if err := sched.podPreemptor.deletePod(victim); err != nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } // If the victim is a WaitingPod, send a reject message to the PermitPlugin if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject("preempted") } sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) }
針對那些已經被提議調度到當前node的pod,會將其node設置爲空,從新進行調度選擇
for _, p := range nominatedPodsToClear { // 清理這些提議的pod rErr := sched.podPreemptor.removeNominatedNodeName(p) if rErr != nil { klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr) // We do not return as this error is not critical. } }
爲了不太多的線交叉,這裏我只給出了大的核心的流程,同時針對SchedulerExtender和Framework我作了簡化,多個階段其實都有調用,可是我只在最下面畫出了數據結構和調用, 這張圖基本上包含了大多數的關鍵的數據結構以及數據流向,但願能給想學習調度器的朋友一些幫助
調度器代碼的閱讀從開始到如今,應該已經有快一個月的時間了,讀到如今也算是對調度器的核心流程和關鍵的數據結構有一點了解,固然不少具體的調度算法,目前也並無去細看,由於初衷其實只是想了解下調度方面的架構設計與關鍵數據結構
源碼閱讀的過程當中我想最大的問題,可能就是關於一些數據結構和算法的設計的理解,固然我目前也都是本身的臆測做者的設計初衷,好在我是作運維開發的不少場景上其實還蠻容易理解的,好比服務打散、調度隊列的Pod轉移、併發意圖等等,後續若是有人閱讀有不同的理解,歡迎交流,指正小弟的一些錯誤理解
調度器目前應該仍然在開發中,目前已經吧優選階段移入到Framework,後續的預選應該也在計劃中,其次針對流程上的設計應該也在變更,好比不少說的nodeTree也在修改中,調度器的構建也更加工程化,反而比以前更好理解了,因此有興趣閱讀的,不必定要選擇老的版本,新的版本可能更容易一些
調度器將來的優化點我感受除了在調度流程和算法管理Framework的演進,更多的優化仍是在預選階段,即如何選擇選擇出最合適 node節點,該流程的優化應該主要分爲兩個部分:新Pod的預選和舊Pod的預選,即針對已知和未知的預選優化
針對已知的優化,一般能夠經過保存更多的數據,以空間來換時間的設計來進行更多狀態的保存加速預選 針對未知的優化,若是不考慮批處理任務,則其實針對未知的優化是個僞命題,由於在實際場景中,你一把不可能獲取同時上線1000個新的服務,可是你能夠同時調度10000個pod,那這些pod在以前的調度流程中,其實能夠保存更多的狀態數據,來加速預選,可是更多的數據狀態保存則對當前的調度系統的不少設計可能都須要進行變動,估計應該須要等到整個調度器的流程和插件固化以後再考慮吧
好吧就胡說到這裏吧,明天要開始新的模塊的學習,也但願能交到更多的朋友,我會把這個系列的全部文章整理程pdf,畢竟微信公共號的閱讀體驗是真很差
> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章
> 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈