github地址: https://github.com/Jeffail/tunny
tunny的項目結構很是簡單,核心文件就是tunny.go與worker.gogit
tunny主要是經過reqChan管道來聯繫pool與worker之間的關係,worker的數量與協程池的大小相等,在初始化協程池時決定;各個worker競爭地獲取reqChan中的數據,而後處理,最後返回給pool;github
type Pool struct { queuedJobs int64 ctor func() Worker workers []*workerWrapper reqChan chan workRequest workerMut sync.Mutex }
Pool結構體:golang
type Worker interface { // Process will synchronously perform a job and return the result. Process(interface{}) interface{} // BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady() // Interrupt is called when a job is cancelled. The worker is responsible // for unblocking the Process implementation. Interrupt() // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }
worker在tunny中被設計成了一個interface,由於在以後的代碼中能夠看到,worker能夠有許多不一樣地實現,正如以前一篇整理的博客所說:golang編碼技巧總結,咱們在寫代碼時都應該使用interface,來面向接口編程,實現解耦;編程
兩種workersegmentfault
// closureWorker is a minimal Worker implementation that simply wraps a // func(interface{}) interface{} type closureWorker struct { processor func(interface{}) interface{} }
閉包worker,這個worker是最經常使用的一種worker,它主要執行初始化時賦予它的processeor函數來完成工做;閉包
type callbackWorker struct{} func (w *callbackWorker) Process(payload interface{}) interface{} { f, ok := payload.(func()) if !ok { return ErrJobNotFunc } f() return nil }
回調worker,這種worker處理的數據必須是一個函數,而後調用這個函數;app
// NewFunc creates a new Pool of workers where each worker will process using // the provided func. func NewFunc(n int, f func(interface{}) interface{}) *Pool { return New(n, func() Worker { return &closureWorker{ processor: f, } }) }
初始化協程池時須要兩個參數,一個是協程池大小n,一個是但願協程池執行的函數,這個函數最終交由閉包worker,運行時由它實際處理數據;ide
func New(n int, ctor func() Worker) *Pool { p := &Pool{ ctor: ctor, reqChan: make(chan workRequest), } p.SetSize(n) return p }
能夠看到,reqChan在這時出現了,這個在以後的代碼中將是鏈接pool與worker的核心;函數
SetSize會作什麼呢?編碼
func (p *Pool) SetSize(n int) { p.workerMut.Lock() defer p.workerMut.Unlock() lWorkers := len(p.workers) if lWorkers == n { return } // Add extra workers if N > len(workers) for i := lWorkers; i < n; i++ { p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor())) } // Asynchronously stop all workers > N for i := n; i < lWorkers; i++ { p.workers[i].stop() } // Synchronously wait for all workers > N to stop for i := n; i < lWorkers; i++ { p.workers[i].join() } // Remove stopped workers from slice p.workers = p.workers[:n] }
首先,會對這個函數加鎖,這是爲了防止在多個協程同時進行SetSize操做;
其次,當worker數量小於須要SetSize的數量,則增長worker的數量;
若worker數量大於SetSize的數量,則減少worker的數量;
增長worker的數量是如何增長呢?newWorkerWrapper
函數有不少值得關注的地方,值得注意的是,pool將它的reqChan傳給了這個函數,也就是傳給了worker;
func newWorkerWrapper( reqChan chan<- workRequest, worker Worker, ) *workerWrapper { w := workerWrapper{ worker: worker, interruptChan: make(chan struct{}), reqChan: reqChan, closeChan: make(chan struct{}), closedChan: make(chan struct{}), } go w.run() return &w }
能夠看到,在調用初始化newWorkerWrapper後,go了一個協程,進行w.run()操做,worker在這裏是調用的以前傳入的閉包worker的構造函數生成的,所以這裏的worker是閉包worker;
func (w *workerWrapper) run() { jobChan, retChan := make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }() for { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { case w.reqChan <- workRequest{ jobChan: jobChan, retChan: retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) } case <-w.closeChan: return } } }
解讀這個run函數,這是整個worker的核心;
首先,能看到一個大的for循環,裏面嵌套了select;
一進入select,會無腦往reqChan裏傳入workRequest,這時須要與pool的接收函數對應起來看:
func (p *Pool) Process(payload interface{}) interface{} { atomic.AddInt64(&p.queuedJobs, 1) request, open := <-p.reqChan if !open { panic(ErrPoolNotRunning) } request.jobChan <- payload payload, open = <-request.retChan if !open { panic(ErrWorkerClosed) } atomic.AddInt64(&p.queuedJobs, -1) return payload }
能夠發現,由於worker會無腦往reqChan管道里傳入workRequest,所以pool必定會取到塞入的值交給request變量,payload是實際處理的數據,pool將其塞入workRequest的jobChan中,以後阻塞等待從retChan取得結果,因爲這個jobChan與worker的jobChan是同一個指針,所以payload能在worker的
select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } ...
case語句中被取到,而後進行處理,處理完後進入下一個select語句,無腦將result塞到retChan中;因爲worker的retChan與pool的retChan是同一個指針,所以pool取到了retChan的結果,將其返回;
多個worker的狀況,則會競爭從reqChan取數據,可是總能保證只有size個worker在工做,達到了限制協程數量的目的。