kubernetes垃圾回收器GarbageCollector源碼分析(一)

kubernetes版本:1.13.2

背景

因爲operator建立的redis集羣,在kubernetes apiserver重啓後,redis集羣被異常刪除(包括redis exporter statefulset、redis statefulset)。刪除後operator將其重建,從新組建集羣,實例IP發生變動(中間件容器化,咱們開發了固定IP,當statefulset刪除後,IP會被回收),致使建立集羣失敗,最終集羣不可用。html

經屢次復現,apiserver重啓後,經過查詢redis operator日誌,並無發現主動去刪除redis集羣(redis statefulset)、監控實例(redis exporter)。進一步去查看kube-controller-manager的日誌,將其日誌級別設置--v=5,繼續復現,最終在kube-controller-manager日誌中發現以下日誌:
在這裏插入圖片描述
能夠看到是garbage collector觸發刪除操做的。這個問題在apiserver正常的時候是不存在,要想弄其究竟,就得看看kube-controller-manager內置組件garbage collector這個控制器的邏輯。前端

因爲內容偏長,分爲多節來說:
①、monitors做爲生產者將變化的資源放入graphChanges隊列;同時restMapper按期檢測集羣內資源類型,刷新monitors
②、runProcessGraphChangesgraphChanges隊列中取出變化的item,根據狀況放入attemptToDelete隊列;runAttemptToDeleteWorker取出處理垃圾資源;
③、runProcessGraphChangesgraphChanges隊列中取出變化的item,根據狀況放入attemptToOrphan隊列;runAttemptToOrphanWorker取出處理該該孤立的資源;
在這裏插入圖片描述java


正文

想要啓用GC,須要在kube-apiserverkube-controller-manager的啓動參數中都設置--enable-garbage-collectortrue,1.13.2版本中默認開啓GCnode

須要注意:兩組件該參數必須保持同步。python


kube-controller-manager啓動入口,app.NewControllerManagerCommand()中加載controller manager默認啓動參數,建立* cobra.Command對象:mysql

func main() {
        rand.Seed(time.Now().UnixNano())
        //加載controller manager默認啓動參數,建立* cobra.Command對象
        command := app.NewControllerManagerCommand()
        //......省略.......
        //執行cobra.command,並啓動controller-manager
        if err := command.Execute(); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
}

如下代碼處去啓動kube-controller-manager
在這裏插入圖片描述
NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)加載各個控制器的配置:git

//NewKubeControllerManagerOptions使用默認配置建立一個新的KubeControllerManagerOptions
func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
    //加載各個控制器的默認配置
    componentConfig, err := NewDefaultComponentConfig(ports.InsecureKubeControllerManagerPort)
    if err != nil {
        return nil, err
    }

    s := KubeControllerManagerOptions{
        Generic:         cmoptions.NewGenericControllerManagerConfigurationOptions(componentConfig.Generic),
        //.....省略
        GarbageCollectorController: &GarbageCollectorControllerOptions{
            ConcurrentGCSyncs:      componentConfig.GarbageCollectorController.ConcurrentGCSyncs,
            EnableGarbageCollector: componentConfig.GarbageCollectorController.EnableGarbageCollector,
        },
        //.....省略
    }
    //gc忽略的資源對象列表
    gcIgnoredResources := make([]kubectrlmgrconfig.GroupResource, 0, len(garbagecollector.DefaultIgnoredResources()))
    for r := range garbagecollector.DefaultIgnoredResources() {
        gcIgnoredResources = append(gcIgnoredResources, kubectrlmgrconfig.GroupResource{Group: r.Group, Resource: r.Resource})
    }
    s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources
    return &s, nil
}
// NewDefaultComponentConfig返回kube-controller管理器配置對象
func NewDefaultComponentConfig(insecurePort int32) (kubectrlmgrconfig.KubeControllerManagerConfiguration, error) {
    scheme := runtime.NewScheme()
    if err := kubectrlmgrschemev1alpha1.AddToScheme(scheme); err != nil {
        return kubectrlmgrconfig.KubeControllerManagerConfiguration{}, err
    }
    if err := kubectrlmgrconfig.AddToScheme(scheme); err != nil {
        return kubectrlmgrconfig.KubeControllerManagerConfiguration{}, err
    }

    versioned := kubectrlmgrconfigv1alpha1.KubeControllerManagerConfiguration{}
    //加載默認參數
    scheme.Default(&versioned)

    internal := kubectrlmgrconfig.KubeControllerManagerConfiguration{}
    if err := scheme.Convert(&versioned, &internal, nil); err != nil {
        return internal, err
    }
    internal.Generic.Port = insecurePort
    return internal, nil
}
// 根據Object,獲取提供的默認參數
func (s *Scheme) Default(src Object) {
    if fn, ok := s.defaulterFuncs[reflect.TypeOf(src)]; ok {
        fn(src)
    }
}

s.defaulterFuncs類型爲map[reflect.Type]func(interface{}),用於根據指針類型獲取默認值函數。該map中的數據從哪裏來的呢?面試

代碼位於srck8s.iokubernetespkgcontrollerapisconfigv1alpha1zz_generated.defaults.go
在這裏插入圖片描述
能夠看到默認參數中garbage collector中默認開啓gc(EnableGarbageCollector),併發數爲20(ConcurrentGCSyncs)redis

func SetDefaults_GarbageCollectorControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.GarbageCollectorControllerConfiguration) {
    if obj.EnableGarbageCollector == nil {
        obj.EnableGarbageCollector = utilpointer.BoolPtr(true)
    }
    if obj.ConcurrentGCSyncs == 0 {
        obj.ConcurrentGCSyncs = 20
    }
}

回到Run函數,裏面調用了NewControllerInitializers啓動全部控制器:
在這裏插入圖片描述
重點來到啓動garbage collector的startGarbageCollectorController函數:spring

func startGarbageCollectorController(ctx ControllerContext) (http.Handler, bool, error) {
    //k8s 1.13.2中默認爲true,可在kube-apiserver和kube-controller-manager的啓動參數中加--enable-garbage-conllector=false設置
    //需保證這兩個組件中參數值一致
    if !ctx.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
        return nil, false, nil
    }

    //k8s各類原生資源對象客戶端集合(默認啓動參數中用SimpleControllerClientBuilder構建)
    gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector")
    discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery())

    //生成rest config
    config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector")
    dynamicClient, err := dynamic.NewForConfig(config)
    if err != nil {
        return nil, true, err
    }

    // Get an initial set of deletable resources to prime the garbage collector.
    //獲取一組初始可刪除資源以填充垃圾收集器。
    deletableResources := garbagecollector.GetDeletableResources(discoveryClient)
    ignoredResources := make(map[schema.GroupResource]struct{})

    //忽略gc的資源類型
    for _, r := range ctx.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
        ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
    }
    garbageCollector, err := garbagecollector.NewGarbageCollector(
        dynamicClient,
        ctx.RESTMapper,
        deletableResources,
        ignoredResources,
        ctx.InformerFactory,
        ctx.InformersStarted,
    )
    if err != nil {
        return nil, true, fmt.Errorf("Failed to start the generic garbage collector: %v", err)
    }

    // Start the garbage collector.
    //啓動參數中默認是20個協程
    workers := int(ctx.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
    //啓動monitors和deleteWorkers、orphanWorkers
    go garbageCollector.Run(workers, ctx.Stop)

    // Periodically refresh the RESTMapper with new discovery information and sync
    // the garbage collector.
    //使用新的發現信息按期刷新RESTMapper並同步垃圾收集器。
    go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)

    //gc提供debug dot grap依賴關係圖接口
    return garbagecollector.NewDebugHandler(garbageCollector), true, nil
}

該函數主要做用有:
一、deletableResources := garbagecollector.GetDeletableResources(discoveryClient)獲取集羣內全部可刪除的資源對象;排除掉忽略的資源對象。
二、構建garbageCollector結構體對象;
三、garbageCollector.Run(workers, ctx.Stop)啓動一個monitors用來監聽資源對象的變化(對應的由runProcessGraphChanges死循環處理),和默認20個deleteWorkers協程處理可刪除的資源對象、20個orphanWorkers協程處理孤兒對象。
四、garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop) 定時去獲取一個集羣內是否有新類型的資源對象的加入,並從新刷新monitors,以監聽新類型的資源對象。
五、garbagecollector.NewDebugHandler(garbageCollector)註冊debug接口,用來提供獲取dot流程圖接口:

curl http://127.0.0.1:10252/debug/controllers/garbagecollector/graph?uid=11211212edsaddkqedmk12

使用graphviz提供的dot.exe能夠生成svg格式的圖,可用google瀏覽器查看以下:
在這裏插入圖片描述

// curl http://127.0.0.1:10252/debug/controllers/garbagecollector/graph?uid=11211212edsaddkqedmk12
func (h *debugHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    if req.URL.Path != "/graph" {
        http.Error(w, "", http.StatusNotFound)
        return
    }

    var graph graph.Directed
    if uidStrings := req.URL.Query()["uid"]; len(uidStrings) > 0 {
        uids := []types.UID{}
        for _, uidString := range uidStrings {
            uids = append(uids, types.UID(uidString))
        }
        graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraphForObj(uids...)

    } else {
        graph = h.controller.dependencyGraphBuilder.uidToNode.ToGonumGraph()
    }

    //生成dot流程圖數據,用graphviz工具中的dot.exe工具轉換爲svg圖(用google瀏覽器打開)或者png圖
    //API參考:https://godoc.org/gonum.org/v1/gonum/graph
    //graphviz下載地址:https://graphviz.gitlab.io/_pages/Download/Download_windows.html
    //dot.exe test.dot -T svg -o test.svg
    data, err := dot.Marshal(graph, "full", "", "  ", false)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    w.Write(data)
    w.WriteHeader(http.StatusOK)
}

在這裏插入圖片描述
GarbageCollector經過restMapper按期重置可刪除的資源類型,更新GraphBuilder中的monitors,monitors將建立全部資源類型的變動通知回調函數,將變化的資源對象加入到GraphBuilder的graphChanges隊列,GraphBuilder的runProcessGraphChanges()會一直從隊列中獲取變化,構建一個緩存對象之間依賴關係的圖形,以及觸發dependencyGraphBuilder將可能被垃圾收集的對象排隊到attemptToDelete隊列,並將其依賴項須要孤立的對象排隊到attemptToOrphan隊列。GarbageCollector具備使用這兩個隊列的工做人員runAttemptToDeleteWorker和runAttemptToOrphanWorker死循環,分別從attemptToDelete隊列和attemptToOrphan隊列取出,向API服務器發送請求以相應地刪除更新對象。

// GarbageCollector運行反射器來監視託管API對象的更改,將結果彙總到單線程dependencyGraphBuilder,
// 構建一個緩存對象之間依賴關係的圖形。由圖變化觸發,dependencyGraphBuilder將可能被垃圾收集的對象
// 排隊到`attemptToDelete`隊列,並將其依賴項須要孤立的對象排隊到`attemptToOrphan`隊列。
// GarbageCollector具備使用這兩個隊列的工做人員,向API服務器發送請求以相應地刪除更新對象。
// 請注意,讓dependencyGraphBuilder通知垃圾收集器確保垃圾收集器使用至少與發送通知同樣最新的圖形進行操做。
type GarbageCollector struct {
    // resettableRESTMapper是一個RESTMapper,它可以在discovery資源類型時重置本身
    restMapper    resettableRESTMapper
    // dynamicClient提供操做集羣內全部資源對象的接口方法,包括k8s內置、CRD生成的自定義資源
    dynamicClient dynamic.Interface
    //垃圾收集器嘗試在時間成熟時刪除attemptToDelete隊列中的item
    attemptToDelete workqueue.RateLimitingInterface
    //垃圾收集器嘗試孤立attemptToOrphan隊列中item的依賴項,而後刪除item
    attemptToOrphan        workqueue.RateLimitingInterface
    dependencyGraphBuilder *GraphBuilder
    // 有owner的資源對象,纔會給absentOwnerCache填充不存在的Owner信息
    absentOwnerCache *UIDCache
    sharedInformers  informers.SharedInformerFactory

    workerLock sync.RWMutex
}
// GraphBuilder:基於informers提供的事件,GraphBuilder更新
// uidToNode,一個緩存咱們所知的依賴關係的圖,並將
// 項放入attemptToDelete和attemptToOrphan隊列
type GraphBuilder struct {
    restMapper meta.RESTMapper

    //每一個監視器列表/監視資源,結果聚集到dependencyGraphBuilder
    monitors    monitors
    monitorLock sync.RWMutex
    // informersStarted is closed after after all of the controllers have been initialized and are running.
    // After that it is safe to start them here, before that it is not.
    // informersStarted在全部控制器初始化並運行後關閉。以後在這裏啓動它們是安全的,在此以前它不是。
    informersStarted <-chan struct{}

    // stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
    // This channel is also protected by monitorLock.
    // stopCh驅動器關閉當來自它的接收解除阻塞時,監視器將關閉。 此channel也受monitorLock保護。
    stopCh <-chan struct{}

    // running tracks whether Run() has been called.
    // it is protected by monitorLock.
    //運行軌道是否已調用Run()它受monitorLock保護。
    running bool

    dynamicClient dynamic.Interface
    // monitors are the producer of the graphChanges queue, graphBuilder alters
    // the in-memory graph according to the changes.
    // monitor是graphChanges隊列的生成者,graphBuilder根據更改改變了內存中的圖形。
    graphChanges workqueue.RateLimitingInterface

    // uidToNode doesn't require a lock to protect, because only the
    // single-threaded GraphBuilder.processGraphChanges() reads/writes it.
    //uidToNode不須要鎖保護,由於只有單線程GraphBuilder.processGraphChanges()讀寫它。
    uidToNode *concurrentUIDToNode

    // GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
    // GraphBuilder是attemptToDelete和attemptToOrphan的生產者,GC是消費者。
    attemptToDelete workqueue.RateLimitingInterface
    attemptToOrphan workqueue.RateLimitingInterface

    // GraphBuilder and GC share the absentOwnerCache. Objects that are known to
    // be non-existent are added to the cached.
    // GraphBuilder和GC共享absentOwnerCache。已知不存在的對象將添加到緩存中。
    absentOwnerCache *UIDCache

    //全部k8s資源對象集的informer
    sharedInformers  informers.SharedInformerFactory

    //監視器忽略的資源對象集
    ignoredResources map[schema.GroupResource]struct{}
}

建立NewGarbageCollector結構體:

func NewGarbageCollector(
    dynamicClient dynamic.Interface,
    mapper resettableRESTMapper,
    deletableResources map[schema.GroupVersionResource]struct{},
    ignoredResources map[schema.GroupResource]struct{},
    sharedInformers informers.SharedInformerFactory,
    informersStarted <-chan struct{},
) (*GarbageCollector, error) {
    attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
    attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
    absentOwnerCache := NewUIDCache(500)
    gc := &GarbageCollector{
        dynamicClient:    dynamicClient,
        restMapper:       mapper,
        attemptToDelete:  attemptToDelete,
        attemptToOrphan:  attemptToOrphan,
        absentOwnerCache: absentOwnerCache,
    }
    gb := &GraphBuilder{
        dynamicClient:    dynamicClient,
        informersStarted: informersStarted,
        restMapper:       mapper,
        graphChanges:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
        uidToNode: &concurrentUIDToNode{
            uidToNode: make(map[types.UID]*node),
        },
        attemptToDelete:  attemptToDelete,
        attemptToOrphan:  attemptToOrphan,
        absentOwnerCache: absentOwnerCache,
        sharedInformers:  sharedInformers,
        ignoredResources: ignoredResources,
    }
    //初始化各個資源對象的monitors,啓動各資源對象的監聽器,變化時觸發回調,將其加入graphChanges 隊列
    if err := gb.syncMonitors(deletableResources); err != nil {
        utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err))
    }
    gc.dependencyGraphBuilder = gb

    return gc, nil
}

主要功能:
一、構建GarbageCollector結構體;
二、構建依賴結構圖維護結構體GraphBuilder,和GarbageCollector共用attemptToDelete和attemptToOrphan隊列,GraphBuilder做爲生產着將適當資源放到attemptToDelete或者attemptToOrphan隊列,供GarbageCollector中的worker進行消費;
三、初始化各個資源對象的monitors,啓動各資源對象的監聽器,變化時觸發回調,將其加入graphChanges 隊列。

gb.syncMonitors(deletableResources)方法中最主要的是c, s, err := gb.controllerFor(resource, kind)

func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
    handlers := cache.ResourceEventHandlerFuncs{
        // add the event to the dependencyGraphBuilder's graphChanges.
        // 將事件添加到dependencyGraphBuilder的graphChanges中。
        AddFunc: func(obj interface{}) {
            event := &event{
                eventType: addEvent,
                obj:       obj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            // TODO: check if there are differences in the ownerRefs,
            // finalizers, and DeletionTimestamp; if not, ignore the update.
            //TODO:檢查ownerRefs, finalizers和DeletionTimestamp是否存在差別;若是沒有,請忽略更新。
            event := &event{
                eventType: updateEvent,
                obj:       newObj,
                oldObj:    oldObj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
        DeleteFunc: func(obj interface{}) {
            // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
            // delta fifo能夠將對象包裝在cache.DeletedFinalStateUnknown中,解包它
            if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
                obj = deletedFinalStateUnknown.Obj
            }
            event := &event{
                eventType: deleteEvent,
                obj:       obj,
                gvk:       kind,
            }
            gb.graphChanges.Add(event)
        },
    }
    shared, err := gb.sharedInformers.ForResource(resource)
    if err == nil {
        klog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
        // need to clone because it's from a shared cache
        shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
        return shared.Informer().GetController(), shared.Informer().GetStore(), nil
    } else {
        //獲取資源對象時出錯會到這裏,好比非k8s內置RedisCluster、clusterbases、clusters、esclusters、volumeproviders、stsmasters、appapps、mysqlclusters、brokerclusters、clustertemplates;
        //內置的networkPolicies、apiservices、customresourcedefinitions
        klog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
    }

    // TODO: consider store in one storage.
    // TODO: 考慮存儲在一個存儲中。
    klog.V(5).Infof("create storage for resource %s", resource)
    //上面失敗的資源對象的store和controller
    store, monitor := cache.NewInformer(
        listWatcher(gb.dynamicClient, resource),
        nil,
        ResourceResyncTime,
        // don't need to clone because it's not from shared cache
        //不須要克隆,由於它不是來自共享緩存
        handlers,
    )
    return monitor, store, nil
}

該方法主要功能是:
一、將新增、更改、刪除的資源對象構建爲event結構體,放入GraphBuilder的graphChanges隊列裏,最終被runProcessGraphChanges這個worker消費;
二、構建大多數內置資源的SharedInformerFactory,構建失敗的用cache.NewInformer構建(經過CRD定義的對象以及部分k8s內置對象)

代碼繼續回到k8s.iokubernetescmdkube-controller-managerappcore.go中的startGarbageCollectorController中,看`
garbageCollector.Run(workers, ctx.Stop)`方法:

func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer gc.attemptToDelete.ShutDown()
    defer gc.attemptToOrphan.ShutDown()
    defer gc.dependencyGraphBuilder.graphChanges.ShutDown()

    klog.Infof("Starting garbage collector controller")
    defer klog.Infof("Shutting down garbage collector controller")

    //協程運行生產者monitors
    go gc.dependencyGraphBuilder.Run(stopCh)

    //等待dependencyGraphBuilder緩存開始同步
    if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
        return
    }

    //垃圾收集器:全部資源監視器都已同步。繼續收集垃圾
    klog.Infof("Garbage collector: all resource monitors have synced. Proceeding to collect garbage")

    // gc workers
    //協程運行消費者DeleteWorkers和OrphanWorkers
    for i := 0; i < workers; i++ {
        //默認參數爲20個併發協程嘗試delete worker
        go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh)
        //默認參數爲20個併發協程嘗試orphan worker
        go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh)
    }

    <-stopCh
}

gc.dependencyGraphBuilder.Run(stopCh)主要功能:
一、gb.startMonitors()啓動監聽資源變化的informer;
二、wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)開啓從隊列GraphBuilder.graphChanges中消費的worker

啓動20個runAttemptToDeleteWorker和20個runAttemptToOrphanWorker

參考:

k8s官方文檔garbage-collection英文版:
https://kubernetes.io/docs/co...

依賴圖標生成庫gonum Api文檔:
https://godoc.org/gonum.org/v...

graphviz下載:
https://graphviz.gitlab.io/_p...



本公衆號免費提供csdn下載服務,海量IT學習資源,若是你準備入IT坑,勵志成爲優秀的程序猿,那麼這些資源很適合你,包括但不限於java、go、python、springcloud、elk、嵌入式 、大數據、面試資料、前端 等資源。同時咱們組建了一個技術交流羣,裏面有不少大佬,會不定時分享技術文章,若是你想來一塊兒學習提升,能夠公衆號後臺回覆【2】,免費邀請加技術交流羣互相學習提升,會不按期分享編程IT相關資源。


掃碼關注,精彩內容第一時間推給你

image

相關文章
相關標籤/搜索