Kubernetes源碼閱讀筆記——Controller Manager(之二)

上一篇文章中,咱們看到了Controller Manager的基本運行邏輯,可是還有一些問題沒有解決,咱們將在本篇文章中進行分析。html

1、ListAndWatchapi

首先是Informer。上一篇中寫道,啓動Informer本質上是調用了controller的reflector的Run方法。下面咱們進入reflector的Run方法看看:緩存

k8s.io/cient-go/tools/cache/reflector.go

func (r *Reflector) Run(stopCh <-chan struct{}) {
    glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    wait.Until(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, stopCh)
}

能夠看到,方法經過調用wait.Until,每過period時間段就執行一次ListAndWatch方法。app

進入ListAndWatch方法:函數

k8s.io/client-go/tools/cache/reflector.go // ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
    var resourceVersion string

    // Explicitly set "0" as resource version - it's fine for the List()
    // to be served from cache and potentially be delayed relative to
    // etcd contents. Reflector framework will catch up via Watch() eventually.
    options := metav1.ListOptions{ResourceVersion: "0"}
    r.metrics.numberOfLists.Inc()
    start := r.clock.Now()
    list, err := r.listerWatcher.List(options)
    if err != nil {
        return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
    }
    r.metrics.listDuration.Observe(time.Since(start).Seconds())
    listMetaInterface, err := meta.ListAccessor(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    }
    resourceVersion = listMetaInterface.GetResourceVersion()
    items, err := meta.ExtractList(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
    }
    r.metrics.numberOfItemsInList.Observe(float64(len(items)))
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    }
    r.setLastSyncResourceVersion(resourceVersion)

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                glog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
        }

        r.metrics.numberOfWatches.Inc()
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            switch err {
            case io.EOF:
                // watch closed normally
            case io.ErrUnexpectedEOF:
                glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
            }
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case wait and resend watch request.
            if urlError, ok := err.(*url.Error); ok {
                if opError, ok := urlError.Err.(*net.OpError); ok {
                    if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
                        time.Sleep(time.Second)
                        continue
                    }
                }
            }
            return nil
        }

        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
            }
            return nil
        }
    }
}

這個方法的註釋已經寫明。首先,用默認的ResourceVersion 「0」 調用List方法,列出資源,並調用ListAccessor方法進行驗證。工具

其次,調用GetResourceVersion方法取得資源的實際的ResourceVersion,並調用syncWith方法進行同步。oop

再次,用go func()後面的一大段代碼處理reflector緩存的同步。ui

最後,在一個for循環裏調用Watch方法,對資源的變化進行watch,並調用watchHandler方法對變化進行相應處理。this

Watch方法本質上是在調用ListWatch的WatchFunc。這一字段是在建立具體的Controller對象時向informer添加的,後面會詳細介紹。總之,Watch的本質與用戶經過client-go工具鏈接API Server是同樣的。url

進入watchHandler方法:

k8s.io/client-go/tools/cache/reflector.go // watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    start := r.clock.Now()
    eventCount := 0

    defer w.Stop()
    // update metrics
    ...

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrs.FromObject(event.Object)
            }
            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
                utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                continue
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            r.setLastSyncResourceVersion(newResourceVersion)
            eventCount++
        }
    }

    watchDuration := r.clock.Now().Sub(start)
    ...
    return nil
}

這個方法中,最重要的是中間的case選擇。對於watch到的結果,按照Added、Modified、Deleted分別調用store的Add、Update、Delete方法,在FIFO隊列中進行更新。這個FIFO隊列就是以前在執行informer的Run方法時所建立的。

Informer的大體邏輯就是這樣,經過list-watch機制,與API Server創建鏈接,監聽資源的變化,在緩存中進行更新。

2、StartControllers

上一篇文章中提到,Controller Manager經過在StartControllers方法中調用啓動函數來啓動全部的Controller。咱們先來看看StartControllers方法:

cmd/kube-controller-manager/app/controllermanager.go

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
    ...

    for controllerName, initFn := range controllers {
    if !ctx.IsControllerEnabled(controllerName) {
        klog.Warningf("%q is disabled", controllerName)
        continue
    }

    time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

    klog.V(1).Infof("Starting %q", controllerName)
    debugHandler, started, err := initFn(ctx)
    if err != nil {
        klog.Errorf("Error starting %q", controllerName)
        return err
    }
    if !started {
        klog.Warningf("Skipping %q", controllerName)
        continue
    }
    ...

    return nil
}

咱們看到,方法經過對controllers中每一個元素執行各自的initFn來執行啓動函數,由於每一個Controller都是以[string]function的格式保存在map中的,因此直接執行自身的function便可。

下面咱們進入一個Controller的啓動函數看看。以deployment爲例。從NewControllerInitializers中找到controllers["deployment"]=startDeploymentController,進入startDeploymentController,能夠看到方法位於app/apps.go中:

cmd/kube-controller-manager/app/apps.go

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
    return nil, false, nil
    }
    dc, err := deployment.NewDeploymentController(
    ctx.InformerFactory.Apps().V1().Deployments(),
    ctx.InformerFactory.Apps().V1().ReplicaSets(),
    ctx.InformerFactory.Core().V1().Pods(),
    ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    )
    if err != nil {
    return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
    }
    go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
    return nil, true, nil
}

咱們看到,方法調用了NewDeploymentController方法建立一個Deployment,並經過GO協程調用Run方法運行它。

咱們先來看一下NewDeploymentController方法。

3、NewDeploymentController

pkg/controller/deployment/deployment_controller.go

func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

    if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
    if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
        return nil, err
    }
    }
    dc := &DeploymentController{
    client:        client,
    eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
    queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    }
    dc.rsControl = controller.RealRSControl{
    KubeClient: client,
    Recorder:   dc.eventRecorder,
    }

    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addDeployment,
    UpdateFunc: dc.updateDeployment,
    // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
    DeleteFunc: dc.deleteDeployment,
    })
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addReplicaSet,
    UpdateFunc: dc.updateReplicaSet,
    DeleteFunc: dc.deleteReplicaSet,
    })
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: dc.deletePod,
    })

    dc.syncHandler = dc.syncDeployment
    dc.enqueueDeployment = dc.enqueue

    dc.dLister = dInformer.Lister()
    dc.rsLister = rsInformer.Lister()
    dc.podLister = podInformer.Lister()
    dc.dListerSynced = dInformer.Informer().HasSynced
    dc.rsListerSynced = rsInformer.Informer().HasSynced
    dc.podListerSynced = podInformer.Informer().HasSynced
    return dc, nil
}

咱們看到,這個方法大體作了三件事。

(1)建立了一個Broadcaster,用於作kubernetes中event資源相關的處理。

(2)建立DeploymentController

咱們知道,Deployment並不直接操做集羣中的pod,而是經過操做ReplicaSet來間接操做pod,所以咱們能夠看到,DeploymentController結構體中也包含rsControl這一字段,用於操做ReplicaSet。而queue字段則維護了一個deployment的隊列,用於將須要更新狀態的deployment元素存入,後面會講到。

(3)添加回調函數

Informer的AddEventHandler方法爲deployment controller的全部informer添加了回調函數,對deployment和ReplicaSet的添加、更新、刪除進行相應的處理。

下面咱們詳細分析一下添加回調函數操做,這裏分紅三部分說,一是Informer的具體建立,二是函數被調用的邏輯,三是回調函數自己。

4、Informer()

在Deployment Controller中包含了deployment、replicaset、pod三個informer,都是各自的Informer()方法建立各自的informer。

以方法中dInformer.Informer()方法爲例。方法本質上調用了InformerFor方法:

k8s.io/client-go/informers/apps/v1/deployment.go

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&apps_v1.Deployment{}, f.defaultInformer)
}

InformerFor方法比較直觀,就是調用defaultInformer方法,將生成的informer存入factory的informer字段中。

defaultInformer方法則是調用了NewFilteredDeploymentInformer方法:

k8s.io/client-go/informers/apps/v1/deployment.go

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).List(options) }, WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).Watch(options) }, }, &apps_v1.Deployment{}, resyncPeriod, indexers, ) }

這個方法直接就返回了一個NewSharedIndexInformer結構體。能夠看到,這個結構體內部,定義了這個Informer的List和Watch函數,與通常用戶經過client-go鏈接API Server並沒有二致。

5、AddEventHandler

首先看調用的邏輯。AddEventHandler方法除了爲Informer添加新的回調函數外,還同時向這些新回調函數如何分發消息,使這些回調函數被調用。AddEventHandler方法只有一行,即調用AddEventHandlerWithResyncPeriod方法。

進入AddEventHandlerWithResyncPeriod方法:

k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    if s.stopped {
        glog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
        return
    }

    if resyncPeriod > 0 {
          ...
    }

    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

    if !s.started {
        s.processor.addListener(listener)
        return
    }

    // in order to safely join, we have to
    // 1. stop sending add/update/delete notifications
    // 2. do a list against the store
    // 3. send synthetic "Add" events to the new handler
    // 4. unblock
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    s.processor.addListener(listener)
    for _, item := range s.indexer.List() {
        listener.add(addNotification{newObj: item})
    }
}

咱們看到,這裏引入了一個listener的概念。listener是informer用於獲取Thread Safe Store元素並處理的機制。經過調用addListener方法,爲informer添加並運行listener。

隨後,又遍歷Thread Safe Store中的元素,將全部的消息都發送給新添加的EventHandler。這裏的目的是讓新EventHandler先處理一遍Thread Safe Store中已有的元素,否則它們就只能接收到新消息,沒有機會處理舊消息了。

6、addListener

listener的添加和運行都經過addListener方法進行。進入addListener方法:

k8s.io/client-go/tools/cache/shared_informer.go

func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

addListener方法經過goroutine的方式,調用listener的run和pop方法,運行listener。

分別進入run和pop方法:

k8s.io/client-go/tools/cache/shared_informer.go

func (p *processorListener) run() {
    stopCh := make(chan struct{})
    wait.Until(func() {
        // this gives us a few quick retries before a long pause and then a few more quick retries
        err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                    utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
                }
            }
            // the only way to get here is if the p.nextCh is empty and closed
            return true, nil
        })

        // the only way to get here is if the p.nextCh is empty and closed
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

 

k8s.io/client-go/tools/cache/shared_informer.go

func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}

pop方法用到好幾個channel,本質上還是從channel中取出一個notification,並經過run方法進行處理。能夠看到,在run方法中有對於notification的類型選擇,並分別調用OnUpdate、OnAdd、OnDelete方法進行處理,而這三個方法則分別調用了前面AddEventHandler方法中傳進來的回調函數,即:

pkg/controller/deployment/deployment_controller.go

func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { ... dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, }) ... }

那麼這些notification從何而來呢?答案是,經過listener的add方法添加,即前述AddEventHandlerWithResyncPeriod方法的最後一句:

k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
        ...
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}

add方法只有一行,就是將信息傳入listener的channel中。而信息則是經過indexer的List方法,從Thread Safe Store中得到。此外,上一篇文章中也提到過,informer會不斷從FIFO中pop元素,進行處理後向listener發送信息。因此對於EventHandler來講,一方面在剛註冊時,從Thread Safe Store中一次性得到全部舊的消息,一方面從FIFO隊列中一個一個地得到消息。

至此,informer的邏輯就很清晰了。首先經過ListAndWatch方法,將watch到的資源狀態的變化存入FIFO隊列。以後,在啓動controller後,經過factory生成相應的informer,並經過AddEventHandler方法添加相應的回調函數。AddEventHandler方法會爲informer添加listener,將Thread Safe Store隊列中的內容取出,根據其狀態調用相應的回調函數進行處理。同時FIFO隊列中的元素也會一個一個pop出來,供informer消費。

剩下的內容,將在下一篇文章中分析。下一篇文章連接http://www.javashuo.com/article/p-vluyzvvi-p.html

相關文章
相關標籤/搜索