本文主要研究一下dbsync的jobsgit
//Job represents db sync job type Job struct { ID string Error string Status string Progress Progress Items []*Transferable Chunked bool mutex *sync.Mutex StartTime time.Time EndTime *time.Time } //NewJob creates a new job func NewJob(id string) *Job { return &Job{ ID: id, StartTime: time.Now(), mutex: &sync.Mutex{}, Items: make([]*Transferable, 0), } }
Job方法定義了ID、Error、Status、Progress、Items、Chunked、mutex、StartTime、EndTime
//Update updates job progress func (j *Job) Update() { if len(j.Items) == 0 { return } sourceCount := 0 destCount := 0 transferred := 0 for i := range j.Items { if j.Items[i].Status == nil { continue } sourceCount += j.Items[i].Source.Count() destCount += j.Items[i].Dest.Count() transferred += int(atomic.LoadUint32(&j.Items[i].Transferred)) } j.Progress.Transferred = transferred j.Progress.SourceCount = sourceCount j.Progress.DestCount = destCount if sourceCount > 0 { j.Progress.Pct = transferred / sourceCount } }
Update方法遍歷Items,統計transferred、sourceCount、destCount
//Done flag job as done func (j *Job) Done(now time.Time) { if j.Status != shared.StatusError { j.Status = shared.StatusDone } j.EndTime = &now }
Done方法更新Status和EndTime
//Add add transferable func (j *Job) Add(transferable *Transferable) { j.mutex.Lock() defer j.mutex.Unlock() j.Items = append(j.Items, transferable) }
Add方法往transferable添加Items
//IsRunning returns true if jos has running status func (j *Job) IsRunning() bool { return j.Status == shared.StatusRunning || j.EndTime == nil }
IsRunning方法經過status和EndTime來判斷是不是running
//Service represents a job service type Service interface { //List lists all active or recently active jobs List(request *ListRequest) *ListResponse //Create creates a new job Create(ID string) *core.Job //Get returns a job for supplied ID or nil Get(ID string) *core.Job } type service struct { registry *registry } //New create a job service func New() Service { return &service{ registry: newRegistry(), } }
Service接口定義了List、Create、Get
//Get returns job by ID or nil func (s *service) Get(ID string) *core.Job { jobs := s.registry.list() for i := range jobs { if jobs[i].ID == ID { jobs[i].Update() return jobs[i] } } return nil }
Get方法先執行registry.list(),而後遍歷list找到ID對應的job,而後執行Update
//List lists all jobs func (s *service) List(request *ListRequest) *ListResponse { jobs := s.registry.list() if len(request.IDs) == 0 { return &ListResponse{ Jobs: jobs, } } var requestedIDs = make(map[string]bool) for i := range request.IDs { requestedIDs[request.IDs[i]] = true } var filtered = make([]*core.Job, 0) for i := range jobs { if _, has := requestedIDs[jobs[i].ID]; !has { continue } jobs[i].Update() filtered = append(filtered, jobs[i]) } return &ListResponse{ Jobs: filtered, } }
List方法先執行registry.list(),以後根據requestedIDs找出對應的job,執行Update,最後返回
//Create creates a new job func (s *service) Create(ID string) *core.Job { job := core.NewJob(ID) s.registry.add(job) return job }
Create方法經過core.NewJob(ID)建立job,而後執行registry.add(job)
dbsync的Schedulable定義了URL、ID、*contract.Sync、Schedule、Status、status屬性,它提供了Clone、Done、IsRunning、ScheduleNexRun、Init、Validate方法。github