golang實現worker

注意:以上代碼摘抄至@harbor https://github.com/vmware/harborgit

/*
   Copyright (c) 2016 VMware, Inc. All Rights Reserved.
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/

package job

import (
	"github.com/vmware/harbor/src/common/dao"
	"github.com/vmware/harbor/src/jobservice/config"
	"github.com/vmware/harbor/src/common/models"
	"github.com/vmware/harbor/src/common/utils/log"
)

//定義工人池
type workerPool struct {
	workerChan chan *Worker
	workerList []*Worker //和業務相關不用理會
}

// WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs.
// it consists of a channel for free workers and a list to all workers
	var WorkerPool *workerPool

// StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker.
func (wp *workerPool) StopJobs(jobs []int64) {
	log.Debugf("Works working on jobs: %v will be stopped", jobs)
	for _, id := range jobs {
		for _, w := range wp.workerList {
			if w.SM.JobID == id {
				log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
				w.SM.Stop(id)
			}
		}
	}
}

// Worker consists of a channel for job from which worker gets the next job to handle, and a pointer to a statemachine,
// the actual work to handle the job is done via state machine.
//工人結構體
type Worker struct {
	ID      int
	RepJobs chan int64
	SM      *SM
	quit    chan bool
}

// Start is a loop worker gets id from its channel and handle it.
func (w *Worker)  Start() {
	go func() {
		for {
                //註冊功能到工人池
			WorkerPool.workerChan <- w
			select {
                    //獲取任務
			case jobID := <-w.RepJobs:
				log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
				w.handleRepJob(jobID)
			case q := <-w.quit: 
				if q {
					log.Debugf("worker: %d, will stop.", w.ID)
					return
				}
			}
		}
	}()
}

// Stop ...
func (w *Worker) Stop() {
	go func() {
        //發送退出指令
		w.quit <- true
	}()
}

//接受到任務後並處理後續任務
func (w *Worker) handleRepJob(id int64) {
	err := w.SM.Reset(id)
	if err != nil {
		log.Errorf("Worker %d, failed to re-initialize statemachine for job: %d, error: %v", w.ID, id, err)
		err2 := dao.UpdateRepJobStatus(id, models.JobError)
		if err2 != nil {
			log.Errorf("Failed to update job status to ERROR, job: %d, error:%v", id, err2)
		}
		return
	}
	if w.SM.Parms.Enabled == 0 {
		log.Debugf("The policy of job:%d is disabled, will cancel the job", id)
		_ = dao.UpdateRepJobStatus(id, models.JobCanceled)
		w.SM.Logger.Info("The job has been canceled")
	} else {
		w.SM.Start(models.JobRunning)
	}
}

// NewWorker returns a pointer to new instance of worker
//建立工人
func NewWorker(id int) *Worker {
	w := &Worker{
		ID:      id, //工人名稱
		RepJobs: make(chan int64), //任務隊列
		quit:    make(chan bool),//退出goroutine
		SM:      &SM{},//和業務有關,沒必要理會
	}
	w.SM.Init()    //和業務有關,沒必要理會
	return w
}

// InitWorkerPool create workers according to configuration.
func InitWorkerPool() {
	WorkerPool = &workerPool{
		workerChan: make(chan *Worker, config.MaxJobWorkers()), //初始化工人池
		workerList: make([]*Worker, 0, config.MaxJobWorkers()),     //和業務有關,沒必要理會
	}
	for i := 0; i < config.MaxJobWorkers(); i++ {
		worker := NewWorker(i)
		WorkerPool.workerList = append(WorkerPool.workerList, worker)     //和業務有關,沒必要理會
		worker.Start() //啓動
		log.Debugf("worker %d started", worker.ID)
	}
}

// Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it.
func Dispatch() {
	for {
		select {
		case job := <-jobQueue: //監放任務隊列,若是存在就取出
			go func(jobID int64) {
				log.Debugf("Trying to dispatch job: %d", jobID)
				worker := <-WorkerPool.workerChan //從工人池當中取出一個工人
				worker.RepJobs <- jobID //把取到的工做交由工人去處理
			}(job)
		}
	}
}
相關文章
相關標籤/搜索