kubernetes源碼剖析之client-go(二) Informer機制

kubernetes源碼剖析之client-go(一) Informer機制

  Kubernetes經過informer機制,實如今不依賴任何中間件的狀況下保證消息的實時性、可靠性、順序性。其餘Kubernetes組件經過client-go的informer機制與Api Server進行通訊。Informer的核心組件包括:api

  • Reflector
     用於監控(Watch)指定Kubernetes資源
  • DeltaFIFO
      Delta的先進先出隊列,Reflector爲生產者,Controller爲消費者
  • Indexer
     自帶索引功能的本地存儲,用於存儲資源對象

Infermers

運行原理

kubernetes源碼剖析之client-go(二) Informer機制

代碼示例

package main

import (
    "log"
    "time"

    v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {

    config, err := clientcmd.BuildConfigFromFlags("", "D:\\coding\\config")
    if err != nil {
        panic(err)
    }

    // Imformer經過clientset與Api Server通訊
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }

    // 建立stopCH對象,用於進程退出前通知Imformer提早退出
    stopCh := make(chan struct{})
    defer close(stopCh)

    // 實例化SharedInformer對象,參數clientset用於與Api Server交互, time.Minute設定resync週期,0爲禁用resync
    // 經過map共享Informer( informers map[reflect.Type]cache.SharedIndexInformer ),避免同一資源的Informer被重複實例化
    sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute)
    // 獲取Pod資源的informer對象
    // 每個K8S資源都會實現Informer機制,每一個Informer實現都會提供Informer和Lister方法
    informer := sharedInformers.Core().V1().Pods().Informer()

    // 添加資源的回調方法
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        // 建立資源時的回調方法
        AddFunc: func(obj interface{}) {
            mObj := obj.(v1.Object)
            log.Printf("New Pod Added to Stroe: %s", mObj.GetName())

        },
        // 更新資源時的回調方法
        UpdateFunc: func(oldObj, newObj interface{}) {
            oObj := oldObj.(v1.Object)
            nObj := newObj.(v1.Object)
            log.Printf("%s Pod Updated to %s", oObj.GetName(), nObj.GetName())
        },
        // 刪除資源時的回調方法
        DeleteFunc: func(obj interface{}) {
            mObj := obj.(v1.Object)
            log.Printf("Pod Deleted from Stroe : %s", mObj.GetName())
        },
    })

    informer.Run(stopCh)
}

Reflector

 Informer對Kubernetes的Api Server資源進行監控(Watch)操做。其中最核心的功能是Reflector,Reflector用於監控指定的Kubernetes資源,當監控的資源發生變化時,觸發相應的變動事件。並將其資源對象存放到本地緩衝DeltaFIFO中。promise

 經過NewReflector方法實例化Reflector對象,方法必須傳入ListerWatcher數據接口對象。 ListerWatcher擁有List和Watch方法,用於獲取和監控資源列表,只要實現了List和Watch方法的對象均可以成爲ListerWatcher。app

#源碼路徑 vender\k8s.io\client-go\tools\cache\reflector.go

// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

  Reflector經過Run函數啓動監控進程,並處理監控的事件。其中最主要的是ListAndWatch函數,它負責List和Watch指定的Kubernetes Api Server資源。less

#源碼路徑 vender\k8s.io\client-go\tools\cache\reflector.go
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

ListAndWatch函數

  ListAndWatch第一次運行時,經過List獲取資源下的全部對象和版本信息,後續經過版本進行watchide

#源碼路徑 vender\k8s.io\client-go\tools\cache\reflector.go
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    // Explicitly set "0" as resource version - it's fine for the List()
    // to be served from cache and potentially be delayed relative to
    // etcd contents. Reflector framework will catch up via Watch() eventually.
    // 初始化時將版本置爲0
    options := metav1.ListOptions{ResourceVersion: "0"}

    if err := func() error {
        ... 
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
              // 獲取資源下的全部對象的數據,當傳入的opts的ResourceVersion爲0時,返回 全量數據。非0時返回增量數據。
                return r.listerWatcher.List(opts)
            }))
            if r.WatchListPageSize != 0 {
                pager.PageSize = r.WatchListPageSize
            }
            // Pager falls back to full list if paginated list calls fail due to an "Expired" error.
            list, err = pager.List(context.Background(), options)
            close(listCh)
        }()
    ......

        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
        }
        // 獲取資源的版本號信息
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
      // 將資源數據裝成資源對象列表
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
        }
        initTrace.Step("Objects extracted")
        // 將資源對象列表中的資源對象和資源版本存儲只DeltaFIFO中
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
        }
        initTrace.Step("SyncWith done")
        // 設置最新的資源版本號
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        // 監控參數,會傳入資源版本號信息
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

        // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
        start := r.clock.Now()
        // 調用Watch方法,監控資源對象
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            switch err {
            case io.EOF:
                // watch closed normally
            case io.ErrUnexpectedEOF:
                klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
            }
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case wait and resend watch request.
            if utilnet.IsConnectionRefused(err) {
                time.Sleep(time.Second)
                continue
            }
            return nil
        }

    // 處理資源的變動事件
        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case apierrs.IsResourceExpired(err):
                    klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

DeltaFIFO

  DeltaFIFO能夠分開理解爲FIFO和Delta。 FIFO是一個先進先出隊列,擁有隊列的基本操做方法。Delta是資源對象存儲,能夠報錯資源對象的操做類型。DeltaFIFO隊列中,Reflector是生長澤,controller是消費者。DeltaFIFO結構以下:函數

#源碼路徑 vendor\k8s.io\client-go\tools\cache\delta_fifo.go
type DeltaFIFO struct {
    // lock/cond protects access to 'items' and 'queue'.
    lock sync.RWMutex
    cond sync.Cond

    // We depend on the property that items in the set are in
    // the queue and vice versa, and that all Deltas in this
    // map have at least one Delta.
    items map[string]Deltas
    queue []string

    ...
}

生產者方法

 DeltaFIFO隊列中的資源對象在Added、Updated、Delete等事件被調用時都調用了queueActionLocked方法,它是DeltaFIFO實現的關鍵。oop

#源碼路徑 vendor\k8s.io\client-go\tools\cache\delta_fifo.go

## Add、Update方法
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
  // 執行前先進行加鎖
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}

// Update is just like Add, but makes an Updated Delta.
func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}

## queueActionLockedf方法
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  // 計算出資源對象的key
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
  // 將actionType和資源對象構形成dELTA,添加到items中,並經過dedupDeltas中去重。
    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        // 經過cond.Broadcast通知全部消費者接觸阻塞。
        f.cond.Broadcast()
    } else {
        // We need to remove this from our map (extra items in the queue are
        // ignored if they are not in the map).
        delete(f.items, id)
    }
    return nil
}

消費者

#源碼路徑 vendor\k8s.io\client-go\tools\cache\delta_fifo.go

// Pop blocks until an item is added to the queue, and then returns it.  If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
            // When Close() is called, the f.closed is set and the condition is broadcasted.
            // Which causes this loop to continue and return from the Pop().
            if f.IsClosed() {
                return nil, ErrFIFOClosed
            }
      // 當隊列中沒有數據時,經過f.cond.wait阻塞等待數據。只有接收到cond.Broadcast時才說明有數據被添加,接觸當前阻塞狀態。
            f.cond.Wait()
        }
      // 若是隊列不爲空,去除隊列頭部的數據。
        id := f.queue[0]
        f.queue = f.queue[1:]

        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        // 將數據傳入process回調函數,由上層消費者進行處理
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
          // 若是回調出錯,則將數據從新添加回隊列中
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        // Don't need to copyDeltas here, because we're transferring
        // ownership to the caller.
        return item, err
    }
}
相關文章
相關標籤/搜索