SchedulerCache是kubernetes scheduler中負責本地數據緩存的核心數據結構, 其實現了Cache接口,負責存儲從apiserver獲取的數據,提供給Scheduler調度器獲取Node的信息,而後由調度算法的決策pod的最終node節點,其中Snapshot和節點打散算法很是值得借鑑node
SchedulerCache的數據從apiserver經過網絡感知,其數據的同步一致性主要是經過kubernetes中的Reflector組件來負責保證,SchedulerCache自己就是一個單純數據的存儲算法
當scheduler獲取一個待調度的pod,則須要從Cache中獲取當前集羣中的快照數據(當前此時集羣中node的統計信息), 用於後續調度流程中使用api
節點打散主要是指的調度器調度的時候,在知足調度需求的狀況下,爲了保證pod均勻分配到全部的node節點上,一般會按照逐個zone逐個node節點進行分配,從而讓pod節點打散在整個集羣中緩存
Scheduler進行完成調度流程的決策以後,爲pod選擇了一個node節點,此時還未進行後續的Bind操做,但實際上資源已經分配給該pod, 此時會先更新到本地緩存(),而後再等待apiserver進行數據的廣播而且最終被kubelet來進行實際的調度安全
但若是由於某些緣由致使pod後續的事件都沒有被監聽到,則須要將對應的pod資源進行刪除,並刪除對node資源的佔用微信
在scheduler cache中pod會一個內部的狀態機:initial、Assumed、Expired、Added、Delete,實際上全部的操做都是圍繞着該狀態機在進行,狀態以下: Initial: 初始化完成從apiserver監聽到(也多是監聽到一個已經完成分配的pod) Assumed: 在scheduler中完成分配最終完成bind操做的pod(未實際分配) Added: 首先監聽到事件多是一個已經完成實際調度的pod(即從initial到Added),其次多是通過調度決策後,被實際調度(從Assumed到Added),最後則是後續pod的更新(Update), Added語義上其實就是往Cache中添加一個Pod狀態 Deleted: 某個pod被監聽到刪除事件,只有被Added過的數據才能夠被Deleted Expired: Assumed pod通過一段時間後沒有感知到真正的分配事件被刪除網絡
type schedulerCache struct { stop <-chan struct{} ttl time.Duration period time.Duration // 保證數據的安全 mu sync.RWMutex // 存儲假定pod的信息集合,通過scheduler調度後假定pod被調度到某些節點,進行本地臨時存儲 // 主要是爲了進行node資源的佔用,能夠經過key在podStats查找到假定的pod信息 assumedPods map[string]bool // pod的狀態 podStates map[string]*podState // 存儲node的映射 nodes map[string]*nodeInfoListItem csiNodes map[string]*storagev1beta1.CSINode // node信息的鏈表,按照最近更新時間來進行鏈接 headNode *nodeInfoListItem // 存儲node、zone的映射信息 nodeTree *NodeTree // 鏡像信息 imageStates map[string]*imageState }
Snapshot數據結構主要負責存儲當前集羣中的node信息,而且經過Generation記錄當前更新的最後一個週期數據結構
type Snapshot struct { NodeInfoMap map[string]*NodeInfo Generation int64 }
建立主要位於kubernetes/pkg/scheduler/core/generic_scheduler.go,實際上就是建立一個空的snapshot對象app
nodeInfoSnapshot: framework.NodeInfoSnapshot(),
數據的更新則是經過snapshot方法來調用Cache的更新接口來進行更新ide
func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot) }
隨着集羣中node和pod的數量的增長,若是每次都全量獲取snapshot則會嚴重影響調度器的調度效率,在Cache中經過一個雙向鏈表和node的遞增計數(etcd實現)來實現增量更新
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error { cache.mu.Lock() defer cache.mu.Unlock() balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) // 獲取當前snapshot的Genration snapshotGeneration := nodeSnapshot.Generation // 遍歷雙向鏈表,更新snapshot信息 for node := cache.headNode; node != nil; node = node.next { if node.info.GetGeneration() <= snapshotGeneration { //全部node信息都更新完畢 break } if balancedVolumesEnabled && node.info.TransientInfo != nil { // Transient scheduler info is reset here. node.info.TransientInfo.ResetTransientSchedulerInfo() } if np := node.info.Node(); np != nil { nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() } } // 更新snapshot的genration if cache.headNode != nil { nodeSnapshot.Generation = cache.headNode.info.GetGeneration() } // 若是snapshot裏面包含過時的pod信息則進行清理工做 if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { for name := range nodeSnapshot.NodeInfoMap { if _, ok := cache.nodes[name]; !ok { delete(nodeSnapshot.NodeInfoMap, name) } } } return nil }
nodeTree主要負責節點的打散,用於讓pod均勻分配在多個zone中的node節點上
type NodeTree struct { tree map[string]*nodeArray // 存儲zone和zone下面的node信息 zones []string // 存儲zones zoneIndex int numNodes int mu sync.RWMutex }
其中zones和zoneIndex主要用於後面的節點打散算法使用,實現按zone逐個分配
nodeArray負責存儲一個zone下面的全部node節點,而且經過lastIndex記錄當前zone分配的節點索引
type nodeArray struct { nodes []string lastIndex int }
添加node其實很簡單,只須要獲取對應node的zone信息,而後加入對應zone的nodeArray中
func (nt *NodeTree) addNode(n *v1.Node) { // 獲取zone zone := utilnode.GetZoneKey(n) if na, ok := nt.tree[zone]; ok { for _, nodeName := range na.nodes { if nodeName == n.Name { klog.Warningf("node %q already exist in the NodeTree", n.Name) return } } // 吧節點加入到zone中 na.nodes = append(na.nodes, n.Name) } else { // 新加入zone nt.zones = append(nt.zones, zone) nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0} } klog.V(2).Infof("Added node %q in group %q to NodeTree", n.Name, zone) nt.numNodes++ }
數據打散算法很簡單,首先咱們存儲了zone和nodeArray的信息,而後咱們只須要經過兩個索引zoneIndex和nodeIndex就能夠實現節點的打散操做, 只有噹噹前集羣中全部zone裏面的全部節點都進行一輪分配後,而後重建分配索引
func (nt *NodeTree) Next() string { nt.mu.Lock() defer nt.mu.Unlock() if len(nt.zones) == 0 { return "" } // 記錄分配完全部node的zone的計數,用於進行狀態重置 // 好比有3個zone: 則當numExhaustedZones=3的時候,就會從新從頭開始進行分配 numExhaustedZones := 0 for { if nt.zoneIndex >= len(nt.zones) { nt.zoneIndex = 0 } // 按照zone索引來進行逐個zone分配 zone := nt.zones[nt.zoneIndex] nt.zoneIndex++ // 返回當前zone下面的next節點,若是exhausted爲True則代表當前zone全部的節點,在這一輪調度中都已經分配了一次 // 就須要從下個zone繼續獲取節點 nodeName, exhausted := nt.tree[zone].next() if exhausted { numExhaustedZones++ // 全部的zone下面的node都被分配了一次,這裏進行重置,從頭開始繼續分配 if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset. nt.resetExhausted() } } else { return nodeName } } }
重建索引則是將全部nodeArray的索引和當前zoneIndex進行歸零
func (nt *NodeTree) resetExhausted() {// 重置索引 for _, na := range nt.tree { na.lastIndex = 0 } nt.zoneIndex = 0 }
Cache要定時將以前在通過本地scheduler分配完成後的假設的pod的信息進行清理,若是這些pod在給定時間內仍然沒有感知到對應的pod真正的添加事件則就這些pod刪除
assumedPods map[string]bool
默認每30s進行清理一次
func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) }
清理邏輯主要是針對那些已經完成綁定的pod來進行,若是一個pod完成了在scheduler裏面的全部操做後,會有一個過時時間,當前是30s,若是超過該時間即deadline小於當前的時間就刪除該pod
// cleanupAssumedPods exists for making test deterministic by taking time as input argument. func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { cache.mu.Lock() defer cache.mu.Unlock() // The size of assumedPods should be small for key := range cache.assumedPods { ps, ok := cache.podStates[key] if !ok { panic("Key found in assumed set but not in podStates. Potentially a logical error.") } // 未完成綁定的pod不會被進行清理 if !ps.bindingFinished { klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.", ps.pod.Namespace, ps.pod.Name) continue } // 在完成bind以後會設定一個過時時間,目前是30s,若是deadline即bind時間+30s小於當前時間就過時刪除 if now.After(*ps.deadline) { klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name) if err := cache.expirePod(key, ps); err != nil { klog.Errorf("ExpirePod failed for %s: %v", key, err) } } } }
清理pod主要分爲以下幾個部分: 1.對應pod假定分配node的信息 2.清理映射的podState信息
func (cache *schedulerCache) expirePod(key string, ps *podState) error { if err := cache.removePod(ps.pod); err != nil { return err } delete(cache.assumedPods, key) delete(cache.podStates, key) return nil }
核心數據結構數據流如上所示,其核心是經過nodes、headNode實現一個Snapshot爲調度器提供當前系統資源的快照,並經過nodeTree進行node節點的打散,最後內部經過一個pod的狀態機來進行系統內部的pod資源狀態的轉換,並經過後臺的定時任務來保證通過通過Reflector獲取的數據的最終一致性(刪除那些通過bind的可是卻沒被實際調度或者事件丟失的pod), 藉助這些其實一個最基礎的工業級調度器的本地cache功能就實現了
> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章
> 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈