幾乎全部的Controller manager 和CRD Controller 都會使用Client-go 的Informer 函數,這樣經過Watch 或者Get List 能夠獲取對應的Object,下面咱們從源碼分析角度來看一下Client go Informer 的機制。git
kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) controller := NewController(kubeClient, exampleClient, kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos()) // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. kubeInformerFactory.Start(stopCh)
這裏的例子是以https://github.com/kubernetes/sample-controller/blob/master/main.go
節選,主要以 k8s 默認的Deployment Informer
爲例子。能夠看到直接使用Client-go Informer 仍是很是簡單的,先無論NewCOntroller
函數裏面執行了什麼,順着代碼來看一下kubeInformerFactory.Start
都幹了啥。github
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
能夠看到這裏遍歷了f.informers
,而informers
的定義咱們來看一眼數據結構api
type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool }
咱們這裏的例子,在運行的時候,f.informers
裏面含有的內容以下數組
type *v1.Deployment informer &{0xc000379fa0 <nil> 0xc00038ccb0 {} 0xc000379f80 0xc00033bb00 30000000000 30000000000 0x28e5ec8 false false {0 0} {0 0}}
也就是說,每一種k8s 類型都會有本身的Informer函數。下面咱們來看一下這個函數是在哪裏註冊的,這裏以Deployment Informer 爲例。數據結構
首先回到剛開始初始化kubeClient
的代碼,app
controller := NewController(kubeClient, exampleClient, kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos()) deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, new interface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) if newDepl.ResourceVersion == oldDepl.ResourceVersion { // Periodic resync will send update events for all known Deployments. // Two different versions of the same Deployment will always have different RVs. return } controller.handleObject(new) }, DeleteFunc: controller.handleObject, })
注意這裏的傳參, kubeInformerFactory.Apps().V1().Deployments()
, 這句話的意思就是指建立一個只關注Deployment 的Informer.less
controller := &Controller{ kubeclientset: kubeclientset, sampleclientset: sampleclientset, deploymentsLister: deploymentInformer.Lister(), deploymentsSynced: deploymentInformer.Informer().HasSynced, foosLister: fooInformer.Lister(), foosSynced: fooInformer.Informer().HasSynced, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"), recorder: recorder, }
deploymentInformer.Lister()
這裏就是初始化了一個Deployment Lister,下面來看一下Lister函數裏面作了什麼。異步
// NewFilteredDeploymentInformer constructs a new informer for Deployment type. // Always prefer using an informer factory to get a shared informer instead of getting an independent // one. This reduces memory footprint and number of connections to the server. func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).List(options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).Watch(options) }, }, &appsv1.Deployment{}, resyncPeriod, indexers, ) } func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *deploymentInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer) } func (f *deploymentInformer) Lister() v1.DeploymentLister { return v1.NewDeploymentLister(f.Informer().GetIndexer()) }
注意這裏的Lister
函數,它調用了Informer
,而後觸發了f.factory.InformerFor
,
這就最終調用了sharedInformerFactory InformerFor
函數,ide
// InternalInformerFor returns the SharedIndexInformer for obj using an internal // client. func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }
這裏能夠看到,informer = newFunc(f.client, resyncPeriod)
這句話最終完成了對於informer的建立,而且註冊到了Struct object中,完成了前面咱們的問題。函數
下面咱們再回到informer start
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
這裏能夠看到,它會遍歷全部的informer,而後選擇異步調用Informer 的RUN方法。咱們來全局看一下Run方法
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don't want any new listeners }() s.controller.Run(stopCh) }
首先它根據獲得的 key 拆分函數和Store index 建立一個FIFO隊列,這個隊列是一個先進先出的隊列,主要用來保存對象的各類事件。
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, knownObjects: knownObjects, } f.cond.L = &f.lock return f }
能夠看到這個隊列建立的比較簡單,就是使用 Map 來存放數據,String 數組來存放隊列的 Key。
後面根據client 建立的List 和Watch 函數,還有隊列建立了一個 config,下面將根據這個config 來初始化controller. 這個controller是client-go 的Cache controller ,主要用來控制從 APIServer 得到的對象的 cache 以及更新對象。
下面主要關注這個函數調用
wg.StartWithChannel(processorStopCh, s.processor.run)
這裏進行了真正的Listering 調用。
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
主要看 run 方法,還記得前面已經把ADD UPDATE DELETE 註冊了自定義的處理函數了嗎。這裏就實現了前面函數的觸發
func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { // this gives us a few quick retries before a long pause and then a few more quick retries err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next)) } } // the only way to get here is if the p.nextCh is empty and closed return true, nil }) // the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) }
能夠看到當p.nexhCh channel
接收到一個對象進入的時候,就會根據通知類型的不一樣,選擇對應的用戶註冊函數去調用。那麼這個channel 誰來向其中傳入參數呢
func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } }
答案就是這個pop
函數,這裏會從p.addCh
中讀取增長的通知,而後轉給p.nexhCh
而且保證每一個通知只會讀取一次。
下面就是最終的Controller run 函數,咱們來看看到底幹了什麼
// Run begins processing items, and will continue until a value is sent down stopCh. // It's an error to call Run more than once. // Run blocks; call via go. func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) }
這裏主要的就是wg.StartWithChannel(stopCh, r.Run)
,
// Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run will exit when stopCh is closed. func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) }
這裏就調用了r.ListAndWatch
方法,這個方法比較複雜,咱們慢慢來看。
// watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { start := r.clock.Now() eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop() // update metrics defer func() { r.metrics.numberOfItemsInWatch.Observe(float64(eventCount)) r.metrics.watchDuration.Observe(time.Since(start).Seconds()) }() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { return apierrs.FromObject(event.Object) } if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } watchDuration := r.clock.Now().Sub(start) if watchDuration < 1*time.Second && eventCount == 0 { r.metrics.numberOfShortWatches.Inc() return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount) return nil }
這裏就是真正調用watch 方法,根據返回的watch 事件,將其放入到前面建立的 FIFO 隊列中。
最終調用了controller 的POP 方法
// processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times // concurrently. // // TODO: Plumb through the stopCh here (and down to the queue) so that this can // actually exit when the controller is stopped. Or just give up on this stuff // ever being stoppable. Converting this whole package to use Context would // also be helpful. func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
前面是將 watch 到的對象加入到隊列中,這裏的goroutine 就是用來消費的。具體的消費函數就是前面建立的Process 函數
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
這個函數就是根據傳進來的obj,先從本身的cache 中取一下,看是否存在,若是存在就表明是Update ,那麼更新本身的隊列後,調用用戶註冊的Update 函數,若是不存在,就調用用戶的 Add 函數。
到此Client-go 的Informer 流程源碼分析基本完畢。
本文爲雲棲社區原創內容,未經容許不得轉載。