Go組件學習——cron定時器

1 前言

轉到Go已經將近三個月,寫業務代碼又找到了屬於Go的條件反射了。git

後置聲明和多參數返回這些Go風格代碼寫起來也不會那麼蹩腳,甚至還有點小適應~github

反而,前幾天在寫Java的時候,發現Java怎麼啓動這麼慢,Java怎麼可以容忍這些用不到的代碼還義正詞嚴的躺在那……等等,這些話在哪聽過相似的???bash

「Go爲何要後置聲明,多彆扭啊」閉包

「Go裏面爲啥要定義這麼多的struct,看的頭暈」app

……框架

其實,沒有最好的語言,只有最適合的。less

前面《Go語言學習》系列主要介紹了一些Go的基礎知識和相較於Java的一些新特性。後續若是有相關的體會和新的還會繼續更新。dom

從這篇開始,開始學習Go的一些工具類庫和開源組件,但願在學習這些優秀的開源項目過程當中,更深刻的瞭解Go,發現Go的威力。函數

2 cron簡介

robfig/cron是一個第三方開源的任務調度庫,也就是咱們平時說的定時任務。工具

Github:github.com/robfig/cron

官方文檔:godoc.org/github.com/…

3 cron如何使用

一、新建文件cron-demo.go

package main

import (
	"fmt"
	"github.com/robfig/cron"
	"time"
)

func main() {
	c := cron.New()
	c.AddFunc("*/3 * * * * *", func() {
		fmt.Println("every 3 seconds executing")
	})

	go c.Start()
	defer c.Stop()


	select {
	case <-time.After(time.Second * 10):
		return
	}
}
複製代碼
  • cron.New建立一個定時器管理器
  • c.AddFunc添加一個定時任務,第一個參數是cron時間表達式,第二個參數是要觸發執行的函數
  • go c.Start()新啓一個協程,運行定時任務
  • c.Stop是等待中止信號結束任務

二、在cron-demo.go文件下執行go build

本項目採用go mod進行包管理,因此執行go build命令後,會在go.mod文件中生成對應的依賴版本如圖所示

三、運行cron-demo.go

能夠看出每3秒執行一次,直到10秒後過時退出進程,任務結束。

代碼參見項目:go-demo項目(github.com/DMinerJacki…

看上去這個任務調度仍是蠻好用的,那麼具體是如何實現的呢,看了下源碼,也是很是的短小精悍,目錄結構以下。

下面經過幾個問題一塊兒看下cron是如何實現任務調度。

4 cron如何解析任務表達式

上例咱們看到添加「*/3 * * * * *」這樣的表達式,就能實現每3秒執行一次。

顯然,這個表達式只是對人友好的一種約定表達形式,要真正在指定時間執行任務,cron確定是要讀取並解析這個c表達式,轉化爲具體的時間再執行。

那咱們來看看,這個具體是如何執行的。

進入AddFunc函數實現

// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func()) error {
	return c.AddJob(spec, FuncJob(cmd))
}
複製代碼

這只是套了個殼,具體還要進入AddJob函數

// AddJob adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
	schedule, err := Parse(spec)
	if err != nil {
		return err
	}
	c.Schedule(schedule, cmd)
	return nil
}
複製代碼

該函數第一行就是解析cron表達式,順藤摸瓜,咱們看到具體實現以下

// Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid.
// It accepts crontab specs and features configured by NewParser.
func (p Parser) Parse(spec string) (Schedule, error) {
	if len(spec) == 0 {
		return nil, fmt.Errorf("Empty spec string")
	}
	if spec[0] == '@' && p.options&Descriptor > 0 {
		return parseDescriptor(spec)
	}

	// Figure out how many fields we need
	max := 0
	for _, place := range places {
		if p.options&place > 0 {
			max++
		}
	}
	min := max - p.optionals

	// Split fields on whitespace
	fields := strings.Fields(spec)	// 使用空白符拆分cron表達式

	// Validate number of fields
	if count := len(fields); count < min || count > max {
		if min == max {
			return nil, fmt.Errorf("Expected exactly %d fields, found %d: %s", min, count, spec)
		}
		return nil, fmt.Errorf("Expected %d to %d fields, found %d: %s", min, max, count, spec)
	}

	// Fill in missing fields
	fields = expandFields(fields, p.options)

	var err error
	field := func(field string, r bounds) uint64 {	// 抽象出filed函數,方便下面調用
		if err != nil {
			return 0
		}
		var bits uint64
		bits, err = getField(field, r)
		return bits
	}

	var (
		second     = field(fields[0], seconds)
		minute     = field(fields[1], minutes)
		hour       = field(fields[2], hours)
		dayofmonth = field(fields[3], dom)
		month      = field(fields[4], months)
		dayofweek  = field(fields[5], dow)
	)
	if err != nil {
		return nil, err
	}

	return &SpecSchedule{
		Second: second,
		Minute: minute,
		Hour:   hour,
		Dom:    dayofmonth,
		Month:  month,
		Dow:    dayofweek,
	}, nil
}
複製代碼

該函數主要是將cron表達式映射爲「Second, Minute, Hour, Dom, Month, Dow」6個時間維度的結構體SpecSchedule。

SpecSchedule是實現了方法「Next(time.Time) time.Time」的結構體,而「Next(time.Time) time.Time」是定義在Schedule接口中的

// The Schedule describes a job's duty cycle.
type Schedule interface {
	// Return the next activation time, later than the given time.
	// Next is invoked initially, and then each time the job is run.
	Next(time.Time) time.Time
}
複製代碼

因此,最終能夠理解是將cron解析後轉換爲下一次要執行的時刻,等待執行。

5 cron如何執行任務

咱們知道經過parser.go能夠將人很好理解的表達式轉換爲cron能夠讀懂的要執行的時間。

有了要執行的時間點,那麼cron具體是如何執行這些任務的呢?

咱們看下Start函數的具體實現

// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() {
	if c.running {
		return
	}
	c.running = true
	go c.run()
}
複製代碼

這裏會經過斷定Cron的running字段是否在運行來鉅額聽是否要啓動任務。

顯然這裏running是false,由於在調用c.New初始化的時候running被設置爲false。

因此,這裏新啓一個協程用於執行定時任務,再次順藤摸瓜,咱們看到run函數的實現

// Run the scheduler. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
	// Figure out the next activation times for each entry.
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
	}

	for {
		// Determine the next entry to run.
		sort.Sort(byTime(c.entries))

		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {	// 若是沒有要執行的任務或者第一個任務的待執行時間爲空,則睡眠
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))	// 不然新建一個距離如今到下一個要觸發執行的Timer
		}

		for {
			select {
			case now = <-timer.C:	// 觸發時間到,執行任務
				now = now.In(c.location)
				// Run every entry whose next time was less than now
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					go c.runWithRecovery(e.Job)
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
				}

			case newEntry := <-c.add:	// 添加任務
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)

			case <-c.snapshot:	// 調用c.Entries()返回一個現有任務列表的snapshot
				c.snapshot <- c.entrySnapshot()
				continue

			case <-c.stop:	// 任務結束,退出
				timer.Stop()
				return
			}

			break
		}
	}
}
複製代碼
  • 進入該函數,首先遍歷因此任務,找到全部任務下一個要執行的時間。
  • 而後進入外層for循環,對於各個任務按照執行時間進行排序,保證離當前時間最近的先執行。
  • 再對任務列表進行斷定,是否有任務若是沒有,則休眠,不然初始化一個timer。

裏層的for循環纔是重頭戲,下面主要分析這個for循環裏面的任務加入和執行。

在此以前,須要瞭解下go標準庫的timer

timer用於指定在某個時間間隔後,調用函數或者表達式。

使用NewTimer就能夠建立一個Timer,在指定時間間隔到達後,能夠經過<-timer.C接收值。

package main

import (
	"fmt"
	"time"
)

func main() {
	timer1 := time.NewTimer(2 * time.Second)

	<-timer1.C
	fmt.Println("Timer 1 expired")

	timer2 := time.NewTimer(time.Second)
	go func() {
		<-timer2.C
		fmt.Println("Timer 2 expired")
	}()

	stop2 := timer2.Stop()
	if stop2 {
		fmt.Println("Timer 2 stopped")
	}
}
複製代碼

執行結果爲

Timer 1 expired
Timer 2 stopped
複製代碼

timer1表示2秒後到期,在此以前都是阻塞狀態,2秒後<-timer1.C接收到信號,執行下面的打印語句。

timer2表示1秒後到期,可是中途被Stop掉了,至關於清除了定時功能。

有了這個背景以後,咱們再來看run函數的裏層for循環。

接收到c.add信道

case newEntry := <-c.add:	// 添加任務
	timer.Stop()
	now = c.now()
	newEntry.Next = newEntry.Schedule.Next(now)
	c.entries = append(c.entries, newEntry)
複製代碼

將timer停掉,清除設置的定時功能,並以當前時間點爲起點,設置添加任務的下一次執行時間,並添加到entries任務隊列中。

接收到timer.C信道

case now = <-timer.C:	// 觸發時間到,執行任務
	now = now.In(c.location)
    // Run every entry whose next time was less than now
    for _, e := range c.entries {
    	if e.Next.After(now) || e.Next.IsZero() {
    		break
    	}
    go c.runWithRecovery(e.Job)
    e.Prev = e.Next
    e.Next = e.Schedule.Next(now)
    }
複製代碼

當定任務到點後,time.C就會接收到值,並新開協程執行真正須要執行的Job,以後再更新下一個要執行的任務列表。

咱們進入runWithRecovery函數,該函數從函數名就能夠看出,即便出現panic也能夠從新recovery,保證其餘任務不受影響。

func (c *Cron) runWithRecovery(j Job) {
	defer func() {
		if r := recover(); r != nil {
			const size = 64 << 10
			buf := make([]byte, size)
			buf = buf[:runtime.Stack(buf, false)]
			c.logf("cron: panic running job: %v\n%s", r, buf)
		}
	}()
	j.Run()
}
複製代碼

追根溯源,咱們發現真正執行Job的是j.Run()的執行。進入這個Run函數的實現,咱們看到

func (f FuncJob) Run() { f() }
複製代碼

沒錯,咱們要執行的任務一直從AddFunc一直往下傳遞,直到這裏,咱們經過調用Run函數,將包裝的FuncJob類型的函數經過f()的形式進行執行。

這裏說的可能比較模糊,舉個例子,Go裏面的閉包定義

func () {
    fmt.Println("test")
}()
複製代碼

若是這裏定義後面沒有"()"該函數就不會執行,因此結合這個看上面的定時任務是如何執行就更容易理解了。

6 代碼閱讀體會

一、channel的奧妙

經過channel可讓感知變得垂手可得,好比timer.C就像是時間到了,天然會有人來敲門告訴你。而不須要咱們本身主動去獲取是否到期了。

二、經常使用類庫的使用

好比在parser裏面咱們看到了"fields := strings.Fields(spec)",在平常開發中,咱們能夠靈活使用這些API,避免本身造輪子的狀況。

三、多思考

以前作Java的時候,更多的是沉浸在各類工具和框架的使用,對於這些工具和框架的實現關注的很少。好比從Quartz到Spring Job,咱們須要更新的是愈來愈好用的定時任務工具,而底層的實現升級Spring都幫咱們考慮好了。這種對業務對項目有友好的,能夠快速的實現業務功能開發,可是對於開發者並不友好,友好的設計麻痹了開發者對於底層原理的深究的慾望。

相關文章
相關標籤/搜索