go自從出生就身帶「高併發」的標籤,其併發編程就是由groutine實現的,因其消耗資源低,性能高效,開發成本低的特性而被普遍應用到各類場景,例如服務端開發中使用的HTTP服務,在golang net/http包中,每個被監聽到的tcp連接都是由一個groutine去完成處理其上下文的,由此使得其擁有極其優秀的併發量吞吐量git
for { // 監聽tcp rw, e := l.Accept() if e != nil { ....... } tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // before Serve can return // 啓動協程處理上下文 go c.serve(ctx) }
雖然建立一個groutine佔用的內存極小(大約2KB左右,線程一般2M左右),可是在實際生產環境無限制的開啓協程顯然是不科學的,好比上圖的邏輯,若是來幾千萬個請求就會開啓幾千萬個groutine,當沒有更多內存可用時,go的調度器就會阻塞groutine最終致使內存溢出乃至嚴重的崩潰,因此本文將經過實現一個簡單的協程池,以及剖析幾個開源的協程池源碼來探討一下對groutine的併發控制以及多路複用的設計和實現。github
過年前作過一波小需求,是將主播管理系統中信息不完整的主播找出來而後再到其相對應的直播平臺爬取完整信息並補全,當時考慮到每個主播的數據都要訪問一次直播平臺因此就用應對每個主播開啓一個groutine去抓取數據,雖然這個業務量還遠遠遠遠達不到能形成groutine性能瓶頸的地步,可是內心老是不舒服,因而放假回來後將其優化成從協程池中控制groutine數量再開啓爬蟲進行數據抓取。思路其實很是簡單,用一個channel當作任務隊列,初始化groutine池時肯定好併發量,而後以設置好的併發量開啓groutine同時讀取channel中的任務並執行, 模型以下圖golang
type SimplePool struct { wg sync.WaitGroup work chan func() //任務隊列 } func NewSimplePoll(workers int) *SimplePool { p := &SimplePool{ wg: sync.WaitGroup{}, work: make(chan func()), } p.wg.Add(workers) //根據指定的併發量去讀取管道並執行 for i := 0; i < workers; i++ { go func() { defer func() { // 捕獲異常 防止waitGroup阻塞 if err := recover(); err != nil { fmt.Println(err) p.wg.Done() } }() // 從workChannel中取出任務執行 for fn := range p.work { fn() } p.wg.Done() }() } return p } // 添加任務 func (p *SimplePool) Add(fn func()) { p.work <- fn } // 執行 func (p *SimplePool) Run() { close(p.work) p.wg.Wait() }
測試設定爲在併發數量爲20的協程池中併發抓取一百我的的信息, 由於代碼包含較多業務邏輯因此sleep 1秒模擬爬蟲過程,理論上執行時間爲5秒數據庫
func TestSimplePool(t *testing.T) { p := NewSimplePoll(20) for i := 0; i < 100; i++ { p.Add(parseTask(i)) } p.Run() } func parseTask(i int) func() { return func() { // 模擬抓取數據的過程 time.Sleep(time.Second * 1) fmt.Println("finish parse ", i) } }
這樣一來最簡單的一個groutine池就完成了編程
上面的groutine池雖然簡單,可是對於每個併發任務的狀態,pool的狀態缺乏控制,因此又去看了一下go-playground/pool的源碼實現,先從每個須要執行的任務入手,該庫中對併發單元作了以下的結構體,能夠看到除工做單元的值,錯誤,執行函數等,還用了三個分別表示,取消,取消中,寫 的三個併發安全的原子操做值來標識其運行狀態。安全
// 須要加入pool 中執行的任務 type WorkFunc func(wu WorkUnit) (interface{}, error) // 工做單元 type workUnit struct { value interface{} // 任務結果 err error // 任務的報錯 done chan struct{} // 通知任務完成 fn WorkFunc cancelled atomic.Value // 任務是否被取消 cancelling atomic.Value // 是否正在取消任務 writing atomic.Value // 任務是否正在執行 }
接下來看Pool的結構併發
type limitedPool struct { workers uint // 併發量 work chan *workUnit // 任務channel cancel chan struct{} // 用於通知結束的channel closed bool // 是否關閉 m sync.RWMutex // 讀寫鎖,主要用來保證 closed值的併發安全 }
初始化groutine池, 以及啓動設定好數量的groutineapp
// 初始化pool,設定併發量 func NewLimited(workers uint) Pool { if workers == 0 { panic("invalid workers '0'") } p := &limitedPool{ workers: workers, } p.initialize() return p } func (p *limitedPool) initialize() { p.work = make(chan *workUnit, p.workers*2) p.cancel = make(chan struct{}) p.closed = false for i := 0; i < int(p.workers); i++ { // 初始化併發單元 p.newWorker(p.work, p.cancel) } } // passing work and cancel channels to newWorker() to avoid any potential race condition // betweeen p.work read & write func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) { go func(p *limitedPool) { var wu *workUnit defer func(p *limitedPool) { // 捕獲異常,結束掉異常的工做單元,並將其再次做爲新的任務啓動 if err := recover(); err != nil { trace := make([]byte, 1<<16) n := runtime.Stack(trace, true) s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))])) iwu := wu iwu.err = &ErrRecovery{s: s} close(iwu.done) // need to fire up new worker to replace this one as this one is exiting p.newWorker(p.work, p.cancel) } }(p) var value interface{} var err error for { select { // workChannel中讀取任務 case wu = <-work: // 防止channel 被關閉後讀取到零值 if wu == nil { continue } // 先判斷任務是否被取消 if wu.cancelled.Load() == nil { // 執行任務 value, err = wu.fn(wu) wu.writing.Store(struct{}{}) // 任務執行完在寫入結果時須要再次檢查工做單元是否被取消,防止產生競爭條件 if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil { wu.value, wu.err = value, err close(wu.done) } } // pool是否被中止 case <-cancel: return } } }(p) }
往POOL中添加任務,並檢查pool是否關閉tcp
func (p *limitedPool) Queue(fn WorkFunc) WorkUnit { w := &workUnit{ done: make(chan struct{}), fn: fn, } go func() { p.m.RLock() if p.closed { w.err = &ErrPoolClosed{s: errClosed} if w.cancelled.Load() == nil { close(w.done) } p.m.RUnlock() return } // 將工做單元寫入workChannel, pool啓動後將由上面newWorker函數中讀取執行 p.work <- w p.m.RUnlock() }() return w }
在go-playground/pool包中, limitedPool的批量併發執行還須要藉助batch.go來完成函數
// batch contains all information for a batch run of WorkUnits type batch struct { pool Pool // 上面的limitedPool實現了Pool interface m sync.Mutex // 互斥鎖,用來判斷closed units []WorkUnit // 工做單元的slice, 這個主要用在不設併發限制的場景,這裏忽略 results chan WorkUnit // 結果集,執行完後的workUnit會更新其value,error,能夠從結果集channel中讀取 done chan struct{} // 通知batch是否完成 closed bool wg *sync.WaitGroup }
// go-playground/pool 中有設置併發量和不設併發量的批量任務,都實現Pool interface,初始化batch批量任務時會將以前建立好的Pool傳入newBatch func newBatch(p Pool) Batch { return &batch{ pool: p, units: make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times. results: make(chan WorkUnit), done: make(chan struct{}), wg: new(sync.WaitGroup), } } // 往批量任務中添加workFunc任務 func (b *batch) Queue(fn WorkFunc) { b.m.Lock() if b.closed { b.m.Unlock() return } //往上述的limitPool中添加workFunc wu := b.pool.Queue(fn) b.units = append(b.units, wu) // keeping a reference for cancellation purposes b.wg.Add(1) b.m.Unlock() // 執行完後將workUnit寫入結果集channel go func(b *batch, wu WorkUnit) { wu.Wait() b.results <- wu b.wg.Done() }(b, wu) } // 通知批量任務再也不接受新的workFunc, 若是添加完workFunc不執行改方法的話將致使取結果集時done channel一直阻塞 func (b *batch) QueueComplete() { b.m.Lock() b.closed = true close(b.done) b.m.Unlock() } // 獲取批量任務結果集 func (b *batch) Results() <-chan WorkUnit { go func(b *batch) { <-b.done b.m.Lock() b.wg.Wait() b.m.Unlock() close(b.results) }(b) return b.results }
func SendMail(int int) pool.WorkFunc { fn := func(wu pool.WorkUnit) (interface{}, error) { // sleep 1s 模擬發郵件過程 time.Sleep(time.Second * 1) // 模擬異常任務須要取消 if int == 17 { wu.Cancel() } if wu.IsCancelled() { return false, nil } fmt.Println("send to", int) return true, nil } return fn } func TestBatchWork(t *testing.T) { // 初始化groutine數量爲20的pool p := pool.NewLimited(20) defer p.Close() batch := p.Batch() // 設置一個批量任務的過時超時時間 t := time.After(10 * time.Second) go func() { for i := 0; i < 100; i++ { batch.Queue(SendMail(i)) } batch.QueueComplete() }() // 由於 batch.Results 中要close results channel 因此不能將其放在LOOP中執行 r := batch.Results() LOOP: for { select { case <-t: // 登臺超時通知 fmt.Println("recived timeout") break LOOP case email, ok := <-r: // 讀取結果集 if ok { if err := email.Error(); err != nil { fmt.Println("err", err.Error()) } fmt.Println(email.Value()) } else { fmt.Println("finish") break LOOP } } } }
接近理論值5s, 通知模擬被取消的work也正常取消
go-playground/pool在比起以前簡單的協程池的基礎上, 對pool, worker的狀態有了很好的管理。可是,可是問題來了,在第一個實現的簡單groutine池和go-playground/pool中,都是先啓動預約好的groutine來完成任務執行,在併發量遠小於任務量的狀況下確實可以作到groutine的複用,若是任務量很少則會致使任務分配到每一個groutine不均勻,甚至可能出現啓動的groutine根本不會執行任務從而致使浪費,並且對於協程池也沒有動態的擴容和縮小。因此我又去看了一下ants的設計和實現。
ants是一個受fasthttp啓發的高性能協程池, fasthttp號稱是比go原生的net/http快10倍,其快速高性能的緣由之一就是採用了各類池化技術(這個往後再開新坑去讀源碼), ants相比以前兩種協程池,其模型更像是以前接觸到的數據庫鏈接池,須要從空餘的worker中取出一個來執行任務, 當無可用空餘worker的時候再去建立,而當pool的容量達到上線以後,剩餘的任務阻塞等待當前進行中的worker執行完畢將worker放回pool, 直至pool中有空閒worker。 ants在內存的管理上作得很好,除了按期清除過時worker(必定時間內沒有分配到任務的worker),ants還實現了一種適用於大批量相同任務的pool, 這種pool與一個須要大批量重複執行的函數鎖綁定,避免了調用方不停的建立,更加節省內存。
先看一下ants的pool 結構體 (pool.go)
type Pool struct { // 協程池的容量 (groutine數量的上限) capacity int32 // 正在執行中的groutine running int32 // 過時清理間隔時間 expiryDuration time.Duration // 當前可用空閒的groutine workers []*Worker // 表示pool是否關閉 release int32 // lock for synchronous operation. lock sync.Mutex // 用於控制pool等待獲取可用的groutine cond *sync.Cond // 確保pool只被關閉一次 once sync.Once // worker臨時對象池,在複用worker時減小新對象的建立並加速worker從pool中的獲取速度 workerCache sync.Pool // pool引起panic時的執行函數 PanicHandler func(interface{}) }
接下來看pool的工做單元 worker (worker.go)
type Worker struct { // worker 所屬的poo; pool *Pool // 任務隊列 task chan func() // 回收時間,即該worker的最後一次結束運行的時間 recycleTime time.Time }
執行worker的代碼 (worker.go)
func (w *Worker) run() { // pool中正在執行的worker數+1 w.pool.incRunning() go func() { defer func() { if p := recover(); p != nil { //若worker因各類問題引起panic, //pool中正在執行的worker數 -1, //若是設置了Pool中的PanicHandler,此時會被調用 w.pool.decRunning() if w.pool.PanicHandler != nil { w.pool.PanicHandler(p) } else { log.Printf("worker exits from a panic: %v", p) } } }() // worker 執行任務隊列 for f := range w.task { //任務隊列中的函數所有被執行完後, //pool中正在執行的worker數 -1, //將worker 放回對象池 if f == nil { w.pool.decRunning() w.pool.workerCache.Put(w) return } f() //worker 執行完任務後放回Pool //使得其他正在阻塞的任務能夠獲取worker w.pool.revertWorker(w) } }() }
瞭解了工做單元worker如何執行任務以及與pool交互後,回到pool中查看其實現, pool的核心就是取出可用worker提供給任務執行 (pool.go)
// 向pool提交任務 func (p *Pool) Submit(task func()) error { if 1 == atomic.LoadInt32(&p.release) { return ErrPoolClosed } // 獲取pool中的可用worker並向其任務隊列中寫入任務 p.retrieveWorker().task <- task return nil } // **核心代碼** 獲取可用worker func (p *Pool) retrieveWorker() *Worker { var w *Worker p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - 1 // 當前pool中有可用worker, 取出(隊尾)worker並執行 if n >= 0 { w = idleWorkers[n] idleWorkers[n] = nil p.workers = idleWorkers[:n] p.lock.Unlock() } else if p.Running() < p.Cap() { p.lock.Unlock() // 當前pool中無空閒worker,且pool數量未達到上線 // pool會先從臨時對象池中尋找是否有已完成任務的worker, // 若臨時對象池中不存在,則從新建立一個worker並將其啓動 if cacheWorker := p.workerCache.Get(); cacheWorker != nil { w = cacheWorker.(*Worker) } else { w = &Worker{ pool: p, task: make(chan func(), workerChanCap), } } w.run() } else { // pool中沒有空餘worker且達到併發上限 // 任務會阻塞等待當前運行的worker完成任務釋放會pool for { p.cond.Wait() // 等待通知, 暫時阻塞 l := len(p.workers) - 1 if l < 0 { continue } // 當有可用worker釋放回pool以後, 取出 w = p.workers[l] p.workers[l] = nil p.workers = p.workers[:l] break } p.lock.Unlock() } return w } // 釋放worker回pool func (p *Pool) revertWorker(worker *Worker) { worker.recycleTime = time.Now() p.lock.Lock() p.workers = append(p.workers, worker) // 通知pool中已經獲取鎖的groutine, 有一個worker已完成任務 p.cond.Signal() p.lock.Unlock() }
在批量併發任務的執行過程當中, 若是有超過5納秒(ants中默認worker過時時間爲5ns)的worker未被分配新的任務,則將其做爲過時worker清理掉,從而保證pool中可用的worker都能發揮出最大的做用以及將任務分配得更均勻
(pool.go)
// 該函數會在pool初始化後在協程中啓動 func (p *Pool) periodicallyPurge() { // 建立一個5ns定時的心跳 heartbeat := time.NewTicker(p.expiryDuration) defer heartbeat.Stop() for range heartbeat.C { currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 { p.lock.Unlock() return } n := -1 for i, w := range idleWorkers { // 由於pool 的worker隊列是先進後出的,因此正序遍歷可用worker時前面的每每裏當前時間越久 if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } // 若是worker最後一次運行時間距如今超過5納秒,視爲過時,worker收到nil, 執行上述worker.go中 if n == nil 的操做 n = i w.task <- nil idleWorkers[i] = nil } if n > -1 { // 所有過時 if n >= len(idleWorkers)-1 { p.workers = idleWorkers[:0] } else { // 部分過時 p.workers = idleWorkers[n+1:] } } p.lock.Unlock() } }
func TestAnts(t *testing.T) { wg := sync.WaitGroup{} pool, _ := ants.NewPool(20) defer pool.Release() for i := 0; i < 100; i++ { wg.Add(1) pool.Submit(sendMail(i, &wg)) } wg.Wait() } func sendMail(i int, wg *sync.WaitGroup) func() { return func() { time.Sleep(time.Second * 1) fmt.Println("send mail to ", i) wg.Done() } }
這裏雖只簡單的測試批量併發任務的場景, 若是你們有興趣能夠去看看ants的壓力測試, ants的吞吐量可以比原生groutine高出N倍,內存節省10到20倍, 可謂是協程池中的神器。
借用ants做者的原話來講: 然而又有多少場景是單臺機器須要扛100w甚至1000w同步任務的?基本沒有啊!結果就是造出了屠龍刀,但是世界上沒有龍啊!也是無情…
一口氣從簡單到複雜總結了三個協程池的實現,受益不淺, 感謝各開源庫的做者, 雖然世界上沒有龍,可是屠龍技是必須練的,由於它就像存款,不必定要所有都用了,可是必定不能沒有!