Framework是kubernetes擴展的第二種實現,相比SchedulerExtender基於遠程獨立Service的擴展,Framework核心則實現了一種基於擴展點的本地化的規範流程管理機制node
Framework的設計在官方文檔中已經有明確的描述,當前尚未Stable, 本文目前基於1.18版本聊一聊除了官方描述外的實現的上的一些細節git
目前官方主要是圍繞着以前的預選和優選階段進行擴展,提供了更多的擴展點,其中每一個擴展點都是一類插件,咱們能夠根據咱們的須要在對應的階段來進行擴展插件的編寫,實現調度加強github
在當前版本中優先級插件已經抽取到了framework中,後續應該會繼續將預選插件來進行抽取,這塊應該還得一段時間才能穩定編程
在Framework的實現中,每一個插件擴展階段調用都會傳遞context和CycleState兩個對象,其中context與咱們在大多數go編程中的用法相似,這裏主要是用於多階段並行處理的時候的統一退出操做,而CycleState則存儲當前這一個調度週期內的全部數據,這是一個併發安全的結構,內部包含一個讀寫鎖數組
Permit是在進行Bind綁定操做以前進行的一項操做,其主要設計目標是在進行bind以前,進行最後一道決策,即當前pod是否准許進行最終的Bind操做,具備一票否決權,若是裏面的插件拒絕,則對應的pod會從新進行調度安全
Framework的核心數據結構簡單的來講分爲三部分:插件集合(針對每一個擴展階段都會有本身的集合)、元數據獲取接口(集羣和快照數據的獲取)、等待Pod集合微信
插件集合中會根據不一樣的插件類型,來進行分類保存, 其中還有一個插件的優先級存儲map,目前只有在優選階段使用,後續可能會加入預選的優先級數據結構
pluginNameToWeightMap map[string]int queueSortPlugins []QueueSortPlugin preFilterPlugins []PreFilterPlugin filterPlugins []FilterPlugin postFilterPlugins []PostFilterPlugin scorePlugins []ScorePlugin reservePlugins []ReservePlugin preBindPlugins []PreBindPlugin bindPlugins []BindPlugin postBindPlugins []PostBindPlugin unreservePlugins []UnreservePlugin permitPlugins []PermitPlugin
主要是集羣中的一些數據獲取接口的實現,主要是爲了實現FrameworkHandle, 該接口主要是提供一些數據的獲取的接口和集羣操做的接口給插件使用閉包
clientSet clientset.Interface informerFactory informers.SharedInformerFactory volumeBinder *volumebinder.VolumeBinder snapshotSharedLister schedulerlisters.SharedLister
等待pod集合主要是存儲在Permit階段進行等待的pod,若是在等待週期中pod被刪除,則會直接拒絕併發
waitingPods *waitingPodsMap
經過插件工廠來存儲全部註冊的插件工廠,而後經過插件工廠構建具體的插件
registry Registry
工廠函數即傳入對應的參數,構建一個Plugin,其中FrameworkHandle主要是用於獲取快照和集羣的其餘數據
type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)
在go裏面大多數插件工廠的實現都是經過map來實現這裏也是同樣,對外暴露Register和UnRegister接口
type Registry map[string]PluginFactory // Register adds a new plugin to the registry. If a plugin with the same name // exists, it returns an error. func (r Registry) Register(name string, factory PluginFactory) error { if _, ok := r[name]; ok { return fmt.Errorf("a plugin named %v already exists", name) } r[name] = factory return nil } // Unregister removes an existing plugin from the registry. If no plugin with // the provided name exists, it returns an error. func (r Registry) Unregister(name string) error { if _, ok := r[name]; !ok { return fmt.Errorf("no plugin named %v exists", name) } delete(r, name) return nil } // Merge merges the provided registry to the current one. func (r Registry) Merge(in Registry) error { for name, factory := range in { if err := r.Register(name, factory); err != nil { return err } } return nil }
這裏以preFilterPlugins爲例展現整個流程的註冊
Plugins在配置階段進行構造,其會保存當前framework中註冊的全部的插件,其經過PluginSet保存對應的容許和禁用的插件
type Plugins struct { // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. QueueSort *PluginSet // PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework. PreFilter *PluginSet // Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod. Filter *PluginSet // PostFilter is a list of plugins that are invoked after filtering out infeasible nodes. PostFilter *PluginSet // Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase. Score *PluginSet // Reserve is a list of plugins invoked when reserving a node to run the pod. Reserve *PluginSet // Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod. Permit *PluginSet // PreBind is a list of plugins that should be invoked before a pod is bound. PreBind *PluginSet // Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework. // The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success. Bind *PluginSet // PostBind is a list of plugins that should be invoked after a pod is successfully bound. PostBind *PluginSet // Unreserve is a list of plugins invoked when a pod that was previously reserved is rejected in a later phase. Unreserve *PluginSet }
該方法主要是爲了實現對應插件類型和framework中保存對應插件類型數組的映射, 好比Prefilter與其關聯的preFilterPlugins切片,string(插件類型)->[]PreFilterPlugin(&reflect.SliceHeader切片頭)
func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint { return []extensionPoint{ {plugins.PreFilter, &f.preFilterPlugins}, {plugins.Filter, &f.filterPlugins}, {plugins.Reserve, &f.reservePlugins}, {plugins.PostFilter, &f.postFilterPlugins}, {plugins.Score, &f.scorePlugins}, {plugins.PreBind, &f.preBindPlugins}, {plugins.Bind, &f.bindPlugins}, {plugins.PostBind, &f.postBindPlugins}, {plugins.Unreserve, &f.unreservePlugins}, {plugins.Permit, &f.permitPlugins}, {plugins.QueueSort, &f.queueSortPlugins}, } }
其會遍歷全部的上面的映射,可是此處不會根據類型註冊到對應的切片中,而是全部的註冊到gpMAp中
func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { pgMap := make(map[string]config.Plugin) if plugins == nil { return pgMap } // 構建匿名函數,利用閉包來修改pgMap保存全部容許的插件集合 find := func(pgs *config.PluginSet) { if pgs == nil { return } for _, pg := range pgs.Enabled { // 遍歷全部容許的插件集合 pgMap[pg.Name] = pg // 保存到map中 } } // 遍歷上面的全部映射表 for _, e := range f.getExtensionPoints(plugins) { find(e.plugins) } return pgMap }
會調用生成的插件工廠註冊表,來經過每一個插件的Factory構建Plugin插件實例, 保存到pluginsMap中
pluginsMap := make(map[string]Plugin) for name, factory := range r { // pg即上面生成的pgMap,這裏只會生成須要使用的插件 if _, ok := pg[name]; !ok { continue } p, err := factory(pluginConfig[name], f) if err != nil { return nil, fmt.Errorf("error initializing plugin %q: %v", name, err) } pluginsMap[name] = p // 進行權重保存 f.pluginNameToWeightMap[name] = int(pg[name].Weight) if f.pluginNameToWeightMap[name] == 0 { f.pluginNameToWeightMap[name] = 1 } // Checks totalPriority against MaxTotalScore to avoid overflow if int64(f.pluginNameToWeightMap[name])*MaxNodeScore > MaxTotalScore-totalPriority { return nil, fmt.Errorf("total score of Score plugins could overflow") } totalPriority += int64(f.pluginNameToWeightMap[name]) * MaxNodeScore }
這裏主要是經過e.slicePtr利用反射,結合以前的構造的pluginsMap和反射來進行具體類型插件的註冊
for _, e := range f.getExtensionPoints(plugins) { if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil { return nil, err } }
updatePluginList主要是經過反射來進行的,經過上面的getExtensionPoints獲取的framework中對應的slice的地址,而後利用反射來進行插件的註冊和合法性效驗
func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]Plugin) error { if pluginSet == nil { return nil } // 首先經過Elem獲取當前數組的類型 plugins := reflect.ValueOf(pluginList).Elem() // 經過數組類型來獲取數組內部元素的類型 pluginType := plugins.Type().Elem() set := sets.NewString() for _, ep := range pluginSet.Enabled { pg, ok := pluginsMap[ep.Name] if !ok { return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name) } // 合法性檢查:若是發現當前插件未實現當前接口,則報錯 if !reflect.TypeOf(pg).Implements(pluginType) { return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name()) } if set.Has(ep.Name) { return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name()) } set.Insert(ep.Name) // 追加插件到slice中,並保存指針指向 newPlugins := reflect.Append(plugins, reflect.ValueOf(pg)) plugins.Set(newPlugins) } return nil }
CycleState主要是負責調度流程中數據的保存和克隆,其對外暴露了讀寫鎖接口,各擴展點插件能夠根據需求獨立進行加鎖選擇
CycleState實現並複雜主要保存StateData數據,只須要實現一個clone接口便可,CycleState裏面的數據,能夠被當前framework全部的插件進行數據增長和修改,裏面會經過讀寫鎖來保證線程安全,但並不會針對插件進行限制,即信任全部插件,能夠任意進行增刪
type CycleState struct { mx sync.RWMutex storage map[StateKey]StateData // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. recordPluginMetrics bool } // StateData is a generic type for arbitrary data stored in CycleState. type StateData interface { // Clone is an interface to make a copy of StateData. For performance reasons, // clone should make shallow copies for members (e.g., slices or maps) that are not // impacted by PreFilter's optional AddPod/RemovePod methods. Clone() StateData }
對外接口的實現,須要對應的插件主動選擇進行加讀鎖或者加寫鎖,而後進行相關數據的讀取和修改
func (c *CycleState) Read(key StateKey) (StateData, error) { if v, ok := c.storage[key]; ok { return v, nil } return nil, errors.New(NotFound) } // Write stores the given "val" in CycleState with the given "key". // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func (c *CycleState) Write(key StateKey, val StateData) { c.storage[key] = val } // Delete deletes data with the given key from CycleState. // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func (c *CycleState) Delete(key StateKey) { delete(c.storage, key) } // Lock acquires CycleState lock. func (c *CycleState) Lock() { c.mx.Lock() } // Unlock releases CycleState lock. func (c *CycleState) Unlock() { c.mx.Unlock() } // RLock acquires CycleState read lock. func (c *CycleState) RLock() { c.mx.RLock() } // RUnlock releases CycleState read lock. func (c *CycleState) RUnlock() { c.mx.RUnlock() }
waitingPodMap主要是存儲Permit階段插件設置的須要Wait等待的pod,即時通過以前的優選後,這裏面的pod也可能會被某些插件給拒絕掉
waitingPodsMAp其內部經過pod的uid保存一個map映射,同時經過讀寫鎖來進行數據保護
type waitingPodsMap struct { pods map[types.UID]WaitingPod mu sync.RWMutex }
waitingPod則是一個具體的pod的等待實例,其內部經過pendingPlugins保存插件的定義的 timer等待時間,對外經過chan *status來接受當前pod的狀態,並經過讀寫鎖來進行串行化
type waitingPod struct { pod *v1.Pod pendingPlugins map[string]*time.Timer s chan *Status mu sync.RWMutex }
會根據每一個plugin的wait等待時間構建N個timer, 若是任一的timer到期,則就拒絕
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod { wp := &waitingPod{ pod: pod, s: make(chan *Status), } wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) // The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the // lock here so that time.AfterFunc can only execute after newWaitingPod finishes. wp.mu.Lock() defer wp.mu.Unlock() // 根據插件的等待時間來構建timer,若是有任一timer到期,還不曾有任何plugin Allow則會進行Rejectj㐇 for k, v := range pluginsMaxWaitTime { plugin, waitTime := k, v wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() { msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v", waitTime, plugin) wp.Reject(msg) }) } return wp }
任一一個plugin的定時器到期,或者plugin主動發起reject操做,則都會暫停全部的定時器,並進行消息廣播
func (w *waitingPod) Reject(msg string) bool { w.mu.RLock() defer w.mu.RUnlock() // 中止全部的timer for _, timer := range w.pendingPlugins { timer.Stop() } // 經過管道發送拒絕事件 select { case w.s <- NewStatus(Unschedulable, msg): return true default: return false } }
容許操做必須等待全部的plugin都Allow後,才能發送容許事件
func (w *waitingPod) Allow(pluginName string) bool { w.mu.Lock() defer w.mu.Unlock() if timer, exist := w.pendingPlugins[pluginName]; exist { // 中止當前plugin的定時器 timer.Stop() delete(w.pendingPlugins, pluginName) } // Only signal success status after all plugins have allowed if len(w.pendingPlugins) != 0 { return true } // 只有當全部的plugin都容許,纔會發生成功容許事件 select { case w.s <- NewStatus(Success, ""): // 發送事件 return true default: return false } }
首先會遍歷全部的插件,而後若是發現狀態設置爲Wait,則會根據插件的等待時間進行wait操做
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) }() pluginsWaitTime := make(map[string]time.Duration) statusCode := Success for _, pl := range f.permitPlugins { status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message()) klog.V(4).Infof(msg) return NewStatus(status.Code(), msg) } if status.Code() == Wait { // Not allowed to be greater than maxTimeout. if timeout > maxTimeout { timeout = maxTimeout } // 記錄當前plugin的等待時間 pluginsWaitTime[pl.Name()] = timeout statusCode = Wait } else { msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) return NewStatus(Error, msg) } } } // We now wait for the minimum duration if at least one plugin asked to // wait (and no plugin rejected the pod) if statusCode == Wait { startTime := time.Now() // 根據插件等待時間構建waitingPod w := newWaitingPod(pod, pluginsWaitTime) // 加入到waitingPods中 f.waitingPods.add(w) // 移除 defer f.waitingPods.remove(pod.UID) klog.V(4).Infof("waiting for pod %q at permit", pod.Name) // 等待狀態消息 s := <-w.s metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime)) if !s.IsSuccess() { if s.IsUnschedulable() { msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message()) klog.V(4).Infof(msg) return NewStatus(s.Code(), msg) } msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message()) klog.Error(msg) return NewStatus(Error, msg) } } return nil }
上面已經將插件進行註冊,而且介紹了調度流程中數據的保存和等待機制的實現,其實剩下的就是每類插件執行調用的具體實現了,除了優選階段,其實剩下的階段,都是幾乎沒有什麼邏輯處理了,而優選階段就跟以前系列分享裏面的優選階段的設計相似,這裏也不在進行贅述了
流程看起來都蠻簡單的,注意這個地方有任一一個插件拒絕,則就會直接調度失敗
func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) { startTime := time.Now() defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) }() for _, pl := range f.preFilterPlugins { status = f.runPreFilterPlugin(ctx, pl, state, pod) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message()) klog.V(4).Infof(msg) return NewStatus(status.Code(), msg) } msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) return NewStatus(Error, msg) } } return nil }
跟以前的相似,只不過會根據runAllFilters參數肯定是否要運行全部的插件,默認是不運行,由於已經失敗了了嘛
unc (f *framework) RunFilterPlugins( ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) PluginToStatus { var firstFailedStatus *Status startTime := time.Now() defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, firstFailedStatus.Code().String()).Observe(metrics.SinceInSeconds(startTime)) }() statuses := make(PluginToStatus) for _, pl := range f.filterPlugins { pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) if len(statuses) == 0 { firstFailedStatus = pluginStatus } if !pluginStatus.IsSuccess() { if !pluginStatus.IsUnschedulable() { // Filter plugins are not supposed to return any status other than // Success or Unschedulable. firstFailedStatus = NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message())) return map[string]*Status{pl.Name(): firstFailedStatus} } statuses[pl.Name()] = pluginStatus if !f.runAllFilters { // 不須要運行全部插件進行退出 return statuses } } } return statuses }
今天就到這裏吧,調度器修改仍是蠻大的,可是能夠預見的是,爲了更多的調度插件可能都會集中到framework中,對kubernetes scheduler系列的學習,也算是告一段落了,做爲一個kubernetes新手學習起來仍是有點費勁,還好調度器設計的跟其餘模塊的耦合性相對小一點
> 微信號:baxiaoshi2020 歡迎一塊兒交流學習分享,有個小羣歡迎大佬光臨 > 我的博客: www.sreguide.com
> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章
> 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈