轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客:https://www.luozhiyun.comhtml
因爲這部分的代碼是在client-go 中,因此使用的源碼版本是client-go 1.19node
此次講解我用了很一些圖,儘量的把這個模塊給描述清楚,若是感受對你有所幫助不妨發一封郵件激勵一下我~linux
Informer主要有兩個做用:git
Informer運行原理以下:github
根據流程圖來解釋一下Informer中幾個組件的做用:web
Reflector 包會和 apiServer 創建長鏈接,並使用 ListAndWatch 方法獲取並監聽某一個資源的變化。List 方法將會獲取某個資源的全部實例,Watch 方法則監聽資源對象的建立、更新以及刪除事件,而後將事件放入到DeltaFIFO Queue中;mongodb
而後Informer會不斷的從 Delta FIFO Queue 中 pop 增量事件,並根據事件的類型來決定新增、更新或者是刪除本地緩存;接着Informer 根據事件類型來觸發事先註冊好的 Event Handler觸發回調函數,而後而後將該事件丟到 Work Queue 這個工做隊列中。windows
將到了go-client部分的代碼,咱們能夠直接經過實例來進行上手跑動,Informers Example代碼示例以下:api
package main import ( "flag" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "log" "path/filepath" "time" ) func main() { var kubeconfig *string //若是是windows,那麼會讀取C:\Users\xxx\.kube\config 下面的配置文件 //若是是linux,那麼會讀取~/.kube/config下面的配置文件 if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } stopCh := make(chan struct{}) defer close(stopCh) //表示每分鐘進行一次resync,resync會週期性地執行List操做 sharedInformers := informers.NewSharedInformerFactory(clientset, time.Minute) informer := sharedInformers.Core().V1().Pods().Informer() informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("New Pod Added to Store: %s", mObj.GetName()) }, UpdateFunc: func(oldObj, newObj interface{}) { oObj := oldObj.(v1.Object) nObj := newObj.(v1.Object) log.Printf("%s Pod Updated to %s", oObj.GetName(),nObj.GetName()) }, DeleteFunc: func(obj interface{}) { mObj := obj.(v1.Object) log.Printf("Pod Deleted from Store: %s", mObj.GetName()) }, }) informer.Run(stopCh) }
要運行這段代碼,須要咱們將k8s服務器上的~/.kube代碼拷貝到本地,我是win10的機器因此拷貝到C:\Users\xxx\.kube
中。緩存
informers.NewSharedInformerFactory會傳入兩個參數,第1個參數clientset是用於與k8s apiserver交互的客戶端,第2個參數是表明每分鐘會執行一次resync,resync會週期性執行List將全部資源存放再Informer Store中,若是該參數是0,則禁用resync功能。
經過informer.AddEventHandler函數能夠爲pod資源添加資源事件回調方法,支持3種資源事件回調方法:
經過名稱咱們就能夠知道是新增、更新、刪除時會回調這些方法。
在咱們初次執行run方法的時候,能夠會將監控的k8s上pod存放到本地,並回調AddFunc方法,以下日誌:
2020/10/17 15:13:10 New Pod Added to Store: dns-test 2020/10/17 15:13:10 New Pod Added to Store: web-1 2020/10/17 15:13:10 New Pod Added to Store: fluentd-elasticsearch-nwqph 2020/10/17 15:13:10 New Pod Added to Store: kube-flannel-ds-amd64-bjmt2 2020/10/17 15:13:10 New Pod Added to Store: kubernetes-dashboard-65665f84db-jrw6k 2020/10/17 15:13:10 New Pod Added to Store: mongodb 2020/10/17 15:13:10 New Pod Added to Store: web-0 ....
shared Informer初始化的時候會調用到informers.NewSharedInformerFactory進行初始化。
文件位置:informers/factory.go
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { return NewSharedInformerFactoryWithOptions(client, defaultResync) } func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory }
NewSharedInformerFactory方法最終會調用到NewSharedInformerFactoryWithOptions初始化一個sharedInformerFactory,在初始化的時候會初始化一個informers,用來緩存不一樣類型的informer。
informer初始化會調用sharedInformerFactory的方法進行初始化,而且能夠調用不一樣資源的Informer。
podInformer := sharedInformers.Core().V1().Pods().Informer() nodeInformer := sharedInformers.Node().V1beta1().RuntimeClasses().Informer()
定義不一樣資源的Informer能夠用來監控node或pod。
經過調用Informer方法會根據類型來建立Informer,同一類資源會共享同一個informer。
文件路徑:informers/factory.go
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { //建立informer return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *podInformer) Informer() cache.SharedIndexInformer { //傳入上面定義的defaultInformer方法,用於建立informer return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() //獲取informer類型 informerType := reflect.TypeOf(obj) //查找map緩存,若是存在,那麼直接返回 informer, exists := f.informers[informerType] if exists { return informer } //根據類型查找resync的週期 resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } //調用defaultInformer方法建立informer informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }
調用InformerFor方法的時候會傳入defaultInformer方法用於建立informer。
InformerFor方法裏面首先會去sharedInformerFactory的map緩存中根據類型查找對應的informer,若是存在那麼直接返回,若是不存在,那麼則會調用newFunc方法建立informer,而後設置到informers緩存中。
下面咱們看一下NewFilteredPodInformer是如何建立Informer的:
文件位置:informers/core/v1/pod.go
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } //調用apiserver獲取pod列表 return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } //調用apiserver監控pod列表 return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) }
這裏是真正的建立一個informer,並註冊了List&Watch的回調函數,list回調函數的api相似下面這樣:
result = &v1.PodList{} err = c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result)
構造Informer經過NewSharedIndexInformer完成:
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer }
sharedIndexInformer裏面會建立sharedProcessor,設置List&Watch的回調函數,建立了一個indexer,咱們這裏看一下NewIndexer是怎麼建立indexer的:
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
NewIndexer方法建立了一個cache,它的keyFunc是DeletionHandlingMetaNamespaceKeyFunc,即接受一個object,生成它的namepace/name的字符串。cache裏面的數據會存放到cacheStorage中,它是一個threadSafeMap用來存儲資源對象並自帶索引功能的本地存儲。
EventHandler事件的註冊是經過informer的AddEventHandler方法進行的。在調用AddEventHandler方法的時候,傳入一個cache.ResourceEventHandlerFuncs結構體:
文件位置:tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() ... //初始化監聽器 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) //若是informer還沒啓動,那麼直接將監聽器加入到processor監聽器列表中 if !s.started { s.processor.addListener(listener) return } //若是informer已經啓動,那麼須要加鎖 s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.addListener(listener) //而後將indexer中緩存的數據寫入到listener中 for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } }
AddEventHandler方法會調用到AddEventHandlerWithResyncPeriod方法中,而後調用newProcessListener初始化listener。
接着會校驗informer是否已經啓動,若是沒有啓動,那麼直接將監聽器加入到processor監聽器列表中並返回;若是informer已經啓動,那麼須要加鎖將監聽器加入到processor監聽器列表中,而後將indexer中緩存的數據寫入到listener中。
須要注意的是listener.add方法會調用processorListener的add方法,這個方法會將數據寫入到addCh管道中:
func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
addCh管道里面數據是用來處理事件回調的,後面我會說到。
大體的流程以下:
最後咱們在上面的demo中會使用sharedIndexInformer的Run方法來啓動Informer模塊。
文件位置:tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() //初始化DeltaFIFO隊列 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) cfg := &Config{ //設置Queue爲DeltaFIFO隊列 Queue: fifo, //設置List&Watch的回調函數 ListerWatcher: s.listerWatcher, ObjectType: s.objectType, //設置Resync週期 FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, //判斷有哪些監聽器到期須要被Resync ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() //異步建立controller s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) //調用run方法啓動processor wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true }() //啓動controller s.controller.Run(stopCh) }
這段代碼主要作了如下幾件事:
下面咱們看看sharedProcessor的run方法作了什麼:
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { ... //遍歷監聽器 for _, listener := range p.listeners { //下面兩個方法是核心的事件call back的方法 p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() ... }
run方法會調用processorListener的run方法和pop方法,這兩個方法合在一塊兒完成了事件回調。
func (p *processorListener) add(notification interface{}) { p.addCh <- notification } func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { notification = notificationToAdd nextCh = p.nextCh } else { p.pendingNotifications.WriteOne(notificationToAdd) } } } }
這段代碼,我把add方法也貼到這裏了,是由於監聽的事件都是從這個方法傳入的,而後寫入到addCh管道中。
pop方法在select代碼塊中會獲取addCh管道中的數據,第一個循環的時候notification是nil,因此會將nextCh設置爲p.nextCh;第二個循環的時候會將數據寫入到nextCh中。
當notification不爲空的時候是直接將數據存入pendingNotifications緩存中的,取也是從pendingNotifications中讀取。
下面咱們看看run方法:
func (p *processorListener) run() { stopCh := make(chan struct{}) wait.Until(func() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) }
run每秒遍歷一次nextCh中的數據,而後根據不一樣的notification類型執行不一樣的回調方法,這裏會回調到咱們在main方法中註冊的eventHandler。
下面咱們再回到sharedIndexInformer的Run方法中往下走,會運行controller的Run方法。
文件位置:tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) { ... //建立Reflector r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) ... //啓動Reflector wg.StartWithChannel(stopCh, r.Run) //每秒中循環調用DeltaFIFO隊列的pop方法, wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() }
這裏對應Informer運行原理裏面Informer上部分建立Reflector並進行監聽,和下部分循環調用DeltaFIFO隊列的pop方法進行分發。
Reflector的Run方法最後會調用到Reflector的ListAndWatch方法進行監聽獲取資源。ListAndWatch代碼會分爲兩部分,一部分是List,一部分是Watch。
咱們先看List部分代碼:
代碼位置:tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ... if err := func() error { ... go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { //根據參數獲取pod 列表 return r.listerWatcher.List(opts) })) ... list, paginatedResult, err = pager.List(context.Background(), options) ... close(listCh) }() ... //獲取資源版本號 resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") //將資源數據轉換成資源對象列表 items, err := meta.ExtractList(list) ... //將資源對象列表中的資源對象和資源版本號存儲至DeltaFIFO隊列中 if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("unable to sync list result: %v", err) } ... r.setLastSyncResourceVersion(resourceVersion) return nil }(); err != nil { return err } ... }
這部分的代碼會分爲以下幾個部分:
下面看看Watch部分的代碼:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ... for { ... //調用clientset客戶端api與apiServer創建長鏈接,監控指定資源的變動 w, err := r.listerWatcher.Watch(options) ... //處理資源的變動事件 if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { ... return nil } } }
這裏會循環調用clientset客戶端api與apiServer創建長鏈接,監控指定資源的變動,若是監控到有資源變動,那麼會調用watchHandler處理資源的變動事件。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { ... loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): ... // 獲取資源版本號 newResourceVersion := meta.GetResourceVersion() switch event.Type { //將添加資源事件添加到DeltaFIFO隊列中 case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } //將更新資源事件添加到DeltaFIFO隊列中 case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } //將刪除資源事件添加到DeltaFIFO隊列中 case watch.Deleted: err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } ... *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } ... }
watchHandler方法會根據傳入的資源類型調用不一樣的方法轉換成不一樣的Delta而後存入到DeltaFIFO隊列中。
processLoop方法,以1s爲週期,週期性的執行。
文件位置:tools/cache/controller.go
func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
這裏會循環將DeltaFIFO隊列中數據pop出隊,而後交給Process方法進行處理,Process方法是在上面調用sharedIndexInformer的Run方法的數據設置,設置的方法是sharedIndexInformer的HandleDeltas方法。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest //根據obj的Type類型進行分發 for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) //若是緩存中存在該對象 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { //更新indexr if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // Sync events are only propagated to listeners that requested resync isSync = true case d.Type == Replaced: //新老對象獲取版本號進行比較 if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Replaced events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) // 若是緩存中不存在該對象 } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
HandleDeltas會與indexer緩存交互更新咱們從Delta FIFO中取到的內容,以後經過s.processor.distribute()
進行消息的分發。
在distribute中,sharedProcesser經過listener.add(obj)
向每一個listener分發該object。而該函數中又執行了p.addCh <- notification
。
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } }
這裏能夠結合上面的p.wg.Start(listener.run)
和p.wg.Start(listener.pop)
方法來進行理解,這裏將notification傳入到addCh管道以後會觸發EventHandler事件。
這裏我用一張圖總結一下informer的Run方法流程:
至此,咱們分析完了informer的全部機制。
經過上面分析,咱們全面熟悉了k8s是如何經過Informer機制實現ListAndWatch獲取並監視 API 對象變化。
熟悉了Informer與Reflector是如何協同進行數據的傳遞,可是我這裏有點遺憾的是限於篇幅,沒有去詳細的講解DeltaFIFO隊列裏面是如何進行數據的存儲與獲取,實際上這個隊列的實現也是很是的有意思的。
對於Indexer來講,我在文章裏面也只說到了獲取DeltaFIFO隊列的數據後更新到Indexer的ThreadSafeMap中,可是並無講ThreadSafeMap這個存儲是如何作的,裏面的索引又是如何創建的,這些各位同窗感興趣的也能夠去研究一下。
https://www.kubernetes.org.cn/2693.html
https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md
https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/