Kubernetes v1.5.0node
k8s的各個組件與apiServer交互操做各類資源對象,最終都會落入到etcd中。
k8s爲全部對外提供服務的Restful資源實現了一套通用的符合Restful要求的etcd操做接口,每一個服務接口負責處理一類(Kind)資源對象。
這些資源對象包括pods、bindings、podTemplates、RC、Services等。後端
要了解etcd操做接口的實現,咱們先須要瞭解下Master.GenericAPIServer.storage結構:api
storage map[string]rest.Storage
該storage變量是個map,Key是REST API的path,Value是rest.Storage接口,該接口就是一個通用的符合Restful要求的資源存儲接口。
核心組資源列表的建立要查看pkg/registry/core/rest/storage_core.go中的NewLegacyRESTStorage()接口:
接口調用流程: main --> App.Run --> config.Complete().New() --> m.InstallLegacyAPI() --> NewLegacyRESTStorage()服務器
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { .... // 建立podStorage podStorage := podetcd.NewStorage( restOptionsGetter(api.Resource("pods")), nodeStorage.KubeletConnectionInfo, c.ProxyTransport, podDisruptionClient, ) ... // 資源列表 restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.Binding, "podTemplates": podTemplateStorage, "replicationControllers": controllerStorage.Controller, "replicationControllers/status": controllerStorage.Status, "services": serviceRest.Service, "services/proxy": serviceRest.Proxy, "services/status": serviceStatusStorage, "endpoints": endpointsStorage, ... "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate), } if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) { restStorageMap["replicationControllers/scale"] = controllerStorage.Scale } if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1beta1"}) { restStorageMap["pods/eviction"] = podStorage.Eviction } apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap return restStorage, apiGroupInfo, nil }
該接口在ApiServer源碼分析的第二章介紹資源註冊的時候已經講過,這裏咱們主要分析後端存儲etcd操做接口的實現。
咱們以Pod資源爲例,進行介紹:
路徑: pkg/registry/core/pod/etcd/etcd.goide
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage { // 完成prefix prefix := "/" + opts.ResourcePrefix newListFunc := func() runtime.Object { return &api.PodList{} } // 調用接口裝飾器,返回該storage的etcd操做接口及資源delete接口 // 該opts傳參進來的,須要到上一層查看master.go下的restOptionsFactory.NewFor storageInterface, dFunc := opts.Decorator( opts.StorageConfig, // 這一下的參數都是用於開啓cache時的接口使用 cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc, pod.NodeNameTriggerFunc, ) // 建立Store對象 store := ®istry.Store{ NewFunc: func() runtime.Object { return &api.Pod{} }, NewListFunc: newListFunc, KeyRootFunc: func(ctx api.Context) string { return registry.NamespaceKeyRootFunc(ctx, prefix) }, KeyFunc: func(ctx api.Context, name string) (string, error) { return registry.NamespaceKeyFunc(ctx, prefix, name) }, ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil }, PredicateFunc: pod.MatchPod, QualifiedResource: api.Resource("pods"), EnableGarbageCollection: opts.EnableGarbageCollection, DeleteCollectionWorkers: opts.DeleteCollectionWorkers, CreateStrategy: pod.Strategy, UpdateStrategy: pod.Strategy, DeleteStrategy: pod.Strategy, ReturnDeletedObject: true, Storage: storageInterface, DestroyFunc: dFunc, } statusStore := *store statusStore.UpdateStrategy = pod.StatusStrategy return PodStorage{ Pod: &REST{store, proxyTransport}, Binding: &BindingREST{store: store}, Eviction: newEvictionStorage(store, podDisruptionBudgetClient), Status: &StatusREST{store: &statusStore}, Log: &podrest.LogREST{Store: store, KubeletConn: k}, Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport}, Exec: &podrest.ExecREST{Store: store, KubeletConn: k}, Attach: &podrest.AttachREST{Store: store, KubeletConn: k}, PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k}, } }
該接口中調用了opts.Decorator()接口返回了關鍵的storage interface及清除操做資源的接口。
要看該接口的實現,咱們得先從opts的建立開始。
restOptionsGetter(api.Resource("pods"))該步完成了opts的建立,api.Resource("pods")其實就是拼接了一個GroupResource的結構,咱們須要從頭開始介紹restOptionsGetter接口的由來。
路徑:pkg/master/master.go函數
func (c completedConfig) New() (*Master, error) { ... restOptionsFactory := restOptionsFactory{ deleteCollectionWorkers: c.DeleteCollectionWorkers, enableGarbageCollection: c.GenericConfig.EnableGarbageCollection, storageFactory: c.StorageFactory, } // 判斷是否使能了用於Watch的Cache // 有無cache賦值的是不一樣的接口實現 // restOptionsFactory.storageDecorator:是一個各個資源的REST interface(CRUD)裝飾者 // 後面調用NewStorage()時會用到該接口,並輸出對應的CRUD接口及銷燬接口。 // 能夠參考pkg/registry/core/pod/etcd/etcd.go中的NewStorage() // 其實這裏有無cache的接口差別就在於:有cache的話,就提供操做cache的接口;無cache的話,就提供直接操做etcd的接口 if c.EnableWatchCache { restOptionsFactory.storageDecorator = registry.StorageWithCacher } else { restOptionsFactory.storageDecorator = generic.UndecoratedStorage } // install legacy rest storage if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ StorageFactory: c.StorageFactory, ProxyTransport: c.ProxyTransport, KubeletClientConfig: c.KubeletClientConfig, EventTTL: c.EventTTL, ServiceIPRange: c.ServiceIPRange, ServiceNodePortRange: c.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, } m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider) } ... }
該接口初始化了一個restOptionsFactory變量,裏面指定了最大的刪除回收資源的協程數,是否使能GC和storageFactory,還根據是否使能了WatchCache來完成NewStorage()接口中調用的裝飾器接口的賦值。
restOptionsFactory.NewForj接口一直被往下傳,直到NewLegacyRESTStorage()接口中被調用而後建立了opts,咱們看下該接口實現:
路徑: pkg/master/master.gooop
type restOptionsFactory struct { deleteCollectionWorkers int enableGarbageCollection bool storageFactory genericapiserver.StorageFactory storageDecorator generic.StorageDecorator } func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.RESTOptions { // 建立該資源的Storage Config storageConfig, err := f.storageFactory.NewConfig(resource) if err != nil { glog.Fatalf("Unable to find storage destination for %v, due to %v", resource, err.Error()) } // 最終返回的就是RESTOptions, 就是前面的opts的類型 // 須要關注f.storageDecorator的由來 return generic.RESTOptions{ // 用於生成Storage的config StorageConfig: storageConfig, Decorator: f.storageDecorator, DeleteCollectionWorkers: f.deleteCollectionWorkers, EnableGarbageCollection: f.enableGarbageCollection, ResourcePrefix: f.storageFactory.ResourcePrefix(resource), } }
該接口比較簡單,初始化了一個generic.RESTOptions變量,即opts。咱們須要找出opts.Decorator的由來,就只須要看下上一個接口判斷EnableWatchCache時就明白了。
opts.Decorator該接口最終返回了storage的interface和清除操做資源的接口。能夠想一下帶緩衝和不帶緩衝的接口實現確定不一致,因此這裏須要進行區分:源碼分析
registry.StorageWithCacher:該接口是返回了操做cache的接口,和清除cache的操做接口ui
generic.UndecoratedStorage: 該接口會根據你配置的後端類型(etcd2/etcd3等),來返回不一樣的etcd操做接口,實際上是爲全部的資源對象建立了etcd的連接,而後經過該連接發送不一樣的命令,最後還返回了斷開該連接的接口。this
因此實現徹底不同,一個操做cache,一個操做實際的etcd。
先看registry.StorageWithCacher()接口實現:
路徑: pkg/registry/generic/registry/storage_factory.go
func StorageWithCacher( storageConfig *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { // storageConfig是後端存儲的config,定義了存儲類型,存儲服務器List,TLS證書信息,Cache大小等。 // 該接口就是generic.UndecoratedStorage()接口的實現,StorageWithCacher()接口就是多了下面的cacher操做 s, d := generic.NewRawStorage(storageConfig) // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := storage.CacherConfig{ CacheCapacity: capacity, Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, Type: objectType, ResourcePrefix: resourcePrefix, NewListFunc: newListFunc, TriggerPublisherFunc: triggerFunc, Codec: storageConfig.Codec, } // 根據是否有namespace來進行區分賦值 // KeyFunc函數用於獲取該object的Key: // 有namespace的話,key的格式:prefix + "/" + Namespace + "/" + name // 無namespace的話,key的格式:prefix + "/" + name if scopeStrategy.NamespaceScoped() { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(resourcePrefix, obj) } } else { cacherConfig.KeyFunc = func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(resourcePrefix, obj) } } // 根據以前初始化的Cacher的config,進行cacher建立 // 比較關鍵,後面進行介紹 cacher := storage.NewCacherFromConfig(cacherConfig) destroyFunc := func() { cacher.Stop() d() } return cacher, destroyFunc }
先調用NewRawStorage()接口建立了一個存儲後端,咱們先看下這個接口實現:
路徑: pkg/registry/generic/storage_decorator.go
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) { s, d, err := factory.Create(*config) if err != nil { glog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err) } return s, d }
沒啥好說的,繼續看Create():
路徑: pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { // 判斷下存儲類型:etcd2 、etcd3 switch c.Type { case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD2: return newETCD2Storage(c) case storagebackend.StorageTypeETCD3: // TODO: We have the following features to implement: // - Support secure connection by using key, cert, and CA files. // - Honor "https" scheme to support secure connection in gRPC. // - Support non-quorum read. return newETCD3Storage(c) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } }
挑個etcd2看下實現:
路徑: pkg/storage/storagebackend/factory/etcd2.go
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { // 根據配置的TLS證書信息建立http.Transport tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) if err != nil { return nil, nil, err } // 建立etcd2 client,返回的是httpClusterClient結構 client, err := newETCD2Client(tr, c.ServerList) if err != nil { return nil, nil, err } // 根據入參初始化一個實現了storage.Interface接口的etcdHelper變量 s := etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize) // 返回etcdHelper變量,及關閉連接的函數 return s, tr.CloseIdleConnections, nil }
前兩步都是爲了建立與etcd鏈接的client,後一步比較關鍵:
路徑: pkg/storage/etcd/etcd_helper.go
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { return &etcdHelper{ // 建立一個httpMembersAPI變量,附帶不少方法 etcdMembersAPI: etcd.NewMembersAPI(client), // 建立一個httpKeysAPI變量,一樣附帶各種方法 etcdKeysAPI: etcd.NewKeysAPI(client), // 編解碼使用 codec: codec, versioner: APIObjectVersioner{}, // 用於序列化反序列化,版本間轉換,兼容等 copier: api.Scheme, pathPrefix: path.Join("/", prefix), quorum: quorum, // 建立cache結構 cache: utilcache.NewCache(cacheSize), } }
該接口很簡單的初始化,須要關注的是etcdHelper附帶的通用的RESTFul 方法:
能夠看到storage.Interface接口所須要的方法都實現了。
繼續回到StorageWithCacher()接口,在往下走就是CacherConfig的初始化,就不介紹了,直接進入cacher的建立接口:
路徑: pkg/storage/cacher.go
func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) // Give this error when it is constructed rather than when you get the // first watch item, because it's much easier to track down that way. if obj, ok := config.Type.(runtime.Object); ok { if err := runtime.CheckCodec(config.Codec, obj); err != nil { panic("storage codec doesn't seem to match given type: " + err.Error()) } } cacher := &Cacher{ ready: newReady(), storage: config.Storage, objectType: reflect.TypeOf(config.Type), watchCache: watchCache, reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0), versioner: config.Versioner, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, watchers: indexedWatchers{ allWatchers: make(map[int]*cacheWatcher), valueWatchers: make(map[string]watchersMap), }, // TODO: Figure out the correct value for the buffer size. incoming: make(chan watchCacheEvent, 100), // We need to (potentially) stop both: // - wait.Until go-routine // - reflector.ListAndWatch // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. stopCh: make(chan struct{}), } watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() stopCh := cacher.stopCh cacher.stopWg.Add(1) go func() { defer cacher.stopWg.Done() wait.Until( func() { if !cacher.isStopped() { cacher.startCaching(stopCh) } }, time.Second, stopCh, ) }() return cacher }
該接口主要用於開啓cacher,而該cache只用於WATCH和LIST的request。
咱們在看下Cacher結構體:
該接口必然也實現了storage.Interface接口所須要的方法。
由於該Cacher只用於WATCH和LIST的request,因此你能夠看下cacher提供的API,除了WATCH和LIST相關的以外的接口都是調用了以前建立的storage的API。
查看下cacher.Create和Delete:
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { return c.storage.Create(ctx, key, obj, out, ttl) } func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error { return c.storage.Delete(ctx, key, out, preconditions) }
到這裏registry.StorageWithCacher()接口就結束了,咱們繼續回到前面講的另一個接口generic.UndecoratedStorage():
路徑:pkg/registry/generic/storage_decorator.go
func UndecoratedStorage( config *storagebackend.Config, capacity int, objectType runtime.Object, resourcePrefix string, scopeStrategy rest.NamespaceScopedStrategy, newListFunc func() runtime.Object, trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { return NewRawStorage(config) }
發現registry.StorageWithCacher()接口也是調用了NewRawStorage()接口,其實現就少了cache。
這裏接觸到了cache,下節會專門介紹該cache實現。
--watch-cache: 該apiServer的參數默認就是true的,用於打開watch cache
--watch-cache-sizes: 既然有enable cache,那就少不了cache sizes,並且該size能夠指定各種資源所使用的cache size。格式: resource#size
--storage-backend: 後端持久化存儲類型,可選項爲etcd2(默認)、etcd3