go 的 goroutine 提供了一種較線程而言更廉價的方式處理併發場景, go 使用二級線程的模式, 將 goroutine 以 M:N 的形式複用到系統線程上, 節省了 cpu 調度的開銷, 也避免了用戶級線程(協程)進行系統調用時阻塞整個系統線程的問題。【1】html
但 goroutine 太多仍會致使調度性能降低、GC 頻繁、內存暴漲, 引起一系列問題。在面臨這樣的場景時, 限制 goroutine 的數量、重用 goroutine 顯然頗有價值。git
本文正是針對上述狀況而提供一種簡單的解決方案, 編寫一個協程池(任務池)來實現對 goroutine 的管控。github
要解決這個問題, 要思考兩個問題安全
說到限制和重用, 那麼最早想到的就是池化。好比 TCP 鏈接池, 線程池, 都是有效限制、重用資源的最好實踐。因此, 咱們能夠建立一個 goroutine 池, 用來管理 goroutine。併發
在使用原生 goroutine 的場景中, 運行一個任務直接啓動一個 goroutine 來運行, 在池化的場景而言, 任務也是要在 goroutine 中執行, 可是任務須要任務池來放入 goroutine。函數
在鏈接池中, 鏈接在使用時從池中取出, 用完後放入池中。對於 goroutine 而言, goroutine 經過語言關鍵字啓動, 沒法像鏈接同樣操做。那麼如何讓 goroutine 能夠執行任務, 且執行後能夠從新用來執行其它任務呢?這裏就須要使用生產者消費者模型了:性能
生產者 --(生產任務)--> 隊列 --(消費任務)--> 消費者測試
用來執行任務的 goroutine 能夠做爲消費者, 操做任務池的 goroutine 做爲生產者, 而隊列則可使用 go 的 buffer channel, 任務池的建模到此結束。ui
Talk is cheap. Show me the code.atom
任務要包含須要執行的函數、以及函數要傳的參數, 由於參數類型、個數不肯定, 這裏使用可變參數和空接口的形式
type Task struct { Handler func(v ...interface{}) Params []interface{} }
任務池的定義包括了池的容量 capacity、當前運行的 worker(goroutine)數量 runningWorkers、任務隊列(channel)taskC、關閉任務池的 channel closeC 以及任務池的狀態 state(運行中或已關閉, 用於安全關閉任務池)
type Pool struct { capacity uint64 runningWorkers uint64 state int64 taskC chan *Task closeC chan bool }
任務池的構造函數:
var ErrInvalidPoolCap = errors.New("invalid pool cap") const ( RUNNING = 1 STOPED = 0 ) func NewPool(capacity uint64) (*Pool, error) { if capacity <= 0 { return nil, ErrInvalidPoolCap } return &Pool{ capacity: capacity, state: RUNNING, // 初始化任務隊列, 隊列長度爲容量 taskC: make(chan *Task, capacity), closeC: make(chan bool), }, nil }
新建 run() 方法做爲啓動 worker 的方法:
func (p *Pool) run() { p.runningWorkers++ // 運行中的任務加一 go func() { defer func() { p.runningWorkers-- // worker 結束, 運行中的任務減一 }() for { select { // 阻塞等待任務、結束信號到來 case task, ok := <-p.taskC: // 從 channel 中消費任務 if !ok { // 若是 channel 被關閉, 結束 worker 運行 return } // 執行任務 task.Handler(task.Params...) case <-p.closeC: // 若是收到關閉信號, 結束 worker 運行 return } } }() }
上述代碼中, runningWorkers 的加減直接使用了自增運算, 可是考慮到啓動多個 worker 時, runningWorkers 就會有數據競爭, 因此咱們使用 sync.atomic 包來保證 runningWorkers 的自增操做是原子的。
對 runningWorkers 的操做進行封裝:
func (p *Pool) incRunning() { // runningWorkers + 1 atomic.AddUint64(&p.runningWorkers, 1) } func (p *Pool) decRunning() { // runningWorkers - 1 atomic.AddUint64(&p.runningWorkers, ^uint64(0)) } func (p *Pool) GetRunningWorkers() uint64 { return atomic.LoadUint64(&p.runningWorkers) }
打鐵乘熱, 對於 capacity 的操做也考慮數據競爭, 封裝 GetCap() 方法:
func (p *Pool) GetCap() uint64 { return atomic.LoadUint64(&p.capacity) }
run() 方法改造:
func (p *Pool) run() { p.incRunning() go func() { defer func() { p.decRunning() }() for { select { case task, ok := <-p.taskC: if !ok { return } task.Handler(task.Params...) case <-p.closeC: return } } }() }
新建 Put() 方法用來將任務放入池中:
func (p *Pool) Put(task *Task) { if p.GetRunningWorkers() < p.GetCap() { // 若是任務池滿, 則再也不建立 worker // 建立啓動一個 worker p.run() } // 將任務推入隊列, 等待消費 p.taskC <- task }
當有關閉任務池來節省 goroutine 資源的場景時, 咱們須要有一個關閉任務池的方法。
直接銷燬 worker 關閉 channel 並不合適, 由於此時可能還有任務在隊列中沒有被消費掉。要確保全部任務被安全消費後再銷燬掉 worker。
首先, 在關閉任務池時, 須要先關閉掉生產任務的入口。改造 Put() 方法:
var ErrPoolAlreadyClosed = errors.New("pool already closed") func (p *Pool) Put(task *Task) error { if p.state == STOPED { // 若是任務池處於關閉狀態, 再 put 任務會返回 ErrPoolAlreadyClosed 錯誤 return ErrPoolAlreadyClosed } if p.GetRunningWorkers() < p.GetCap() { p.run() } p.taskC <- task return nil }
在 run() 方法中已經對 closeC 進行了監聽, 銷燬 worker 只需等待任務被消費完後向 closeC 發出信號。Close() 方法以下:
func (p *Pool) Close() { p.state = STOPED // 設置 state 爲已中止 for len(p.taskC) > 0 { // 阻塞等待全部任務被 worker 消費 } p.closeC <- true // 發送銷燬 worker 信號 close(p.taskC) // 關閉任務隊列 }
每一個 worker 都是一個 goroutine, 若是 goroutine 中產生了 panic, 會致使整個程序崩潰。爲了保證程序的安全進行, 任務池須要對每一個 worker 中的 panic 進行 recover 操做, 並提供可訂製的 panic handler。
更新任務池定義:
type Pool struct { capacity uint64 runningWorkers uint64 state int64 taskC chan *Task closeC chan bool PanicHandler func(interface{}) }
更新 run() 方法:
func (p *Pool) run() { p.incRunning() go func() { defer func() { p.decRunning() if r := recover(); r != nil { // 恢復 panic if p.PanicHandler != nil { // 若是設置了 PanicHandler, 調用 p.PanicHandler(r) } else { // 默認處理 log.Printf("Worker panic: %s\n", r) } } }() for { select { case task, ok := <-p.taskC: if !ok { return } task.Handler(task.Params...) case <-p.closeC: return } } }() }
OK, 咱們的任務池就這麼簡單的寫好了, 試試:
func main() { // 建立任務池 pool, err := NewPool(10) if err != nil { panic(err) } for i := 0; i < 20; i++ { // 任務放入池中 pool.Put(&Task{ Handler: func(v ...interface{}) { fmt.Println(v) }, Params: []interface{}{i}, }) } time.Sleep(1e9) // 等待執行 }
詳細例子見 mortar/examples
做爲協程池, 性能和內存佔用的指標測試確定是少不了的, 測試數據纔是最有說服力的
100w 次執行,原子增量操做
測試任務:
var sum int64 func demoTask(v ...interface{}) { for i := 0; i < 100; i++ { atomic.AddInt64(&sum, 1) } }
測試方法:
var runTimes = 1000000 // 原生 goroutine func BenchmarkGoroutineSetTimes(b *testing.B) { for i := 0; i < runTimes; i++ { go demoTask() } } // 使用協程池 func BenchmarkPutSetTimes(b *testing.B) { pool, err := NewPool(20) if err != nil { b.Error(err) } ctx := context.Background() task := &Task{ Handler: demoTask, } for i := 0; i < runTimes; i++ { pool.Put(ctx, task) } }
模式 | 操做時間消耗 ns/op | 內存分配大小 B/op | 內存分配次數 allocs/op |
---|---|---|---|
原生 goroutine (100w goroutine) | 1596177880 | 103815552 | 240022 |
任務池開啓 20 個 worker 20 goroutine) | 1378909099 | 15312 | 89 |
使用任務池和原生 goroutine 性能相近(略好於原生)
使用任務池比直接 goroutine 內存分配節省 7000 倍左右, 內存分配次數減小 2700 倍左右
該項目的所有源碼詳見 mortar
【1】線程的 3 種實現方式