Go 每日一庫之 cron

簡介

cron一個用於管理定時任務的庫,用 Go 實現 Linux 中crontab這個命令的效果。以前咱們也介紹過一個相似的 Go 庫——grongron代碼小巧,用於學習是比較好的。可是它功能相對簡單些,而且已經不維護了。若是有定時任務需求,仍是建議使用crongit

快速使用

文本代碼使用 Go Modules。github

建立目錄並初始化:golang

$ mkdir cron && cd cron
$ go mod init github.com/darjun/go-daily-lib/cron

安裝cron,目前最新穩定版本爲 v3:緩存

$ go get -u github.com/robfig/cron/v3

使用:安全

package main

import (
  "fmt"
  "time"

  "github.com/robfig/cron/v3"
)

func main() {
  c := cron.New()

  c.AddFunc("@every 1s", func() {
    fmt.Println("tick every 1 second")
  })

  c.Start()
  time.Sleep(time.Second * 5)
}

使用很是簡單,建立cron對象,這個對象用於管理定時任務。微信

調用cron對象的AddFunc()方法向管理器中添加定時任務。AddFunc()接受兩個參數,參數 1 以字符串形式指定觸發時間規則,參數 2 是一個無參的函數,每次觸發時調用。@every 1s表示每秒觸發一次,@every後加一個時間間隔,表示每隔多長時間觸發一次。例如@every 1h表示每小時觸發一次,@every 1m2s表示每隔 1 分 2 秒觸發一次。time.ParseDuration()支持的格式均可以用在這裏。併發

調用c.Start()啓動定時循環。app

注意一點,由於c.Start()啓動一個新的 goroutine 作循環檢測,咱們在代碼最後加了一行time.Sleep(time.Second * 5)防止主 goroutine 退出。函數

運行效果,每隔 1s 輸出一行字符串:oop

$ go run main.go 
tick every 1 second
tick every 1 second
tick every 1 second
tick every 1 second
tick every 1 second

時間格式

與Linux 中crontab命令類似,cron庫支持用 5 個空格分隔的域來表示時間。這 5 個域含義依次爲:

  • Minutes:分鐘,取值範圍[0-59],支持特殊字符* / , -
  • Hours:小時,取值範圍[0-23],支持特殊字符* / , -
  • Day of month:每個月的第幾天,取值範圍[1-31],支持特殊字符* / , - ?
  • Month:月,取值範圍[1-12]或者使用月份名字縮寫[JAN-DEC],支持特殊字符* / , -
  • Day of week:周曆,取值範圍[0-6]或名字縮寫[JUN-SAT],支持特殊字符* / , - ?

注意,月份和周曆名稱都是不區分大小寫的,也就是說SUN/Sun/sun表示一樣的含義(都是週日)。

特殊字符含義以下:

  • *:使用*的域能夠匹配任何值,例如將月份域(第 4 個)設置爲*,表示每月;
  • /:用來指定範圍的步長,例如將小時域(第 2 個)設置爲3-59/15表示第 3 分鐘觸發,之後每隔 15 分鐘觸發一次,所以第 2 次觸發爲第 18 分鐘,第 3 次爲 33 分鐘。。。直到分鐘大於 59;
  • ,:用來列舉一些離散的值和多個範圍,例如將周曆的域(第 5 個)設置爲MON,WED,FRI表示周1、三和五;
  • -:用來表示範圍,例如將小時的域(第 1 個)設置爲9-17表示上午 9 點到下午 17 點(包括 9 和 17);
  • ?:只能用在月曆和周曆的域中,用來代替*,表示每個月/周的任意一天。

瞭解規則以後,咱們能夠定義任意時間:

  • 30 * * * *:分鐘域爲 30,其餘域都是*表示任意。每小時的 30 分觸發;
  • 30 3-6,20-23 * * *:分鐘域爲 30,小時域的3-6,20-23表示 3 點到 6 點和 20 點到 23 點。3,4,5,6,20,21,22,23 時的 30 分觸發;
  • 0 0 1 1 *:1(第 4 個) 月 1(第 3 個) 號的 0(第 2 個) 時 0(第 1 個) 分觸發。

記熟了這幾個域的順序,再多練習幾回很容易就能掌握格式。熟悉規則了以後,就能熟練使用crontab命令了。

func main() {
  c := cron.New()

  c.AddFunc("30 * * * *", func() {
    fmt.Println("Every hour on the half hour")
  })

  c.AddFunc("30 3-6,20-23 * * *", func() {
    fmt.Println("On the half hour of 3-6am, 8-11pm")
  })

  c.AddFunc("0 0 1 1 *", func() {
    fmt.Println("Jun 1 every year")
  })

  c.Start()

  for {
    time.Sleep(time.Second)
  }
}

預約義時間規則

爲了方便使用,cron預約義了一些時間規則:

  • @yearly:也能夠寫做@annually,表示每一年第一天的 0 點。等價於0 0 1 1 *
  • @monthly:表示每個月第一天的 0 點。等價於0 0 1 * *
  • @weekly:表示每週第一天的 0 點,注意第一天爲週日,即週六結束,週日開始的那個 0 點。等價於0 0 * * 0
  • @daily:也能夠寫做@midnight,表示天天 0 點。等價於0 0 * * *
  • @hourly:表示每小時的開始。等價於0 * * * *

例如:

func main() {
  c := cron.New()

  c.AddFunc("@hourly", func() {
    fmt.Println("Every hour")
  })

  c.AddFunc("@daily", func() {
    fmt.Println("Every day on midnight")
  })

  c.AddFunc("@weekly", func() {
    fmt.Println("Every week")
  })

  c.Start()

  for {
    time.Sleep(time.Second)
  }
}

上面代碼只是演示用法,實際運行可能要等待很是長的時間纔能有輸出。

固定時間間隔

cron支持固定時間間隔,格式爲:

@every <duration>

含義爲每隔duration觸發一次。<duration>會調用time.ParseDuration()函數解析,因此ParseDuration支持的格式均可以。例如1h30m10s。在快速開始部分,咱們已經演示了@every的用法了,這裏就不贅述了。

時區

默認狀況下,全部時間都是基於當前時區的。固然咱們也能夠指定時區,有 2 兩種方式:

  • 在時間字符串前面添加一個CRON_TZ= + 具體時區,具體時區的格式在以前carbon的文章中有詳細介紹。東京時區爲Asia/Tokyo,紐約時區爲America/New_York
  • 建立cron對象時增長一個時區選項cron.WithLocation(location)locationtime.LoadLocation(zone)加載的時區對象,zone爲具體的時區格式。或者調用已建立好的cron對象的SetLocation()方法設置時區。

示例:

func main() {
  nyc, _ := time.LoadLocation("America/New_York")
  c := cron.New(cron.WithLocation(nyc))
  c.AddFunc("0 6 * * ?", func() {
    fmt.Println("Every 6 o'clock at New York")
  })

  c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", func() {
    fmt.Println("Every 6 o'clock at Tokyo")
  })

  c.Start()

  for {
    time.Sleep(time.Second)
  }
}

Job接口

除了直接將無參函數做爲回調外,cron還支持Job接口:

// cron.go
type Job interface {
  Run()
}

咱們定義一個實現接口Job的結構:

type GreetingJob struct {
  Name string
}

func (g GreetingJob) Run() {
  fmt.Println("Hello ", g.Name)
}

調用cron對象的AddJob()方法將GreetingJob對象添加到定時管理器中:

func main() {
  c := cron.New()
  c.AddJob("@every 1s", GreetingJob{"dj"})
  c.Start()

  time.Sleep(5 * time.Second)
}

運行效果:

$ go run main.go 
Hello  dj
Hello  dj
Hello  dj
Hello  dj
Hello  dj

使用自定義的結構可讓任務攜帶狀態(Name字段)。

實際上AddFunc()方法內部也調用了AddJob()方法。首先,cron基於func()類型定義一個新的類型FuncJob

// cron.go
type FuncJob func()

而後讓FuncJob實現Job接口:

// cron.go
func (f FuncJob) Run() {
  f()
}

AddFunc()方法中,將傳入的回調轉爲FuncJob類型,而後調用AddJob()方法:

func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
  return c.AddJob(spec, FuncJob(cmd))
}

線程安全

cron會建立一個新的 goroutine 來執行觸發回調。若是這些回調須要併發訪問一些資源、數據,咱們須要顯式地作同步。

自定義時間格式

cron支持靈活的時間格式,若是默認的格式不能知足要求,咱們能夠本身定義時間格式。時間規則字符串須要cron.Parser對象來解析。咱們先來看看默認的解析器是如何工做的。

首先定義各個域:

// parser.go
const (
  Second         ParseOption = 1 << iota
  SecondOptional                        
  Minute                                
  Hour                                  
  Dom                                   
  Month                                 
  Dow                                   
  DowOptional                           
  Descriptor                            
)

除了Minute/Hour/Dom(Day of month)/Month/Dow(Day of week)外,還能夠支持Second。相對順序都是固定的:

// parser.go
var places = []ParseOption{
  Second,
  Minute,
  Hour,
  Dom,
  Month,
  Dow,
}

var defaults = []string{
  "0",
  "0",
  "0",
  "*",
  "*",
  "*",
}

默認的時間格式使用 5 個域。

咱們能夠調用cron.NewParser()建立本身的Parser對象,以位格式傳入使用哪些域,例以下面的Parser使用 6 個域,支持Second(秒):

parser := cron.NewParser(
  cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)

調用cron.WithParser(parser)建立一個選項傳入構造函數cron.New(),使用時就能夠指定秒了:

c := cron.New(cron.WithParser(parser))
c.AddFunc("1 * * * * *", func () {
  fmt.Println("every 1 second")
})
c.Start()

這裏時間格式必須使用 6 個域,順序與上面的const定義一致。

由於上面的時間格式太常見了,cron定義了一個便捷的函數:

// option.go
func WithSeconds() Option {
  return WithParser(NewParser(
    Second | Minute | Hour | Dom | Month | Dow | Descriptor,
  ))
}

注意Descriptor表示對@every/@hour等的支持。有了WithSeconds(),咱們不用手動建立Parser對象了:

c := cron.New(cron.WithSeconds())

選項

cron對象建立使用了選項模式,咱們前面已經介紹了 3 個選項:

  • WithLocation:指定時區;
  • WithParser:使用自定義的解析器;
  • WithSeconds:讓時間格式支持秒,實際上內部調用了WithParser

cron還提供了另外兩種選項:

  • WithLogger:自定義Logger
  • WithChain:Job 包裝器。

WithLogger

WithLogger能夠設置cron內部使用咱們自定義的Logger

func main() {
  c := cron.New(
    cron.WithLogger(
      cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))))
  c.AddFunc("@every 1s", func() {
    fmt.Println("hello world")
  })
  c.Start()

  time.Sleep(5 * time.Second)
}

上面調用cron.VerbosPrintfLogger()包裝log.Logger,這個logger會詳細記錄cron內部的調度過程:

$ go run main.go
cron: 2020/06/26 07:09:14 start
cron: 2020/06/26 07:09:14 schedule, now=2020-06-26T07:09:14+08:00, entry=1, next=2020-06-26T07:09:15+08:00
cron: 2020/06/26 07:09:15 wake, now=2020-06-26T07:09:15+08:00
cron: 2020/06/26 07:09:15 run, now=2020-06-26T07:09:15+08:00, entry=1, next=2020-06-26T07:09:16+08:00
hello world
cron: 2020/06/26 07:09:16 wake, now=2020-06-26T07:09:16+08:00
cron: 2020/06/26 07:09:16 run, now=2020-06-26T07:09:16+08:00, entry=1, next=2020-06-26T07:09:17+08:00
hello world
cron: 2020/06/26 07:09:17 wake, now=2020-06-26T07:09:17+08:00
cron: 2020/06/26 07:09:17 run, now=2020-06-26T07:09:17+08:00, entry=1, next=2020-06-26T07:09:18+08:00
hello world
cron: 2020/06/26 07:09:18 wake, now=2020-06-26T07:09:18+08:00
hello world
cron: 2020/06/26 07:09:18 run, now=2020-06-26T07:09:18+08:00, entry=1, next=2020-06-26T07:09:19+08:00
cron: 2020/06/26 07:09:19 wake, now=2020-06-26T07:09:19+08:00
hello world
cron: 2020/06/26 07:09:19 run, now=2020-06-26T07:09:19+08:00, entry=1, next=2020-06-26T07:09:20+08:0

咱們看看默認的Logger是什麼樣的:

// logger.go
var DefaultLogger Logger = PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))

func PrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  return printfLogger{l, false}
}

func VerbosePrintfLogger(l interface{ Printf(string, ...interface{}) }) Logger {
  return printfLogger{l, true}
}

type printfLogger struct {
  logger  interface{ Printf(string, ...interface{}) }
  logInfo bool
}

WithChain

Job 包裝器能夠在執行實際的Job先後添加一些邏輯:

  • 捕獲panic
  • 若是Job上次運行還未結束,推遲本次執行;
  • 若是Job上次運行還未介紹,跳過本次執行;
  • 記錄每一個Job的執行狀況。

咱們能夠將Chain類比爲 Web 處理器的中間件。實際上就是在Job的執行邏輯外在封裝一層邏輯。咱們的封裝邏輯須要寫成一個函數,傳入一個Job類型,返回封裝後的Jobcron爲這種函數定義了一個類型JobWrapper

// chain.go
type JobWrapper func(Job) Job

而後使用一個Chain對象將這些JobWrapper組合到一塊兒:

type Chain struct {
  wrappers []JobWrapper
}

func NewChain(c ...JobWrapper) Chain {
  return Chain{c}
}

調用Chain對象的Then(job)方法應用這些JobWrapper,返回最終的`Job:

func (c Chain) Then(j Job) Job {
  for i := range c.wrappers {
    j = c.wrappers[len(c.wrappers)-i-1](j)
  }
  return j
}

注意應用JobWrapper的順序。

內置JobWrapper

cron內置了 3 個用得比較多的JobWrapper

  • Recover:捕獲內部Job產生的 panic;
  • DelayIfStillRunning:觸發時,若是上一次任務還未執行完成(耗時太長),則等待上一次任務完成以後再執行;
  • SkipIfStillRunning:觸發時,若是上一次任務還未完成,則跳過這次執行。

下面分別介紹。

Recover

先看看如何使用:

type panicJob struct {
  count int
}

func (p *panicJob) Run() {
  p.count++
  if p.count == 1 {
    panic("oooooooooooooops!!!")
  }

  fmt.Println("hello world")
}

func main() {
  c := cron.New()
  c.AddJob("@every 1s", cron.NewChain(cron.Recover(cron.DefaultLogger)).Then(&panicJob{}))
  c.Start()

  time.Sleep(5 * time.Second)
}

panicJob在第一次觸發時,觸發了panic。由於有cron.Recover()保護,後續任務還能執行:

go run main.go 
cron: 2020/06/27 14:02:00 panic, error=oooooooooooooops!!!, stack=...
goroutine 18 [running]:
github.com/robfig/cron/v3.Recover.func1.1.1(0x514ee0, 0xc0000044a0)
        D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:45 +0xbc
panic(0x4cf380, 0x513280)
        C:/Go/src/runtime/panic.go:969 +0x174
main.(*panicJob).Run(0xc0000140e8)
        D:/code/golang/src/github.com/darjun/go-daily-lib/cron/recover/main.go:17 +0xba
github.com/robfig/cron/v3.Recover.func1.1()
        D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:53 +0x6f
github.com/robfig/cron/v3.FuncJob.Run(0xc000070390)
        D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:136 +0x2c
github.com/robfig/cron/v3.(*Cron).startJob.func1(0xc00005c0a0, 0x514d20, 0xc000070390)
        D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:312 +0x68
created by github.com/robfig/cron/v3.(*Cron).startJob
        D:/code/golang/pkg/mod/github.com/robfig/cron/v3@v3.0.1/cron.go:310 +0x7a
hello world
hello world
hello world
hello world

咱們看看cron.Recover()的實現,很簡單:

// cron.go
func Recover(logger Logger) JobWrapper {
  return func(j Job) Job {
    return FuncJob(func() {
      defer func() {
        if r := recover(); r != nil {
          const size = 64 << 10
          buf := make([]byte, size)
          buf = buf[:runtime.Stack(buf, false)]
          err, ok := r.(error)
          if !ok {
            err = fmt.Errorf("%v", r)
          }
          logger.Error(err, "panic", "stack", "...\n"+string(buf))
        }
      }()
      j.Run()
    })
  }
}

就是在執行內層的Job邏輯前,添加recover()調用。若是Job.Run()執行過程當中有panic。這裏的recover()會捕獲到,輸出調用堆棧。

DelayIfStillRunning

仍是先看如何使用:

type delayJob struct {
  count int
}

func (d *delayJob) Run() {
  time.Sleep(2 * time.Second)
  d.count++
  log.Printf("%d: hello world\n", d.count)
}

func main() {
  c := cron.New()
  c.AddJob("@every 1s", cron.NewChain(cron.DelayIfStillRunning(cron.DefaultLogger)).Then(&delayJob{}))
  c.Start()

  time.Sleep(10 * time.Second)
}

上面咱們在Run()中增長了一個 2s 的延遲,輸出中間隔變爲 2s,而不是定時的 1s:

$ go run main.go 
2020/06/27 14:11:16 1: hello world
2020/06/27 14:11:18 2: hello world
2020/06/27 14:11:20 3: hello world
2020/06/27 14:11:22 4: hello world

看看源碼:

// chain.go
func DelayIfStillRunning(logger Logger) JobWrapper {
  return func(j Job) Job {
    var mu sync.Mutex
    return FuncJob(func() {
      start := time.Now()
      mu.Lock()
      defer mu.Unlock()
      if dur := time.Since(start); dur > time.Minute {
        logger.Info("delay", "duration", dur)
      }
      j.Run()
    })
  }
}

首先定義一個該任務共用的互斥鎖sync.Mutex,每次執行任務前獲取鎖,執行結束以後釋放鎖。因此在上一個任務結束前,下一個任務獲取鎖是沒法成功的,從而保證的任務的串行執行。

SkipIfStillRunning

仍是先看看如何使用:

type skipJob struct {
  count int32
}

func (d *skipJob) Run() {
  atomic.AddInt32(&d.count, 1)
  log.Printf("%d: hello world\n", d.count)
  if atomic.LoadInt32(&d.count) == 1 {
    time.Sleep(2 * time.Second)
  }
}

func main() {
  c := cron.New()
  c.AddJob("@every 1s", cron.NewChain(cron.SkipIfStillRunning(cron.DefaultLogger)).Then(&skipJob{}))
  c.Start()

  time.Sleep(10 * time.Second)
}

輸出:

$ go run main.go
2020/06/27 14:22:07 1: hello world
2020/06/27 14:22:10 2: hello world
2020/06/27 14:22:11 3: hello world
2020/06/27 14:22:12 4: hello world
2020/06/27 14:22:13 5: hello world
2020/06/27 14:22:14 6: hello world
2020/06/27 14:22:15 7: hello world
2020/06/27 14:22:16 8: hello world

注意觀察時間,第一個與第二個輸出之間相差 3s,由於跳過了兩次執行。

注意DelayIfStillRunningSkipIfStillRunning是有本質上的區別的,前者DelayIfStillRunning只要時間足夠長,全部的任務都會循序漸進地完成,只是可能前一個任務耗時過長,致使後一個任務的執行時間推遲了一點。SkipIfStillRunning會跳過一些執行。

看看源碼:

func SkipIfStillRunning(logger Logger) JobWrapper {
  return func(j Job) Job {
    var ch = make(chan struct{}, 1)
    ch <- struct{}{}
    return FuncJob(func() {
      select {
      case v := <-ch:
        j.Run()
        ch <- v
      default:
        logger.Info("skip")
      }
    })
  }
}

定義一個該任務共用的緩存大小爲 1 的通道chan struct{}。執行任務時,從通道中取值,若是成功,執行,不然跳過。執行完成以後再向通道中發送一個值,確保下一個任務能執行。初始發送一個值到通道中,保證第一個任務的執行。

總結

cron實現比較小巧,且優雅,代碼行數也很少,很是值得一看!

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. cron GitHub:https://github.com/robfig/cron
  2. Go 每日一庫之 carbon:https://darjun.github.io/2020/02/14/godailylib/carbon/
  3. Go 每日一庫之 gron:https://darjun.github.io/2020/04/20/godailylib/gron/
  4. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

個人博客:https://darjun.github.io

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

相關文章
相關標籤/搜索