以前工做中有項目用到goroutine池,抽空封裝了下。golang
package pool
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Pool struct {
mu *sync.Mutex
tasks chan ITask
workerNum int
workers []*worker
freeWorker int32
done chan struct{}
}
type worker struct {
id int
p *Pool
release chan struct{}
}
type ITask interface {
Execute()
}
func NewPool(workerNum int) *Pool {
return &Pool{
tasks: make(chan ITask),
workerNum: workerNum,
workers: make([]*worker, 0),
done: make(chan struct{}),
freeWorker: int32(workerNum),
mu: &sync.Mutex{},
}
}
func (p *Pool) Put(task ITask) {
p.tasks <- task
}
func (p *Pool) Run() {
for i := 0; i < p.workerNum; i++ {
w := newWorker(i, p)
p.workers = append(p.workers, w)
go w.run()
}
select {
case <-p.done:
return
}
}
func (p *Pool) Resize(workerNum int) error {
if workerNum < 0 {
return fmt.Errorf("invalid worker num: %d", workerNum)
}
p.mu.Lock()
defer p.mu.Unlock()
if workerNum > p.workerNum {
newWorkerNum := workerNum - p.workerNum
for i := 0; i < newWorkerNum; i++ {
w := newWorker(i, p)
p.workers = append(p.workers, w)
go w.run()
}
p.workerNum = len(p.workers)
} else {
releaseNum := p.workerNum - workerNum
if releaseNum == p.workerNum {
return fmt.Errorf("invalid operation, pool size should not be zero")
}
for i := 0; i < releaseNum && i < p.workerNum; i++ {
close(p.workers[0].release)
if len(p.workers) > 1 {
p.workers = p.workers[1:]
} else {
p.workers = []*worker{}
}
}
p.workerNum = len(p.workers)
}
return nil
}
func (p *Pool) FreeSize() int {
return int(atomic.LoadInt32(&p.freeWorker))
}
func (p *Pool) Size() int {
p.mu.Lock()
defer p.mu.Unlock()
return p.workerNum
}
func (p *Pool) Close() {
close(p.done)
}
func newWorker(id int, p *Pool) *worker {
return &worker{
id: id,
p: p,
release: make(chan struct{}),
}
}
func (w *worker) run() {
p := w.p
for {
select {
case <-w.release:
return
case <-p.done:
return
case t := <-p.tasks:
atomic.AddInt32(&p.freeWorker, -1)
t.Execute()
atomic.AddInt32(&p.freeWorker, 1)
}
}
}
複製代碼
調用app
type Task struct {
}
func (t *Task) Execute() {
fmt.Println("task printf", time.Now())
}
func main() {
task := &Task{}
p := NewPool(6)
go p.Run()
defer p.Close()
time.Sleep(time.Second * 1)
for i := 0; i < 5; i++ {
p.Put(task)
time.Sleep(time.Second * 1)
fmt.Println("before: ", p.Size())
p.Resize(p.Size() / 2)
fmt.Println(p.Size())
fmt.Println()
// p.Resize(p.Size() + 10)
}
}
複製代碼
大概寫了下,先存着備忘,還沒怎麼測試,測完有bug再進行修復。測試