asynq:一個由 Go 開發的輕量級的異步定時任務系統

最近開發了一個Go的簡單高效的異步任務處理庫:Asyqngit


安裝

要安裝asynq庫和asynqmon命令行工具,請運行如下命令:github

go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon
複製代碼

入門

在本asynq教程中,咱們將建立兩個程序。redis

producer.go將建立並定時要由consumer異步處理的任務。安全

consumer.go 將處理producer建立的任務。bash

假定在上運行Redis服務器localhost:6379。在開始以前,請確保已安裝並運行Redis。服務器

咱們須要作的第一件事是建立兩個主文件:併發

mkdir producer consumer
touch producer/producer.go consumer/consumer.go
複製代碼

導入asynq兩個文件:app

import "github.com/hibiken/asynq"
複製代碼

Asynq使用Redis做爲消息代理。使用一種RedisConnOpt類型來指定如何鏈接到Redis。咱們這裏將使用RedisClientOpt異步

// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Omit if no password is required
    Password: "mypassword",
    // Use a dedicated db number for asynq.
    // By default, Redis offers 16 databases (0..15)
    DB: 0,
}
複製代碼

producer.go,咱們將建立一個Client實例來建立和定時任務。 在asynq,要執行的工做單元被封裝在稱爲的結構中Task。其中有兩個字段:TypePayload函數

// Task represents a task to be performed.
type Task struct {
    // Type indicates the type of task to be performed.
    Type string

    // Payload holds data needed to perform the task.
    Payload Payload
}
複製代碼

要建立任務,請使用NewTask函數,併爲任務傳遞類型和有效負載。

能夠經過Client.Schedule傳入任務和須要處理的時間來計劃任務。

// producer.go
func main() {
    client := asynq.NewClient(redis)

    // Create a task with typename and payload.
    t1 := asynq.NewTask(
        "send_welcome_email",
        map[string]interface{}{"user_id": 42})

    t2 := asynq.NewTask(
        "send_reminder_email",
        map[string]interface{}{"user_id": 42})

    // Process the task immediately.
    err := client.Schedule(t1, time.Now())
    if err != nil {
        log.Fatal(err)
    }

    // Process the task 24 hours later.
    err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
    if err != nil {
        log.Fatal(err)
    }
}
複製代碼

consumer.go,建立一個Background例來處理任務。

NewBackground函數須要RedisConnOpt和Config

您能夠查看有關文檔,Config以查看可用的選項。

在此示例中,咱們僅指定併發。

// consumer.go
func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    bg.Run(handler)
}
複製代碼

參數t(*asynq.Background).Runasynq.Handler具備一種方法的接口ProcessTask

// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
    ProcessTask(*Task) error
}
複製代碼

實現處理程序的最簡單方法是定義一個具備相同type的函數,並asynq.HandlerFunc在將其傳遞給時使用適配器類型Run

func handler(t *asynq.Task) error {
    switch t.Type {
    case "send_welcome_email":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Welcome Email to User %d\n", id)

    case "send_reminder_email":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Reminder Email to User %d\n", id)

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type)
    }
    return nil
}

func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    // Use asynq.HandlerFunc adapter for a handler function
    bg.Run(asynq.HandlerFunc(handler))
}
複製代碼

咱們能夠繼續向該處理函數添加案例,可是在實際應用中,在單獨的函數中爲每種案例定義邏輯很方便。爲了重構咱們的代碼,讓咱們建立一個簡單的調度程序,將任務類型映射到其處理程序:

// consumer.go

// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
    mapping map[string]asynq.HandlerFunc
}

// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
    d.mapping[taskType] = fn
}

// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
    fn, ok := d.mapping[task.Type]
    if !ok {
        return fmt.Errorf("no handler registered for %q", task.Type)
    }
    return fn(task)
}

func main() {
    d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
    d.HandleFunc("send_welcome_email", sendWelcomeEmail)
    d.HandleFunc("send_reminder_email", sendReminderEmail)

    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })
    bg.Run(d)
}

func sendWelcomeEmail(t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}

func sendReminderEmail(t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}
複製代碼

如今咱們既有任務生產者又有消費者,咱們能夠運行這兩個程序。

go run producer.go
複製代碼

這將建立兩項任務:一項應當即處理,另外一項將在24小時後處理。

讓咱們使用asynqmon工具檢查任務。

asynqmon stats
複製代碼

你應該能看到,有一個任務Enqueued狀態,另外一個在Scheduled狀態。

注意:如需瞭解每種狀態的含義,請參閱Wiki頁面上Life of Task

讓咱們運行asynqmonwatch命令,以便咱們可以連續運行的命令看到的變化。

watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds
複製代碼

最後,讓咱們啓動consumer程序來處理定時的任務。

go run consumer.go
複製代碼

注意:在您發送信號終止程序以前,此操做不會退出。有關如何安全終止後臺處理的最佳實踐,請參見Signal Wiki頁面

您應該可以看到在終端上打印的文本,代表該任務已成功處理。

這是一次asynq基礎的快速教程。要了解有關其全部功能(如**優先級隊列自定義重試**)的更多信息,請參見的Wiki頁面

命令行工具

Asynq附帶了一個命令行工具來檢查隊列和任務的狀態。

要安裝,請運行如下命令:

go get github.com/hibiken/asynq/tools/asynqmon
複製代碼

完成!

例圖:

asynqmon_stats.gif

詳情請參考:Asyqn-https://github.com/hibiken/asynq

相關文章
相關標籤/搜索