k8s version: v1.11.0author: lbl167612@alibaba-inc.comapi
cronJob controller 的實現比較簡單,使用 Cron - Wikipedia 的方法,肯定調度規則,底層的調度對象就是依賴了 job,它不會去檢查任何 Pod。併發
該 controller 也沒有依賴各類 informer,就簡單建立了一個循環運行的協程,每次遍歷現有的 jobs & cronJobs,整理它們的關係並進行管理。app
注意:kubernetes version >= 1.4 (ScheduledJob),>= 1.5(CronJob),須要給 apiserver 傳遞
--runtime-config=batch/v2alpha1=true
開啓 batch/v2alpha1 API 纔可用。
.spec.schedule
是 cronJob 的必填字段,該值是 Cron - Wikipedia 格式的字符串,例如:0 * * * *
,或者 @hourly
,來肯定調度策略。函數
.spec.startingDeadlineSeconds
是可選字段,表示啓動 Job 的期限(秒級別),若是由於任何緣由而錯過了被調度的時間,那麼錯誤執行時間的 Job 被認爲是失敗的。若是沒有指定,則沒有期限。ui
.spec.concurrencyPolicy
也是可選字段,指定了 cronJob 建立 Job 的併發執行策略:url
Allow
(默認):容許併發運行 Job。Forbid
:禁止併發運行,若是前一個尚未完成,則直接跳過。Replace
:取消當前正在運行的 Jobs,而後新建 Job 來替換。.spec.suspend
也是可選字段,若是設置爲 true
,則後續全部的執行都會被過濾掉,可是對當前已經在運行的 Job 不影響。默認爲false
。spa
.spec.successfulJobsHistoryLimit
和 .spec.failedJobsHistoryLimit
這兩個字段也是可選的。它們指定了能夠保留完成和失敗 Job 數量的限制。
默認沒有限制,全部成功和失敗的 Job 都會被保留。然而,當運行一個 Cron Job 時,很快就會堆積不少 Job,推薦設置這兩個字段的值。設置限制值爲 0,相關類型的 Job 完成後將不會被保留。code
路徑:pkg/controller/cronjob/cronjob_controller.go
orm
type CronJobController struct { // 訪問 kube-apiserver 的 client. kubeClient clientset.Interface // job 控制器,用於建立和刪除 job. jobControl jobControlInterface // cronJob 控制器,用於更新狀態. sjControl sjControlInterface // pod 控制器,用於list & delete pods // 在刪除 job 時,同時也清理 job 建立的 pods. podControl podControlInterface // cronJob 相關的events, 經過該 recorder 進行廣播 recorder record.EventRecorder }
注意:代碼中有不少
sj
,由於之前不叫 cronJob,叫 scheduled jobs。
路徑:cmd/kube-controller-manager/app/batch.go
server
startCronJobController() 是啓動 cronJob controller 的入口函數。它會初始化 CronJobController 對象,並Run().
func startCronJobController(ctx ControllerContext) (bool, error) { // 在啓動 cronJob controller 以前,判斷下 cronJob 是否有配置生效 // 用戶能夠在建立k8s clusters時,經過修改kube-apiserver --runtime-config配置想要生效的 resource if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] { return false, nil } // 初始化 CronJobController 對象 cjc, err := cronjob.NewCronJobController( ctx.ClientBuilder.ClientOrDie("cronjob-controller"), ) if err != nil { return true, fmt.Errorf("error creating CronJob controller: %v", err) } // Run go cjc.Run(ctx.Stop) return true, nil }
CronJobController Run() 方法比較簡單,就是每10s 循環調用 syncAll() 函數。
syncAll() 邏輯也比較清楚,根據初始化的 kubeClient, 獲取全部的 jobs 和 cronJobs,並遍歷全部 Jobs, 根據ObjectMeta.OwnerReferences 字段匹配是否由 cronJob controller 所建立。最後基於 cronJob 的UUID 進行整理。
最後處理全部的 cronJobs,確認須要調度的時間並根據並行策略建立 jobs,同步完後再清理全部已經 finished jobs。
func (jm *CronJobController) syncAll() { // 列出全部的 jobs jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err)) return } js := jl.Items glog.V(4).Infof("Found %d jobs", len(js)) // 列出全部的 cronJobs sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err)) return } sjs := sjl.Items glog.V(4).Infof("Found %d cronjobs", len(sjs)) // 遍歷全部的 jobs, 根據 ObjectMeta.OwnerReferences 字段肯定該 job 是否由 cronJob 所建立。 // 而後根據 cronJob uuid 進行排列 jobsBySj := groupJobsByParent(js) glog.V(4).Infof("Found %d groups", len(jobsBySj)) // 遍歷全部的 cronJobs for _, sj := range sjs { // 進行同步 // 肯定須要調度的時間,並根據 Spec.ConcurrencyPolicy 策略,確認如何來建立 jobs // 並更新 cronJob.Status syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) // 清理全部已經完成的 jobs cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) } }
該接口就是 cronJob controller 中實現同步的關鍵部分。
func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) // 遍歷全部獲取到的 jobs // 1.記錄到 childrenJobs 中,表示當前屬於該 cronJob 的全部 Jobs,便於後面清理 cronJob 中記錄的 active Jobs // 2.查看該 job 是否在 cronJob.Status.Active 的列表中 // - 若是在的話,且該 Job 已經 finished,則將該 job 從 active list 中刪除 // - 若是不在,且該 Job 尚未 finished,則發送異常事件 childrenJobs := make(map[types.UID]bool) for _, j := range js { childrenJobs[j.ObjectMeta.UID] = true found := inActiveList(*sj, j.ObjectMeta.UID) if !found && !IsJobFinished(&j) { recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name) } else if found && IsJobFinished(&j) { deleteFromActiveList(sj, j.ObjectMeta.UID) // TODO: event to call out failure vs success. recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) } } // 遍歷 cronJob 全部的 active jobs, 根據前面的 childrenJobs 來判斷該繼續的 active job 是否還存在,若是不存在的話,也從 active list 中刪除。 for _, j := range sj.Status.Active { if found := childrenJobs[j.UID]; !found { recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) deleteFromActiveList(sj, j.UID) } } // 上面更新了 cronJob.Status.Active 字段,因此須要更新一把 cronJob updatedSJ, err := sjc.UpdateStatus(sj) if err != nil { glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) return } *sj = *updatedSJ // 若是 cronJob 已經被用戶刪除,則直接 return if sj.DeletionTimestamp != nil { return } // 若是 cronJob 已經被 suspend,也直接 return if sj.Spec.Suspend != nil && *sj.Spec.Suspend { glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return } // 根據 cronJob 的建立時間或最近一次的調度時間,和 cronJob.Spec.Schedule 配置,計算出到如今爲止全部應該調度的時間點。 times, err := getRecentUnmetScheduleTimes(*sj, now) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) return } // 若是返回的時間點列表爲空,則表示該 cronJob 暫時還不須要調度,直接 return if len(times) == 0 { glog.V(4).Infof("No unmet start times for %s", nameForLog) return } // 有屢次未知足的調度時間 if len(times) > 1 { glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) } // scheduledTime 取列表中的最後一次時間 scheduledTime := times[len(times)-1] tooLate := false // 若是用戶配置了 Spec.StartingDeadlineSeconds,則須要判斷 scheduledTime 是否知足條件 // 若是 now - scheduledTime > Spec.StartingDeadlineSeconds,則直接 return if sj.Spec.StartingDeadlineSeconds != nil { tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now) } if tooLate { glog.V(4).Infof("Missed starting window for %s", nameForLog) return } // scheduledTime 知足各類條件的狀況下,就須要查看 cronJob 配置的併發策略 // 若是 ForbidConcurrent,且 active jobs > 0, 則直接 return; // 不然繼續往下建立; if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 { glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) return } // 若是 ReplaceConcurrent,則刪除全部的 active jobs, 等後面從新建立 if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { for _, j := range sj.Status.Active { glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) job, err := jc.GetJob(j.Namespace, j.Name) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) return } if !deleteJob(sj, job, jc, pc, recorder, "") { return } } } // 根據 cronJob.spec.JobTemplate,填充 job 的完整結構 // 好比 name, labels, OwnerReferences 等等。 jobReq, err := getJobFromTemplate(sj, scheduledTime) if err != nil { glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) return } // 建立 job jobResp, err := jc.CreateJob(sj.Namespace, jobReq) if err != nil { recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) return } glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) // 根據建立 job 返回的 response,獲取 ObjectReference 結構 // 用於記錄到 cronJob.Status.Active 中 ref, err := getRef(jobResp) if err != nil { glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) } else { sj.Status.Active = append(sj.Status.Active, *ref) } // 設置最近一次的調度時間 sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} // 更新 cronJob if _, err := sjc.UpdateStatus(sj); err != nil { glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) } return }