緩存和發佈訂閱都是後端開發中經常使用的手段,其中緩存主要是用於可丟失數據的暫存,發佈訂閱主要是用於消息傳遞,今天給你們介紹一個k8s中帶有發佈訂閱的緩存實現,其目標是給定一個時間,只關注該時間後續的事件,主要是用於近實時狀態數據的獲取後端
在k8s中的kubelet中支持不一樣的容器運行時,爲了緩存容器運行時當前全部可見的Pod/Container就構造了一個Cache結構,當一個事件發生後,kubelet接收到事件後,此時須要獲取當前Pod的狀態,此時要獲取的狀態,就必需要求是在事件產生後的最新的狀態,而不能是以前的狀態,緩存
狀態數據主要是存儲一個pod的狀態數據微信
type data struct { // 存儲Pod的狀態 status *PodStatus // 試圖檢測Pod狀態出錯信息 err error // 上次數據的修改時間 modified time.Time }
訂閱記錄其實指的是一個訂閱需求,其經過一個chan來進行數據通知,其中time字段是過濾條件,即只有時間大於time的記錄才容許被加入到chan中數據結構
type subRecord struct { time time.Time ch chan *data }
cache裏面的數據在kubelet每次進行PLEG更新的時候,都會更新timestamp,而且會從新獲取最新的Pod狀態進行填充cache,因此這裏會更新timestamp,寓意着讓以前舊的狀態都過時,而且會針對舊的訂閱的進行數據的返回app
// cache implements Cache. type cache struct { // 讀寫鎖 lock sync.RWMutex // 存儲Pod的狀態數據,用於知足不帶時間戳的狀態獲取 pods map[types.UID]*data // 全局時間戳,即當前緩存中的數據,至少都要比該時間戳新 timestamp *time.Time //存儲對應Pod的定語記錄列表 subscribers map[types.UID][]*subRecord }
普通狀態獲取即直接經過Map來進行數據的返回ide
func (c *cache) Get(id types.UID) (*PodStatus, error) { c.lock.RLock() defer c.lock.RUnlock() d := c.get(id) return d.status, d.err }
當發現當前的cahce中並不存在對應的數據,則是直接根據ID來生成一個默認的狀態數據函數
func (c *cache) get(id types.UID) *data { d, ok := c.pods[id] if !ok { return makeDefaultData(id) } return d } // 默認狀態構造器 func makeDefaultData(id types.UID) *data { return &data{status: &PodStatus{ID: id}, err: nil} }
會給定一個時間戳,只有噹噹前緩存的數據的時間在該時間戳以後,纔有效,不然返回nil,這裏有個關鍵點就是timestamp的相關設計,由於在每一個PLEG週期中,都會更新timestamp源碼分析
若是minTime<globaltimestamp, 則意味着在已經有新一輪的更新,而你這個事件仍是上一輪的事件,則可能就是事件的處理太慢,此時就會將以前緩存的狀態,直接返回,由於下一輪頗有可能會有新的事件到來 go func (c *cache) getifnewerthan(id types.uid, mintime time.time) *data { 獲取當前的狀態 d, ok :="range" 若是全局時間戳大於給定的時間,則會直接返回 globaltimestampisnewer !="nil" && c.timestamp.after(mintime)) if !ok 狀態沒有緩存,可是全局時間比最小時間新,就直接返回 return makedefaultdata(id) } 若是以前數據的時間在獲取時間以後,或者全局時間已經更新 (d.modified.after(mintime) || globaltimestampisnewer) d the pod status is not ready. nil
### 2.2.6 訂閱狀態管道構造 訂閱管道最終會返回一個狀態的管道,同時會進行檢查,若是發現當前有可用數據,則會直接丟進管道中,不然則建立一個subrecords訂閱記錄,並保存 subscribe(id timestamp chan ch *data, 1) c.lock.lock() defer c.lock.unlock() 獲取狀態數據 timestamp) 若是已經有狀態數據,則當即返回 <- 不然添加一個訂閱記錄到subscribers中對應的列表中 c.subscribers[id]="append(c.subscribers[id]," &subrecord{time: timestamp, ch: ch}) 2.2.7 通知清理過時管道 通知的時候回根據subrecord的訂閱時間進行檢測,若是訂閱時間已經超過當前的 timestamp則直接獲取數據進行返回,最後只會保留那些還未過時的訂閱記錄 notify(id 獲取事件的id列表 list, no one to notify. newlist 遍歷全部的訂閱記錄subrecords for i, r list 若是這些訂閱記錄的時間在timestamp以前,就不進行操做, 即當前管道時間>timestamp if timestamp.Before(r.time) { newList = append(newList, list[i]) continue } // 獲取一個數據返回, 同時關閉管道 r.ch <- c.get(id) close(r.ch) } if len(newList) == 0 { // 若是不存在訂閱記錄,則就刪除對應的key delete(c.subscribers, id) } else { // 剩餘的訂閱列表 c.subscribers[id] = newList } }學習
### 2.2.8 全局時間戳更新 全局時間戳更新,則會遍歷全部的訂閱,以最新的全局時間戳做爲時間,進行通知 ```go func (c *cache) UpdateTime(timestamp time.Time) { c.lock.Lock() defer c.lock.Unlock() c.timestamp = &timestamp // Notify all the subscribers if the condition is met. for id := range c.subscribers { c.notify(id, *c.timestamp) } }
更新的時候,則會調用notify來進行通知ui
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) { c.lock.Lock() defer c.lock.Unlock() // 進行事件的通知 defer c.notify(id, timestamp) // 保存最新的狀態數據 c.pods[id] = &data{status: status, err: err, modified: timestamp} }
今天就到這裏,這些數據結構和設計有不少值得學習地方,但願你們能多多交流,一塊兒學習雲原生相關的設計與關鍵實現
> 微信號:baxiaoshi2020 > 關注公告號閱讀更多源碼分析文章 > 更多文章關注 www.sreguide.com > 本文由博客一文多發平臺 OpenWrite 發佈</globaltimestamp,>