深刻K8S Job(二):job controller源碼分析

k8s version: v1.11.0

author: lbl167612@alibaba-inc.comapi

源碼流程圖

job controller流程圖

JobController 結構

路徑:pkg/controller/job/job_controller.go併發

type JobController struct {
    // 訪問 kube-apiserver 的client
    // 須要查詢 job、pod 等元數據信息
    kubeClient clientset.Interface
    // pod 控制器,用於建立和刪除pod使用
    podControl controller.PodControlInterface
    // 用於更新 job status
    updateHandler func(job *batch.Job) error
    // job controller 核心接口,用於 sync job
    syncHandler   func(jobKey string) (bool, error)
    // job controller 在啓動時會對 job & pod 先進行同步
    // 用於判斷是否已經對 pod 同步過
    podStoreSynced cache.InformerSynced
    // 用於判斷是否已經對 job 同步過
    jobStoreSynced cache.InformerSynced
    // expectations cache,記錄該job下pods的adds & dels次數,
    // 並提供接口進行調整,已達到指望值。
    expectations controller.ControllerExpectationsInterface

    // jobLister 用於獲取job元數據及根據pod的labels來匹配jobs
    // 該controller 會使用到的接口以下:
    // 1. GetPodJobs(): 用於根據pod反推jobs
    // 2. Get(): 根據namespace & name 獲取job 元數據
    jobLister batchv1listers.JobLister

    // podStore 提供了接口用於獲取指定job下管理的全部pods
    podStore corelisters.PodLister

    // Jobs queue
    // job controller經過kubeClient watch jobs & pods的數據變動,
    // 好比add、delete、update,來操做該queue。
    // 並啓動相應的worker,調用syncJob處理該queue中的jobs。
    queue workqueue.RateLimitingInterface
    // jobs的相關events,經過該recorder進行廣播
    recorder record.EventRecorder
}

startJobController()

路徑:cmd/kube-controller-manager/app/batch.goapp

startJobController() 是啓動 job controller 的入口函數,該函數會註冊到 kube-controller-manager 組件的 NewControllerInitializers() 接口中。
具體的 kube-controller-manager 組件的啓動實現能夠本身看下相關代碼,這裏先只關注 job controller 的實現。函數

func startJobController(ctx ControllerContext) (bool, error) {
    // 在啓動job controller以前,判斷下job 是否有配置生效
    // 用戶能夠在建立k8s clusters時,經過修改kube-apiserver --runtime-config配置想要生效的 resource
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] {
        return false, nil
    }
    // 初始化 JobController結構,並Run
    // Run的時候指定了gorutinue的數量,每一個gorutinue 就是一個worker
    go job.NewJobController(
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Batch().V1().Jobs(),
        ctx.ClientBuilder.ClientOrDie("job-controller"),
    ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)
    return true, nil
}

NewJobController()

路徑:pkg/controller/job/job_controller.go源碼分析

func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController {
    // 初始化event broadcaster
    // 用於該controller 發送job 相關的events
    eventBroadcaster := record.NewBroadcaster()
    // 註冊打印event信息的function
    // eventBroadcaster.StartEventWatcher()會建立gorutinue並開始watch event,
    // 根據註冊的eventHandler輪詢處理每一個event,這裏就是經過glog.Infof打印日誌
    eventBroadcaster.StartLogging(glog.Infof)
    // EventSinkImpl 包含了一個EventInterface, 實現了Create/Update/Delete/Get/Watch/Patch..等等操做
    // 這一步跟上面同樣,也是經過eventBroadcaster.StartEventWatcher() 註冊了EventInterface實現,
    // 用來從指定的eventBroadcaster接收event,併發送給指定的接收器。
    // k8s event實現能夠單獨進行源碼分析,值得學習下。
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    // kubernetes 內部的限流策略
    // 對apiserver來講,每一個controller及scheduler都是client,因此內部的限流策略也相當重要。 
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    // 初始化JobController
    jm := &JobController{
        // 鏈接kube-apiserver的client
        kubeClient: kubeClient,
        // podControl,用於manageJob()中建立和刪除pod
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
        },
        // 維護的指望狀態下的Pod Cache,而且提供了修正該Cache的接口
        // 好比會存jobs 下pods 的adds & dels 值,並提供了接口修改這兩個值。
        expectations: controller.NewControllerExpectations(),
        // jobs queue, 後面會建立對應數量的workers 從該queue 中處理各個jobs。
        queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
        // event recorder,用於發送job 相關的events
        recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
    }
    // 註冊jobInformer 的Add、Update、Delete 函數
    // 該controller 獲取到job 的Add、Update、Delete事件以後,會調用對應的function
  // 這些function 的核心仍是去操做了上面的queue,讓syncJob 處理queue 中的jobs
    jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
        UpdateFunc: jm.updateJob,
        DeleteFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
    })
    // 上面結構中已經有介紹
    jm.jobLister = jobInformer.Lister()
    jm.jobStoreSynced = jobInformer.Informer().HasSynced

    // 註冊 podInformer 的Add、Update、Delete 函數
  // job 最終是依託了pod 去運行,因此相關的pods 事件也須要關心。
  // 該podInformer 會監聽全部的pods 變動事件,因此函數中都會去判斷該pod 的containerRef是不是「job」,
    // 若是是的話再更新對應的expectations & queue, 觸發syncJob進行處理。
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    jm.addPod,
        UpdateFunc: jm.updatePod,
        DeleteFunc: jm.deletePod,
    })
    // 上面結構中已經有介紹
    jm.podStore = podInformer.Lister()
    jm.podStoreSynced = podInformer.Informer().HasSynced

    // 註冊更新job status的函數
    jm.updateHandler = jm.updateJobStatus
    // 註冊sync job handler
    // 核心實現
    jm.syncHandler = jm.syncJob

    return jm
}

Run()

路徑:pkg/controller/job/job_controller.go學習

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer jm.queue.ShutDown()

    glog.Infof("Starting job controller")
    defer glog.Infof("Shutting down job controller")

    // 每次啓動都會先等待Job & Pod cache 是否有同步過,即指queue是否已經同步過數據,
    // 由於每一個worker乾的活都是從queue中獲取,因此只有queue有數據才應該繼續往下建立worker。
    if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
        return
    }

    // 建立指定數量的gorutinue
    // 每一個gorutinue 執行worker,每一個worker 執行完了以後sleep 1s,而後繼續循環執行
    for i := 0; i < workers; i++ {
        go wait.Until(jm.worker, time.Second, stopCh)
    }

    <-stopCh
}

看下具體的worker 實現:ui

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *JobController) worker() {
    for jm.processNextWorkItem() {
    }
}

func (jm *JobController) processNextWorkItem() bool {
    // 從queque 中獲取job key
    // key 構成: namespace + "/" + name 
    key, quit := jm.queue.Get()
    if quit {
        return false
    }
    defer jm.queue.Done(key)

    // 調用初始化時註冊的 syncJob()
    // 若是執行成功,且forget = true, 則從queue 中刪除該 key。
    forget, err := jm.syncHandler(key.(string))
    if err == nil {
        if forget {
            jm.queue.Forget(key)
        }
        return true
    }
    // 若是syncJob() 出錯, 則打印出錯信息
    // 該utilruntime.HandleError() 會記錄最近一次的錯誤時間點並進行限速,防止頻繁打印錯誤信息。
    utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
    // 若是syncJob() 出錯,則把該job key 繼續丟回queue 中, 等待下次sync。
    jm.queue.AddRateLimited(key)

    return true
}

syncJob()

worker的關鍵就是調用了syncJob,下面繼續看下該函數具體作了什麼:spa

func (jm *JobController) syncJob(key string) (bool, error) {
    // 慣用招數,看下每次sync 花了多久
    startTime := time.Now()
    defer func() {
        glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
    }()

    // 把key 拆分紅job namespace & name
    ns, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return false, err
    }
    if len(ns) == 0 || len(name) == 0 {
        return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
    }
    // 獲取job 信息
    // 若是沒有找到該job的話,表示已經被刪除,並從ControllerExpectations中刪除該key
    sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            glog.V(4).Infof("Job has been deleted: %v", key)
            jm.expectations.DeleteExpectations(key)
            return true, nil
        }
        return false, err
    }
    job := *sharedJob

    // 根據job.Status.Conditions是否處於「JobComplete」 or "JobFailed", 來判斷該job 是否已經完成。
    // 若是已經完成的話,直接return
    if IsJobFinished(&job) {
        return true, nil
    }

    // 根據該 job key 失敗的次數來計算該job 已經重試的次數。
    // job 默認會有6次的重試機會
    previousRetry := jm.queue.NumRequeues(key)

    // 判斷該key 是否須要調用manageJob()進行sync,條件以下:
    // 1. 該key 在ControllerExpectations中的adds和dels 都 <= 0
    // 2. 該key 在ControllerExpectations中已經超過5min沒有更新了
    // 3. 該key 在ControllerExpectations中沒有查到
    // 4. 調用GetExpectations()接口失敗
    jobNeedsSync := jm.expectations.SatisfiedExpectations(key)

    // 獲取該job管理的全部pods
    pods, err := jm.getPodsForJob(&job)
    if err != nil {
        return false, err
    }

    // 獲取處於active 的pods
    activePods := controller.FilterActivePods(pods)
    // 獲取active & succeeded & failed pods數量
    active := int32(len(activePods))
    succeeded, failed := getStatus(pods)
    conditions := len(job.Status.Conditions)
    // 看下該job是不是第一次啓動,是的話,設置StartTime;
    // 並判斷是否設置了job.Spec.ActiveDeadlineSeconds, 若是設置了的話,在ActiveDeadlineSeconds秒後,在將該key 丟入queue
    if job.Status.StartTime == nil {
        now := metav1.Now()
        job.Status.StartTime = &now
        // enqueue a sync to check if job past ActiveDeadlineSeconds
        if job.Spec.ActiveDeadlineSeconds != nil {
            glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds",
                key, *job.Spec.ActiveDeadlineSeconds)
            jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
        }
    }

    var manageJobErr error
    jobFailed := false
    var failureReason string
    var failureMessage string

    // 確認該job是否有新的pod failed
    jobHaveNewFailure := failed > job.Status.Failed
    // 確認重試次數是否有超出預期值
    exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
        (int32(previousRetry)+1 > *job.Spec.BackoffLimit)

    // 若是job重試的次數超過了job.Spec.BackoffLimit(默認是6次),則標記該job爲failed並指明緣由;
    // 計算job重試的次數,還跟job中的pod template設置的重啓策略有關,若是設置成「RestartPolicyOnFailure」,
    // job重試的次數 = 全部pods InitContainerStatuses 和 ContainerStatuses 的RestartCount 之和,
    // 也須要判斷這個重試次數是否超過 BackoffLimit;
    if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
        jobFailed = true
        failureReason = "BackoffLimitExceeded"
        failureMessage = "Job has reached the specified backoff limit"
    // 若是job 運行的時間超過了ActiveDeadlineSeconds,則標記該job爲failed並指明緣由
    } else if pastActiveDeadline(&job) {
        jobFailed = true
        failureReason = "DeadlineExceeded"
        failureMessage = "Job was active longer than specified deadline"
    }

    // 若是job failed,則併發等待全部active pods刪除結束;
    // 修改job.Status.Conditions, 而且根據以前記錄的失敗信息發送event
    if jobFailed {
        errCh := make(chan error, active)
        jm.deleteJobPods(&job, activePods, errCh)
        select {
        case manageJobErr = <-errCh:
            if manageJobErr != nil {
                break
            }
        default:
        }

        failed += active
        active = 0
        job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
        jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
    // 若是job 沒有標記爲failed
    } else {
        // 根據以前判斷的job是否須要sync,且該job 還未被刪除,則調用mangeJob()。
        // manageJob() 後面單獨解析
        if jobNeedsSync && job.DeletionTimestamp == nil {
            active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
        }
        completions := succeeded
        complete := false
        // job.Spec.Completions 表示該job只有成功建立這些數量的pods,纔算完成。
        // 若是該值沒有設置,表示只要其中有一個pod 成功過,該job 就算完成了,
        // 可是須要注意,若是當前還有正在運行的pods,則須要等待這些pods都退出,才能標記該job完成任務了。
        if job.Spec.Completions == nil {
            if succeeded > 0 && active == 0 {
                complete = true
            }
        // 若是設置了Completions值,只要該job下成功建立的pods數量 >= Completions,該job就成功結束了。
        // 還須要發送一些異常events, 好比已經達到要求的成功建立的數量後,還有處於active的pods;
        // 或者成功的次數 > 指定的次數,這些應該都是預期以外的事件。
        } else {
            if completions >= *job.Spec.Completions {
                complete = true
                if active > 0 {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
                }
                if completions > *job.Spec.Completions {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
                }
            }
        }
        // 若是job成功結束,則更新job.Status.Conditions && job.Status.CompletionTime
        if complete {
            job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
            now := metav1.Now()
            job.Status.CompletionTime = &now
        }
    }

    forget := false
    // 若是此次有成功的pod 產生,則forget 該次job key
    if job.Status.Succeeded < succeeded {
        forget = true
    }

    // 更新job.Status
    if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
        job.Status.Active = active
        job.Status.Succeeded = succeeded
        job.Status.Failed = failed
        // 更新job失敗的話,將該job key繼續丟入queue中。
        if err := jm.updateHandler(&job); err != nil {
            return forget, err
        }
        // 若是此次job 有新的pod failed,且該job還未完成,則繼續把該job key丟入queue中
        if jobHaveNewFailure && !IsJobFinished(&job) {
            // returning an error will re-enqueue Job after the backoff period
            return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
        }
        // 不然forget job
        forget = true
    }

    return forget, manageJobErr
}

manageJob()

在syncJob()中有個關鍵函數 manageJob(),它主要作的事情就是根據 job 配置的併發數來確認當前處於 active 的 pods 數量是否合理,若是不合理的話則進行調整。
具體實現以下:日誌

func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
    var activeLock sync.Mutex
    active := int32(len(activePods))
    parallelism := *job.Spec.Parallelism
    // 獲取job key, 根據 namespace + "/" + name進行拼接。
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
        return 0, nil
    }

    var errCh chan error
    // 若是處於active pods 大於job設置的併發數,則併發刪除超出部分的active pods。
    // 須要注意的是,須要刪除的active pods是有必定的優先級的:
    // not-ready < ready;unscheduled < scheduled;pending < running。
    // 先基於上面的優先級對activePods 進行排序,而後再從頭執行刪除操做。
    // 若是刪除pods失敗,則須要回滾以前設置的ControllerExpectations 和 active 值。
    if active > parallelism {
        diff := active - parallelism
        errCh = make(chan error, diff)
        jm.expectations.ExpectDeletions(jobKey, int(diff))
        glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
        sort.Sort(controller.ActivePods(activePods))

        active -= diff
        wait := sync.WaitGroup{}
        wait.Add(int(diff))
        for i := int32(0); i < diff; i++ {
            go func(ix int32) {
                defer wait.Done()
                if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
                    defer utilruntime.HandleError(err)
                    glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
                    jm.expectations.DeletionObserved(jobKey)
                    activeLock.Lock()
                    active++
                    activeLock.Unlock()
                    errCh <- err
                }
            }(i)
        }
        wait.Wait()
    // 若是active pods少於設置的併發值,則先計算diff值,具體的計算跟Completions和Parallelism的配置有關。
    // 1.job.Spec.Completions == nil && succeeded pods > 0, 則diff = 0;
    // 2.job.Spec.Completions == nil && succeeded pods = 0,則diff = Parallelism;
    // 3.job.Spec.Completions != nil 則diff等於(job.Spec.Completions - succeeded - active)和parallelism中的最小值(非負值);
    // 計算好diff值即知道了還須要建立多少pods,因爲等待建立的pods數量可能會很是龐大,因此這裏有個分批建立的邏輯:
    // 第一批建立1個,第二批建立2個,後續按2的倍數繼續往下分批建立,可是每次建立的數量都不會大於diff值(diff值每次都會減掉對應的分批數量)。
    // 若是建立pod超時,則直接return;
    // 若是建立pod失敗,則回滾ControllerExpectations的adds 和 active 值,並不在執行後續未執行的 pods.
    } else if active < parallelism {
        wantActive := int32(0)
        if job.Spec.Completions == nil {
            if succeeded > 0 {
                wantActive = active
            } else {
                wantActive = parallelism
            }
        } else {
            wantActive = *job.Spec.Completions - succeeded
            if wantActive > parallelism {
                wantActive = parallelism
            }
        }
        diff := wantActive - active
        if diff < 0 {
            utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
            diff = 0
        }
        jm.expectations.ExpectCreations(jobKey, int(diff))
        errCh = make(chan error, diff)
        glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

        active += diff
        wait := sync.WaitGroup{}
        
        // 分批建立 diff 數量的 pods
        for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
            errorCount := len(errCh)
            wait.Add(int(batchSize))
            for i := int32(0); i < batchSize; i++ {
                go func() {
                    defer wait.Done()
                    err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
                    if err != nil && errors.IsTimeout(err) {
                        return
                    }
                    if err != nil {
                        defer utilruntime.HandleError(err)
                        glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
                        jm.expectations.CreationObserved(jobKey)
                        activeLock.Lock()
                        active--
                        activeLock.Unlock()
                        errCh <- err
                    }
                }()
            }
            wait.Wait()

            // 若是此次分批建立pods有失敗的狀況,則不在處理後續未執行的pods
            // 須要計算剩餘未執行的pods數量,並更新 ControllerExpectations 的 adds 和 active 值
            skippedPods := diff - batchSize
            if errorCount < len(errCh) && skippedPods > 0 {
                glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
                active -= skippedPods
                for i := int32(0); i < skippedPods; i++ {
                    jm.expectations.CreationObserved(jobKey)
                }
                break
            }
            diff -= batchSize
        }
    }

    select {
    case err := <-errCh:
        // 只要前面有錯誤產生,則返回出錯並會將該job 繼續丟入queue,等待下次sync
        if err != nil {
            return active, err
        }
    default:
    }

    return active, nil
}

整個job controller實現流程到這裏就結束了,後面會繼續分析cronJob controller的源碼實現!code

相關文章
相關標籤/搜索