注:本次使用的client-go版本爲:client-go 11.0,主要參考CSDN上的深刻淺出kubernetes之client-go系列,建議看本文前先參考該文檔。本文檔爲CSDN文檔的深挖和補充。
本文中的visio能夠從這裏獲取
下圖爲來自官方的Client-go架構圖html
圖1.node
下圖也能夠做爲參考git
圖2.github
Indexerapi
Indexer提供了對底層存儲的操做,給出了存儲對象以及索引對象的框架。Indexer的接口定義以下,它繼承了Store接口,Store中定義了對對象的增刪改查等方法。數組
Indexer保存了來自apiServer的資源,使用listWatch方式來維護資源的增量變化。經過這種方式能夠減少對apiServer的訪問,減輕apiServer端的壓力緩存
// client-go/tools/cache/index.go
type Indexer interface { Store // Retrieve list of objects that match on the named indexing function Index(indexName string, obj interface{}) ([]interface{}, error) // IndexKeys returns the set of keys that match on the named indexing function. IndexKeys(indexName, indexKey string) ([]string, error) // ListIndexFuncValues returns the list of generated values of an Index func ListIndexFuncValues(indexName string) []string // ByIndex lists object that match on the named indexing function with the exact key ByIndex(indexName, indexKey string) ([]interface{}, error) // GetIndexer return the indexers GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error }
// client-go/tools/cache/store.go
type Store interface { Add(obj interface{}) error Update(obj interface{}) error Delete(obj interface{}) error List() []interface{} ListKeys() []string Get(obj interface{}) (item interface{}, exists bool, err error) GetByKey(key string) (item interface{}, exists bool, err error) // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]interface{}, string) error Resync() error }
cache實現了Indexer和Store接口,同時也實現了ThreadSafeStore接口(能夠看到ThreadSafeStore基本包含了Indexer接口的全部方法),但cache是包內私有的(首字母小寫),只能經過包內封裝的函數進行調用。session
// client-go/tools/cache/store.go
type cache struct { // cacheStorage bears the burden of thread safety for the cache cacheStorage ThreadSafeStore // keyFunc is used to make the key for objects stored in and retrieved from items, and // should be deterministic. keyFunc KeyFunc }
// client-go/tools/cache/thread_safe_store.go type ThreadSafeStore interface { Add(key string, obj interface{}) Update(key string, obj interface{}) Delete(key string) Get(key string) (item interface{}, exists bool) List() []interface{} ListKeys() []string Replace(map[string]interface{}, string) Index(indexName string, obj interface{}) ([]interface{}, error) IndexKeys(indexName, indexKey string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexKey string) ([]interface{}, error) GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error Resync() error }
例如能夠經過函數NewStore和NewIndexer初始化cache來返回一個Store或Indexer指針(cache實現了Store和Indexer接口)。NewStore和NewIndexer返回的Store和Indexer接口的數據載體爲threadSafeMap,threadSafeMap經過NewThreadSafeStore函數進行初始化。架構
注:運行go語言接口中的方法即運行該方法的實現。以threadSafeMap爲例,在運行cache.Add函數中的「c.cacheStorage.Add(key, obj)」時,實際是在運行」(&threadSafeMap{items:map[string]interface{}{}, indexers: indexers, indices: indices}).Add(key, obj)「app
// client-go/tools/cache/store.go func (c *cache) Add(obj interface{}) error { key, err := c.keyFunc(obj) if err != nil { return KeyError{obj, err} } c.cacheStorage.Add(key, obj) return nil }
// client-go/tools/cache/store.go
// NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc) Store { return &cache{ cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), keyFunc: keyFunc, } } // NewIndexer returns an Indexer implemented simply with a map and a lock. func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }
能夠經過下圖理解threadSafeMap中各類索引之間的關係
默認的indexFunc以下,根據對象的namespace進行分類
// client-go/tools/cache/index.go
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{meta.GetNamespace()}, nil }
cache結構中的keyFunc用於生成objectKey,下面是默認的keyFunc。
//client-go/tools/cache/thread_safe_store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { return string(key), nil } meta, err := meta.Accessor(obj) if err != nil { return "", fmt.Errorf("object has no meta: %v", err) } if len(meta.GetNamespace()) > 0 { return meta.GetNamespace() + "/" + meta.GetName(), nil } return meta.GetName(), nil }
DeltaFIFO
DeltaFIFO的源碼註釋寫的比較清楚,它是一個生產者-消費者隊列,生產者爲Reflector,消費者爲Pop()函數,從架構圖中能夠看出DeltaFIFO的數據來源爲Reflector,經過Pop操做消費數據,將數據存儲到localstore中。須要注意的是,Pop的單位是一個Deltas,而不是Delta。
DeltaFIFO同時實現了Queue和Store接口。DeltaFIFO使用Deltas保存了對象狀態的變動(Add/Delete/Update)信息(如Pod的刪除添加等),Deltas緩存了針對相同對象的多個狀態變動信息。最老的狀態變動信息爲Newest(),最新的狀態變動信息爲Oldest(),使用中獲取DeltaFIFO中對象的key以及獲取DeltaFIFO都以最新狀態爲準。
//client-go/tools/cache/delta_fifo.go type Delta struct { Type DeltaType Object interface{} } // Deltas is a list of one or more 'Delta's to an individual object. // The oldest delta is at index 0, the newest delta is the last one. type Deltas []Delta
DeltaFIFO結構中比較難以理解的是knownObjects,它的結構以下。其接口中的方法ListKeys和GetByKey也是Store接口中的方法,所以knownObjects可以被賦值爲實現了Store的類型指針;一樣地,因爲Indexer繼承了Store方法,所以knownObjects可以被賦值爲實現了Indexer的類型指針。
//client-go/tools/cache/delta_fifo.go type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond // We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int // keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc // knownObjects list keys that are "known", for the // purpose of figuring out which items have been deleted // when Replace() or Delete() is called. knownObjects KeyListerGetter // Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex }
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister
KeyGetter
}
// A KeyLister is anything that knows how to list its keys.
type KeyLister interface { ListKeys() []string } // A KeyGetter is anything that knows how to get the value stored under a given key. type KeyGetter interface { GetByKey(key string) (interface{}, bool, error) }
在NewSharedIndexInformer(client-go/tools/cache/shared_informer.go)函數中使用下面進行初始化一個sharedIndexInformer,即便用函數DeletionHandlingMetaNamespaceKeyFunc初始化indexer,並在sharedIndexInformer.Run中將該indexer做爲knownObjects入參,最終初始化爲一個DeltaFIFO。
NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) //NewDeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) //sharedIndexInformer.Run
DeltaFIFO實現了Queue接口。能夠看到Queue接口同時也(Indexer繼承了Store)繼承了Store接口。
//client-go/tools/cache/delta_fifo.go type Queue interface { Store // Pop blocks until it has something to process. // It returns the object that was process and the result of processing. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item // should be requeued before releasing the lock on the queue. Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent adds a value previously // returned by Pop back into the queue as long // as nothing else (presumably more recent) // has since been added. AddIfNotPresent(interface{}) error // HasSynced returns true if the first batch of items has been popped HasSynced() bool // Close queue Close() }
knownObjects實際使用時爲Indexer,它對應圖2中的localStore,DeltaFIFO根據其保存的對象狀態變動消息處理(增/刪/改/同步)knownObjects中相應的對象。其中同步(Sync)操做對Detals中即將被刪除的對象是沒有意義的(參見willObjectBeDeletedLocked函數)。
Replace(client-go/tools/cache/delta_fifo.go)函數中會對DeltaFIFO進行全量更新,包括3個步驟:
因爲全量更新的最終目的是爲了更新knownObjects中的對象,故第三步中直接使用knownObjects而非DeltaFIFO進行比對刪除。
ListWatch
Lister用於獲取某個資源(如Pod)的全量,Watcher用於獲取某個資源的增量變化。實際使用中Lister和Watcher都從apiServer獲取資源信息,Lister通常用於首次獲取某資源的全量信息,而Watcher用於持續獲取該資源的增量變化信息。Lister和Watcher的接口定義以下,使用NewListWatchFromClient函數來初始化ListerWatcher
// client-go/tools/cache/listwatch.go type Lister interface { // List should return a list type object; the Items field will be extracted, and the // ResourceVersion field will be used to start the watch in the right place. List(options metav1.ListOptions) (runtime.Object, error) } // Watcher is any object that knows how to start a watch on a resource. type Watcher interface { // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error) } // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. type ListerWatcher interface { Lister Watcher }
在workqueue的例子中能夠看到調用NewListWatchFromClient的地方,該例子會從clientset.CoreV1().RESTClient()獲取"pods"的相關信息。
// client-go/examples/workqueue/main.go // create the pod watcher podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
除了能夠從CoreV1版本的API group獲取RESTClient信息外,還能夠從下面Clientset結構體定義的API group中獲取信息
// client-go/kubernetes/clientset.go type Clientset struct { *discovery.DiscoveryClient admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client appsV1 *appsv1.AppsV1Client appsV1beta1 *appsv1beta1.AppsV1beta1Client appsV1beta2 *appsv1beta2.AppsV1beta2Client auditregistrationV1alpha1 *auditregistrationv1alpha1.AuditregistrationV1alpha1Client authenticationV1 *authenticationv1.AuthenticationV1Client authenticationV1beta1 *authenticationv1beta1.AuthenticationV1beta1Client authorizationV1 *authorizationv1.AuthorizationV1Client authorizationV1beta1 *authorizationv1beta1.AuthorizationV1beta1Client autoscalingV1 *autoscalingv1.AutoscalingV1Client autoscalingV2beta1 *autoscalingv2beta1.AutoscalingV2beta1Client autoscalingV2beta2 *autoscalingv2beta2.AutoscalingV2beta2Client batchV1 *batchv1.BatchV1Client batchV1beta1 *batchv1beta1.BatchV1beta1Client batchV2alpha1 *batchv2alpha1.BatchV2alpha1Client certificatesV1beta1 *certificatesv1beta1.CertificatesV1beta1Client coordinationV1beta1 *coordinationv1beta1.CoordinationV1beta1Client coordinationV1 *coordinationv1.CoordinationV1Client coreV1 *corev1.CoreV1Client eventsV1beta1 *eventsv1beta1.EventsV1beta1Client extensionsV1beta1 *extensionsv1beta1.ExtensionsV1beta1Client networkingV1 *networkingv1.NetworkingV1Client networkingV1beta1 *networkingv1beta1.NetworkingV1beta1Client nodeV1alpha1 *nodev1alpha1.NodeV1alpha1Client nodeV1beta1 *nodev1beta1.NodeV1beta1Client policyV1beta1 *policyv1beta1.PolicyV1beta1Client rbacV1 *rbacv1.RbacV1Client rbacV1beta1 *rbacv1beta1.RbacV1beta1Client rbacV1alpha1 *rbacv1alpha1.RbacV1alpha1Client schedulingV1alpha1 *schedulingv1alpha1.SchedulingV1alpha1Client schedulingV1beta1 *schedulingv1beta1.SchedulingV1beta1Client schedulingV1 *schedulingv1.SchedulingV1Client settingsV1alpha1 *settingsv1alpha1.SettingsV1alpha1Client storageV1beta1 *storagev1beta1.StorageV1beta1Client storageV1 *storagev1.StorageV1Client storageV1alpha1 *storagev1alpha1.StorageV1alpha1Client }
RESTClient()的返回值爲Interface接口類型,該類型中包含以下對資源的操做方法,如Get()就封裝了HTTP的Get方法。NewListWatchFromClient初始化ListWatch的時候使用了Get方法
// client-go/rest/client.go type Interface interface { GetRateLimiter() flowcontrol.RateLimiter Verb(verb string) *Request Post() *Request Put() *Request Patch(pt types.PatchType) *Request Get() *Request Delete() *Request APIVersion() schema.GroupVersion }
Reflector
reflector使用listerWatcher獲取資源,並將其保存在store中,此處的store就是DeltaFIFO,Reflector核心處理函數爲ListAndWatch(client-go/tools/cache/reflector.go)
// client-go/tools/cache/reflector.go type Reflector struct { // name identifies this reflector. By default it will be a file:line if possible. name string // metrics tracks basic metric information about the reflector metrics *reflectorMetrics // The type of object we expect to place in the store. expectedType reflect.Type // The destination to sync up with the watch source store Store // listerWatcher is used to perform lists and watches. listerWatcher ListerWatcher // period controls timing between one watch ending and // the beginning of the next one. period time.Duration resyncPeriod time.Duration ShouldResync func() bool // clock allows tests to manipulate time clock clock.Clock // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store lastSyncResourceVersion string // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. // Defaults to pager.PageSize. WatchListPageSize int64 }
ListAndWatch在Reflector.Run函數中啓動,並以Reflector.period週期性進行調度。ListAndWatch使用resourceVersion來獲取資源的增量變化:在List時會獲取資源的首個resourceVersion值,在Watch的時候會使用List獲取的resourceVersion來獲取資源的增量變化,而後將獲取到的資源的resourceVersion保存起來,最後下一次Watch的基線。
// client-go/tools/cache/reflector.go func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name) wait.Until(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) }
如可使用以下命令獲取Pod的resourceVersion
# oc get pod $PodName -oyaml|grep resourceVersion: resourceVersion: "4993804"
Controller
controller的結構以下,其包含一個配置變量config,在註釋中能夠看到Config.Queue就是DeltaFIFO。controller定義瞭如何調度Reflector。
// client-go/tools/cache/controller.go type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock }
// client-go/tools/cache/controller.go type Config struct { // The queue for your objects - has to be a DeltaFIFO due to // assumptions in the implementation. Your Process() function // should accept the output of this Queue's Pop() method. Queue // Something that can list and watch your objects. ListerWatcher // Something that can process your objects. Process ProcessFunc // The type of your objects. ObjectType runtime.Object // Reprocess everything at least this often. // Note that if it takes longer for you to clear the queue than this // period, you will end up processing items in the order determined // by FIFO.Replace(). Currently, this is random. If this is a // problem, we can change that replacement policy to append new // things to the end of the queue instead of replacing the entire // queue. FullResyncPeriod time.Duration // ShouldResync, if specified, is invoked when the controller's reflector determines the next // periodic sync should occur. If this returns true, it means the reflector should proceed with // the resync. ShouldResync ShouldResyncFunc // If true, when Process() returns an error, re-enqueue the object. // TODO: add interface to let you inject a delay/backoff or drop // the object completely if desired. Pass the object in // question to this interface as a parameter. RetryOnError bool }
controller的框架比較簡單它使用wg.StartWithChannel啓動Reflector.Run,至關於啓動了一個DeltaFIFO的生產者(wg.StartWithChannel(stopCh, r.Run)表示能夠將r.Run放在獨立的協程運行,並可使用stopCh來中止r.Run);使用wait.Until來啓動一個消費者(wait.Until(c.processLoop, time.Second, stopCh)表示每秒會觸發一次c.processLoop,但若是c.processLoop在1秒以內沒有結束,則運行c.processLoop繼續運行,不會結束其運行狀態)
// client-go/tools/cache/controller.go func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) }
processLoop的框架也很簡單,它運行了DeltaFIFO.Pop函數,用於消費DeltaFIFO中的對象,並在DeltaFIFO.Pop運行失敗後可能從新處理該對象(AddIfNotPresent)
注:c.config.RetryOnError在目前版本中初始化爲False
// client-go/tools/cache/controller.go func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == FIFOClosedError { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
//client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, }
...
ShareInformer
下圖爲SharedInformer的運行圖。能夠看出SharedInformer啓動了controller,reflector,並將其與Indexer結合起來。
注:不一樣顏色表示不一樣的chan,相同顏色表示在同一個chan中的處理
SharedInformer.Run啓動了兩個chan,s.c.Run爲controller的入口,s.c.Run函數中會Pop DeltaFIFO中的元素,並根據DeltaFIFO的元素的類型(Sync/Added/Updated/Deleted)進兩類處理,一類會使用indexer.Update,indexer,Add,indexer.Delete對保存的在Store中的數據進行處理;另外一類會根據DeltaFIFO的元素的類型將其封裝爲sharedInformer內部類型updateNotification,addNotification,deleteNotification,傳遞給s.processor.Listeners.addCh,後續給註冊的pl.handler處理。
s.processor.run主要用於處理註冊的handler,processorListener.run函數接受processorListener.nextCh中的值,將其做爲參數傳遞給handler進行處理。而processorListener.pop負責將processorListener.addCh中的元素緩存到p.pendingNotifications,並讀取p.pendingNotifications中的元素,將其傳遞到processorListener.nextCh。即processorListener.pop負責管理數據,processorListener.run負責使用processorListener.pop管理的數據進行處理。
// client-go/tools/cache/controller.go type ResourceEventHandler interface { OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) OnDelete(obj interface{}) }
sharedIndexInformer有3個狀態:啓動前,啓動後,中止後,由started, stopped兩個bool值表示。
stopped=true表示inforer再也不運做且不能添加新的handler(由於即便添加了也不會運行)
informer啓動前和中止後容許添加新的indexer(sharedIndexInformer.AddIndexers),但不能在informer運行時添加,由於此時須要經過watchlist以及handler等一系列處理來操做sharedIndexInformer.inxder。若是容許同時使用sharedIndexInformer.AddIndexers,可能會形成數據不一致。
還有一個狀態sharedProcessor.listenersStarted,用於表示是否全部的s.processor.Listeners都已經啓動,若是已經啓動,則在添加新的processorListener時,須要運行新添加的processorListener,不然僅僅添加便可(添加後一樣會被sharedProcessor.run調度)
// client-go/tools/cache/shared_informer.go type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector CacheMutationDetector // This block is tracked to handle late initialization of the controller listerWatcher ListerWatcher objectType runtime.Object // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call // shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default // value). defaultEventHandlerResyncPeriod time.Duration // clock allows for testability clock clock.Clock started, stopped bool startedLock sync.Mutex // blockDeltas gives a way to stop all event distribution so that a late event handler // can safely join the shared informer. blockDeltas sync.Mutex }
SharedInformerFactory
sharedInformerFactory接口的內容以下,它按照group和version對informer進行了分類。
// client-go/informers/factory.go type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Admissionregistration() admissionregistration.Interface Apps() apps.Interface Auditregistration() auditregistration.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface Events() events.Interface Extensions() extensions.Interface Networking() networking.Interface Node() node.Interface Policy() policy.Interface Rbac() rbac.Interface Scheduling() scheduling.Interface Settings() settings.Interface Storage() storage.Interface }
注:下圖來自https://blog.csdn.net/weixin_42663840/article/details/81980022
sharedInformerFactory負責在不一樣的chan中啓動不一樣的informer(或shared_informer)
// client-go/informers/factory.go func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
那sharedInformerFactory啓動的informer又是怎麼註冊到sharedInformerFactory.informers中的呢?informer的註冊函數統一爲InformerFor,代碼以下,全部類型的informer都會調用該函數註冊到sharedInformerFactory
// client-go/informers/factory.go
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer }
下面以(Core,v1,podInformer)爲例結合client-go中提供的代碼進行講解。代碼以下,在調用informers.Core().V1().Pods().Informer()的時候會同時調用informers.InformerFor註冊到sharedInformerFactory,後續直接調用informers.Start啓動註冊的informer。
// client-go/examples/fake-client/main_test.go func TestFakeClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Create the fake client. client := fake.NewSimpleClientset() // We will create an informer that writes added pods to a channel. pods := make(chan *v1.Pod, 1) informers := informers.NewSharedInformerFactory(client, 0) //建立一個新的shareInformerFactory podInformer := informers.Core().V1().Pods().Informer() //建立一個podInformer,並調用InformerFor函數進行註冊 podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*v1.Pod) t.Logf("pod added: %s/%s", pod.Namespace, pod.Name) pods <- pod }, }) // Make sure informers are running. informers.Start(ctx.Done()) //啓動全部的informer
...
workqueue
indexer用於保存apiserver的資源信息,而workqueue用於保存informer中的handler處理以後的數據。workqueue的接口定義以下:
// client-go/util/workqueue/queue.go type Interface interface { Add(item interface{}) Len() int Get() (item interface{}, shutdown bool) Done(item interface{}) ShutDown() ShuttingDown() bool }
參見上圖能夠看到真正處理的元素來自queue,dirty和queue中的元素可能不一致,不一致點來自於當Get一個元素後且Done執行前,此時Get操做會刪除dirty中的該元素,若是此時發生了Add正在處理的元素的操做,因爲此時dirty中沒有該元素且processing中存在該元素,會發生dirty中的元素大於queue中元素的狀況。但對某一元素的不一致會在Done完成後消除,即Done函數中會判斷該元素是否在dirty中,若是存在則會將該元素append到queue中。總之,dirty中的數據都會被append到queue中,後續queue中的數據會insert到processing中進行處理()
dType實現了Interface接口。包含下面幾個變量:
// client-go/util/workqueue/queue.go // Type is a work queue (see the package comment). type Type struct { // queue defines the order in which we will work on items. Every // element of queue should be in the dirty set and not in the // processing set. queue []t // dirty defines all of the items that need to be processed. dirty set // Things that are currently being processed are in the processing set. // These things may be simultaneously in the dirty set. When we finish // processing something and remove it from this set, we'll check if // it's in the dirty set, and if so, add it to the queue. processing set cond *sync.Cond shuttingDown bool metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration clock clock.Clock }
workqueue的使用例子能夠參見client-go/util/workqueue/queue_test.go
延時隊列
延時隊列接口繼承了queue的Interface接口,僅新增了一個AddAfter方法,它用於在duration時間以後將元素添加到queue中。
// client-go/util/workqueue/delaying_queue.go type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration) }
delayingType實現了DelayingInterface接口使用waitingForAddCh來傳遞須要添加到queue的元素,
// client-go/util/workqueue/delaying_queue.go type delayingType struct { Interface // clock tracks time for delayed firing clock clock.Clock // stopCh lets us signal a shutdown to the waiting loop stopCh chan struct{} // stopOnce guarantees we only signal shutdown a single time stopOnce sync.Once // heartbeat ensures we wait no more than maxWait before firing heartbeat clock.Ticker // waitingForAddCh is a buffered channel that feeds waitingForAdd waitingForAddCh chan *waitFor // metrics counts the number of retries metrics retryMetrics deprecatedMetrics retryMetrics }
delayingType.waitingForAddCh中的元素若是沒有超過延時時間會添加到waitForPriorityQueue中,不然直接加入queue中。
// client-go/util/workqueue/delaying_queue.go
type waitForPriorityQueue []*waitFor
延時隊列實現邏輯比較簡單,須要注意的是waitingForQueue是以heap方式實現的隊列,隊列的pop和push等操做使用的是heap.pop和heap.push
限速隊列
限速隊列實現了3個接口,When用於返回元素的重試時間,Forget用於清除元素的重試記錄,NumRequeues返回元素的重試次數
//client-go/util/workqueue/default_rate_limiter.go type RateLimiter interface { // When gets an item and gets to decide how long that item should wait When(item interface{}) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing // or for success, we'll stop tracking it Forget(item interface{}) // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int }
ItemExponentialFailureRateLimiter對使用指數退避的方式進行失敗重試,當failures增長時,下次重試的時間就變爲了baseDelay.Nanoseconds()) * math.Pow(2, float64(exp),maxDelay用於限制重試時間的最大值,當計算的重試時間超過maxDelay時則採用maxDelay
// client-go/util/workqueue/default_rate_limiters.go type ItemExponentialFailureRateLimiter struct { failuresLock sync.Mutex failures map[interface{}]int baseDelay time.Duration maxDelay time.Duration }
ItemFastSlowRateLimiter針對失敗次數採用不一樣的重試時間。當重試次數小於maxFastAttempts時,重試時間爲fastDelay,不然我爲slowDelay。
// client-go/util/workqueue/default_rate_limiters.go type ItemFastSlowRateLimiter struct { failuresLock sync.Mutex failures map[interface{}]int maxFastAttempts int fastDelay time.Duration slowDelay time.Duration }
MaxOfRateLimiter爲一個限速隊列列表,它的實現中返回列表中重試時間最長的限速隊列的值。
// client-go/util/workqueue/default_rate_limiters.go type MaxOfRateLimiter struct { limiters []RateLimiter }
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration { ret := time.Duration(0) for _, limiter := range r.limiters { curr := limiter.When(item) if curr > ret { ret = curr } } return ret }
BucketRateLimiter
使用令牌桶實現一個固定速率的限速器
// client-go/util/workqueue/default_rate_limiters.go type BucketRateLimiter struct { *rate.Limiter }
限速隊列的調用
全部的限速隊列實際上就是根據不一樣的需求,最終提供一個延時時間,在延時時間到後經過AddAfter函數將元素添加添加到隊列中。在queue.go中給出了workqueue的基本框架,delaying_queue.go擴展了workqueue的功能,提供了限速的功能,而default_rate_limiters.go提供了多種限速隊列,用於給delaying_queue.go中的AddAfter提供延時參數,最後rate_limiting_queue.go給出了使用使用限速隊列的入口。
RateLimitingInterface爲限速隊列入口,AddRateLimited
// client-g0/util/workqueue/rate_limiting_queue.go type RateLimitingInterface interface { DelayingInterface // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok AddRateLimited(item interface{}) // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int }
rateLimitingType實現了RateLimitingInterface接口,第二個參數就時限速隊列接口。
// client-g0/util/workqueue/rate_limiting_queue.go type rateLimitingType struct { DelayingInterface rateLimiter RateLimiter }
下面是限速隊列的使用:
// client-go/util/workqueue/rate_limiting_queue_test.go func TestRateLimitingQueue(t *testing.T) { limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second) queue := NewRateLimitingQueue(limiter).(*rateLimitingType) fakeClock := clock.NewFakeClock(time.Now()) delayingQueue := &delayingType{ Interface: New(), clock: fakeClock, heartbeat: fakeClock.NewTicker(maxWait), stopCh: make(chan struct{}), waitingForAddCh: make(chan *waitFor, 1000), metrics: newRetryMetrics(""), deprecatedMetrics: newDeprecatedRetryMetrics(""), } queue.DelayingInterface = delayingQueue queue.AddRateLimited("one") waitEntry := <-delayingQueue.waitingForAddCh if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a { t.Errorf("expected %v, got %v", e, a) } queue.Forget("one") if e, a := 0, queue.NumRequeues("one"); e != a { t.Errorf("expected %v, got %v", e, a) } }
PS:後續會使用client-go編寫簡單程序
TIPS:
參考:
https://rancher.com/using-kubernetes-api-go-kubecon-2017-session-recap/
https://kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/
https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/