Go 每日一庫之 ants

簡介

處理大量併發是 Go 語言的一大優點。語言內置了方便的併發語法,能夠很是方便的建立不少個輕量級的 goroutine 併發處理任務。相比於建立多個線程,goroutine 更輕量、資源佔用更少、切換速度更快、無線程上下文切換開銷更少。可是受限於資源總量,系統中可以建立的 goroutine 數量也是受限的。默認每一個 goroutine 佔用 8KB 內存,一臺 8GB 內存的機器滿打滿算也只能建立 8GB/8KB = 1000000 個 goroutine,更況且系統還須要保留一部份內存運行平常管理任務,go 運行時須要內存運行 gc、處理 goroutine 切換等。使用的內存超過機器內存容量,系統會使用交換區(swap),致使性能急速降低。咱們能夠簡單驗證一下建立過多 goroutine 會發生什麼:git

func main() {
  var wg sync.WaitGroup
  wg.Add(10000000)
  for i := 0; i < 10000000; i++ {
    go func() {
      time.Sleep(1 * time.Minute)
    }()
  }
  wg.Wait()
}

在個人機器上(8G內存)運行上面的程序會報errno 1455,即Out of Memory錯誤,這很好理解。謹慎運行github

另外一方面,goroutine 的管理也是一個問題。goroutine 只能本身運行結束,外部沒有任何手段能夠強制j結束一個 goroutine。若是一個 goroutine 由於某種緣由沒有自行結束,就會出現 goroutine 泄露。此外,頻繁建立 goroutine 也是一個開銷。golang

鑑於上述緣由,天然出現了與線程池同樣的需求,即 goroutine 池。通常的 goroutine 池自動管理 goroutine 的生命週期,能夠按需建立,動態縮容。向 goroutine 池提交一個任務,goroutine 池會自動安排某個 goroutine 來處理。微信

ants就是其中一個實現 goroutine 池的庫。數據結構

快速使用

本文代碼使用 Go Modules。閉包

建立目錄並初始化:併發

$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants

安裝ants庫,使用v2版本:app

$ go get -u github.com/panjf2000/ants/v2

咱們接下來要實現一個計算大量整數和的程序。首先建立基礎的任務結構,並實現其執行任務方法:函數

type Task struct {
  index int
  nums  []int
  sum   int
  wg    *sync.WaitGroup
}

func (t *Task) Do() {
  for _, num := range t.nums {
    t.sum += num
  }

  t.wg.Done()
}

很簡單,就是將一個切片中的全部整數相加。oop

而後咱們建立 goroutine 池,注意池使用完後須要手動關閉,這裏使用defer關閉:

p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()

func taskFunc(data interface{}) {
  task := data.(*Task)
  task.Do()
  fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}

上面調用了ants.NewPoolWithFunc()建立了一個 goroutine 池。第一個參數是池容量,即池中最多有 10 個 goroutine。第二個參數爲每次執行任務的函數。當咱們調用p.Invoke(data)的時候,ants池會在其管理的 goroutine 中找出一個空閒的,讓它執行函數taskFunc,並將data做爲參數。

接着,咱們模擬數據,作數據切分,生成任務,交給 ants 處理:

const (
  DataSize    = 10000
  DataPerTask = 100
)

nums := make([]int, DataSize, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  task := &Task{
    index: i + 1,
    nums:  nums[i*DataPerTask : (i+1)*DataPerTask],
    wg:    &wg,
  }

  tasks = append(tasks, task)
  p.Invoke(task)
}

wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())

隨機生成 10000 個整數,將這些整數分爲 100 份,每份 100 個,生成Task結構,調用p.Invoke(task)處理。wg.Wait()等待處理完成,而後輸出ants正在運行的 goroutine 數量,這時應該是 0。

最後咱們將結果彙總,並驗證一下結果,與直接相加獲得的結果作一個比較:

var sum int
for _, task := range tasks {
  sum += task.sum
}

var expect int
for _, num := range nums {
  expect += num
}

fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)

運行:

$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172

確實,任務完成以後,正在運行的 goroutine 數量變爲 0。並且咱們驗證了,結果沒有誤差。另外須要注意,goroutine 池中任務的執行順序是隨機的,與提交任務的前後沒有關係。由上面運行打印的任務標識咱們也能發現這一點。

函數做爲任務

ants支持將一個不接受任何參數的函數做爲任務提交給 goroutine 運行。因爲不接受參數,咱們提交的函數要麼不須要外部數據,只須要處理自身邏輯,不然就必須用某種方式將須要的數據傳遞進去,例如閉包。

提交函數做爲任務的 goroutine 池使用ants.NewPool()建立,它只接受一個參數表示池子的容量。調用池子對象的Submit()方法來提交任務,將一個不接受任何參數的函數傳入。

最開始的例子能夠改寫一下。增長一個任務包裝函數,將任務須要的參數做爲包裝函數的參數。包裝函數返回實際的任務函數,該任務函數就能夠經過閉包訪問它須要的數據了:

type taskFunc func()

func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {
  return func() {
    for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {
      *sum += num
    }

    fmt.Printf("task:%d sum:%d\n", i+1, *sum)
    wg.Done()
  }
}

調用ants.NewPool(10)建立 goroutine 池,一樣池子用完須要釋放,這裏使用defer

p, _ := ants.NewPool(10)
defer p.Release()

生成模擬數據,切分任務。提交任務給ants池執行,這裏使用taskFuncWrapper()包裝函數生成具體的任務,而後調用p.Submit()提交:

nums := make([]int, DataSize, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()

彙總結果,驗證:

var sum int
for _, partSum := range partSums {
  sum += partSum
}

var expect int
for _, num := range nums {
  expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

這個程序的功能與最開始的徹底相同。

執行流程

GitHub 倉庫中有個執行流程圖,我從新繪製了一下:

執行流程以下:

  • 初始化 goroutine 池;
  • 提交任務給 goroutine 池,檢查是否有空閒的 goroutine:

    • 有,獲取空閒 goroutine
    • 無,檢查池中的 goroutine 數量是否已到池容量上限:

      • 已到上限,檢查 goroutine 池是不是非阻塞的:

        • 非阻塞,直接返回nil表示執行失敗
        • 阻塞,等待 goroutine 空閒
      • 未到上限,建立一個新的 goroutine 處理任務
  • 任務處理完成,將 goroutine 交還給池,以待處理下一個任務

選項

ants提供了一些選項能夠定製 goroutine 池的行爲。選項使用Options結構定義:

// src/github.com/panjf2000/ants/options.go
type Options struct {
  ExpiryDuration time.Duration
  PreAlloc bool
  MaxBlockingTasks int
  Nonblocking bool
  PanicHandler func(interface{})
  Logger Logger
}

各個選項含義以下:

  • ExpiryDuration:過時時間。表示 goroutine 空閒多長時間以後會被ants池回收
  • PreAlloc:預分配。調用NewPool()/NewPoolWithFunc()以後預分配worker(管理一個工做 goroutine 的結構體)切片。並且使用預分配與否會直接影響池中管理worker的結構。見下面源碼
  • MaxBlockingTasks:最大阻塞任務數量。即池中 goroutine 數量已到池容量,且全部 goroutine 都處理繁忙狀態,這時到來的任務會在阻塞列表等待。這個選項設置的是列表的最大長度。阻塞的任務數量達到這個值後,後續任務提交直接返回失敗
  • Nonblocking:池是否阻塞,默認阻塞。提交任務時,若是ants池中 goroutine 已到上限且所有繁忙,阻塞的池會將任務添加的阻塞列表等待(固然受限於阻塞列表長度,見上一個選項)。非阻塞的池直接返回失敗
  • PanicHandler:panic 處理。遇到 panic 會調用這裏設置的處理函數
  • Logger:指定日誌記錄器

NewPool()部分源碼:

if p.options.PreAlloc {
  if size == -1 {
    return nil, ErrInvalidPreAllocSize
  }
  p.workers = newWorkerArray(loopQueueType, size)
} else {
  p.workers = newWorkerArray(stackType, 0)
}

使用預分配時,建立loopQueueType類型的結構,反之建立stackType類型。這是ants定義的兩種管理worker的數據結構。

ants定義了一些With*函數來設置這些選項:

func WithOptions(options Options) Option {
  return func(opts *Options) {
    *opts = options
  }
}

func WithExpiryDuration(expiryDuration time.Duration) Option {
  return func(opts *Options) {
    opts.ExpiryDuration = expiryDuration
  }
}

func WithPreAlloc(preAlloc bool) Option {
  return func(opts *Options) {
    opts.PreAlloc = preAlloc
  }
}

func WithMaxBlockingTasks(maxBlockingTasks int) Option {
  return func(opts *Options) {
    opts.MaxBlockingTasks = maxBlockingTasks
  }
}

func WithNonblocking(nonblocking bool) Option {
  return func(opts *Options) {
    opts.Nonblocking = nonblocking
  }
}

func WithPanicHandler(panicHandler func(interface{})) Option {
  return func(opts *Options) {
    opts.PanicHandler = panicHandler
  }
}

func WithLogger(logger Logger) Option {
  return func(opts *Options) {
    opts.Logger = logger
  }
}

這裏使用了 Go 語言中很是常見的一種模式,我稱之爲選項模式,很是方便地構造有大量參數,且大部分有默認值或通常不須要顯式設置的對象。

咱們來驗證幾個選項。

最大等待隊列長度

ants池設置容量以後,若是全部的 goroutine 都在處理任務。這時提交的任務默認會進入等待隊列,WithMaxBlockingTasks(maxBlockingTasks int)能夠設置等待隊列的最大長度。超過這個長度,提交任務直接返回錯誤:

func wrapper(i int, wg *sync.WaitGroup) func() {
  return func() {
    fmt.Printf("hello from task:%d\n", i)
    time.Sleep(1 * time.Second)
    wg.Done()
  }
}

func main() {
  p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(8)
  for i := 1; i <= 8; i++ {
    go func(i int) {
      err := p.Submit(wrapper(i, &wg))
      if err != nil {
        fmt.Printf("task:%d err:%v\n", i, err)
        wg.Done()
      }
    }(i)
  }

  wg.Wait()
}

上面代碼中,咱們設置 goroutine 池的容量爲 4,最大阻塞隊列長度爲 2。而後一個 for 提交 8 個任務,指望結果是:4 個任務在執行,2 個任務在等待,2 個任務提交失敗。運行結果:

hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2

咱們看到提交任務失敗,打印too many goroutines blocked ...

代碼中有 4 點須要注意:

  • 提交任務必須並行進行。若是是串行提交,第 5 個任務提交時因爲池中沒有空閒的 goroutine 處理該任務,Submit()方法會被阻塞,後續任務就都不能提交了。也就達不到驗證的目的了
  • 因爲任務可能提交失敗,失敗的任務不會實際執行,因此實際上wg.Done()次數會小於 8。於是在err != nil分支中咱們須要調用一次wg.Done()。不然wg.Wait()會永遠阻塞
  • 爲了不任務執行過快,空出了 goroutine,觀察不到現象,每一個任務中我使用time.Sleep(1 * time.Second)休眠 1s
  • 因爲 goroutine 之間的執行順序未顯式同步,故每次執行的順序不肯定

因爲簡單起見,前面的例子中Submit()方法的返回值都被咱們忽略了。實際開發中必定不要忽略。

非阻塞

ants池默認是阻塞的,咱們可使用WithNonblocking(nonblocking bool)設置其爲非阻塞。非阻塞的ants池中,在全部 goroutine 都在處理任務時,提交新任務會直接返回錯誤:

func main() {
  p, _ := ants.NewPool(2, ants.WithNonblocking(true))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 3; i++ {
    err := p.Submit(wrapper(i, &wg))
    if err != nil {
      fmt.Printf("task:%d err:%v\n", i, err)
      wg.Done()
    }
  }

  wg.Wait()
}

使用上個例子中的wrapper()函數,ants池容量設置爲 2。連續提交 3 個任務,指望結果前兩個任務正常執行,第 3 個任務提交時返回錯誤:

hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1

panic 處理器

一個魯棒性強的庫必定不會忽視錯誤的處理,特別是宕機相關的錯誤。在 Go 語言中就是 panic,也被稱爲運行時恐慌,在程序運行的過程當中產生的嚴重性錯誤,例如索引越界,空指針解引用等,都會觸發 panic。若是不處理 panic,程序會直接意外退出,可能形成數據丟失的嚴重後果。

ants中若是 goroutine 在執行任務時發生panic,會終止當前任務的執行,將發生錯誤的堆棧輸出到os.Stderr注意,該 goroutine 仍是會被放回池中,下次能夠取出執行新的任務

func wrapper(i int, wg *sync.WaitGroup) func() {
  return func() {
    fmt.Printf("hello from task:%d\n", i)
    if i%2 == 0 {
      panic(fmt.Sprintf("panic from task:%d", i))
    }
    wg.Done()
  }
}

func main() {
  p, _ := ants.NewPool(2)
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 2; i++ {
    p.Submit(wrapper(i, &wg))
  }

  time.Sleep(1 * time.Second)
  p.Submit(wrapper(3, &wg))
  p.Submit(wrapper(5, &wg))
  wg.Wait()
}

咱們讓偶數個任務觸發panic。提交兩個任務,第二個任務必定會觸發panic。觸發panic以後,咱們還能夠繼續提交任務 三、5。注意這裏沒有 4,提交任務 4 仍是會觸發panic

上面的程序須要注意 2 點:

  • 任務函數中wg.Done()是在panic方法以後,若是觸發了panic,函數中的其餘正常邏輯就不會再繼續執行了。因此咱們雖然wg.Add(3),可是一共提交了 4 個任務,其中一個任務觸發了panicwg.Done()沒有正確執行。實際開發中,咱們通常使用defer語句來確保wg.Done()必定會執行
  • 在 for 循環以後,我添加了一行代碼time.Sleep(1 * time.Second)。若是沒有這一行,後續的兩條Submit()方法能夠直接執行,可能會致使任務很快就完成了,wg.Wait()直接返回了,這時panic的堆棧尚未輸出。你能夠嘗試註釋掉這行代碼運行看看結果

除了ants提供的默認 panic 處理器,咱們還可使用WithPanicHandler(paincHandler func(interface{}))指定咱們本身編寫的 panic 處理器。處理器的參數就是傳給panic的值:

func panicHandler(err interface{}) {
  fmt.Fprintln(os.Stderr, err)
}

p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()

其他代碼與上面的徹底相同,指定了panicHandler後觸發panic就會執行它。運行:

hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3

看到輸出了傳給panic函數的字符串(第二行輸出)。

默認池

爲了方便使用,不少 Go 庫都喜歡提供其核心功能類型的一個默認實現。能夠直接經過庫提供的接口調用。例如net/http,例如antsants庫中定義了一個默認的池,默認容量爲MaxInt32。goroutine 池的各個方法均可以直接經過ants包直接訪問:

// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)

func Submit(task func()) error {
  return defaultAntsPool.Submit(task)
}

func Running() int {
  return defaultAntsPool.Running()
}

func Cap() int {
  return defaultAntsPool.Cap()
}

func Free() int {
  return defaultAntsPool.Free()
}

func Release() {
  defaultAntsPool.Release()
}

func Reboot() {
  defaultAntsPool.Reboot()
}

直接使用:

func main() {
  defer ants.Release()

  var wg sync.WaitGroup
  wg.Add(2)
  for i := 1; i <= 2; i++ {
    ants.Submit(wrapper(i, &wg))
  }
  wg.Wait()
}

默認池也須要Release()

總結

本文介紹了 goroutine 池的由來,並藉由ants庫介紹了基本的使用方法,和一些細節。ants源碼很少,去掉測試的核心代碼只有 1k 行左右,建議有時間、感興趣的童鞋深刻閱讀。

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. ants GitHub:github.com/panjf2000/ants
  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

個人博客:https://darjun.github.io

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

相關文章
相關標籤/搜索