本文主要研究一下machinery的TaskStategit
const ( // StatePending - initial state of a task StatePending = "PENDING" // StateReceived - when task is received by a worker StateReceived = "RECEIVED" // StateStarted - when the worker starts processing the task StateStarted = "STARTED" // StateRetry - when failed task has been scheduled for retry StateRetry = "RETRY" // StateSuccess - when the task is processed successfully StateSuccess = "SUCCESS" // StateFailure - when processing of the task fails StateFailure = "FAILURE" ) type TaskState struct { TaskUUID string `bson:"_id"` TaskName string `bson:"task_name"` State string `bson:"state"` Results []*TaskResult `bson:"results"` Error string `bson:"error"` CreatedAt time.Time `bson:"created_at"` TTL int64 `bson:"ttl,omitempty"` } type TaskResult struct { Type string `bson:"type"` Value interface{} `bson:"value"` } // NewPendingTaskState ... func NewPendingTaskState(signature *Signature) *TaskState { return &TaskState{ TaskUUID: signature.UUID, TaskName: signature.Name, State: StatePending, CreatedAt: time.Now().UTC(), } } // NewReceivedTaskState ... func NewReceivedTaskState(signature *Signature) *TaskState { return &TaskState{ TaskUUID: signature.UUID, State: StateReceived, } } // NewStartedTaskState ... func NewStartedTaskState(signature *Signature) *TaskState { return &TaskState{ TaskUUID: signature.UUID, State: StateStarted, } } // NewSuccessTaskState ... func NewSuccessTaskState(signature *Signature, results []*TaskResult) *TaskState { return &TaskState{ TaskUUID: signature.UUID, State: StateSuccess, Results: results, } } // NewFailureTaskState ... func NewFailureTaskState(signature *Signature, err string) *TaskState { return &TaskState{ TaskUUID: signature.UUID, State: StateFailure, Error: err, } } // NewRetryTaskState ... func NewRetryTaskState(signature *Signature) *TaskState { return &TaskState{ TaskUUID: signature.UUID, State: StateRetry, } } // IsCompleted returns true if state is SUCCESS or FAILURE, // i.e. the task has finished processing and either succeeded or failed. func (taskState *TaskState) IsCompleted() bool { return taskState.IsSuccess() || taskState.IsFailure() } // IsSuccess returns true if state is SUCCESS func (taskState *TaskState) IsSuccess() bool { return taskState.State == StateSuccess } // IsFailure returns true if state is FAILURE func (taskState *TaskState) IsFailure() bool { return taskState.State == StateFailure }
TaskState定義了PENDING、RECEIVED、STARTED、RETRY、SUCCESS、FAILURE狀態;TaskState定義了TaskUUID、TaskName、State、Results、Error、CreatedAt、TTL屬性;它提供了NewPendingTaskState、NewReceivedTaskState、NewStartedTaskState、NewSuccessTaskState、NewFailureTaskState、NewRetryTaskState方法來根據Signature來建立不一樣state的TaskState;另外還提供了IsCompleted、IsSuccess、IsFailure方法
// Signature represents a single task invocation type Signature struct { UUID string Name string RoutingKey string ETA *time.Time GroupUUID string GroupTaskCount int Args []Arg Headers Headers Priority uint8 Immutable bool RetryCount int RetryTimeout int OnSuccess []*Signature OnError []*Signature ChordCallback *Signature //MessageGroupId for Broker, e.g. SQS BrokerMessageGroupId string //ReceiptHandle of SQS Message SQSReceiptHandle string // StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq, // and don't want machinery to delete from source queue StopTaskDeletionOnError bool // IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available // When this is true a task with no handler will be ignored and not placed back in the queue IgnoreWhenTaskNotRegistered bool } // Arg represents a single argument passed to invocation fo a task type Arg struct { Name string `bson:"name"` Type string `bson:"type"` Value interface{} `bson:"value"` } // Headers represents the headers which should be used to direct the task type Headers map[string]interface{} // NewSignature creates a new task signature func NewSignature(name string, args []Arg) (*Signature, error) { signatureID := uuid.New().String() return &Signature{ UUID: fmt.Sprintf("task_%v", signatureID), Name: name, Args: args, }, nil }
Signature表明對task的調用,它定義了UUID、Name、RoutingKey、ETA、GroupUUID、GroupTaskCount、Args、Headers、Priority、Immutable、RetryCount、RetryTimeout、OnSuccess、OnError、ChordCallback、BrokerMessageGroupId、SQSReceiptHandle、StopTaskDeletionOnError、IgnoreWhenTaskNotRegistered屬性
// Backend represents a Redis result backend type Backend struct { common.Backend host string password string db int pool *redis.Pool // If set, path to a socket file overrides hostname socketPath string redsync *redsync.Redsync redisOnce sync.Once common.RedisConnector } // SetStatePending updates task state to PENDING func (b *Backend) SetStatePending(signature *tasks.Signature) error { conn := b.open() defer conn.Close() taskState := tasks.NewPendingTaskState(signature) return b.updateState(conn, taskState) } // SetStateReceived updates task state to RECEIVED func (b *Backend) SetStateReceived(signature *tasks.Signature) error { conn := b.open() defer conn.Close() taskState := tasks.NewReceivedTaskState(signature) b.mergeNewTaskState(conn, taskState) return b.updateState(conn, taskState) } // SetStateStarted updates task state to STARTED func (b *Backend) SetStateStarted(signature *tasks.Signature) error { conn := b.open() defer conn.Close() taskState := tasks.NewStartedTaskState(signature) b.mergeNewTaskState(conn, taskState) return b.updateState(conn, taskState) } // SetStateRetry updates task state to RETRY func (b *Backend) SetStateRetry(signature *tasks.Signature) error { conn := b.open() defer conn.Close() taskState := tasks.NewRetryTaskState(signature) b.mergeNewTaskState(conn, taskState) return b.updateState(conn, taskState) } // SetStateSuccess updates task state to SUCCESS func (b *Backend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error { conn := b.open() defer conn.Close() taskState := tasks.NewSuccessTaskState(signature, results) b.mergeNewTaskState(conn, taskState) return b.updateState(conn, taskState) } // SetStateFailure updates task state to FAILURE func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error { conn := b.open() defer conn.Close() taskState := tasks.NewFailureTaskState(signature, err) b.mergeNewTaskState(conn, taskState) return b.updateState(conn, taskState) } // GetState returns the latest task state func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) { conn := b.open() defer conn.Close() return b.getState(conn, taskUUID) }
此Backend是基於Redis實現的,它提供了SetStatePending、SetStateReceived、SetStateStarted、SetStateRetry、SetStateSuccess、SetStateFailure、GetState方法
machinery的TaskState定義了PENDING、RECEIVED、STARTED、RETRY、SUCCESS、FAILURE狀態;TaskState定義了TaskUUID、TaskName、State、Results、Error、CreatedAt、TTL屬性;它提供了NewPendingTaskState、NewReceivedTaskState、NewStartedTaskState、NewSuccessTaskState、NewFailureTaskState、NewRetryTaskState方法來根據Signature來建立不一樣state的TaskState;另外還提供了IsCompleted、IsSuccess、IsFailure方法。github