前言
哈嘍,你們好,我是asong,此次給你們介紹一個go的異步任務框架machinery。使用過python的同窗們都知道
Celery
框架,machinery
框架就相似於Celery
框架。下面咱們就來學習一下machinery
的基本使用。python本身翻譯一個粗略版的
machinery
中文文檔,有須要的夥伴們公衆號自取無水印版:後臺回覆:machinery便可領取。git或者github下載:https://github.com/asong2020/Golang_Dream/tree/master/machinerygithub
拋磚引玉
咱們在使用某些APP時,登錄系統後通常會收到一封郵件或者一個短信提示咱們在某個時間某某地點登錄了。而郵件或短信都是在咱們已經登錄後才收到,這裏就是採用的異步機制。你們有沒有想過這裏爲何沒有使用同步機制實現呢?咱們來分析一下。假設咱們如今採用同步的方式實現,用戶在登陸時,首先會去檢驗一下帳號密碼是否正確,驗證經過後去給用戶發送登錄提示信息,假如在這一步出錯了,那麼就會致使用戶登錄失敗,這樣是大大影響用戶的體驗感的,一個登錄提示的優先級別並非很高,因此咱們徹底能夠採用異步的機制實現,即便失敗了也不會影響用戶的體驗。前面說了這麼多,那麼異步機制該怎麼實現呢?對,沒錯,就是machinery
框架,據說大家還不會使用它,今天我就寫一個小例子,咱們一塊兒來學習一下他吧。golang
特性
上面只是簡單舉了個例子,任務隊列有着普遍的應用場景,好比大批量的計算任務,當有大量數據插入,經過拆分並分批插入任務隊列,從而實現串行鏈式任務處理或者實現分組並行任務處理,提升系統魯棒性,提升系統併發度;或者對數據進行預處理,按期的從後端存儲將數據同步到到緩存系統,從而在查詢請求發生時,直接去緩存系統中查詢,提升查詢請求的響應速度。適用任務隊列的場景有不少,這裏就不一一列舉了。迴歸本文主題,既然咱們要學習machinery
,就要先了解一下他都有哪些特性呢?web
-
任務重試機制 -
延遲任務支持 -
任務回調機制 -
任務結果記錄 -
支持Workflow模式:Chain,Group,Chord -
多Brokers支持:Redis, AMQP, AWS SQS -
多Backends支持:Redis, Memcache, AMQP, MongoDB
架構
任務隊列,簡而言之就是一個放大的生產者消費者模型,用戶請求會生成任務,任務生產者不斷的向隊列中插入任務,同時,隊列的處理器程序充當消費者不斷的消費任務。基於這種框架設計思想,咱們來看下machinery的簡單設計結構圖例:面試
-
Sender:業務推送模塊,生成具體任務,可根據業務邏輯中,按交互進行拆分; -
Broker:存儲具體序列化後的任務,machinery中目前支持到Redis, AMQP,和SQS; -
Worker:工做進程,負責消費者功能,處理具體的任務; -
Backend:後端存儲,用於存儲任務執行狀態的數據;
e.g
學習一門新東西,我都習慣先寫一個demo,先學會了走,再學會跑。因此先來看一個例子,功能很簡單,異步計算1到10的和。redis
先看一下配置文件代碼:後端
broker: redis://localhost:6379
default_queue: "asong"
result_backend: redis://localhost:6379
redis:
max_idle: 3
max_active: 3
max_idle_timeout: 240
wait: true
read_timeout: 15
write_timeout: 15
connect_timeout: 15
normal_tasks_poll_period: 1000
delayed_tasks_poll_period: 500
delayed_tasks_key: "asong"
這裏broker
與result_backend
來實現。設計模式
主代碼,完整版github獲取:緩存
func main() {
cnf,err := config.NewFromYaml("./config.yml",false)
if err != nil{
log.Println("config failed",err)
return
}
server,err := machinery.NewServer(cnf)
if err != nil{
log.Println("start server failed",err)
return
}
// 註冊任務
err = server.RegisterTask("sum",Sum)
if err != nil{
log.Println("reg task failed",err)
return
}
worker := server.NewWorker("asong", 1)
go func() {
err = worker.Launch()
if err != nil {
log.Println("start worker error",err)
return
}
}()
//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
}
asyncResult, err := server.SendTask(signature)
if err != nil {
log.Fatal(err)
}
res, err := asyncResult.Get(1)
if err != nil {
log.Fatal(err)
}
log.Printf("get res is %v\n", tasks.HumanReadableResults(res))
}
運行結果:
INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55
好啦,如今咱們開始講一講上面的代碼流程,
-
讀取配置文件,這一步是爲了配置 broker
和result_backend
,這裏我都選擇的是redis
,由於電腦正好有這個環境,就直接用了。 -
Machinery
庫必須在使用前實例化。實現方法是建立一個Server
實例。Server
是Machinery
配置和註冊任務的基本對象。 -
在你的 workders
能消費一個任務前,你須要將它註冊到服務器。這是經過給任務分配一個惟一的名稱來實現的。 -
爲了消費任務,你需有有一個或多個worker正在運行。運行worker所須要的只是一個具備已註冊任務的 Server
實例。每一個worker將只使用已註冊的任務。對於隊列中的每一個任務,Worker.Process()方法將在一個goroutine中運行。可使用server.NewWorker
的第二參數來限制併發運行的worker.Process()調用的數量(每一個worker)。 -
能夠經過將 Signature
實例傳遞給Server
實例來調用任務。 -
調用 HumanReadableResults
這個方法能夠處理反射值,獲取到最終的結果。
多功能
1. 延時任務
上面的代碼只是一個簡單machinery
使用示例,其實machiney
也支持延時任務的,能夠經過在任務signature
上設置ETA時間戳字段來延遲任務。
eta := time.Now().UTC().Add(time.Second * 20)
signature.ETA = &eta
2. 重試任務
在將任務聲明爲失敗以前,能夠設置屢次重試嘗試。斐波那契序列將用於在一段時間內分隔重試請求。這裏可使用兩種方法,第一種直接對tsak signature
中的retryTimeout
和RetryCount
字段進行設置,就能夠,重試時間將按照斐波那契數列進行疊加。
//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
RetryTimeout: 100,
RetryCount: 3,
}
或者,你可使用return.tasks.ErrRetryTaskLater
返回任務並指定重試的持續時間。
func Sum(args []int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, tasks.NewErrRetryTaskLater("我說他錯了", 4 * time.Second)
}
3. 工做流
上面咱們講的都是運行一個異步任務,可是咱們每每作項目時,一個需求是須要多個異步任務以編排好的方式執行的,因此咱們就可使用machinery
的工做流來完成。
3.1 Groups
Group
是一組任務,它們將相互獨立地並行執行。仍是畫個圖吧,這樣看起來更明瞭:
一塊兒來看一個簡單的例子:
// group
group,err :=tasks.NewGroup(signature1,signature2,signature3)
if err != nil{
log.Println("add group failed",err)
}
asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
if err != nil {
log.Println(err)
}
for _, asyncResult := range asyncResults{
results,err := asyncResult.Get(1)
if err != nil{
log.Println(err)
continue
}
log.Printf(
"%v %v %v\n",
asyncResult.Signature.Args[0].Value,
tasks.HumanReadableResults(results),
)
}
group
中的任務是並行執行的。
3.2 chords
咱們在作項目時,每每會有一些回調場景,machiney
也爲咱們考慮到了這一點,Chord
容許你定一個回調任務在groups
中的全部任務執行結束後被執行。
來看一段代碼:
callback := &tasks.Signature{
Name: "call",
}
group, err := tasks.NewGroup(signature1, signature2, signature3)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chord, err := tasks.NewChord(group, callback)
if err != nil {
log.Printf("Error creating chord: %s", err)
return
}
chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
if err != nil {
log.Printf("Could not send chord: %s", err.Error())
return
}
results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chord result failed with error: %s", err.Error())
return
}
log.Printf("%v\n", tasks.HumanReadableResults(results))
上面的例子並行執行task一、task二、task3,聚合它們的結果並將它們傳遞給callback任務。
3.3 chains
chain
就是一個接一個執行的任務集,每一個成功的任務都會觸發chain
中的下一個任務。
看這樣一段代碼:
//chain
chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
if err != nil {
log.Printf("Could not send chain: %s", err.Error())
return
}
results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chain result failed with error: %s", err.Error())
}
log.Printf(" %v\n", tasks.HumanReadableResults(results))
上面的例子執行task1,而後是task2,而後是task3。當一個任務成功完成時,結果被附加到chain
中下一個任務的參數列表的末尾,最終執行callback
任務。
文中代碼地址:https://github.com/asong2020/Golang_Dream/tree/master/machinery/example
總結
這一篇文章到這裏就結束了,machinery
還有不少用法,好比定時任務、定時任務組等等,就不在這一篇文章介紹了。更多使用方法解鎖能夠觀看machinery
文檔。由於machiney
沒有中文文檔,因此我在學習的過程本身翻譯了一篇中文文檔,須要的小夥伴們自取。
獲取步驟:關注公衆號【Golang夢工廠】,後臺回覆:machiney便可獲取無水印版~~~
好啦,這一篇文章到這就結束了,咱們下期見~~。但願對大家有用,又不對的地方歡迎指出,可添加個人golang交流羣,咱們一塊兒學習交流。
結尾給你們發一個小福利吧,最近我在看[微服務架構設計模式]這一本書,講的很好,本身也收集了一本PDF,有須要的小夥能夠到自行下載。獲取方式:關注公衆號:[Golang夢工廠],後臺回覆:[微服務],便可獲取。
我翻譯了一份GIN中文文檔,會按期進行維護,有須要的小夥伴後臺回覆[gin]便可下載。
我是asong,一名普普統統的程序猿,讓gi我一塊兒慢慢變強吧。我本身建了一個golang
交流羣,有須要的小夥伴加我vx
,我拉你入羣。歡迎各位的關注,咱們下期見~~~

推薦往期文章:
-
手把手教姐姐寫消息隊列 -
常見面試題之緩存雪崩、緩存穿透、緩存擊穿 -
詳解Context包,看這一篇就夠了!!! -
go-ElasticSearch入門看這一篇就夠了(一) -
面試官:go中for-range使用過嗎?這幾個問題你能解釋一下緣由嗎 -
學會wire依賴注入、cron定時任務其實就這麼簡單! -
據說你還不會jwt和swagger-飯我都不吃了帶着實踐項目我就來了 -
掌握這些Go語言特性,你的水平將提升N個檔次(二) -
go實現多人聊天室,在這裏你想聊什麼均可以的啦!!! -
grpc實踐-學會grpc就是這麼簡單 -
go標準庫rpc實踐 -
2020最新Gin框架中文文檔 asong又撿起來了英語,用心翻譯 -
基於gin的幾種熱加載方式
本文分享自微信公衆號 - Golang夢工廠(AsongDream)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。