因與工做相關,因此本文中的數據都進行了更改,但邏輯是同樣的。閉包
筆者的服務ServerA會請求服務ServerH獲取一些數據,但ServerH的接口有個N秒內只能請求M次的限制,並返回false。而筆者的服務瞬時請求量遠超M次,因此採用了協程池在收到103錯誤時,中止worker的運行N秒,而後再啓動。函數
協程池的相關概念:要有一個必定數量大小的池子(pool),池子裏存儲須要執行的任務(task),還要有若干個工做協程(worker)。測試
協程池要有啓動,中止,睡眠的功能。atom
下面是從零開始記錄一下思想過程和遇到的問題。code
在此版本里,除了睡眠的功能,已經實現了一個基本的協程池。server
// workpool.go package workpool import ( "context" "sync" ) type TaskFunc func() type Task struct { f TaskFunc } type WorkPool struct { pool chan *Task workerCount int stopCtx context.Context stopCancelFunc context.CancelFunc wg sync.WaitGroup } func (t *Task) Execute() { t.f() } func New(workerCount, poolLen int) *WorkPool { return &WorkPool{ workerCount: workerCount, pool: make(chan *Task, poolLen), } } func (w *WorkPool) PushTask(t *Task) { w.pool <- t } func (w *WorkPool) PushTaskFunc(f TaskFunc) { w.pool <- &Task{ f: f, } } func (w *WorkPool) work() { for { select { case <-w.stopCtx.Done(): w.wg.Done() return case t := <-w.pool: t.Execute() } } } func (w *WorkPool) Start() *WorkPool { w.wg.Add(w.workerCount) w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background()) for i := 0; i < w.workerCount; i++ { go w.work() } return w } func (w *WorkPool) Stop() { w.stopCancelFunc() w.wg.Wait() }
看起來沒什麼毛病,還挺簡潔。其實否則...協程
下面的程序是建立一個容量爲50的workpool,並將經過3個worker輸出100個數字。接口
// workpool_test.go package workpool import ( "fmt" "sync" "testing" ) func TestWorkPool_Start(t *testing.T) { wg := sync.WaitGroup{} wp := New(3, 50).Start() lenth := 100 wg.Add(lenth) for i := 0; i < lenth; i++ { wp.PushTaskFunc(func() { defer wg.Done() fmt.Print(i, " ") }) } wg.Wait() }
運行後輸出結果以下:it
50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 51 51 51 51 69 72 78 78 80 81 81 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 84 84 84 84 50 84 100 100 100 100 100 100 100 100 100 100 50 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 84 100 100 100
這和想象中的輸出 0-99 相差甚遠。io
其緣由在於閉包函數對於外部變量是引用的,因此在函數執行的時候,i的值早就已經改變了。下面是一個關於閉包的簡單例子。
x := 1 f := func() { println(x) } x = 2 x = 3 f() // 3
能夠將 f() 的調用時機對應爲協程池中的 t.Execute()。
既然是由於閉包引用致使的問題,那就不使用閉包了唄。
能夠把參數傳到函數內,可是由於並不知道將要執行的函數須要的參數個數及類型,因此只能是使用不定長的interface{}
TaskFunc,在使用的時候進行斷言。
如下僅列出改動部分:
// workpool.go type TaskFunc func(args ...interface{}) type Task struct { f TaskFunc args []interface{} } func (t *Task) Execute() { t.f(t.args...) } func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) { w.pool <- &Task{ f: f, args: args, } }
如下是測試程序:
// workpool_test.go package workpool import ( "fmt" "sync" "testing" ) func TestWorkPool_Start(t *testing.T) { wg := sync.WaitGroup{} wp := New(3, 50).Start() lenth := 100 wg.Add(lenth) for i := 0; i < lenth; i++ { wp.PushTaskFunc(func(args ...interface{}) { defer wg.Done() fmt.Print(args[0].(int), " ") }, i) } wg.Wait() }
輸出內容以下:
0 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 2 1 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 26 48 49 51 52 53 54 55 56 50 58 59 57 61 62 63 64 65 66 25 68 6 9 70 71 72 73 67 75 76 77 74 79 78 81 82 83 84 60 86 87 88 89 90 91 92 85 94 95 96 97 98 99 80 93
雖然順序是錯亂的,但這是正常狀況,閉包引用問題已解決。
基於開頭的應用場景,在任意一個被worker執行的任務收到ServerH的103錯誤後,要中止全部worker一段時間,由於再一直請求也沒有意義。
這個版本已經與筆者正在使用的相差無幾了
// workpool.go package workpool import ( "context" "fmt" "sync" "sync/atomic" "time" ) type Flag int64 const ( FLAG_OK Flag = 1 << iota FLAG_RETRY Flag = 1 << iota ) type TaskFunc func(w *WorkPool, args ...interface{}) Flag type Task struct { f TaskFunc args []interface{} } type WorkPool struct { pool chan *Task workerCount int // stop相關 stopCtx context.Context stopCancelFunc context.CancelFunc wg sync.WaitGroup // sleep相關 sleepCtx context.Context sleepCancelFunc context.CancelFunc sleepSeconds int64 sleepNotify chan bool } func (t *Task) Execute(w *WorkPool) Flag { return t.f(w, t.args...) } func New(workerCount, poolLen int) *WorkPool { return &WorkPool{ workerCount: workerCount, pool: make(chan *Task, poolLen), sleepNotify: make(chan bool), } } func (w *WorkPool) PushTask(t *Task) { w.pool <- t } func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) { w.pool <- &Task{ f: f, args: args, } } func (w *WorkPool) work(i int) { for { select { case <-w.stopCtx.Done(): w.wg.Done() return case <-w.sleepCtx.Done(): time.Sleep(time.Duration(w.sleepSeconds) * time.Second) case t := <-w.pool: flag := t.Execute(w) if flag&FLAG_RETRY != 0 { w.PushTask(t) fmt.Printf("work %v PushTask,pool length %v\n", i, len(w.pool)) } } } } func (w *WorkPool) Start() *WorkPool { fmt.Printf("workpool run %d worker\n", w.workerCount) w.wg.Add(w.workerCount + 1) w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background()) w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background()) go w.sleepControl() for i := 0; i < w.workerCount; i++ { go w.work(i) } return w } func (w *WorkPool) Stop() { w.stopCancelFunc() w.wg.Wait() } func (w *WorkPool) sleepControl() { fmt.Println("sleepControl start...") for { select { case <-w.stopCtx.Done(): w.wg.Done() return case <-w.sleepNotify: fmt.Printf("receive sleep notify start...\n") w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background()) w.sleepCancelFunc() fmt.Printf("sleepControl will star sleep %v s\n", w.sleepSeconds) time.Sleep(time.Duration(w.sleepSeconds) * time.Second) w.sleepSeconds = 0 fmt.Println("sleepControl was end sleep") } } } func (w *WorkPool) SleepNotify(seconds int64) { // 由於須要CAS操做,因此sleepSeconds沒有采用time.Duration類型 // 成功設置後才發出通知 if atomic.CompareAndSwapInt64(&w.sleepSeconds, 0, seconds) { fmt.Printf("sleepSeconds set %v\n", seconds) w.sleepNotify <- true } }
下面的測試程序中,模擬了一下ServerH,其使用場景與筆者工做中大同小異。
// workpool_test.go package workpool import ( "fmt" "sync" "testing" "time" ) // 這裏模擬ServerH服務的限流操做 var serverh = &server{max: 10, interval: 5} type server struct { count int max int lasttime time.Time interval int64 mu sync.Mutex } func (s *server) Access(i int) bool { now := time.Now() s.mu.Lock() defer s.mu.Unlock() time.Sleep(100 * time.Millisecond) if s.lasttime.Unix() <= 0 || s.count >= s.max { if now.After(s.lasttime) { s.count = 1 s.lasttime = time.Unix(now.Unix()+s.interval, 0) return true } fmt.Printf("Access false,i=%d \n", i) return false } else { s.count++ fmt.Printf("Access true,i=%d s.count %d\n", i, s.count) return true } } // 這裏是筆者服務的邏輯 func TestWorkPool_Start(t *testing.T) { wp := New(3, 100).Start() for i := 0; i < 100; i++ { time.Sleep(100 * time.Millisecond) wp.PushTaskFunc(func(w *WorkPool, args ...interface{}) Flag { if !serverh.Access(args[0].(int)) { // 發送睡眠5秒的通知 w.SleepNotify(5) // 這次未執行成功,要將該任務放回協程池 return FLAG_RETRY } return FLAG_OK }, i) } time.Sleep(100 * time.Second) }
輸出內容以下:
workpool run 3 worker sleepControl start... Access true,i=1 s.count 2 Access true,i=2 s.count 3 Access true,i=3 s.count 4 Access true,i=4 s.count 5 Access true,i=5 s.count 6 Access true,i=6 s.count 7 Access true,i=7 s.count 8 Access true,i=8 s.count 9 Access true,i=9 s.count 10 Access false,i=10 sleepSeconds set 5 work 1 PushTask,pool length 0 receive sleep notify start... sleepControl will star sleep 5 s Access false,i=10 work 0 PushTask,pool length 1 Access false,i=10 work 0 PushTask,pool length 2 Access false,i=11 work 2 PushTask,pool length 3 Access false,i=12 work 1 PushTask,pool length 5 Access false,i=13 work 0 PushTask,pool length 6 Access false,i=14 work 0 PushTask,pool length 7 Access false,i=10 work 1 PushTask,pool length 8 Access false,i=15 work 1 PushTask,pool length 9 Access false,i=11 work 0 PushTask,pool length 11 Access false,i=12 work 0 PushTask,pool length 11 Access false,i=16 work 0 PushTask,pool length 12 sleepControl was end sleep Access true,i=17 s.count 2 Access true,i=14 s.count 3 Access true,i=18 s.count 4 Access true,i=10 s.count 5 Access true,i=15 s.count 6 Access true,i=20 s.count 7 Access true,i=19 s.count 8 Access true,i=12 s.count 9 Access true,i=11 s.count 10 Access false,i=21 sleepSeconds set 5 work 0 PushTask,pool length 53 receive sleep notify start... sleepControl will star sleep 5 s Access false,i=16 work 1 PushTask,pool length 54 Access false,i=22 work 2 PushTask,pool length 55 Access false,i=23 work 0 PushTask,pool length 57 Access false,i=24 ...........
重試次數的邏輯