本文主要研究一下tunny的workerWrappergit
type workerWrapper struct { worker Worker interruptChan chan struct{} // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest // closeChan can be closed in order to cleanly shutdown this worker. closeChan chan struct{} // closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} } func newWorkerWrapper( reqChan chan<- workRequest, worker Worker, ) *workerWrapper { w := workerWrapper{ worker: worker, interruptChan: make(chan struct{}), reqChan: reqChan, closeChan: make(chan struct{}), closedChan: make(chan struct{}), } go w.run() return &w }
workerWrapper包裝了worker,定義了interruptChan、reqChan、closeChan、closedChan屬性
func (w *workerWrapper) interrupt() { close(w.interruptChan) w.worker.Interrupt() }
interrupt方法關閉w.interruptChan,執行w.worker.Interrupt()
func (w *workerWrapper) run() { jobChan, retChan := make(chan interface{}), make(chan interface{}) defer func() { w.worker.Terminate() close(retChan) close(w.closedChan) }() for { // NOTE: Blocking here will prevent the worker from closing down. w.worker.BlockUntilReady() select { case w.reqChan <- workRequest{ jobChan: jobChan, retChan: retChan, interruptFunc: w.interrupt, }: select { case payload := <-jobChan: result := w.worker.Process(payload) select { case retChan <- result: case <-w.interruptChan: w.interruptChan = make(chan struct{}) } case _, _ = <-w.interruptChan: w.interruptChan = make(chan struct{}) } case <-w.closeChan: return } } }
run首先建立jobChan、retChan,而後for循環執行select讀取reqChan,以後讀取jobChan的payload,進行處理,而後寫入到retChan
func (w *workerWrapper) stop() { close(w.closeChan) }
stop方法關閉w.closeChan
func (w *workerWrapper) join() { <-w.closedChan }
join方法則等待w.closedChan
tunny的workerWrapper包裝了worker,定義了interruptChan、reqChan、closeChan、closedChan屬性,它提供了interrupt、run、stop、join方法。github