緩存和發佈訂閱都是後端開發中經常使用的手段,其中緩存主要是用於可丟失數據的暫存,發佈訂閱主要是用於消息傳遞,今天給你們介紹一個k8s中帶有發佈訂閱的緩存實現,其目標是給定一個時間,只關注該時間後續的事件,主要是用於近實時狀態數據的獲取後端
在k8s中的kubelet中支持不一樣的容器運行時,爲了緩存容器運行時當前全部可見的Pod/Container就構造了一個Cache結構,當一個事件發生後,kubelet接收到事件後,此時須要獲取當前Pod的狀態,此時要獲取的狀態,就必需要求是在事件產生後的最新的狀態,而不能是以前的狀態,緩存
狀態數據主要是存儲一個pod的狀態數據微信
type data struct {
// 存儲Pod的狀態
status *PodStatus
// 試圖檢測Pod狀態出錯信息
err error
// 上次數據的修改時間
modified time.Time
}
複製代碼
訂閱記錄其實指的是一個訂閱需求,其經過一個chan來進行數據通知,其中time字段是過濾條件,即只有時間大於time的記錄才容許被加入到chan中數據結構
type subRecord struct {
time time.Time
ch chan *data
}複製代碼
cache裏面的數據在kubelet每次進行PLEG更新的時候,都會更新timestamp,而且會從新獲取最新的Pod狀態進行填充cache,因此這裏會更新timestamp,寓意着讓以前舊的狀態都過時,而且會針對舊的訂閱的進行數據的返回app
// cache implements Cache.
type cache struct {
// 讀寫鎖
lock sync.RWMutex
// 存儲Pod的狀態數據,用於知足不帶時間戳的狀態獲取
pods map[types.UID]*data
// 全局時間戳,即當前緩存中的數據,至少都要比該時間戳新
timestamp *time.Time
//存儲對應Pod的定語記錄列表
subscribers map[types.UID][]*subRecord
}複製代碼
普通狀態獲取即直接經過Map來進行數據的返回ide
func (c *cache) Get(id types.UID) (*PodStatus, error) {
c.lock.RLock()
defer c.lock.RUnlock()
d := c.get(id)
return d.status, d.err
}
複製代碼
當發現當前的cahce中並不存在對應的數據,則是直接根據ID來生成一個默認的狀態數據函數
func (c *cache) get(id types.UID) *data {
d, ok := c.pods[id]
if !ok {
return makeDefaultData(id)
}
return d
}
// 默認狀態構造器
func makeDefaultData(id types.UID) *data {
return &data{status: &PodStatus{ID: id}, err: nil}
}複製代碼
會給定一個時間戳,只有噹噹前緩存的數據的時間在該時間戳以後,纔有效,不然返回nil,這裏有個關鍵點就是timestamp的相關設計,由於在每一個PLEG週期中,都會更新timestamp源碼分析
若是minTime
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
// 獲取當前的狀態
d, ok := c.pods[id]
// 若是全局時間戳大於給定的時間,則會直接返回
globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
if !ok && globalTimestampIsNewer {
// 狀態沒有緩存,可是全局時間比最小時間新,就直接返回
return makeDefaultData(id)
}
// 若是以前數據的時間在獲取時間以後,或者全局時間已經更新
if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
return d
}
// The pod status is not ready.
return nil
}複製代碼
訂閱管道最終會返回一個狀態的管道,同時會進行檢查,若是發現當前有可用數據,則會直接丟進管道中,不然則建立一個subRecords訂閱記錄,並保存ui
func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
ch := make(chan *data, 1)
c.lock.Lock()
defer c.lock.Unlock()
// 獲取狀態數據
d := c.getIfNewerThan(id, timestamp)
if d != nil {
// 若是已經有狀態數據,則當即返回
ch <- d
return ch
}
// 不然添加一個訂閱記錄到subscribers中對應的列表中
c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
return ch
}複製代碼
通知的時候回根據subRecord的訂閱時間進行檢測,若是訂閱時間已經超過當前的 timestamp則直接獲取數據進行返回,最後只會保留那些還未過時的訂閱記錄
func (c *cache) notify(id types.UID, timestamp time.Time) {
// 獲取事件的ID列表
list, ok := c.subscribers[id]
if !ok {
// No one to notify.
return
}
newList := []*subRecord{}
// 遍歷全部的訂閱記錄subRecords
for i, r := range list {
// 若是這些訂閱記錄的時間在timestamp以前,就不進行操做, 即當前管道時間>timestamp
if timestamp.Before(r.time) {
newList = append(newList, list[i])
continue
}
// 獲取一個數據返回, 同時關閉管道
r.ch <- c.get(id)
close(r.ch)
}
if len(newList) == 0 {
// 若是不存在訂閱記錄,則就刪除對應的key
delete(c.subscribers, id)
} else {
// 剩餘的訂閱列表
c.subscribers[id] = newList
}
}複製代碼
全局時間戳更新,則會遍歷全部的訂閱,以最新的全局時間戳做爲時間,進行通知
func (c *cache) UpdateTime(timestamp time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
c.timestamp = ×tamp
// Notify all the subscribers if the condition is met.
for id := range c.subscribers {
c.notify(id, *c.timestamp)
}
}
複製代碼
更新的時候,則會調用notify來進行通知
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
// 進行事件的通知
defer c.notify(id, timestamp)
// 保存最新的狀態數據
c.pods[id] = &data{status: status, err: err, modified: timestamp}
}複製代碼
今天就到這裏,這些數據結構和設計有不少值得學習地方,但願你們能多多交流,一塊兒學習雲原生相關的設計與關鍵實現
微信號:baxiaoshi2020
關注公告號閱讀更多源碼分析文章
更多文章關注 www.sreguide.com
本文由博客一文多發平臺 OpenWrite 發佈