Kubernetes DeltaFIFO數據結構及Reflector同步機制源碼深刻剖析-Kubernetes商業環境實戰

專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。 api

1 DeltaFIFO數據結構(僅追加變化數據)

  • Delta其實就是kubernetes系統中對象的變化(增、刪、改、同步),FIFO比較好理解,是一個先入先出的隊列,那麼DeltaFIFO就是一個按序的(先入先出)kubernetes對象變化的隊列。
  • DeltaFIFO內部一直追加變化數據,例如:(DeltaFIFO.Replace)同步一次,就向DeltaFIFO 是全量插入sync變量Delta。
  • 刪除合併操做發生在最近兩次更新的操做中,僅取其中之一。dedupDeltas()就是這種貨色。
  • queueActionLocked就是DeltaFIFO的靈魂,任何的風吹草動(如:增、刪、改、同步),都會放進items map[string]Deltas中。
items解釋:
- items map[string] Deltas
- type Deltas []Delta  // Delta數組
- f.items表示Map集合
- f.items[id] 表示某一個key(資源對象)對應的數組集合,即:資源操做變化數組

// 代碼源自client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
    lock sync.RWMutex             // 讀寫鎖,由於涉及到同時讀寫,讀寫鎖性能要高
    cond sync.Cond                // 給Pop()接口使用,在沒有對象的時候能夠阻塞,內部鎖複用讀寫鎖
    items map[string]Deltas       // 這個應該是Store的本質了,按照kv的方式存儲對象,可是存儲的是對象的Deltas數組
    queue []string                // 這個是爲先入先出實現的,存儲的就是對象的鍵
    populated bool                // 經過Replace()接口將第一批對象放入隊列,或者第一次調用增、刪、改接口時標記爲true
    initialPopulationCount int    // 經過Replace()接口將第一批對象放入隊列的對象數量
    keyFunc KeyFunc               // 對象鍵計算函數,在Indexer那篇文章介紹過
    knownObjects KeyListerGetter  // 前面介紹就是爲了這是用,該對象指向的就是Indexer,
    closed     bool               // 是否已經關閉的標記
    closedLock sync.Mutex         // 專爲關閉設計的所,爲何不復用讀寫鎖?
}
複製代碼
  • 通常是先追加,生成新變化數組,而後更新DeltaFIFO.items集合,相似:id:[add:obj1,update:obj2,delete:obj3]
id, err := f.KeyOf(obj)  //獲得obj對應的Map的key
    newDeltas := append(f.items[id], Delta{actionType, obj})  //追加,生成新數組
    f.items[id] = newDeltas   //更新DeltaFIFO.items集合
複製代碼
  • queueActionLocked最終會解決資源變化追加的問題,代碼以下:
// 代碼源自client-go/tools/cache/delta_fifo.go
// 從函數名稱來看把「動做」放入隊列中,這個動做就是DeltaType,並且已經加鎖了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // 前面提到的計算對象鍵的函數
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    // 若是是同步,而且對象將來會被刪除,那麼就直接返回,不必記錄這個動做了
    // 確定有人會問爲何Add/Delete/Update這些動做能夠,由於同步對於已經刪除的對象是沒有意義的
    // 已經刪除的對象後續跟添加、更新有可能,由於同名的對象又被添加了,刪除也是有可能
    // 刪除有些複雜,後面會有說明
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }
    // 同一個對象的屢次操做,因此要追加到Deltas數組中
    newDeltas := append(f.items[id], Delta{actionType, obj})
    // 合併操做,去掉冗餘的delta
    newDeltas = dedupDeltas(newDeltas)
    // 判斷對象是否已經存在
    _, exists := f.items[id]
    // 合併後操做有可能變成沒有Delta麼?後面的代碼分析來看應該不會,因此暫時不知道這個判斷目的
    if len(newDeltas) > 0 {
        // 若是對象沒有存在過,那就放入隊列中,若是存在說明已經在queue中了,也就不必再添加了
        if !exists {
            f.queue = append(f.queue, id)
        }
        // 更新Deltas數組,通知全部調用Pop()的人
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else if exists {
        // 直接把對象刪除,這段代碼我不知道什麼條件會進來,由於dedupDeltas()確定有返回結果的
        // 後面會有dedupDeltas()詳細說明
        delete(f.items, id)
    }
    return nil
}
複製代碼
  • dedupDeltas 主要解決連續兩個刪除操做的變化的合併,id:[add:obj1,update:obj2,delete:obj3,delete:obj3],該函數會去掉最後一個,返回一個完整的f.items[id]: id:[add:obj1,update:obj2,delete:obj3]。最終對應的DeltaFIFO.items就是:Map( DeltaFIFO。key->id:[add:obj1,update:obj2,delete:obj3])
// 代碼源自client-go/tools/cache/delta_fifo.go
func dedupDeltas(deltas Deltas) Deltas {
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    // 取出最後兩個
    a := &deltas[n-1]
    b := &deltas[n-2]
    // 判斷若是是重複的,那就刪除這兩個delta把合併後的追加到Deltas數組尾部
    if out := isDup(a, b); out != nil {
        d := append(Deltas{}, deltas[:n-2]...)
        return append(d, *out)
    }
    return deltas
}
// 判斷兩個Delta是不是重複的
func isDup(a, b *Delta) *Delta {
    // 只有一個判斷,只能判斷是否爲刪除類操做,和咱們上面的判斷相同
    // 這個函數的本意應該還能夠判斷多種類型的重複,當前來看只能有刪除這一種可以合併
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
	
    return nil
}
// 判斷是否爲刪除類的重複
func isDeletionDup(a, b *Delta) *Delta {
    // 兩者都是刪除那確定有一個是重複的
    if b.Type != Deleted || a.Type != Deleted {
        return nil
    }
    // 理論上返回最後一個比較好,可是對象已經再也不繫統監控範圍,前一個刪除狀態是好的
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a
    }
    return b
}
複製代碼

2 DeltaFIFO.replace()(全量插入sync類型變量,並作Delete檢測)

  • Reflector經過List(),拿到全量變化,而後調用Replace, 而後每個item執行:if err := f.queueActionLocked(Sync, item)
  • replace操做仍是追加變化量,相似於kafka的Append日誌。
  • replace向DeltaFIFO 是全量插入sync變量,相似:append(f.items[id], Delta{Sync, obj}),並作Delete檢測,以追加if err := f.queueActionLocked(Deleted, item)
// 代碼源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))
    // 遍歷全部的輸入目標
    for _, item := range list {
        // 計算目標鍵
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        // 記錄處理過的目標鍵,採用set存儲,是爲了後續快速查找
        keys.Insert(key)
        // 由於輸入是目標全量,因此每一個目標至關於從新同步了一次
        if err := f.queueActionLocked(Sync, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }
    // 若是沒有存儲的話,本身存儲的就是全部的老對象,目的要看看那些老對象不在全量集合中,那麼就是刪除的對象了
    if f.knownObjects == nil {
        // 遍歷全部的元素
        for k, oldItem := range f.items {
            // 這個目標在輸入的對象中存在就能夠忽略
            if keys.Has(k) {
                continue
            }
            // 輸入對象中沒有,說明對象已經被刪除了。
            var deletedObj interface{}
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            // 終於看到哪裏用到DeletedFinalStateUnknown了,隊列中存儲對象的Deltas數組中
            // 可能已經存在Delete了,避免重複,採用DeletedFinalStateUnknown這種類型
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }
        
        // 若是populated尚未設置,說明是第一次而且尚未任何修改操做執行過
        if !f.populated {
            f.populated = true
            f.initialPopulationCount = len(list)  // 記錄第一次經過來的對象數量
        }
 
        return nil
    }
    // 下面處理的就是檢測某些目標刪除可是Delta沒有在隊列中
    // 從存儲中獲取全部對象鍵
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        // 對象還存在那就忽略
        if keys.Has(k) {
            continue
        }
        // 獲取對象
        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        // 累積刪除的對象數量
        queuedDeletions++
        // 把對象刪除的Delta放入隊列
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }    
    }
    // 和上面的代碼差很少,只是計算initialPopulationCount值的時候增長了刪除對象的數量
    if !f.populated {
        f.populated = true
        f.initialPopulationCount = len(list) + queuedDeletions
    }
 
    return nil
}
複製代碼

3 何爲同步?

period       time.Duration     // 反射器在List和Watch的時候理論上是死循環,只有出現錯誤纔會退出
                                  這個變量用在出錯後多長時間再執行List和Watch,默認值是1秒鐘
resyncPeriod time.Duration     // 從新同步的週期,不少人確定認爲這個同步週期指的是從apiserver的同步週期
                                  其實這裏面同步指的是shared_informer使用者須要按期同步全量對象
複製代碼
  • 週期同步,採用period,從apiserver的進行同步
  • 使用者須要按期同步全量對象
  • 這裏發現是從indexr中,來進行數據同步的,knownObjects

4 knownObjects是誰(index)

  • 發現Reflector就是一個經過NEW封裝的Controller,完成資源變化監聽,並放進DeltaFIFO和Index隊列中。
  • knownObjects是誰也一目瞭然,就是最終完成檢索

5 HandleDeltas打通任督二脈

  • sharedIndexInformer.run -> controler.run-> wait.Until(c.processLoop, time.Second, stopCh)->obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))->Process: s.HandleDeltas ->更新Index,併發送通知Controller 來回調處理
  • sharedIndexInformer是什麼?就是包含了兩個重要步驟:1:Reflector來實現變化更新到DeltaFIFO,注意Reflector自己初始化是在Controller內部,2:Controller來實現更新Index,併發送通知Controller來處理,這個Controller也是(s.controller = New(cfg))。
  • sharedIndexInformer是什麼?其實就是包含上述兩個步驟,而最終呈現的就是s.controller.Run(stopCh)。
  • sharedIndexInformer是什麼?其實就是對controller的配置,初始化了Config。該Config不只用來初始化Reflector,也用來初始化controller。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners }() //初始化Reflector,更新Index, 併發送通知給Controller來回調處理 s.controller.Run(stopCh) } 複製代碼
  • 完成index的增刪改操做,也即最終的變量同步。

6 sharedIndexInformer是什麼?

  • sharedIndexInformer就是一個包裝,初始化Reflector,也用來初始化controller。最終呈現給你們的就是controller.run。
  • sharedIndexInformer和controller是一一對應的。

7 總結

本文綜合分析了Kubernetes 大量源碼,試圖從較高的視野來看問題,猛看錶,一天時間就過去了。辛苦成文,各自珍惜,謝謝!數組

專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。 bash

相關文章
相關標籤/搜索