圖解kubernetes容器運行時狀態緩存數據結構

緩存和發佈訂閱都是後端開發中經常使用的手段,其中緩存主要是用於可丟失數據的暫存,發佈訂閱主要是用於消息傳遞,今天給你們介紹一個k8s中帶有發佈訂閱的緩存實現,其目標是給定一個時間,只關注該時間後續的事件,主要是用於近實時狀態數據的獲取後端

1. 業務背景

image.png在k8s中的kubelet中支持不一樣的容器運行時,爲了緩存容器運行時當前全部可見的Pod/Container就構造了一個Cache結構,當一個事件發生後,kubelet接收到事件後,此時須要獲取當前Pod的狀態,此時要獲取的狀態,就必需要求是在事件產生後的最新的狀態,而不能是以前的狀態,緩存

2. 核心實現

image.png

2.1 數據與訂閱記錄

2.1.1 狀態數據

狀態數據主要是存儲一個pod的狀態數據微信

type data struct {
    // 存儲Pod的狀態
    status *PodStatus
    // 試圖檢測Pod狀態出錯信息
    err error
    // 上次數據的修改時間
    modified time.Time
}
複製代碼

2.1.2 訂閱記錄

訂閱記錄其實指的是一個訂閱需求,其經過一個chan來進行數據通知,其中time字段是過濾條件,即只有時間大於time的記錄才容許被加入到chan中數據結構

type subRecord struct {
    time time.Time
    ch   chan *data
}複製代碼

2.2 Cache實現

2.2.1 核心成員結構

cache裏面的數據在kubelet每次進行PLEG更新的時候,都會更新timestamp,而且會從新獲取最新的Pod狀態進行填充cache,因此這裏會更新timestamp,寓意着讓以前舊的狀態都過時,而且會針對舊的訂閱的進行數據的返回app

// cache implements Cache.
type cache struct {
    // 讀寫鎖
    lock sync.RWMutex
    // 存儲Pod的狀態數據,用於知足不帶時間戳的狀態獲取
    pods map[types.UID]*data
    // 全局時間戳,即當前緩存中的數據,至少都要比該時間戳新
    timestamp *time.Time
    //存儲對應Pod的定語記錄列表
    subscribers map[types.UID][]*subRecord
}複製代碼

2.2.3 普通狀態數據獲取

普通狀態獲取即直接經過Map來進行數據的返回ide

func (c *cache) Get(id types.UID) (*PodStatus, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    d := c.get(id)
    return d.status, d.err
}

複製代碼

2.2.4 默認狀態構造器

當發現當前的cahce中並不存在對應的數據,則是直接根據ID來生成一個默認的狀態數據函數

func (c *cache) get(id types.UID) *data {
    d, ok := c.pods[id]
    if !ok {
        return makeDefaultData(id)
    }
    return d
}
// 默認狀態構造器
func makeDefaultData(id types.UID) *data {
    return &data{status: &PodStatus{ID: id}, err: nil}
}複製代碼

2.2.5 最新狀態數據獲取

會給定一個時間戳,只有噹噹前緩存的數據的時間在該時間戳以後,纔有效,不然返回nil,這裏有個關鍵點就是timestamp的相關設計,由於在每一個PLEG週期中,都會更新timestamp源碼分析

若是minTime 學習

func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
    // 獲取當前的狀態
    d, ok := c.pods[id]

    // 若是全局時間戳大於給定的時間,則會直接返回
    globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
    if !ok && globalTimestampIsNewer {
        // 狀態沒有緩存,可是全局時間比最小時間新,就直接返回
        return makeDefaultData(id)
    }
    // 若是以前數據的時間在獲取時間以後,或者全局時間已經更新
    if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
        return d
    }
    // The pod status is not ready.
    return nil
}複製代碼

2.2.6 訂閱狀態管道構造

訂閱管道最終會返回一個狀態的管道,同時會進行檢查,若是發現當前有可用數據,則會直接丟進管道中,不然則建立一個subRecords訂閱記錄,並保存ui

func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
    ch := make(chan *data, 1)
    c.lock.Lock()
    defer c.lock.Unlock()
    // 獲取狀態數據
    d := c.getIfNewerThan(id, timestamp)
    if d != nil {
        // 若是已經有狀態數據,則當即返回
        ch <- d
        return ch
    }
    // 不然添加一個訂閱記錄到subscribers中對應的列表中
    c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
    return ch
}複製代碼

2.2.7 通知清理過時管道

通知的時候回根據subRecord的訂閱時間進行檢測,若是訂閱時間已經超過當前的 timestamp則直接獲取數據進行返回,最後只會保留那些還未過時的訂閱記錄

func (c *cache) notify(id types.UID, timestamp time.Time) {
    // 獲取事件的ID列表
    list, ok := c.subscribers[id]
    if !ok {
        // No one to notify.
        return
    }
    newList := []*subRecord{}
    // 遍歷全部的訂閱記錄subRecords
    for i, r := range list {
        // 若是這些訂閱記錄的時間在timestamp以前,就不進行操做, 即當前管道時間>timestamp
        if timestamp.Before(r.time) {
            newList = append(newList, list[i])
            continue
        }
        // 獲取一個數據返回, 同時關閉管道
        r.ch <- c.get(id)
        close(r.ch)
    }
    if len(newList) == 0 {
        // 若是不存在訂閱記錄,則就刪除對應的key
        delete(c.subscribers, id)
    } else {
        // 剩餘的訂閱列表
        c.subscribers[id] = newList
    }
}複製代碼

2.2.8 全局時間戳更新

全局時間戳更新,則會遍歷全部的訂閱,以最新的全局時間戳做爲時間,進行通知

func (c *cache) UpdateTime(timestamp time.Time) {
    c.lock.Lock()
    defer c.lock.Unlock()
    c.timestamp = &timestamp
    // Notify all the subscribers if the condition is met.
    for id := range c.subscribers {
        c.notify(id, *c.timestamp)
    }
}
複製代碼

2.2.9 Pod事件更新通知函數

更新的時候,則會調用notify來進行通知

func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
    c.lock.Lock()
    defer c.lock.Unlock()
    // 進行事件的通知
    defer c.notify(id, timestamp)
    // 保存最新的狀態數據 
    c.pods[id] = &data{status: status, err: err, modified: timestamp}
}複製代碼

今天就到這裏,這些數據結構和設計有不少值得學習地方,但願你們能多多交流,一塊兒學習雲原生相關的設計與關鍵實現

微信號:baxiaoshi2020

關注公告號閱讀更多源碼分析文章 21天大棚

更多文章關注 www.sreguide.com

本文由博客一文多發平臺 OpenWrite 發佈

相關文章
相關標籤/搜索