cron
一個用於管理定時任務的庫,用 Go 實現 Linux 中crontab
這個命令的效果。以前咱們也介紹過一個相似的 Go 庫——gron
。gron
代碼小巧,用於學習是比較好的。可是它功能相對簡單些,而且已經不維護了。若是有定時任務需求,仍是建議使用cron
。git
文本代碼使用 Go Modules。github
建立目錄並初始化:golang
$ mkdir cron && cd cron
$ go mod init github.com/darjun/go-daily-lib/cron
複製代碼
安裝cron
,目前最新穩定版本爲 v3:緩存
$ go get -u github.com/robfig/cron/v3
複製代碼
使用:安全
package main
import (
"fmt"
"time"
"github.com/robfig/cron/v3"
)
func main() {
c := cron.New()
c.AddFunc("@every 1s", func() {
fmt.Println("tick every 1 second")
})
c.Start()
time.Sleep(time.Second * 5)
}
複製代碼
使用很是簡單,建立cron
對象,這個對象用於管理定時任務。微信
調用cron
對象的AddFunc()
方法向管理器中添加定時任務。AddFunc()
接受兩個參數,參數 1 以字符串形式指定觸發時間規則,參數 2 是一個無參的函數,每次觸發時調用。@every 1s
表示每秒觸發一次,@every
後加一個時間間隔,表示每隔多長時間觸發一次。例如@every 1h
表示每小時觸發一次,@every 1m2s
表示每隔 1 分 2 秒觸發一次。time.ParseDuration()
支持的格式均可以用在這裏。併發
調用c.Start()
啓動定時循環。app
注意一點,由於c.Start()
啓動一個新的 goroutine 作循環檢測,咱們在代碼最後加了一行time.Sleep(time.Second * 5)
防止主 goroutine 退出。函數
運行效果,每隔 1s 輸出一行字符串:oop
$ go run main.go
tick every 1 second
tick every 1 second
tick every 1 second
tick every 1 second
tick every 1 second
複製代碼
與Linux 中crontab
命令類似,cron
庫支持用 5 個空格分隔的域來表示時間。這 5 個域含義依次爲:
Minutes
:分鐘,取值範圍[0-59]
,支持特殊字符* / , -
;Hours
:小時,取值範圍[0-23]
,支持特殊字符* / , -
;Day of month
:每個月的第幾天,取值範圍[1-31]
,支持特殊字符* / , - ?
;Month
:月,取值範圍[1-12]
或者使用月份名字縮寫[JAN-DEC]
,支持特殊字符* / , -
;Day of week
:周曆,取值範圍[0-6]
或名字縮寫[JUN-SAT]
,支持特殊字符* / , - ?
。注意,月份和周曆名稱都是不區分大小寫的,也就是說SUN/Sun/sun
表示一樣的含義(都是週日)。
特殊字符含義以下:
*
:使用*
的域能夠匹配任何值,例如將月份域(第 4 個)設置爲*
,表示每月;/
:用來指定範圍的步長,例如將小時域(第 2 個)設置爲3-59/15
表示第 3 分鐘觸發,之後每隔 15 分鐘觸發一次,所以第 2 次觸發爲第 18 分鐘,第 3 次爲 33 分鐘。。。直到分鐘大於 59;,
:用來列舉一些離散的值和多個範圍,例如將周曆的域(第 5 個)設置爲MON,WED,FRI
表示周1、三和五;-
:用來表示範圍,例如將小時的域(第 1 個)設置爲9-17
表示上午 9 點到下午 17 點(包括 9 和 17);?
:只能用在月曆和周曆的域中,用來代替*
,表示每個月/周的任意一天。瞭解規則以後,咱們能夠定義任意時間:
30 * * * *
:分鐘域爲 30,其餘域都是*
表示任意。每小時的 30 分觸發;30 3-6,20-23 * * *
:分鐘域爲 30,小時域的3-6,20-23
表示 3 點到 6 點和 20 點到 23 點。3,4,5,6,20,21,22,23 時的 30 分觸發;0 0 1 1 *
:1(第 4 個) 月 1(第 3 個) 號的 0(第 2 個) 時 0(第 1 個) 分觸發。記熟了這幾個域的順序,再多練習幾回很容易就能掌握格式。熟悉規則了以後,就能熟練使用crontab
命令了。
func main() {
c := cron.New()
c.AddFunc("30 * * * *", func() {
fmt.Println("Every hour on the half hour")
})
c.AddFunc("30 3-6,20-23 * * *", func() {
fmt.Println("On the half hour of 3-6am, 8-11pm")
})
c.AddFunc("0 0 1 1 *", func() {
fmt.Println("Jun 1 every year")
})
c.Start()
for {
time.Sleep(time.Second)
}
}
複製代碼
爲了方便使用,cron
預約義了一些時間規則:
@yearly
:也能夠寫做@annually
,表示每一年第一天的 0 點。等價於0 0 1 1 *
;@monthly
:表示每個月第一天的 0 點。等價於0 0 1 * *
;@weekly
:表示每週第一天的 0 點,注意第一天爲週日,即週六結束,週日開始的那個 0 點。等價於0 0 * * 0
;@daily
:也能夠寫做@midnight
,表示天天 0 點。等價於0 0 * * *
;@hourly
:表示每小時的開始。等價於0 * * * *
。例如:
func main() {
c := cron.New()
c.AddFunc("@hourly", func() {
fmt.Println("Every hour")
})
c.AddFunc("@daily", func() {
fmt.Println("Every day on midnight")
})
c.AddFunc("@weekly", func() {
fmt.Println("Every week")
})
c.Start()
for {
time.Sleep(time.Second)
}
}
複製代碼
上面代碼只是演示用法,實際運行可能要等待很是長的時間纔能有輸出。
cron
支持固定時間間隔,格式爲:
@every <duration>
複製代碼
含義爲每隔duration
觸發一次。<duration>
會調用time.ParseDuration()
函數解析,因此ParseDuration
支持的格式均可以。例如1h30m10s
。在快速開始部分,咱們已經演示了@every
的用法了,這裏就不贅述了。
默認狀況下,全部時間都是基於當前時區的。固然咱們也能夠指定時區,有 2 兩種方式:
CRON_TZ=
+ 具體時區,具體時區的格式在以前carbon
的文章中有詳細介紹。東京時區爲Asia/Tokyo
,紐約時區爲America/New_York
;cron
對象時增長一個時區選項cron.WithLocation(location)
,location
爲time.LoadLocation(zone)
加載的時區對象,zone
爲具體的時區格式。或者調用已建立好的cron
對象的SetLocation()
方法設置時區。示例:
func main() {
nyc, _ := time.LoadLocation("America/New_York")
c := cron.New(cron.WithLocation(nyc))
c.AddFunc("0 6 * * ?", func() {
fmt.Println("Every 6 o'clock at New York")
})
c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", func() {
fmt.Println("Every 6 o'clock at Tokyo")
})
c.Start()
for {
time.Sleep(time.Second)
}
}
複製代碼
Job
接口除了直接將無參函數做爲回調外,cron
還支持Job
接口:
// cron.go
type Job interface {
Run()
}
複製代碼
咱們定義一個實現接口Job
的結構:
type GreetingJob struct {
Name string
}
func (g GreetingJob) Run() {
fmt.Println("Hello ", g.Name)
}
複製代碼
調用cron
對象的AddJob()
方法將GreetingJob
對象添加到定時管理器中:
func main() {
c := cron.New()
c.AddJob("@every 1s", GreetingJob{"dj"})
c.Start()
time.Sleep(5 * time.Second)
}
複製代碼
運行效果:
$ go run main.go
Hello dj
Hello dj
Hello dj
Hello dj
Hello dj
複製代碼
使用自定義的結構可讓任務攜帶狀態(Name
字段)。
實際上AddFunc()
方法內部也調用了AddJob()
方法。首先,cron
基於func()
類型定義一個新的類型FuncJob
:
// cron.go
type FuncJob func() 複製代碼
而後讓FuncJob
實現Job
接口:
// cron.go
func (f FuncJob) Run() {
f()
}
複製代碼
在AddFunc()
方法中,將傳入的回調轉爲FuncJob
類型,而後調用AddJob()
方法:
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}
複製代碼
cron
會建立一個新的 goroutine 來執行觸發回調。若是這些回調須要併發訪問一些資源、數據,咱們須要顯式地作同步。
cron
支持靈活的時間格式,若是默認的格式不能知足要求,咱們能夠本身定義時間格式。時間規則字符串須要cron.Parser
對象來解析。咱們先來看看默認的解析器是如何工做的。
首先定義各個域:
// parser.go
const (
Second ParseOption = 1 << iota
SecondOptional
Minute
Hour
Dom
Month
Dow
DowOptional
Descriptor
)
複製代碼
除了Minute/Hour/Dom(Day of month)/Month/Dow(Day of week)
外,還能夠支持Second
。相對順序都是固定的:
// parser.go
var places = []ParseOption{
Second,
Minute,
Hour,
Dom,
Month,
Dow,
}
var defaults = []string{
"0",
"0",
"0",
"*",
"*",
"*",
}
複製代碼
默認的時間格式使用 5 個域。
咱們能夠調用cron.NewParser()
建立本身的Parser
對象,以位格式傳入使用哪些域,例以下面的Parser
使用 6 個域,支持Second
(秒):
parser := cron.NewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)
複製代碼
調用cron.WithParser(parser)
建立一個選項傳入構造函數cron.New()
,使用時就能夠指定秒了:
c := cron.New(cron.WithParser(parser))
c.AddFunc("1 * * * * *", func () {
fmt.Println("every 1 second")
})
c.Start()
複製代碼
這裏時間格式必須使用 6 個域,順序與上面的const
定義一致。
由於上面的時間格式太常見了,cron
定義了一個便捷的函數:
// option.go
func WithSeconds() Option {
return WithParser(NewParser(
Second | Minute | Hour | Dom | Month | Dow | Descriptor,
))
}
複製代碼
注意Descriptor
表示對@every/@hour
等的支持。有了WithSeconds()
,咱們不用手動建立Parser
對象了:
c := cron.New(cron.WithSeconds())
複製代碼
cron
對象建立使用了選項模式,咱們前面已經介紹了 3 個選項:
WithLocation
:指定時區;WithParser
:使用自定義的解析器;WithSeconds
:讓時間格式支持秒,實際上內部調用了WithParser
。cron
還提供了另外兩種選項:
WithLogger
:自定義Logger
;WithChain
:Job 包裝器。WithLogger
WithLogger
能夠設置cron
內部使用咱們自定義的Logger
:
func main() {
c := cron.New(
cron.WithLogger(
cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))))
c.AddFunc("@every 1s", func() {
fmt.Println("hello world")
})
c.Start()
time.Sleep(5 * time.Second)
}
複製代碼
上面調用cron.VerbosPrintfLogger()
包裝log.Logger
,這個logger
會詳細記錄cron
內部的調度過程:
$ go run main.go
cron: 2020/06/26 07:09:14 start cron: 2020/06/26 07:09:14 schedule, now=2020-06-26T07:09:14+08:00, entry=1, next=2020-06-26T07:09:15+08:00 cron: 2020/06/26 07:09:15 wake, now=2020-06-26T07:09:15+08:00 cron: 2020/06/26 07:09:15 run, now=2020-06-26T07:09:15+08:00, entry=1, next=2020-06-26T07:09:16+08:00 hello world cron: 2020/06/26 07:09:16 wake, now=2020-06-26T07:09:16+08:00 cron: 2020/06/26 07:09:16 run, now=2020-06-26T07:09:16+08:00, entry=1, next=2020-06-26T07:09:17+08:00 hello world cron: 2020/06/26 07:09:17 wake, now=2020-06-26T07:09:17+08:00 cron: 2020/06/26 07:09:17 run, now=2020-06-26T07:09:17+08:00, entry=1, next=2020-06-26T07:09:18+08:00 hello world cron: 2020/06/26 07:09:18 wake, now=2020-06-26T07:09:18+08:00 hello world cron: 2020/06/26 07:09:18 run, now=2020-06-26T07:09:18+08:00, entry=1, next=2020-06-26T07:09:19+08:00 cron: 2020/06/26 07:09:19 wake, now=2020-06-26T07:09:19+08:00 hello world cron: 2020/06/26 07:09:19 run, now=2020-06-26T07:09:19+08:00, entry=1, next=2020-06-26T07:09:20+08:0 複製代碼
咱們看看默認的Logger
是什麼樣的:
// logger.go
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
return printfLogger{l, false}
}
func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
return printfLogger{l, true}
}
type printfLogger struct {
logger interface{ Printf(string, ...interface{}) }
logInfo bool
}
複製代碼
WithChain
Job 包裝器能夠在執行實際的Job
先後添加一些邏輯:
panic
;Job
上次運行還未結束,推遲本次執行;Job
上次運行還未介紹,跳過本次執行;Job
的執行狀況。咱們能夠將Chain
類比爲 Web 處理器的中間件。實際上就是在Job
的執行邏輯外在封裝一層邏輯。咱們的封裝邏輯須要寫成一個函數,傳入一個Job
類型,返回封裝後的Job
。cron
爲這種函數定義了一個類型JobWrapper
:
// chain.go
type JobWrapper func(Job) Job 複製代碼
而後使用一個Chain
對象將這些JobWrapper
組合到一塊兒:
type Chain struct {
wrappers []JobWrapper
}
func NewChain(c ...JobWrapper) Chain {
return Chain{c}
}
複製代碼
調用Chain
對象的Then(job)
方法應用這些JobWrapper
,返回最終的`Job:
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
}
return j
}
複製代碼
注意應用JobWrapper
的順序。
JobWrapper
cron
內置了 3 個用得比較多的JobWrapper
:
Recover
:捕獲內部Job
產生的 panic;DelayIfStillRunning
:觸發時,若是上一次任務還未執行完成(耗時太長),則等待上一次任務完成以後再執行;SkipIfStillRunning
:觸發時,若是上一次任務還未完成,則跳過這次執行。下面分別介紹。
Recover
先看看如何使用:
type panicJob struct {
count int
}
func (p *panicJob) Run() {
p.count++
if p.count == 1 {
panic("oooooooooooooops!!!")
}
fmt.Println("hello world")
}
func main() {
c := cron.New()
c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(&panicJob{}))
c.Start()
time.Sleep(5 * time.Second)
}
複製代碼
panicJob
在第一次觸發時,觸發了panic
。由於有cron.Recover()
保護,後續任務還能執行:
go run main.go
cron: 2020/06/27 14:02:00 panic, error=oooooooooooooops!!!, stack=... goroutine 18 [running]: github.com/robfig/cron/v3.Recover.func1.1.1(0x514ee0, 0xc0000044a0) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:45 +0xbc panic(0x4cf380, 0x513280) C:/Go/src/runtime/panic.go:969 +0x174 main.(*panicJob).Run(0xc0000140e8) D:/code/golang/src/github.com/darjun/go-daily-lib/cron/recover/main.go:17 +0xba github.com/robfig/cron/v3.Recover.func1.1() D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:53 +0x6f github.com/robfig/cron/v3.FuncJob.Run(0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:136 +0x2c github.com/robfig/cron/v3.(*Cron).startJob.func1(0xc00005c0a0, 0x514d20, 0xc000070390) D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:312 +0x68 created by github.com/robfig/cron/v3.(*Cron).startJob D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:310 +0x7a hello world hello world hello world hello world 複製代碼
咱們看看cron.Recover()
的實現,很簡單:
// cron.go
func Recover(logger Logger) JobWrapper {
return func(j Job) Job {
return FuncJob(func() {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err, ok := r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
logger.Error(err, "panic", "stack", "...\n"+string(buf))
}
}()
j.Run()
})
}
}
複製代碼
就是在執行內層的Job
邏輯前,添加recover()
調用。若是Job.Run()
執行過程當中有panic
。這裏的recover()
會捕獲到,輸出調用堆棧。
DelayIfStillRunning
仍是先看如何使用:
type delayJob struct {
count int
}
func (d *delayJob) Run() {
time.Sleep(2 * time.Second)
d.count++
log.Printf("%d: hello world\n", d.count)
}
func main() {
c := cron.New()
c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{}))
c.Start()
time.Sleep(10 * time.Second)
}
複製代碼
上面咱們在Run()
中增長了一個 2s 的延遲,輸出中間隔變爲 2s,而不是定時的 1s:
$ go run main.go
2020/06/27 14:11:16 1: hello world
2020/06/27 14:11:18 2: hello world
2020/06/27 14:11:20 3: hello world
2020/06/27 14:11:22 4: hello world
複製代碼
看看源碼:
// chain.go
func DelayIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return FuncJob(func() {
start := time.Now()
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
logger.Info("delay", "duration", dur)
}
j.Run()
})
}
}
複製代碼
首先定義一個該任務共用的互斥鎖sync.Mutex
,每次執行任務前獲取鎖,執行結束以後釋放鎖。因此在上一個任務結束前,下一個任務獲取鎖是沒法成功的,從而保證的任務的串行執行。
SkipIfStillRunning
仍是先看看如何使用:
type skipJob struct {
count int32
}
func (d *skipJob) Run() {
atomic.AddInt32(&d.count, 1)
log.Printf("%d: hello world\n", d.count)
if atomic.LoadInt32(&d.count) == 1 {
time.Sleep(2 * time.Second)
}
}
func main() {
c := cron.New()
c.AddJob("@every 1s", cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(&skipJob{}))
c.Start()
time.Sleep(10 * time.Second)
}
複製代碼
輸出:
$ go run main.go
2020/06/27 14:22:07 1: hello world
2020/06/27 14:22:10 2: hello world
2020/06/27 14:22:11 3: hello world
2020/06/27 14:22:12 4: hello world
2020/06/27 14:22:13 5: hello world
2020/06/27 14:22:14 6: hello world
2020/06/27 14:22:15 7: hello world
2020/06/27 14:22:16 8: hello world
複製代碼
注意觀察時間,第一個與第二個輸出之間相差 3s,由於跳過了兩次執行。
注意DelayIfStillRunning
與SkipIfStillRunning
是有本質上的區別的,前者DelayIfStillRunning
只要時間足夠長,全部的任務都會循序漸進地完成,只是可能前一個任務耗時過長,致使後一個任務的執行時間推遲了一點。SkipIfStillRunning
會跳過一些執行。
看看源碼:
func SkipIfStillRunning(logger Logger) JobWrapper {
return func(j Job) Job {
var ch = make(chan struct{}, 1)
ch <- struct{}{}
return FuncJob(func() {
select {
case v := <-ch:
j.Run()
ch <- v
default:
logger.Info("skip")
}
})
}
}
複製代碼
定義一個該任務共用的緩存大小爲 1 的通道chan struct{}
。執行任務時,從通道中取值,若是成功,執行,不然跳過。執行完成以後再向通道中發送一個值,確保下一個任務能執行。初始發送一個值到通道中,保證第一個任務的執行。
cron
實現比較小巧,且優雅,代碼行數也很少,很是值得一看!
你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
個人博客:darjun.github.io
歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~