轉到Go已經將近三個月,寫業務代碼又找到了屬於Go的條件反射了。git
後置聲明和多參數返回這些Go風格代碼寫起來也不會那麼蹩腳,甚至還有點小適應~github
反而,前幾天在寫Java的時候,發現Java怎麼啓動這麼慢,Java怎麼可以容忍這些用不到的代碼還義正詞嚴的躺在那……等等,這些話在哪聽過相似的???bash
「Go爲何要後置聲明,多彆扭啊」閉包
「Go裏面爲啥要定義這麼多的struct,看的頭暈」app
……框架
其實,沒有最好的語言,只有最適合的。less
前面《Go語言學習》系列主要介紹了一些Go的基礎知識和相較於Java的一些新特性。後續若是有相關的體會和新的還會繼續更新。dom
從這篇開始,開始學習Go的一些工具類庫和開源組件,但願在學習這些優秀的開源項目過程當中,更深刻的瞭解Go,發現Go的威力。函數
robfig/cron是一個第三方開源的任務調度庫,也就是咱們平時說的定時任務。工具
Github:github.com/robfig/cron
一、新建文件cron-demo.go
package main
import (
"fmt"
"github.com/robfig/cron"
"time"
)
func main() {
c := cron.New()
c.AddFunc("*/3 * * * * *", func() {
fmt.Println("every 3 seconds executing")
})
go c.Start()
defer c.Stop()
select {
case <-time.After(time.Second * 10):
return
}
}
複製代碼
二、在cron-demo.go文件下執行go build
本項目採用go mod進行包管理,因此執行go build命令後,會在go.mod文件中生成對應的依賴版本如圖所示
三、運行cron-demo.go
能夠看出每3秒執行一次,直到10秒後過時退出進程,任務結束。
代碼參見項目:go-demo項目(github.com/DMinerJacki…)
看上去這個任務調度仍是蠻好用的,那麼具體是如何實現的呢,看了下源碼,也是很是的短小精悍,目錄結構以下。
下面經過幾個問題一塊兒看下cron是如何實現任務調度。
上例咱們看到添加「*/3 * * * * *」這樣的表達式,就能實現每3秒執行一次。
顯然,這個表達式只是對人友好的一種約定表達形式,要真正在指定時間執行任務,cron確定是要讀取並解析這個c表達式,轉化爲具體的時間再執行。
那咱們來看看,這個具體是如何執行的。
進入AddFunc函數實現
// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func()) error {
return c.AddJob(spec, FuncJob(cmd))
}
複製代碼
這只是套了個殼,具體還要進入AddJob函數
// AddJob adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
schedule, err := Parse(spec)
if err != nil {
return err
}
c.Schedule(schedule, cmd)
return nil
}
複製代碼
該函數第一行就是解析cron表達式,順藤摸瓜,咱們看到具體實現以下
// Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid.
// It accepts crontab specs and features configured by NewParser.
func (p Parser) Parse(spec string) (Schedule, error) {
if len(spec) == 0 {
return nil, fmt.Errorf("Empty spec string")
}
if spec[0] == '@' && p.options&Descriptor > 0 {
return parseDescriptor(spec)
}
// Figure out how many fields we need
max := 0
for _, place := range places {
if p.options&place > 0 {
max++
}
}
min := max - p.optionals
// Split fields on whitespace
fields := strings.Fields(spec) // 使用空白符拆分cron表達式
// Validate number of fields
if count := len(fields); count < min || count > max {
if min == max {
return nil, fmt.Errorf("Expected exactly %d fields, found %d: %s", min, count, spec)
}
return nil, fmt.Errorf("Expected %d to %d fields, found %d: %s", min, max, count, spec)
}
// Fill in missing fields
fields = expandFields(fields, p.options)
var err error
field := func(field string, r bounds) uint64 { // 抽象出filed函數,方便下面調用
if err != nil {
return 0
}
var bits uint64
bits, err = getField(field, r)
return bits
}
var (
second = field(fields[0], seconds)
minute = field(fields[1], minutes)
hour = field(fields[2], hours)
dayofmonth = field(fields[3], dom)
month = field(fields[4], months)
dayofweek = field(fields[5], dow)
)
if err != nil {
return nil, err
}
return &SpecSchedule{
Second: second,
Minute: minute,
Hour: hour,
Dom: dayofmonth,
Month: month,
Dow: dayofweek,
}, nil
}
複製代碼
該函數主要是將cron表達式映射爲「Second, Minute, Hour, Dom, Month, Dow」6個時間維度的結構體SpecSchedule。
SpecSchedule是實現了方法「Next(time.Time) time.Time」的結構體,而「Next(time.Time) time.Time」是定義在Schedule接口中的
// The Schedule describes a job's duty cycle.
type Schedule interface {
// Return the next activation time, later than the given time.
// Next is invoked initially, and then each time the job is run.
Next(time.Time) time.Time
}
複製代碼
因此,最終能夠理解是將cron解析後轉換爲下一次要執行的時刻,等待執行。
咱們知道經過parser.go能夠將人很好理解的表達式轉換爲cron能夠讀懂的要執行的時間。
有了要執行的時間點,那麼cron具體是如何執行這些任務的呢?
咱們看下Start函數的具體實現
// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() {
if c.running {
return
}
c.running = true
go c.run()
}
複製代碼
這裏會經過斷定Cron的running字段是否在運行來鉅額聽是否要啓動任務。
顯然這裏running是false,由於在調用c.New初始化的時候running被設置爲false。
因此,這裏新啓一個協程用於執行定時任務,再次順藤摸瓜,咱們看到run函數的實現
// Run the scheduler. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
}
for {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // 若是沒有要執行的任務或者第一個任務的待執行時間爲空,則睡眠
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now)) // 不然新建一個距離如今到下一個要觸發執行的Timer
}
for {
select {
case now = <-timer.C: // 觸發時間到,執行任務
now = now.In(c.location)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
go c.runWithRecovery(e.Job)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
}
case newEntry := <-c.add: // 添加任務
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
case <-c.snapshot: // 調用c.Entries()返回一個現有任務列表的snapshot
c.snapshot <- c.entrySnapshot()
continue
case <-c.stop: // 任務結束,退出
timer.Stop()
return
}
break
}
}
}
複製代碼
裏層的for循環纔是重頭戲,下面主要分析這個for循環裏面的任務加入和執行。
在此以前,須要瞭解下go標準庫的timer
timer用於指定在某個時間間隔後,調用函數或者表達式。
使用NewTimer就能夠建立一個Timer,在指定時間間隔到達後,能夠經過<-timer.C接收值。
package main
import (
"fmt"
"time"
)
func main() {
timer1 := time.NewTimer(2 * time.Second)
<-timer1.C
fmt.Println("Timer 1 expired")
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
}
複製代碼
執行結果爲
Timer 1 expired
Timer 2 stopped
複製代碼
timer1表示2秒後到期,在此以前都是阻塞狀態,2秒後<-timer1.C接收到信號,執行下面的打印語句。
timer2表示1秒後到期,可是中途被Stop掉了,至關於清除了定時功能。
有了這個背景以後,咱們再來看run函數的裏層for循環。
接收到c.add信道
case newEntry := <-c.add: // 添加任務
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
複製代碼
將timer停掉,清除設置的定時功能,並以當前時間點爲起點,設置添加任務的下一次執行時間,並添加到entries任務隊列中。
接收到timer.C信道
case now = <-timer.C: // 觸發時間到,執行任務
now = now.In(c.location)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
go c.runWithRecovery(e.Job)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
}
複製代碼
當定任務到點後,time.C就會接收到值,並新開協程執行真正須要執行的Job,以後再更新下一個要執行的任務列表。
咱們進入runWithRecovery函數,該函數從函數名就能夠看出,即便出現panic也能夠從新recovery,保證其餘任務不受影響。
func (c *Cron) runWithRecovery(j Job) {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.logf("cron: panic running job: %v\n%s", r, buf)
}
}()
j.Run()
}
複製代碼
追根溯源,咱們發現真正執行Job的是j.Run()的執行。進入這個Run函數的實現,咱們看到
func (f FuncJob) Run() { f() }
複製代碼
沒錯,咱們要執行的任務一直從AddFunc一直往下傳遞,直到這裏,咱們經過調用Run函數,將包裝的FuncJob類型的函數經過f()的形式進行執行。
這裏說的可能比較模糊,舉個例子,Go裏面的閉包定義
func () {
fmt.Println("test")
}()
複製代碼
若是這裏定義後面沒有"()"該函數就不會執行,因此結合這個看上面的定時任務是如何執行就更容易理解了。
一、channel的奧妙
經過channel可讓感知變得垂手可得,好比timer.C就像是時間到了,天然會有人來敲門告訴你。而不須要咱們本身主動去獲取是否到期了。
二、經常使用類庫的使用
好比在parser裏面咱們看到了"fields := strings.Fields(spec)",在平常開發中,咱們能夠靈活使用這些API,避免本身造輪子的狀況。
三、多思考
以前作Java的時候,更多的是沉浸在各類工具和框架的使用,對於這些工具和框架的實現關注的很少。好比從Quartz到Spring Job,咱們須要更新的是愈來愈好用的定時任務工具,而底層的實現升級Spring都幫咱們考慮好了。這種對業務對項目有友好的,能夠快速的實現業務功能開發,可是對於開發者並不友好,友好的設計麻痹了開發者對於底層原理的深究的慾望。