專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。 api
items解釋:
- items map[string] Deltas
- type Deltas []Delta // Delta數組
- f.items表示Map集合
- f.items[id] 表示某一個key(資源對象)對應的數組集合,即:資源操做變化數組
// 代碼源自client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex // 讀寫鎖,由於涉及到同時讀寫,讀寫鎖性能要高
cond sync.Cond // 給Pop()接口使用,在沒有對象的時候能夠阻塞,內部鎖複用讀寫鎖
items map[string]Deltas // 這個應該是Store的本質了,按照kv的方式存儲對象,可是存儲的是對象的Deltas數組
queue []string // 這個是爲先入先出實現的,存儲的就是對象的鍵
populated bool // 經過Replace()接口將第一批對象放入隊列,或者第一次調用增、刪、改接口時標記爲true
initialPopulationCount int // 經過Replace()接口將第一批對象放入隊列的對象數量
keyFunc KeyFunc // 對象鍵計算函數,在Indexer那篇文章介紹過
knownObjects KeyListerGetter // 前面介紹就是爲了這是用,該對象指向的就是Indexer,
closed bool // 是否已經關閉的標記
closedLock sync.Mutex // 專爲關閉設計的所,爲何不復用讀寫鎖?
}
複製代碼
id, err := f.KeyOf(obj) //獲得obj對應的Map的key
newDeltas := append(f.items[id], Delta{actionType, obj}) //追加,生成新數組
f.items[id] = newDeltas //更新DeltaFIFO.items集合
複製代碼
// 代碼源自client-go/tools/cache/delta_fifo.go
// 從函數名稱來看把「動做」放入隊列中,這個動做就是DeltaType,並且已經加鎖了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 前面提到的計算對象鍵的函數
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 若是是同步,而且對象將來會被刪除,那麼就直接返回,不必記錄這個動做了
// 確定有人會問爲何Add/Delete/Update這些動做能夠,由於同步對於已經刪除的對象是沒有意義的
// 已經刪除的對象後續跟添加、更新有可能,由於同名的對象又被添加了,刪除也是有可能
// 刪除有些複雜,後面會有說明
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
// 同一個對象的屢次操做,因此要追加到Deltas數組中
newDeltas := append(f.items[id], Delta{actionType, obj})
// 合併操做,去掉冗餘的delta
newDeltas = dedupDeltas(newDeltas)
// 判斷對象是否已經存在
_, exists := f.items[id]
// 合併後操做有可能變成沒有Delta麼?後面的代碼分析來看應該不會,因此暫時不知道這個判斷目的
if len(newDeltas) > 0 {
// 若是對象沒有存在過,那就放入隊列中,若是存在說明已經在queue中了,也就不必再添加了
if !exists {
f.queue = append(f.queue, id)
}
// 更新Deltas數組,通知全部調用Pop()的人
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
// 直接把對象刪除,這段代碼我不知道什麼條件會進來,由於dedupDeltas()確定有返回結果的
// 後面會有dedupDeltas()詳細說明
delete(f.items, id)
}
return nil
}
複製代碼
// 代碼源自client-go/tools/cache/delta_fifo.go
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
// 取出最後兩個
a := &deltas[n-1]
b := &deltas[n-2]
// 判斷若是是重複的,那就刪除這兩個delta把合併後的追加到Deltas數組尾部
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// 判斷兩個Delta是不是重複的
func isDup(a, b *Delta) *Delta {
// 只有一個判斷,只能判斷是否爲刪除類操做,和咱們上面的判斷相同
// 這個函數的本意應該還能夠判斷多種類型的重複,當前來看只能有刪除這一種可以合併
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
// 判斷是否爲刪除類的重複
func isDeletionDup(a, b *Delta) *Delta {
// 兩者都是刪除那確定有一個是重複的
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// 理論上返回最後一個比較好,可是對象已經再也不繫統監控範圍,前一個刪除狀態是好的
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
複製代碼
// 代碼源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 遍歷全部的輸入目標
for _, item := range list {
// 計算目標鍵
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
// 記錄處理過的目標鍵,採用set存儲,是爲了後續快速查找
keys.Insert(key)
// 由於輸入是目標全量,因此每一個目標至關於從新同步了一次
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 若是沒有存儲的話,本身存儲的就是全部的老對象,目的要看看那些老對象不在全量集合中,那麼就是刪除的對象了
if f.knownObjects == nil {
// 遍歷全部的元素
for k, oldItem := range f.items {
// 這個目標在輸入的對象中存在就能夠忽略
if keys.Has(k) {
continue
}
// 輸入對象中沒有,說明對象已經被刪除了。
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
// 終於看到哪裏用到DeletedFinalStateUnknown了,隊列中存儲對象的Deltas數組中
// 可能已經存在Delete了,避免重複,採用DeletedFinalStateUnknown這種類型
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 若是populated尚未設置,說明是第一次而且尚未任何修改操做執行過
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) // 記錄第一次經過來的對象數量
}
return nil
}
// 下面處理的就是檢測某些目標刪除可是Delta沒有在隊列中
// 從存儲中獲取全部對象鍵
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
// 對象還存在那就忽略
if keys.Has(k) {
continue
}
// 獲取對象
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
// 累積刪除的對象數量
queuedDeletions++
// 把對象刪除的Delta放入隊列
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 和上面的代碼差很少,只是計算initialPopulationCount值的時候增長了刪除對象的數量
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
複製代碼
period time.Duration // 反射器在List和Watch的時候理論上是死循環,只有出現錯誤纔會退出
這個變量用在出錯後多長時間再執行List和Watch,默認值是1秒鐘
resyncPeriod time.Duration // 從新同步的週期,不少人確定認爲這個同步週期指的是從apiserver的同步週期
其實這裏面同步指的是shared_informer使用者須要按期同步全量對象
複製代碼
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners }() //初始化Reflector,更新Index, 併發送通知給Controller來回調處理 s.controller.Run(stopCh) } 複製代碼
本文綜合分析了Kubernetes 大量源碼,試圖從較高的視野來看問題,猛看錶,一天時間就過去了。辛苦成文,各自珍惜,謝謝!數組
專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號。 bash