kubernetes 之 Job 技術內幕大揭祕

本章將從kubernetes源碼層次,對Job內幕原理進行大揭祕。 golang

在開始本篇內容以前,您須要具有以下知識:
golang命令行庫:Cobra
推薦chenjian和Jsharkc兩篇快速入門教程。
k8s實操教程
最好的方法:官網,若是感受本身英文不夠好,也能夠查閱K8smeetup中文本地化翻譯文檔。 api

登堂入室 併發

下面咱們將進入K8s大廈中的一層:kube-controller-manager。本着言簡意賅的原則,我把以下關鍵代碼貼出來,將kube-controller-manager的邏輯按照順序走到Job controller這個房間。 app

cmd/kube-controller-manager/controller-manager.go:函數

func main() {
command := app.NewControllerManagerCommand()
command.Execute()
}

cmd/kube-controller-manager/app/controllermanager.go:oop

func NewControllerManagerCommand() \*cobra.Command{
s, err := options.NewKubeControllerManagerOptions()
cmd := &cobra.Command{
    Use: "kube-controller-manager",
    Run: func() {
        c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
        Run(c.Complete(), wait.NeverStop)
    }
}
return cmd
}

func KnownControllers() \[\]string {
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
 ret.Insert(
saTokenControllerName,
)
return ret.List()
}

func NewControllerInitializers(loopMode ControllerLoopMode) map\[string\]InitFunc {
controllers\["cronjob"\] = startCronJobController
controllers\["job"\] = startJobController
controllers\["deployment"\] = startDeploymentController
...
}

cmd/kube-controller-manager/app/batch.go:ui

func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
go job.NewJobController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)
}

經過上面幾步層層的問詢,咱們終於要到了Job的房間號: this

pkg/controller/job/job_controller.go:spa

func NewJobController() \*JobController{...}
func (jm \*JobController) Run() {...}

玄機就在job_controller.go這個房間裏,一場揭祕之旅就此開始! 命令行

Job大揭祕

kube-Controller內部實現邏輯 pic1.jpg

主要使用到 Informer和workqueue兩個核心組件。Controller能夠有一個或多個informer來跟蹤某一個resource。Informter跟API server保持通信獲取資源的最新狀態並更新到本地的cache中,一旦跟蹤的資源有變化,informer就會調用callback。把關心的變動的Object放到workqueue裏面。而後woker執行真正的業務邏輯,計算和比較worker queue裏items的當前狀態和指望狀態的差異,而後經過client-go向API server發送請求,直到驅動這個集羣向用戶要求的狀態演化。

Informer和workqueue兩個組件的原理將在另外的章節進行揭祕,接下來將以貼出關鍵部分源碼的方式針對worker業務邏輯進行分析。

JobController結構

type JobController struct {
//訪問kube-apiserver的client,獲取pod,job信息
kubeClient clientset.Interface
//pod controller,used for creat and delete pod
podControl controller.PodControlInterface
//To allow injection of updateJobStatus for testing.
undateHandler func(job \*batch.Job) error {}
//Job Controller核心接口,用於sync job
syncHandler func(jobKey string) (bool, error){}
//podStoreSynced returns true if the pod store has been synced at least once.
podStoreSynced cache.InformerSynced
//jobStoreSynced returns true if the job store has been synced at least once.
jobStoreSynced cache.InformerSynced

//A TTLCache of pod creates/deletes each rc expects to see
expectations controller.ControllerExpectationsInterface
//A store of jobs
//jobLister 用於獲取job元數據及根據pod的labels來匹配jobs
//該controller 會使用到的接口以下:
//1. GetPodJobs(): 用於根據pod反推jobs
//2. Get(): 根據namespace & name 獲取job 元數據
jobLister batchv1listers.JobLister

//A store of pods, populated by the podController
//podStore 提供了接口用於獲取指定job下管理的全部pods
podStore corelisters.PodLister

//Jobs that need to be updated
//job controller經過kubeClient watch jobs & pods的數據變動,
//好比add、delete、update,來操做該queue。
//並啓動相應的worker,調用syncJob處理該queue中的jobs。
queue workqueue.RateLimitingInterface
//jobs的相關events,經過該recorder進行廣播
recorder record.EventRecorder
}

NewJobController( )

func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) \*JobController {
jm := &JobController{
    // 鏈接kube-apiserver的client
    kubeClient: kubeClient,
    // podControl,用於manageJob()中建立和刪除pod
podControl: controller.RealPodControl{
  KubeClient: kubeClient,
  Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
    },
    // 維護指望值
    expectations: controller.NewControllerExpectations(),
    // jobs queue存儲要變動的object, 後面會建立對應數量的workers 從該queue 中處理各個jobs。
queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
}
// 註冊 jobInformer、podInformer 的Add、Update、Delete 函數
// 該controller 獲取到job 的Add、Update、Delete事件以後,會調用對應的function
// 這些function 的核心仍是去操做了上面的queue,讓syncJob 處理queue 中的jobs
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
  jm.enqueueController(obj, true)
},
UpdateFunc: jm.updateJob,
DeleteFunc: func(obj interface{}) {
  jm.enqueueController(obj, true)
},
})
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
})
jm.podStore = podInformer.Lister()
jm.podStoreSynced = podInformer.Informer().HasSynced

jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob

return jm

Run( )

func (jm \*JobController) Run(workers int, stopCh <-chan struct{}) {
// 每次啓動都會先等待Job & Pod cache 是否有同步過,即指queue是否已經同步過數據,
// 由於每一個worker乾的活都是從queue中獲取,因此只有queue有數據才應該繼續往下建立worker。
if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh)
}

worker( )

func (jm \*JobController) worker() {
for jm.processNextWorkItem() {
}
}

func (jm \*JobController) processNextWorkItem() bool {
// 從queque 中獲取job key
// key 構成: namespace + "/" + name
key, quit := jm.queue.Get()
// 調用初始化時註冊的 syncJob()
// 若是執行成功,且forget = true, 則從queue 中刪除該 key。
forget, err := jm.syncHandler(key.(string))
if err == nil {
    if forget {
        jm.queue.Forget(key)
    }
    return true
}
// 若是syncJob() 出錯,則把該job key 繼續丟回queue 中, 等待下次sync。
jm.queue.AddRateLimited(key)
return true
}

syncJob( )

func (jm \*JobController) syncJob(key string) (bool, error) {
// 把key 拆分紅job namespace & name
ns, name, err := cache.SplitMetaNamespaceKey(key)
// 獲取job 信息
// 若是沒有找到該job的話,表示已經被刪除,並從ControllerExpectations中刪除該key
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
  jm.expectations.DeleteExpectations(key)
  return true, nil
}
return false, err
}
job := \*sharedJob
// 根據job.Status.Conditions是否處於「JobComplete」 or "JobFailed", 來判斷該job 是否已經完成。
// 若是已經完成的話,直接return
if IsJobFinished(&job) {
return true, nil
}
// 根據該 job key 失敗的次數來計算該job 已經重試的次數。
// job 默認會有6次的重試機會
previousRetry := jm.queue.NumRequeues(key)

// 判斷該key 是否須要調用manageJob()進行sync,條件以下:
// 1. 該key 在ControllerExpectations中的adds和dels 都 <= 0
// 2. 該key 在ControllerExpectations中已經超過5min沒有更新了
// 3. 該key 在ControllerExpectations中沒有查到
// 4. 調用GetExpectations()接口失敗
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
// 獲取該job管理的全部pods
pods, err := jm.getPodsForJob(&job)
// 獲取處於active 的pods
activePods := controller.FilterActivePods(pods)
// 獲取active & succeeded & failed pods數量
active := int32(len(activePods))
succeeded, failed := getStatus(pods)
conditions := len(job.Status.Conditions)
// job first start
// 看下該job是不是第一次啓動,是的話,設置StartTime;
// 並判斷是否設置了job.Spec.ActiveDeadlineSeconds, 若是設置了的話,在ActiveDeadlineSeconds秒後,在將該key 丟入queue
if job.Status.StartTime == nil {
now := metav1.Now()
job.Status.StartTime = &now
if job.Spec.ActiveDeadlineSeconds != nil {
  jm.queue.AddAfter(key, time.Duration(\*job.Spec.ActiveDeadlineSeconds)\*time.Second)
}
}

// 確認該job是否有新的pod failed
jobHaveNewFailure := failed > job.Status.Failed
// 確認重試次數是否有超出預期值
exceedsBackoffLimit := jobHaveNewFailure && (active != \*job.Spec.Parallelism) &&
(int32(previousRetry)+1 > \*job.Spec.BackoffLimit)
// 若是job重試的次數超過了job.Spec.BackoffLimit(默認是6次),則標記該job爲failed並指明緣由;
// 計算job重試的次數,還跟job中的pod template設置的重啓策略有關,若是設置成「RestartPolicyOnFailure」,
// job重試的次數 = 全部pods InitContainerStatuses 和 ContainerStatuses 的RestartCount 之和,
// 也須要判斷這個重試次數是否超過 BackoffLimit;
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobFailed = true
failureReason = "BackoffLimitExceeded"
    failureMessage = "Job has reached the specified backoff limit"
    // 若是job 運行的時間超過了ActiveDeadlineSeconds,則標記該job爲failed並指明緣由
} else if pastActiveDeadline(&job) {
jobFailed = true
failureReason = "DeadlineExceeded"
failureMessage = "Job was active longer than specified deadline"
}
// 若是job failed,則併發等待全部active pods刪除結束;
// 修改job.Status.Conditions, 而且根據以前記錄的失敗信息發送event
if jobFailed {
errCh := make(chan error, active)
jm.deleteJobPods(&job, activePods, errCh)
select {
case manageJobErr = <-errCh:
  if manageJobErr != nil {
    break
  }
default:
}
// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
    // 根據以前判斷的job是否須要sync,且該job 還未被刪除,則調用mangeJob()。
    // manageJob() 後面單獨解析
if jobNeedsSync && job.DeletionTimestamp == nil {
  active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
}
completions := succeeded
    complete := false
    // job.Spec.Completions 表示該job只有成功建立這些數量的pods,纔算完成。
    // 若是該值沒有設置,表示只要其中有一個pod 成功過,該job 就算完成了,
    // 可是須要注意,若是當前還有正在運行的pods,則須要等待這些pods都退出,才能標記該job完成任務了。
if job.Spec.Completions == nil {
  if succeeded > 0 && active == 0 {
    complete = true
        }
        // 若是設置了Completions值,只要該job下成功建立的pods數量 >= Completions,該job就成功結束了。
        // 還須要發送一些異常events, 好比已經達到要求的成功建立的數量後,還有處於active的pods;
        // 或者成功的次數 > 指定的次數,這些應該都是預期以外的事件。
} else {
  if completions >= \*job.Spec.Completions {
    complete = true
    if active > 0 {
      jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
    }
    if completions > \*job.Spec.Completions {
      jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
    }
  }
    }
    // 若是job成功結束,則更新job.Status.Conditions && job.Status.CompletionTime
if complete {
  job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
  now := metav1.Now()
  job.Status.CompletionTime = &now
}
}
forget := false
// 若是此次有成功的pod 產生,則forget 該次job key
if job.Status.Succeeded < succeeded {
forget = true
}

// // 更新job.Status
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
    // 更新job失敗的話,將該job key繼續丟入queue中。
if err := jm.updateHandler(&job); err != nil {
  return forget, err
}
    // 若是此次job 有新的pod failed,且該job還未完成,則繼續把該job key丟入queue中
if jobHaveNewFailure && !IsJobFinished(&job) {
  // returning an error will re-enqueue Job after the backoff period
  return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
    // 不然forget job
forget = true
}

return forget, manageJobErr
}

manageJob( )

在syncJob()中有個關鍵函數 manageJob(),它主要作的事情就是根據 job 配置的併發數來確認當前處於 active 的 pods 數量是否合理,若是不合理的話則進行調整。  
  
具體實現以下:  

func (jm \*JobController) manageJob(activePods \[\]\*v1.Pod, succeeded int32, job \*batch.Job) (int32, error) {
var activeLock sync.Mutex
active := int32(len(activePods))
parallelism := \*job.Spec.Parallelism
// 獲取job key, 根據 namespace + "/" + name進行拼接。
jobKey, err := controller.KeyFunc(job)

var errCh chan error
// 若是處於active pods 大於job設置的併發數,則併發刪除超出部分的active pods。
// 須要注意的是,須要刪除的active pods是有必定的優先級的:
// not-ready < ready;unscheduled < scheduled;pending < running。
// 先基於上面的優先級對activePods 進行排序,而後再從頭執行刪除操做。
// 若是刪除pods失敗,則須要回滾以前設置的ControllerExpectations 和 active 值。
if active > parallelism {
diff := active - parallelism
errCh = make(chan error, diff)
jm.expectations.ExpectDeletions(jobKey, int(diff))
sort.Sort(controller.ActivePods(activePods))

active -= diff
wait := sync.WaitGroup{}
wait.Add(int(diff))
for i := int32(0); i < diff; i++ {
  go func(ix int32) {
    defer wait.Done()
    if err := jm.podControl.DeletePod(job.Namespace, activePods\[ix\].Name, job); err != nil {
      defer utilruntime.HandleError(err)
      jm.expectations.DeletionObserved(jobKey)
      activeLock.Lock()
      active++
      activeLock.Unlock()
      errCh <- err
    }
  }(i)
}
    wait.Wait()
// 若是active pods少於設置的併發值,則先計算diff值,具體的計算跟Completions和Parallelism的配置有關。
// 1.job.Spec.Completions == nil && succeeded pods > 0, 則diff = 0;
// 2.job.Spec.Completions == nil && succeeded pods = 0,則diff = Parallelism;
// 3.job.Spec.Completions != nil 則diff等於(job.Spec.Completions - succeeded - active)和parallelism中的最小值(非負值);
// 計算好diff值即知道了還須要建立多少pods,因爲等待建立的pods數量可能會很是龐大,因此這裏有個分批建立的邏輯:
// 第一批建立1個,第二批建立2個,後續按2的倍數繼續往下分批建立,可是每次建立的數量都不會大於diff值(diff值每次都會減掉對應的分批數量)。
// 若是建立pod超時,則直接return;
// 若是建立pod失敗,則回滾ControllerExpectations的adds 和 active 值,並不在執行後續未執行的 pods
} else if active < parallelism {
wantActive := int32(0)
if job.Spec.Completions == nil {
  if succeeded > 0 {
    wantActive = active
  } else {
    wantActive = parallelism
  }
} else {
  wantActive = \*job.Spec.Completions - succeeded
  if wantActive > parallelism {
    wantActive = parallelism
  }
}
diff := wantActive - active
if diff < 0 {
  utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
  diff = 0
}
jm.expectations.ExpectCreations(jobKey, int(diff))
errCh = make(chan error, diff)
klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

active += diff
wait := sync.WaitGroup{}
    // 分批建立 diff 數量的 pods
for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2\*batchSize, diff) {
  errorCount := len(errCh)
  wait.Add(int(batchSize))
  for i := int32(0); i < batchSize; i++ {
    go func() {
      defer wait.Done()
      err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
      if err != nil && errors.IsTimeout(err) {
        return
      }
      if err != nil {
        defer utilruntime.HandleError(err)
        // Decrement the expected number of creates because the informer won't observe this pod
        klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
        jm.expectations.CreationObserved(jobKey)
        activeLock.Lock()
        active--
        activeLock.Unlock()
        errCh <- err
      }
    }()
  }
  wait.Wait()
  // 若是此次分批建立pods有失敗的狀況,則不在處理後續未執行的pods
        // 須要計算剩餘未執行的pods數量,並更新 ControllerExpectations 的 adds 和 active 值
  skippedPods := diff - batchSize
  if errorCount < len(errCh) && skippedPods > 0 {
    klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
    active -= skippedPods
    for i := int32(0); i < skippedPods; i++ {
      jm.expectations.CreationObserved(jobKey)
    }
    break
  }
  diff -= batchSize
}
}

select {
case err := <-errCh:
    // 只要前面有錯誤產生,則返回出錯並會將該job 繼續丟入queue,等待下次sync
if err != nil {
  return active, err
}
default:
}
return active, nil
}

以上是對Kubernetes Job技術內幕的大揭祕,意猶未盡的童鞋請持續關注本公衆號,接下來本文做者還將繼續揭祕cronjob 內幕原理。

相關文章
相關標籤/搜索