處理大量併發是 Go 語言的一大優點。語言內置了方便的併發語法,能夠很是方便的建立不少個輕量級的 goroutine 併發處理任務。相比於建立多個線程,goroutine 更輕量、資源佔用更少、切換速度更快、無線程上下文切換開銷更少。可是受限於資源總量,系統中可以建立的 goroutine 數量也是受限的。默認每一個 goroutine 佔用 8KB 內存,一臺 8GB 內存的機器滿打滿算也只能建立 8GB/8KB = 1000000 個 goroutine,更況且系統還須要保留一部份內存運行平常管理任務,go 運行時須要內存運行 gc、處理 goroutine 切換等。使用的內存超過機器內存容量,系統會使用交換區(swap),致使性能急速降低。咱們能夠簡單驗證一下建立過多 goroutine 會發生什麼:git
func main() { var wg sync.WaitGroup wg.Add(10000000) for i := 0; i < 10000000; i++ { go func() { time.Sleep(1 * time.Minute) }() } wg.Wait() }
在個人機器上(8G內存)運行上面的程序會報errno 1455
,即Out of Memory
錯誤,這很好理解。謹慎運行。github
另外一方面,goroutine 的管理也是一個問題。goroutine 只能本身運行結束,外部沒有任何手段能夠強制j結束一個 goroutine。若是一個 goroutine 由於某種緣由沒有自行結束,就會出現 goroutine 泄露。此外,頻繁建立 goroutine 也是一個開銷。golang
鑑於上述緣由,天然出現了與線程池同樣的需求,即 goroutine 池。通常的 goroutine 池自動管理 goroutine 的生命週期,能夠按需建立,動態縮容。向 goroutine 池提交一個任務,goroutine 池會自動安排某個 goroutine 來處理。微信
ants
就是其中一個實現 goroutine 池的庫。數據結構
本文代碼使用 Go Modules。閉包
建立目錄並初始化:併發
$ mkdir ants && cd ants $ go mod init github.com/darjun/go-daily-lib/ants
安裝ants
庫,使用v2
版本:app
$ go get -u github.com/panjf2000/ants/v2
咱們接下來要實現一個計算大量整數和的程序。首先建立基礎的任務結構,並實現其執行任務方法:函數
type Task struct { index int nums []int sum int wg *sync.WaitGroup } func (t *Task) Do() { for _, num := range t.nums { t.sum += num } t.wg.Done() }
很簡單,就是將一個切片中的全部整數相加。oop
而後咱們建立 goroutine 池,注意池使用完後須要手動關閉,這裏使用defer
關閉:
p, _ := ants.NewPoolWithFunc(10, taskFunc) defer p.Release() func taskFunc(data interface{}) { task := data.(*Task) task.Do() fmt.Printf("task:%d sum:%d\n", task.index, task.sum) }
上面調用了ants.NewPoolWithFunc()
建立了一個 goroutine 池。第一個參數是池容量,即池中最多有 10 個 goroutine。第二個參數爲每次執行任務的函數。當咱們調用p.Invoke(data)
的時候,ants
池會在其管理的 goroutine 中找出一個空閒的,讓它執行函數taskFunc
,並將data
做爲參數。
接着,咱們模擬數據,作數據切分,生成任務,交給 ants
處理:
const ( DataSize = 10000 DataPerTask = 100 ) nums := make([]int, DataSize, DataSize) for i := range nums { nums[i] = rand.Intn(1000) } var wg sync.WaitGroup wg.Add(DataSize / DataPerTask) tasks := make([]*Task, 0, DataSize/DataPerTask) for i := 0; i < DataSize/DataPerTask; i++ { task := &Task{ index: i + 1, nums: nums[i*DataPerTask : (i+1)*DataPerTask], wg: &wg, } tasks = append(tasks, task) p.Invoke(task) } wg.Wait() fmt.Printf("running goroutines: %d\n", ants.Running())
隨機生成 10000 個整數,將這些整數分爲 100 份,每份 100 個,生成Task
結構,調用p.Invoke(task)
處理。wg.Wait()
等待處理完成,而後輸出ants
正在運行的 goroutine 數量,這時應該是 0。
最後咱們將結果彙總,並驗證一下結果,與直接相加獲得的結果作一個比較:
var sum int for _, task := range tasks { sum += task.sum } var expect int for _, num := range nums { expect += num } fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
運行:
$ go run main.go ... task:96 sum:53275 task:88 sum:50090 task:62 sum:57114 task:45 sum:48041 task:82 sum:45269 running goroutines: 0 finish all tasks, result is 5010172 expect:5010172
確實,任務完成以後,正在運行的 goroutine 數量變爲 0。並且咱們驗證了,結果沒有誤差。另外須要注意,goroutine 池中任務的執行順序是隨機的,與提交任務的前後沒有關係。由上面運行打印的任務標識咱們也能發現這一點。
ants
支持將一個不接受任何參數的函數做爲任務提交給 goroutine 運行。因爲不接受參數,咱們提交的函數要麼不須要外部數據,只須要處理自身邏輯,不然就必須用某種方式將須要的數據傳遞進去,例如閉包。
提交函數做爲任務的 goroutine 池使用ants.NewPool()
建立,它只接受一個參數表示池子的容量。調用池子對象的Submit()
方法來提交任務,將一個不接受任何參數的函數傳入。
最開始的例子能夠改寫一下。增長一個任務包裝函數,將任務須要的參數做爲包裝函數的參數。包裝函數返回實際的任務函數,該任務函數就能夠經過閉包訪問它須要的數據了:
type taskFunc func() func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc { return func() { for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] { *sum += num } fmt.Printf("task:%d sum:%d\n", i+1, *sum) wg.Done() } }
調用ants.NewPool(10)
建立 goroutine 池,一樣池子用完須要釋放,這裏使用defer
:
p, _ := ants.NewPool(10) defer p.Release()
生成模擬數據,切分任務。提交任務給ants
池執行,這裏使用taskFuncWrapper()
包裝函數生成具體的任務,而後調用p.Submit()
提交:
nums := make([]int, DataSize, DataSize) for i := range nums { nums[i] = rand.Intn(1000) } var wg sync.WaitGroup wg.Add(DataSize / DataPerTask) partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask) for i := 0; i < DataSize/DataPerTask; i++ { p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg)) } wg.Wait()
彙總結果,驗證:
var sum int for _, partSum := range partSums { sum += partSum } var expect int for _, num := range nums { expect += num } fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)
這個程序的功能與最開始的徹底相同。
GitHub 倉庫中有個執行流程圖,我從新繪製了一下:
執行流程以下:
提交任務給 goroutine 池,檢查是否有空閒的 goroutine:
無,檢查池中的 goroutine 數量是否已到池容量上限:
已到上限,檢查 goroutine 池是不是非阻塞的:
nil
表示執行失敗ants
提供了一些選項能夠定製 goroutine 池的行爲。選項使用Options
結構定義:
// src/github.com/panjf2000/ants/options.go type Options struct { ExpiryDuration time.Duration PreAlloc bool MaxBlockingTasks int Nonblocking bool PanicHandler func(interface{}) Logger Logger }
各個選項含義以下:
ExpiryDuration
:過時時間。表示 goroutine 空閒多長時間以後會被ants
池回收PreAlloc
:預分配。調用NewPool()/NewPoolWithFunc()
以後預分配worker
(管理一個工做 goroutine 的結構體)切片。並且使用預分配與否會直接影響池中管理worker
的結構。見下面源碼MaxBlockingTasks
:最大阻塞任務數量。即池中 goroutine 數量已到池容量,且全部 goroutine 都處理繁忙狀態,這時到來的任務會在阻塞列表等待。這個選項設置的是列表的最大長度。阻塞的任務數量達到這個值後,後續任務提交直接返回失敗Nonblocking
:池是否阻塞,默認阻塞。提交任務時,若是ants
池中 goroutine 已到上限且所有繁忙,阻塞的池會將任務添加的阻塞列表等待(固然受限於阻塞列表長度,見上一個選項)。非阻塞的池直接返回失敗PanicHandler
:panic 處理。遇到 panic 會調用這裏設置的處理函數Logger
:指定日誌記錄器NewPool()
部分源碼:
if p.options.PreAlloc { if size == -1 { return nil, ErrInvalidPreAllocSize } p.workers = newWorkerArray(loopQueueType, size) } else { p.workers = newWorkerArray(stackType, 0) }
使用預分配時,建立loopQueueType
類型的結構,反之建立stackType
類型。這是ants
定義的兩種管理worker
的數據結構。
ants
定義了一些With*
函數來設置這些選項:
func WithOptions(options Options) Option { return func(opts *Options) { *opts = options } } func WithExpiryDuration(expiryDuration time.Duration) Option { return func(opts *Options) { opts.ExpiryDuration = expiryDuration } } func WithPreAlloc(preAlloc bool) Option { return func(opts *Options) { opts.PreAlloc = preAlloc } } func WithMaxBlockingTasks(maxBlockingTasks int) Option { return func(opts *Options) { opts.MaxBlockingTasks = maxBlockingTasks } } func WithNonblocking(nonblocking bool) Option { return func(opts *Options) { opts.Nonblocking = nonblocking } } func WithPanicHandler(panicHandler func(interface{})) Option { return func(opts *Options) { opts.PanicHandler = panicHandler } } func WithLogger(logger Logger) Option { return func(opts *Options) { opts.Logger = logger } }
這裏使用了 Go 語言中很是常見的一種模式,我稱之爲選項模式,很是方便地構造有大量參數,且大部分有默認值或通常不須要顯式設置的對象。
咱們來驗證幾個選項。
ants
池設置容量以後,若是全部的 goroutine 都在處理任務。這時提交的任務默認會進入等待隊列,WithMaxBlockingTasks(maxBlockingTasks int)
能夠設置等待隊列的最大長度。超過這個長度,提交任務直接返回錯誤:
func wrapper(i int, wg *sync.WaitGroup) func() { return func() { fmt.Printf("hello from task:%d\n", i) time.Sleep(1 * time.Second) wg.Done() } } func main() { p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2)) defer p.Release() var wg sync.WaitGroup wg.Add(8) for i := 1; i <= 8; i++ { go func(i int) { err := p.Submit(wrapper(i, &wg)) if err != nil { fmt.Printf("task:%d err:%v\n", i, err) wg.Done() } }(i) } wg.Wait() }
上面代碼中,咱們設置 goroutine 池的容量爲 4,最大阻塞隊列長度爲 2。而後一個 for 提交 8 個任務,指望結果是:4 個任務在執行,2 個任務在等待,2 個任務提交失敗。運行結果:
hello from task:8 hello from task:5 hello from task:4 hello from task:6 task:7 err:too many goroutines blocked on submit or Nonblocking is set task:3 err:too many goroutines blocked on submit or Nonblocking is set hello from task:1 hello from task:2
咱們看到提交任務失敗,打印too many goroutines blocked ...
。
代碼中有 4 點須要注意:
Submit()
方法會被阻塞,後續任務就都不能提交了。也就達不到驗證的目的了wg.Done()
次數會小於 8。於是在err != nil
分支中咱們須要調用一次wg.Done()
。不然wg.Wait()
會永遠阻塞time.Sleep(1 * time.Second)
休眠 1s因爲簡單起見,前面的例子中Submit()
方法的返回值都被咱們忽略了。實際開發中必定不要忽略。
ants
池默認是阻塞的,咱們可使用WithNonblocking(nonblocking bool)
設置其爲非阻塞。非阻塞的ants
池中,在全部 goroutine 都在處理任務時,提交新任務會直接返回錯誤:
func main() { p, _ := ants.NewPool(2, ants.WithNonblocking(true)) defer p.Release() var wg sync.WaitGroup wg.Add(3) for i := 1; i <= 3; i++ { err := p.Submit(wrapper(i, &wg)) if err != nil { fmt.Printf("task:%d err:%v\n", i, err) wg.Done() } } wg.Wait() }
使用上個例子中的wrapper()
函數,ants
池容量設置爲 2。連續提交 3 個任務,指望結果前兩個任務正常執行,第 3 個任務提交時返回錯誤:
hello from task:2 task:3 err:too many goroutines blocked on submit or Nonblocking is set hello from task:1
一個魯棒性強的庫必定不會忽視錯誤的處理,特別是宕機相關的錯誤。在 Go 語言中就是 panic,也被稱爲運行時恐慌,在程序運行的過程當中產生的嚴重性錯誤,例如索引越界,空指針解引用等,都會觸發 panic。若是不處理 panic,程序會直接意外退出,可能形成數據丟失的嚴重後果。
ants
中若是 goroutine 在執行任務時發生panic
,會終止當前任務的執行,將發生錯誤的堆棧輸出到os.Stderr
。注意,該 goroutine 仍是會被放回池中,下次能夠取出執行新的任務。
func wrapper(i int, wg *sync.WaitGroup) func() { return func() { fmt.Printf("hello from task:%d\n", i) if i%2 == 0 { panic(fmt.Sprintf("panic from task:%d", i)) } wg.Done() } } func main() { p, _ := ants.NewPool(2) defer p.Release() var wg sync.WaitGroup wg.Add(3) for i := 1; i <= 2; i++ { p.Submit(wrapper(i, &wg)) } time.Sleep(1 * time.Second) p.Submit(wrapper(3, &wg)) p.Submit(wrapper(5, &wg)) wg.Wait() }
咱們讓偶數個任務觸發panic
。提交兩個任務,第二個任務必定會觸發panic
。觸發panic
以後,咱們還能夠繼續提交任務 三、5。注意這裏沒有 4,提交任務 4 仍是會觸發panic
。
上面的程序須要注意 2 點:
wg.Done()
是在panic
方法以後,若是觸發了panic
,函數中的其餘正常邏輯就不會再繼續執行了。因此咱們雖然wg.Add(3)
,可是一共提交了 4 個任務,其中一個任務觸發了panic
,wg.Done()
沒有正確執行。實際開發中,咱們通常使用defer
語句來確保wg.Done()
必定會執行time.Sleep(1 * time.Second)
。若是沒有這一行,後續的兩條Submit()
方法能夠直接執行,可能會致使任務很快就完成了,wg.Wait()
直接返回了,這時panic
的堆棧尚未輸出。你能夠嘗試註釋掉這行代碼運行看看結果除了ants
提供的默認 panic 處理器,咱們還可使用WithPanicHandler(paincHandler func(interface{}))
指定咱們本身編寫的 panic 處理器。處理器的參數就是傳給panic
的值:
func panicHandler(err interface{}) { fmt.Fprintln(os.Stderr, err) } p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler)) defer p.Release()
其他代碼與上面的徹底相同,指定了panicHandler
後觸發panic
就會執行它。運行:
hello from task:2 panic from task:2 hello from task:1 hello from task:5 hello from task:3
看到輸出了傳給panic
函數的字符串(第二行輸出)。
爲了方便使用,不少 Go 庫都喜歡提供其核心功能類型的一個默認實現。能夠直接經過庫提供的接口調用。例如net/http
,例如ants
。ants
庫中定義了一個默認的池,默認容量爲MaxInt32
。goroutine 池的各個方法均可以直接經過ants
包直接訪問:
// src/github.com/panjf2000/ants/ants.go defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) func Submit(task func()) error { return defaultAntsPool.Submit(task) } func Running() int { return defaultAntsPool.Running() } func Cap() int { return defaultAntsPool.Cap() } func Free() int { return defaultAntsPool.Free() } func Release() { defaultAntsPool.Release() } func Reboot() { defaultAntsPool.Reboot() }
直接使用:
func main() { defer ants.Release() var wg sync.WaitGroup wg.Add(2) for i := 1; i <= 2; i++ { ants.Submit(wrapper(i, &wg)) } wg.Wait() }
默認池也須要Release()
。
本文介紹了 goroutine 池的由來,並藉由ants
庫介紹了基本的使用方法,和一些細節。ants
源碼很少,去掉測試的核心代碼只有 1k 行左右,建議有時間、感興趣的童鞋深刻閱讀。
你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~