協程池主要是爲了減小 go 協程頻繁建立、銷燬帶來的性能損耗,雖然能夠忽略不計,可是網上說特殊狀況仍是有用的。app
那這個協程池通俗易懂來說,好比老闆給員工分配任務:性能
老闆領了一堆任務,得找工人幹活呀, 那領導就拿出一個任務,給一個空閒的員工 A,再把下一個任務,給另一個空閒的員工 B 。fetch
這時候 A 或者 B,指不定誰先忙完了spa
若是有人忙完了,領導就把下一個任務,給先忙完的人。A/B 就是協程池裏面的兩個協程code
下面這段代碼,完成了以下功能協程
package gopool import ( "context" "log" "sync" "time") type Task func() // boss 老闆 type GoPool struct { MaxWorkerIdleTime time.Duration // worker 最大空閒時間 MaxWorkerNum int32 // 協程最大數量 TaskEntryChan chan *Task // 任務入列 Workers []*worker // 已建立worker FreeWorkerChan chan *worker // 空閒worker Lock sync.Mutex } const ( WorkerStatusStop = 1 WorkerStatusLive = 0 ) // 幹活的人 type worker struct { Pool *GoPool StartTime time.Time // 開始時間 TaskChan chan *Task // 執行隊列 LastWorkTime time.Time // 最後執行時間 Ctx context.Context Cancel context.CancelFunc Status int32 // 被過時刪掉的標記 } var defaultPool = func() *GoPool { return NewPool() }() // 初始化 func NewPool() *GoPool { g := &GoPool{ MaxWorkerIdleTime: 10 * time.Second, MaxWorkerNum: 20, TaskEntryChan: make(chan *Task, 2000), FreeWorkerChan: make(chan *worker, 2000), } // 分發任務 go g.dispatchTask() //清理空閒worker go g.fireWorker() return g } // 按期清理空閒worker func (g *GoPool) fireWorker() { for { select { // 10秒執行一次 case <-time.After(10 * time.Second): for k, w := range g.Workers { if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime { log.Printf("overtime %v %p", k, w) // 終止協程 w.Cancel() // 清理Free w.Status = WorkerStatusStop } } g.Lock.Lock() g.Workers = g.cleanWorker(g.Workers) g.Lock.Unlock() } } } // 遞歸清理無用worker func (g *GoPool) cleanWorker(workers []*worker) []*worker { for k, w := range workers { if time.Now().Sub(w.LastWorkTime) > g.MaxWorkerIdleTime { workers = append(workers[:k], workers[k+1:]...) // 刪除中間1個元素 return g.cleanWorker(workers) } } return workers } // 分發任務 func (g *GoPool) dispatchTask() { for { select { case t := <-g.TaskEntryChan: log.Printf("dispatch task %p", t) // 獲取worker w := g.fetchWorker() // 將任務扔給worker w.accept(t) } } } // 獲取可用worker func (g *GoPool) fetchWorker() *worker { for { select { // 獲取空閒worker case w := <-g.FreeWorkerChan: if w.Status == WorkerStatusLive { return w } default: // 建立新的worker if int32(len(g.Workers)) < g.MaxWorkerNum { w := &worker{ Pool: g, StartTime: time.Now(), LastWorkTime: time.Now(), TaskChan: make(chan *Task, 1), Ctx: context.Background(), Status: WorkerStatusLive, } ctx, cancel := context.WithCancel(w.Ctx) w.Cancel = cancel // 接到任務本身去執行吧 go w.execute(ctx) g.Lock.Lock() g.Workers = append(g.Workers, w) g.Lock.Unlock() g.FreeWorkerChan <- w log.Printf("worker create %p", w) } } } } // 添加任務 func (g *GoPool) addTask(t Task) { // 將任務放到入口任務隊列 g.TaskEntryChan <- &t } // 接受任務 func (w *worker) accept(t *Task) { // 每一個worker本身的工做隊列 w.TaskChan <- t } // 執行任務 func (w *worker) execute(ctx context.Context) { for { select { case t := <-w.TaskChan: // 執行 (*t)() // 記錄工做狀態 w.LastWorkTime = time.Now() w.Pool.FreeWorkerChan <- w case <-ctx.Done(): log.Printf("worker done %p", w) return } } } // 執行 func SafeGo(t Task) { defaultPool.addTask(t) }