圖解kubernetes容器運行時狀態緩存數據結構

緩存和發佈訂閱都是後端開發中經常使用的手段,其中緩存主要是用於可丟失數據的暫存,發佈訂閱主要是用於消息傳遞,今天給你們介紹一個k8s中帶有發佈訂閱的緩存實現,其目標是給定一個時間,只關注該時間後續的事件,主要是用於近實時狀態數據的獲取後端

1. 業務背景

image.png 在k8s中的kubelet中支持不一樣的容器運行時,爲了緩存容器運行時當前全部可見的Pod/Container就構造了一個Cache結構,當一個事件發生後,kubelet接收到事件後,此時須要獲取當前Pod的狀態,此時要獲取的狀態,就必需要求是在事件產生後的最新的狀態,而不能是以前的狀態,緩存

2. 核心實現

image.png

2.1 數據與訂閱記錄

2.1.1 狀態數據

狀態數據主要是存儲一個pod的狀態數據微信

type data struct {
	// 存儲Pod的狀態
	status *PodStatus
	// 試圖檢測Pod狀態出錯信息
	err error
	// 上次數據的修改時間
	modified time.Time
}

2.1.2 訂閱記錄

訂閱記錄其實指的是一個訂閱需求,其經過一個chan來進行數據通知,其中time字段是過濾條件,即只有時間大於time的記錄才容許被加入到chan中數據結構

type subRecord struct {
	time time.Time
	ch   chan *data
}

2.2 Cache實現

2.2.1 核心成員結構

cache裏面的數據在kubelet每次進行PLEG更新的時候,都會更新timestamp,而且會從新獲取最新的Pod狀態進行填充cache,因此這裏會更新timestamp,寓意着讓以前舊的狀態都過時,而且會針對舊的訂閱的進行數據的返回app

// cache implements Cache.
type cache struct {
	// 讀寫鎖
	lock sync.RWMutex
	// 存儲Pod的狀態數據,用於知足不帶時間戳的狀態獲取
	pods map[types.UID]*data
    // 全局時間戳,即當前緩存中的數據,至少都要比該時間戳新
	timestamp *time.Time
	//存儲對應Pod的定語記錄列表
	subscribers map[types.UID][]*subRecord
}

2.2.3 普通狀態數據獲取

普通狀態獲取即直接經過Map來進行數據的返回ide

func (c *cache) Get(id types.UID) (*PodStatus, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()
	d := c.get(id)
	return d.status, d.err
}

2.2.4 默認狀態構造器

當發現當前的cahce中並不存在對應的數據,則是直接根據ID來生成一個默認的狀態數據函數

func (c *cache) get(id types.UID) *data {
	d, ok := c.pods[id]
	if !ok {
		return makeDefaultData(id)
	}
	return d
}
// 默認狀態構造器
func makeDefaultData(id types.UID) *data {
	return &data{status: &PodStatus{ID: id}, err: nil}
}

2.2.5 最新狀態數據獲取

會給定一個時間戳,只有噹噹前緩存的數據的時間在該時間戳以後,纔有效,不然返回nil,這裏有個關鍵點就是timestamp的相關設計,由於在每一個PLEG週期中,都會更新timestamp源碼分析

若是minTime<globaltimestamp, 則意味着在已經有新一輪的更新,而你這個事件仍是上一輪的事件,則可能就是事件的處理太慢,此時就會將以前緩存的狀態,直接返回,由於下一輪頗有可能會有新的事件到來 go func (c *cache) getifnewerthan(id types.uid, mintime time.time) *data { 獲取當前的狀態 d, ok :="range" 若是全局時間戳大於給定的時間,則會直接返回 globaltimestampisnewer !="nil" && c.timestamp.after(mintime)) if !ok 狀態沒有緩存,可是全局時間比最小時間新,就直接返回 return makedefaultdata(id) } 若是以前數據的時間在獲取時間以後,或者全局時間已經更新 (d.modified.after(mintime) || globaltimestampisnewer) d the pod status is not ready. nil ### 2.2.6 訂閱狀態管道構造 訂閱管道最終會返回一個狀態的管道,同時會進行檢查,若是發現當前有可用數據,則會直接丟進管道中,不然則建立一個subrecords訂閱記錄,並保存 subscribe(id timestamp chan ch *data, 1) c.lock.lock() defer c.lock.unlock() 獲取狀態數據 timestamp) 若是已經有狀態數據,則當即返回 <- 不然添加一個訂閱記錄到subscribers中對應的列表中 c.subscribers[id]="append(c.subscribers[id]," &subrecord{time: timestamp, ch: ch}) 2.2.7 通知清理過時管道 通知的時候回根據subrecord的訂閱時間進行檢測,若是訂閱時間已經超過當前的 timestamp則直接獲取數據進行返回,最後只會保留那些還未過時的訂閱記錄 notify(id 獲取事件的id列表 list, no one to notify. newlist 遍歷全部的訂閱記錄subrecords for i, r list 若是這些訂閱記錄的時間在timestamp以前,就不進行操做, 即當前管道時間>timestamp if timestamp.Before(r.time) { newList = append(newList, list[i]) continue } // 獲取一個數據返回, 同時關閉管道 r.ch <- c.get(id) close(r.ch) } if len(newList) == 0 { // 若是不存在訂閱記錄,則就刪除對應的key delete(c.subscribers, id) } else { // 剩餘的訂閱列表 c.subscribers[id] = newList } }學習

### 2.2.8 全局時間戳更新
全局時間戳更新,則會遍歷全部的訂閱,以最新的全局時間戳做爲時間,進行通知
```go
func (c *cache) UpdateTime(timestamp time.Time) {
	c.lock.Lock()
	defer c.lock.Unlock()
	c.timestamp = &amp;timestamp
	// Notify all the subscribers if the condition is met.
	for id := range c.subscribers {
		c.notify(id, *c.timestamp)
	}
}

2.2.9 Pod事件更新通知函數

更新的時候,則會調用notify來進行通知ui

func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
	c.lock.Lock()
	defer c.lock.Unlock()
    // 進行事件的通知
	defer c.notify(id, timestamp)
    // 保存最新的狀態數據 
	c.pods[id] = &amp;data{status: status, err: err, modified: timestamp}
}

今天就到這裏,這些數據結構和設計有不少值得學習地方,但願你們能多多交流,一塊兒學習雲原生相關的設計與關鍵實現

> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 21天大棚 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈</globaltimestamp,>

相關文章
相關標籤/搜索