Golang協程池(workpool)實現

背景

因與工做相關,因此本文中的數據都進行了更改,但邏輯是同樣的。閉包

筆者的服務ServerA會請求服務ServerH獲取一些數據,但ServerH的接口有個N秒內只能請求M次的限制,並返回false。而筆者的服務瞬時請求量遠超M次,因此採用了協程池在收到103錯誤時,中止worker的運行N秒,而後再啓動。函數

協程池的概念

協程池的相關概念:要有一個必定數量大小的池子(pool),池子裏存儲須要執行的任務(task),還要有若干個工做協程(worker)。測試

協程池要有啓動,中止,睡眠的功能。atom

下面是從零開始記錄一下思想過程和遇到的問題。code

基礎版

在此版本里,除了睡眠的功能,已經實現了一個基本的協程池。server

// workpool.go
package workpool

import (
	"context"
	"sync"
)

type TaskFunc func()

type Task struct {
	f TaskFunc
}

type WorkPool struct {
	pool        chan *Task
	workerCount int

	stopCtx        context.Context
	stopCancelFunc context.CancelFunc
	wg             sync.WaitGroup
}

func (t *Task) Execute() {
	t.f()
}

func New(workerCount, poolLen int) *WorkPool {
	return &WorkPool{
		workerCount: workerCount,
		pool:        make(chan *Task, poolLen),
	}
}

func (w *WorkPool) PushTask(t *Task) {
	w.pool <- t
}

func (w *WorkPool) PushTaskFunc(f TaskFunc) {
	w.pool <- &Task{
		f: f,
	}
}

func (w *WorkPool) work() {
	for {
		select {
		case <-w.stopCtx.Done():
			w.wg.Done()
			return
		case t := <-w.pool:
			t.Execute()
		}
	}
}

func (w *WorkPool) Start() *WorkPool {
	w.wg.Add(w.workerCount)
	w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background())
	for i := 0; i < w.workerCount; i++ {
		go w.work()
	}
	return w
}

func (w *WorkPool) Stop() {
	w.stopCancelFunc()
	w.wg.Wait()
}

看起來沒什麼毛病,還挺簡潔。其實否則...協程

下面的程序是建立一個容量爲50的workpool,並將經過3個worker輸出100個數字。接口

// workpool_test.go
package workpool

import (
	"fmt"
	"sync"
	"testing"
)

func TestWorkPool_Start(t *testing.T) {
	wg := sync.WaitGroup{}
	wp := New(3, 50).Start()
	lenth := 100
	wg.Add(lenth)
	for i := 0; i < lenth; i++ {
		wp.PushTaskFunc(func() {
			defer wg.Done()
			fmt.Print(i, " ")
		})
	}
	wg.Wait()
}

運行後輸出結果以下:it

50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 51 51 51 51 69 72 78 78 80 81 81 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 84 84 84 84 50 84
100 100 100 100 100 100 100 100 100 100 50 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 84 100 100 100

這和想象中的輸出 0-99 相差甚遠。io

其緣由在於閉包函數對於外部變量是引用的,因此在函數執行的時候,i的值早就已經改變了。下面是一個關於閉包的簡單例子。

x := 1
f := func() {
	println(x)
}
x = 2
x = 3
f() // 3

能夠將 f() 的調用時機對應爲協程池中的 t.Execute()。

解決閉包引用問題

既然是由於閉包引用致使的問題,那就不使用閉包了唄。

能夠把參數傳到函數內,可是由於並不知道將要執行的函數須要的參數個數及類型,因此只能是使用不定長的interface{}TaskFunc,在使用的時候進行斷言。

如下僅列出改動部分:

// workpool.go
type TaskFunc func(args ...interface{})

type Task struct {
	f    TaskFunc
	args []interface{}
}

func (t *Task) Execute() {
	t.f(t.args...)
}

func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) {
	w.pool <- &Task{
		f:    f,
		args: args,
	}
}

如下是測試程序:

// workpool_test.go
package workpool

import (
	"fmt"
	"sync"
	"testing"
)

func TestWorkPool_Start(t *testing.T) {
	wg := sync.WaitGroup{}
	wp := New(3, 50).Start()
	lenth := 100
	wg.Add(lenth)
	for i := 0; i < lenth; i++ {
		wp.PushTaskFunc(func(args ...interface{}) {
			defer wg.Done()
			fmt.Print(args[0].(int), " ")
		}, i)
	}
	wg.Wait()
}

輸出內容以下:

0 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 2 1 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 26 48 49 51 52 53 54 55 56 50 58 59 57 61 62 63 64 65 66 25 68 6
9 70 71 72 73 67 75 76 77 74 79 78 81 82 83 84 60 86 87 88 89 90 91 92 85 94 95 96 97 98 99 80 93

雖然順序是錯亂的,但這是正常狀況,閉包引用問題已解決。

添加睡眠功能

基於開頭的應用場景,在任意一個被worker執行的任務收到ServerH的103錯誤後,要中止全部worker一段時間,由於再一直請求也沒有意義。

這個版本已經與筆者正在使用的相差無幾了

// workpool.go
package workpool

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Flag int64

const (
	FLAG_OK    Flag = 1 << iota
	FLAG_RETRY Flag = 1 << iota
)

type TaskFunc func(w *WorkPool, args ...interface{}) Flag

type Task struct {
	f    TaskFunc
	args []interface{}
}

type WorkPool struct {
	pool        chan *Task
	workerCount int

        // stop相關
	stopCtx        context.Context
	stopCancelFunc context.CancelFunc
	wg             sync.WaitGroup

        // sleep相關
	sleepCtx        context.Context
	sleepCancelFunc context.CancelFunc
	sleepSeconds    int64
	sleepNotify     chan bool
}

func (t *Task) Execute(w *WorkPool) Flag {
	return t.f(w, t.args...)
}

func New(workerCount, poolLen int) *WorkPool {
	return &WorkPool{
		workerCount: workerCount,
		pool:        make(chan *Task, poolLen),
		sleepNotify: make(chan bool),
	}
}

func (w *WorkPool) PushTask(t *Task) {
	w.pool <- t
}

func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) {
	w.pool <- &Task{
		f:    f,
		args: args,
	}
}

func (w *WorkPool) work(i int) {
	for {
		select {
		case <-w.stopCtx.Done():
			w.wg.Done()
			return
		case <-w.sleepCtx.Done():
			time.Sleep(time.Duration(w.sleepSeconds) * time.Second)
		case t := <-w.pool:
			flag := t.Execute(w)
			if flag&FLAG_RETRY != 0 {
				w.PushTask(t)
				fmt.Printf("work %v PushTask,pool length %v\n", i, len(w.pool))
			}
		}
	}
}

func (w *WorkPool) Start() *WorkPool {
	fmt.Printf("workpool run %d worker\n", w.workerCount)
	w.wg.Add(w.workerCount + 1)
	w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background())
	w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background())
	go w.sleepControl()
	for i := 0; i < w.workerCount; i++ {
		go w.work(i)
	}
	return w
}

func (w *WorkPool) Stop() {
	w.stopCancelFunc()
	w.wg.Wait()
}

func (w *WorkPool) sleepControl() {
	fmt.Println("sleepControl start...")
	for {
		select {
		case <-w.stopCtx.Done():
			w.wg.Done()
			return
		case <-w.sleepNotify:
			fmt.Printf("receive sleep notify start...\n")
			w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background())
			w.sleepCancelFunc()
			fmt.Printf("sleepControl will star sleep %v s\n", w.sleepSeconds)
			time.Sleep(time.Duration(w.sleepSeconds) * time.Second)
			w.sleepSeconds = 0
			fmt.Println("sleepControl was end sleep")
		}
	}
}


func (w *WorkPool) SleepNotify(seconds int64) {
	// 由於須要CAS操做,因此sleepSeconds沒有采用time.Duration類型
	// 成功設置後才發出通知
	if atomic.CompareAndSwapInt64(&w.sleepSeconds, 0, seconds) {
		fmt.Printf("sleepSeconds set %v\n", seconds)
		w.sleepNotify <- true
	}
}

下面的測試程序中,模擬了一下ServerH,其使用場景與筆者工做中大同小異。

// workpool_test.go
package workpool

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

// 這裏模擬ServerH服務的限流操做
var serverh = &server{max: 10, interval: 5}

type server struct {
	count    int
	max      int
	lasttime time.Time
	interval int64
	mu       sync.Mutex
}

func (s *server) Access(i int) bool {
	now := time.Now()

	s.mu.Lock()
	defer s.mu.Unlock()

	time.Sleep(100 * time.Millisecond)

	if s.lasttime.Unix() <= 0 || s.count >= s.max {
		if now.After(s.lasttime) {
			s.count = 1
			s.lasttime = time.Unix(now.Unix()+s.interval, 0)
			return true
		}
		fmt.Printf("Access false,i=%d \n", i)
		return false
	} else {
		s.count++
		fmt.Printf("Access true,i=%d s.count %d\n", i, s.count)
		return true
	}
}

// 這裏是筆者服務的邏輯
func TestWorkPool_Start(t *testing.T) {
	wp := New(3, 100).Start()
	for i := 0; i < 100; i++ {
		time.Sleep(100 * time.Millisecond)
		wp.PushTaskFunc(func(w *WorkPool, args ...interface{}) Flag {
			if !serverh.Access(args[0].(int)) {
                                // 發送睡眠5秒的通知
				w.SleepNotify(5) 
                                // 這次未執行成功,要將該任務放回協程池
				return FLAG_RETRY 
			}
			return FLAG_OK
		}, i)
	}
	time.Sleep(100 * time.Second)
}

輸出內容以下:

workpool run 3 worker
sleepControl start...
Access true,i=1 s.count 2
Access true,i=2 s.count 3
Access true,i=3 s.count 4
Access true,i=4 s.count 5
Access true,i=5 s.count 6
Access true,i=6 s.count 7
Access true,i=7 s.count 8
Access true,i=8 s.count 9
Access true,i=9 s.count 10
Access false,i=10 
sleepSeconds set 5
work 1 PushTask,pool length 0
receive sleep notify start...
sleepControl will star sleep 5 s
Access false,i=10 
work 0 PushTask,pool length 1
Access false,i=10 
work 0 PushTask,pool length 2
Access false,i=11 
work 2 PushTask,pool length 3
Access false,i=12 
work 1 PushTask,pool length 5
Access false,i=13 
work 0 PushTask,pool length 6
Access false,i=14 
work 0 PushTask,pool length 7
Access false,i=10 
work 1 PushTask,pool length 8
Access false,i=15 
work 1 PushTask,pool length 9
Access false,i=11 
work 0 PushTask,pool length 11
Access false,i=12 
work 0 PushTask,pool length 11
Access false,i=16 
work 0 PushTask,pool length 12
sleepControl was end sleep
Access true,i=17 s.count 2
Access true,i=14 s.count 3
Access true,i=18 s.count 4
Access true,i=10 s.count 5
Access true,i=15 s.count 6
Access true,i=20 s.count 7
Access true,i=19 s.count 8
Access true,i=12 s.count 9
Access true,i=11 s.count 10
Access false,i=21 
sleepSeconds set 5
work 0 PushTask,pool length 53
receive sleep notify start...
sleepControl will star sleep 5 s
Access false,i=16 
work 1 PushTask,pool length 54
Access false,i=22 
work 2 PushTask,pool length 55
Access false,i=23 
work 0 PushTask,pool length 57
Access false,i=24 
...........

待補充

重試次數的邏輯

相關文章
相關標籤/搜索