Kubernetes代碼解讀-apiserver之list-watch

list-watch,做爲k8s系統中統一的異步消息傳遞方式,對系統的性能、數據一致性起到關鍵性的做用。今天我想從代碼這邊探究一下list-watch的實現方式。並看是否能在後面的工做中優化這個過程。

list-watch的需求node

Kubernetes代碼解讀-apiserver之list-watchKubernetes代碼解讀-apiserver之list-watch

上圖是一個典型的Pod建立過程,在這個過程當中,每次當kubectl建立了ReplicaSet對象後,controller-manager都是經過list-watch這種方式獲得了最新的ReplicaSet對象,並執行本身的邏輯來建立Pod對象。其餘的幾個組件,Scheduler/Kubelet也是同樣,經過list-watch得知變化並進行處理。這是組件的處理端代碼:api

go
c.NodeLister.Store, c.nodePopulator = framework.NewInformer(
    c.createNodeLW(),                                            ...(1)
    &api.Node{},                                                 ...(2)
    0,                                                           ...(3)
    framework.ResourceEventHandlerFuncs{                         ...(4)
        AddFunc:    c.addNodeToCache,                            ...(5)
        UpdateFunc: c.updateNodeInCache,
        DeleteFunc: c.deleteNodeFromCache,
    },
)

其中(1)是list-watch函數,(4)(5)則是相應事件觸發操做的入口。緩存

list-watch操做須要作這麼幾件事安全

由組件向apiserver而不是etcd發起watch請求,在組件啓動時就進行訂閱,告訴apiserver須要知道什麼數據發生變化。Watch是一個典型的發佈-訂閱模式。restful

組件向apiserver發起的watch請求是能夠帶條件的,例如,scheduler想要watch的是全部未被調度的Pod,也就是知足Pod.destNode=」」的Pod來進行調度操做;而kubelet只關心本身節點上的Pod列表。apiserver向etcd發起的watch是沒有條件的,只能知道某個數據發生了變化或建立、刪除,但不能過濾具體的值。也就是說對象數據的條件過濾必須在apiserver端而不是etcd端完成。數據結構

list是watch失敗,數據太過陳舊後的彌補手段,這方面詳見 基於list-watch的Kubernetes異步事件處理框架詳解-客戶端部分。list自己是一個簡單的列表操做,和其它apiserver的增刪改操做同樣,再也不多描述細節。app

watch的API處理框架

既然watch自己是一個apiserver提供的http restful的API,那麼就按照API的方式去閱讀它的代碼,按照apiserver的基礎功能實現一文所描述,咱們來看它的代碼,異步

關鍵的處理API註冊代碼pkg/apiserver/api_installer.go函數

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage,...

...
lister, isLister := storage.(rest.Lister)
watcher, isWatcher := storage.(rest.Watcher)                     ...(1)
...    
    case "LIST": // List all resources of a kind.                ...(2)
        doc := "list objects of kind " + kind
        if hasSubresource {
            doc = "list " + subresource + " of objects of kind " + kind
        }
        handler := metrics.InstrumentRouteFunc(action.Verb, resource, ListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) ...(3)

一個rest.Storage對象會被轉換爲watcher和lister對象,提供list和watch服務的入口是同一個,在API接口中是經過 GET /pods?watch=true 這種方式來區分是list仍是watch,API處理函數是由lister和watcher通過ListResource()合體後完成的。

那麼就看看ListResource()的具體實現吧,/pkg/apiserver/resthandler.go

func ListResource(r rest.Lister, rw rest.Watcher,... {
...
    if (opts.Watch || forceWatch) && rw != nil {
        watcher, err := rw.Watch(ctx, &opts)           ...(1)
        ....
        serveWatch(watcher, scope, req, res, timeout)
        return
    }
    result, err := r.List(ctx, &opts)                  ...(2)           
    write(http.StatusOK, scope.Kind.GroupVersion(), scope.Serializer, result, w, req.Request)

每次有一個watch的url請求過來,都會調用rw.Watch()建立一個watcher,好吧這裏的名字和上面那一層的名字重複了,但咱們能夠區分開,而後使用serveWatch()來處理這個請求。watcher的生命週期是每一個http請求的,這一點很是重要。list在這裏是另一個分支,和watch分別處理,能夠忽略。

響應http請求的過程serveWatch()的代碼在/pkg/apiserver/watch.go裏面

func serveWatch(watcher watch.Interface... {
server.ServeHTTP(res.ResponseWriter, req.Request)
}

func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for {
    select {
    case event, ok := < -s.watching.ResultChan():

        obj := event.Object

        if err := s.embeddedEncoder.EncodeToStream(obj, buf); 
...
}

這段的操做基本毫無技術含量,就是從watcher的結果channel中讀取一個event對象,而後持續不斷的編碼寫入到http response的流當中。

這是整個過程的圖形化描述:
Kubernetes代碼解讀-apiserver之list-watchKubernetes代碼解讀-apiserver之list-watch

因此,咱們的問題就回到了

watcher這個對象,嚴格來講是watch.Interface的對象,位置在pkg/watch/watch.go中,是怎麼被建立出來的?

這個watcher對象是怎麼從etcd中得到變化的數據的?又是怎麼過濾條件的?

在代碼迷宮中追尋watcher

回到上面的代碼追蹤過程來看,watcher(watch.Interface)對象是被Rest.Storage對象建立出來的。Rest.Storage分兩層,一層是每一個對象本身的邏輯,另外一層則是經過通用的操做來搞定,像watch這樣的操做應該是通用的,因此咱們看這個源代碼

/pkg/registry/generic/registry/store.go
func (e *Store) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) {
...
return e.WatchPredicate(ctx, e.PredicateFunc(label, field), resourceVersion)
}

func (e *Store) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {

        return e.Storage.Watch(ctx, key, resourceVersion, filterFunc)   ...(1)

return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc)
}

果真,咱們在(1)這裏找到了生成Watch的函數,但這個工做是由e.Storage來完成的,因此咱們須要找一個具體的Storage的生成過程,以Pod爲例子

/pkg/registry/pod/etcd/etcd.go
func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
prefix := "/pods"

storageInterface := opts.Decorator(
    opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc)                                        ...(1)

store := ®istry.Store{
    ...
    Storage: storageInterface,                                        ...(2)
}
return PodStorage{
    Pod:         &REST{store, proxyTransport},                        ...(3)

這(1)就是Storage的生成現場,傳入的參數包括了一個緩存Pod的數量。(2)(3)是和上面代碼的鏈接點。那麼如今問題就轉化爲追尋Decorator這個東西具體是怎麼生成的,須要重複剛纔的過程,往上搜索opts是怎麼搞進來的。

/pkg/master/master.go - GetRESTOptionsOrDie()

/pkg/genericapiserver/genericapiserver.go - StorageDecorator()

/pkg/registry/generic/registry/storage_factory.go - StorageWithCacher()

/pkg/storage/cacher.go

OK,這樣咱們就來到正題,一個具體的watch緩存的實現了!

把上面這個過程用一幅圖表示:
Kubernetes代碼解讀-apiserver之list-watchKubernetes代碼解讀-apiserver之list-watch

watch緩存的具體實現

看代碼,首要看的是數據結構,以及考慮這個數據結構和須要解決的問題之間的關係。

3.1 Cacher(pkg/storage/cacher.go)
對於cacher這結構來講,咱們從外看需求,能夠知道這是一個Storage,用於提供某個類型的數據,例如Pod的增刪改查請求,同時它又用於watch,用於在client端須要對某個key的變化感興趣時,建立一個watcher來源源不斷的提供新的數據給客戶端。

那麼cacher是怎麼知足這些需求的呢?答案就在它的結構裏面:

type Cacher struct {
// Underlying storage.Interface.
storage Interface

// "sliding window" of recent changes of objects and the current state.
watchCache *watchCache
reflector  *cache.Reflector

// Registered watchers.
watcherIdx int
watchers   map[int]*cacheWatcher
}

略去裏面的鎖(在看代碼的時候一開始要忽略鎖的存在,鎖是後期爲了不破壞數據再加上去的,不影響數據流),略去裏面的一些非關鍵的成員,如今咱們剩下這3段重要的成員,其中

storage是鏈接etcd的,也就是背後的裸存儲
watchCache並不只僅是和註釋裏面說的那樣,是個滑動窗口,裏面存儲了全部數據+滑動窗口
watchers這是爲每一個請求建立的struct,每一個watch的client上來後都會被建立一個,因此這裏有個map
固然,這3個成員的做用是我看了全部代碼後,總結出來的,一開始讀代碼時不妨先在腦子裏面有個定位,而後在看下面的方法時不斷修正這個定位。那麼,接下來就看看具體的方法是怎麼讓數據在這些結構裏面流動的吧!

初始化方法

func NewCacherFromConfig(config CacherConfig) *Cacher { 
...
                cacher.startCaching(stopCh)
}

func (c *Cacher) startCaching(stopChannel < -chan struct{}) {
...
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
    glog.Errorf("unexpected ListAndWatch error: %v", err)
}
}

其餘的部分都是陳詞濫調,只有startCaching()這段有點意思,這裏啓動一個go協程,最後啓動了c.reflector.ListAndWatch()這個方法,若是對k8s的基本有了解的話,這個其實就是一個把遠端數據源源不斷的同步到本地的方法,那麼數據落在什麼地方呢?往上看能夠看到

reflector:  cache.NewReflector(listerWatcher, config.Type, watchCache, 0),

也就是說從建立cacher的實例開始,就會從etcd中把全部Pod的數據同步到watchCache裏面來。這也就印證了watchCache是數據從etcd過來的第一站。
Kubernetes代碼解讀-apiserver之list-watchKubernetes代碼解讀-apiserver之list-watch

增刪改方法

func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}

大部分方法都很無聊,就是短路到底層的storage直接執行。

Watch方法
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {

initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)

watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
c.watchers[c.watcherIdx] = watcher
c.watcherIdx++
return watcher, nil
}

這裏的邏輯就比較清晰,首先從watchCache中拿到從某個resourceVersion以來的全部數據——initEvents,而後用這個數據建立了一個watcher返回出去爲某個客戶端提供服務。

List方法
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {

filterFunc := filterFunction(key, c.keyFunc, filter)

objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
if err != nil {
    return fmt.Errorf("failed to wait for fresh list: %v", err)
}
for _, obj := range objs {
    if filterFunc(object) {
        listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
    }
}

}

從這段代碼中咱們能夠看出2件事,一是list的數據都是從watchCache中獲取的,二是獲取後經過filterFunc過濾了一遍而後返回出去。
Kubernetes代碼解讀-apiserver之list-watchKubernetes代碼解讀-apiserver之list-watch

3.2 WatchCache(pkg/storage/watch_cache.go)
這個結構應該是緩存的核心結構,從上一層的代碼分析中咱們已經知道了對這個結構的需求,包括存儲全部這個類型的數據,包括當有新的數據過來時把數據扔到cacheWatcher裏面去,總之,提供List和Watch兩大輸出。

type watchCache struct {
// cache is used a cyclic buffer - its first element (with the smallest
// resourceVersion) is defined by startIndex, its last element is defined
// by endIndex (if cache is full it will be startIndex + capacity).
// Both startIndex and endIndex can be greater than buffer capacity -
// you should always apply modulo capacity to get an index in cache array.
cache      []watchCacheElement
startIndex int
endIndex   int

// store will effectively support LIST operation from the "end of cache
// history" i.e. from the moment just after the newest cached watched event.
// It is necessary to effectively allow clients to start watching at now.
store cache.Store
}

這裏的關鍵數據結構依然是2個

cache 環形隊列,存儲有限個數的最新數據

store 底層其實是個線程安全的hashMap,存儲全量數據

那麼繼續看看方法是怎麼運轉的吧~

增刪改方法

func (w *watchCache) Update(obj interface{}) error {
event := watch.Event{Type: watch.Modified, Object: object}
f := func(obj runtime.Object) error { return w.store.Update(obj) }
return w.processEvent(event, resourceVersion, f)
}


func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(runtime.Object) error) error {

previous, exists, err := w.store.Get(event.Object)
watchCacheEvent := watchCacheEvent{event.Type, event.Object, prevObject, resourceVersion}
    w.onEvent(watchCacheEvent)
w.updateCache(resourceVersion, watchCacheEvent)

}

// Assumes that lock is already held for write.
func (w *watchCache) updateCache(resourceVersion uint64, event watchCacheEvent) {
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.endIndex++
}

全部的增刪改方法作的事情都差很少,就是在store裏面存具體的數據,而後調用processEvent()去增長環形隊列裏面的數據,若是詳細看一下onEvent的操做,就會發現這個操做的本質是落在cacher.go裏面:

func (c *Cacher) processEvent(event watchCacheEvent) { for _, watcher := range c.watchers {

watcher.add(event)

}

}

往全部的watcher裏面挨個添加數據。整體來講,咱們能夠從上面的代碼中得出一個結論:cache裏面存儲的是Event,也就是有prevObject的,對於全部操做都會在cache裏面保存,但對於store來講,只存儲當下的數據,刪了就刪了,改了就改了。
Kubernetes代碼解讀-apiserver之list-watchKubernetes代碼解讀-apiserver之list-watch

WaitUntilFreshAndList()

這裏原本應該討論List()方法的,但在cacher裏面的List()實際上使用的是這個,因此咱們看這個方法。

func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
startTime := w.clock.Now()
go func() {
    w.cond.Broadcast()
}()

for w.resourceVersion < resourceVersion {
    w.cond.Wait()
}
return w.store.List(), w.resourceVersion, nil
}

這個方法比較繞,前面使用了一堆cond通知來和其餘協程通訊,最後仍是調用了store.List()把數據返回出去。後面來具體分析這裏的協調機制。

GetAllEventsSinceThreadUnsafe()
這個方法在cacher的建立cacheWatcher裏面使用,把當前store裏面的全部數據都搞出來,而後把store裏面的數據都轉換爲AddEvent,配上cache裏面的Event,所有返回出去。

3.3 CacheWatcher(pkg/storage/cacher.go)
這個結構是每一個watch的client都會擁有一個的,從上面的分析中咱們也能得出這個結構的需求,就是從watchCache裏面搞一些數據,而後寫到客戶端那邊。

// cacherWatch implements watch.Interface
type cacheWatcher struct {
sync.Mutex
input   chan watchCacheEvent
result  chan watch.Event
filter  FilterFunc
stopped bool
forget  func(bool)
}

這段代碼比較簡單,就不去分析方法了,簡單說就是數據在增長的時候放到input這個channel裏面去,經過filter而後輸出到result這個channel裏面去。

結語

把數據結構和需求對比着看,碰到邏輯複雜的畫個圖來進行記憶,在分析的時候把想到的問題記錄下來,而後在後面專門去考慮,這裏我看完代碼後有這些問題:

這個cache機制是list-watch操做中最短的板嗎?在實際生產中,對這List和Wath的使用頻率和方式是怎麼樣的?顯然這二者存在競爭關係,目前的數據結構是不是最優的?還有更好的方式嗎?須要一個單元測試來對性能進行測試,而後做爲調優的基礎,etcd v3的一些代碼對咱們的機制有什麼影響?這個目錄在/pkg/storage/etcd3裏面。

相關文章
相關標籤/搜索