golang協程池tunny源碼解析

tunny

github地址: https://github.com/Jeffail/tunny

項目結構

image.png
tunny的項目結構很是簡單,核心文件就是tunny.go與worker.gogit

總體分析

tunny.jpg
tunny主要是經過reqChan管道來聯繫pool與worker之間的關係,worker的數量與協程池的大小相等,在初始化協程池時決定;各個worker競爭地獲取reqChan中的數據,而後處理,最後返回給pool;github

代碼詳解

type Pool struct {
      queuedJobs int64
  
      ctor    func() Worker
      workers []*workerWrapper
      reqChan chan workRequest
  
      workerMut sync.Mutex
}

Pool結構體:golang

  • queuedJobs,這個變量表明pool當前積壓的job數量
  • ctor,這個變量表明worker具體的構造函數
  • workers,這個變量表明pool實際擁有的worker
  • reqChan,這個變量是pool與全部worker進行通訊的管道,全部worker與pool都使用相同的reqChan指針
  • workerMut,這個變量是在pool進行SetSize操做時使用的,防止不一樣協程同時對size進行操做

type Worker interface {
      // Process will synchronously perform a job and return the result.
      Process(interface{}) interface{}
  
      // BlockUntilReady is called before each job is processed and must block the
      // calling goroutine until the Worker is ready to process the next job.
      BlockUntilReady()
  
      // Interrupt is called when a job is cancelled. The worker is responsible
      // for unblocking the Process implementation.
      Interrupt()
                                                                                                                               
      // Terminate is called when a Worker is removed from the processing pool
      // and is responsible for cleaning up any held resources.
      Terminate()
  }

worker在tunny中被設計成了一個interface,由於在以後的代碼中能夠看到,worker能夠有許多不一樣地實現,正如以前一篇整理的博客所說:golang編碼技巧總結,咱們在寫代碼時都應該使用interface,來面向接口編程,實現解耦;編程

兩種workersegmentfault

// closureWorker is a minimal Worker implementation that simply wraps a
  // func(interface{}) interface{}
  type closureWorker struct {
      processor func(interface{}) interface{}
  }

閉包worker,這個worker是最經常使用的一種worker,它主要執行初始化時賦予它的processeor函數來完成工做;閉包

type callbackWorker struct{}
  
func (w *callbackWorker) Process(payload interface{}) interface{} {
      f, ok := payload.(func())
      if !ok {
          return ErrJobNotFunc
      }
      f()
      return nil
  }

回調worker,這種worker處理的數據必須是一個函數,而後調用這個函數;app


// NewFunc creates a new Pool of workers where each worker will process using
  // the provided func.
 func NewFunc(n int, f func(interface{}) interface{}) *Pool {
      return New(n, func() Worker {
          return &closureWorker{
              processor: f,
          }   
      })  
  }

初始化協程池時須要兩個參數,一個是協程池大小n,一個是但願協程池執行的函數,這個函數最終交由閉包worker,運行時由它實際處理數據;ide

func New(n int, ctor func() Worker) *Pool {
      p := &Pool{
          ctor:    ctor,
          reqChan: make(chan workRequest),
      }
      p.SetSize(n)
                                                                                                                               
      return p
 }

能夠看到,reqChan在這時出現了,這個在以後的代碼中將是鏈接pool與worker的核心;函數

SetSize會作什麼呢?編碼

func (p *Pool) SetSize(n int) {
      p.workerMut.Lock()
      defer p.workerMut.Unlock()
  
      lWorkers := len(p.workers)
      if lWorkers == n {
          return
      }                                                                                                                        
  
      // Add extra workers if N > len(workers)
      for i := lWorkers; i < n; i++ {
          p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
      }
  
      // Asynchronously stop all workers > N
      for i := n; i < lWorkers; i++ {
          p.workers[i].stop()
      }
      
      // Synchronously wait for all workers > N to stop
      for i := n; i < lWorkers; i++ {
          p.workers[i].join()
      }
  
      // Remove stopped workers from slice
      p.workers = p.workers[:n]
  }

首先,會對這個函數加鎖,這是爲了防止在多個協程同時進行SetSize操做;
其次,當worker數量小於須要SetSize的數量,則增長worker的數量;
若worker數量大於SetSize的數量,則減少worker的數量;

增長worker的數量是如何增長呢?newWorkerWrapper函數有不少值得關注的地方,值得注意的是,pool將它的reqChan傳給了這個函數,也就是傳給了worker;

func newWorkerWrapper(
      reqChan chan<- workRequest,
      worker Worker,
  ) *workerWrapper {
      w := workerWrapper{                                                                                                      
          worker:        worker,
          interruptChan: make(chan struct{}),
          reqChan:       reqChan,
          closeChan:     make(chan struct{}),
          closedChan:    make(chan struct{}),
      }
  
      go w.run()
  
      return &w
 }

能夠看到,在調用初始化newWorkerWrapper後,go了一個協程,進行w.run()操做,worker在這裏是調用的以前傳入的閉包worker的構造函數生成的,所以這裏的worker是閉包worker;

func (w *workerWrapper) run() {
      jobChan, retChan := make(chan interface{}), make(chan interface{})
      defer func() {
          w.worker.Terminate()
          close(retChan)
          close(w.closedChan)                                                                                                  
      }()
  
      for {
          // NOTE: Blocking here will prevent the worker from closing down.
          w.worker.BlockUntilReady()
          select {
          case w.reqChan <- workRequest{
              jobChan:       jobChan,
              retChan:       retChan,
              interruptFunc: w.interrupt,
          }:
              select {
              case payload := <-jobChan:
                  result := w.worker.Process(payload)
                  select {
                  case retChan <- result:
                  case <-w.interruptChan:
                      w.interruptChan = make(chan struct{})
                  }
              case _, _ = <-w.interruptChan:
                  w.interruptChan = make(chan struct{})
              }                                                                                                                
          case <-w.closeChan:
              return
          }
      }
  }

解讀這個run函數,這是整個worker的核心;
首先,能看到一個大的for循環,裏面嵌套了select;
一進入select,會無腦往reqChan裏傳入workRequest,這時須要與pool的接收函數對應起來看:

func (p *Pool) Process(payload interface{}) interface{} {
      atomic.AddInt64(&p.queuedJobs, 1)
  
      request, open := <-p.reqChan
      if !open {
          panic(ErrPoolNotRunning)
      }   
  
      request.jobChan <- payload
  
      payload, open = <-request.retChan
      if !open {
          panic(ErrWorkerClosed)
      }   
  
      atomic.AddInt64(&p.queuedJobs, -1)                                                                                       
      return payload
  }

能夠發現,由於worker會無腦往reqChan管道里傳入workRequest,所以pool必定會取到塞入的值交給request變量,payload是實際處理的數據,pool將其塞入workRequest的jobChan中,以後阻塞等待從retChan取得結果,因爲這個jobChan與worker的jobChan是同一個指針,所以payload能在worker的

select {
              case payload := <-jobChan:
                  result := w.worker.Process(payload)
                  select {
                  case retChan <- result:
                  case <-w.interruptChan:
                      w.interruptChan = make(chan struct{})
                  }
                  ...

case語句中被取到,而後進行處理,處理完後進入下一個select語句,無腦將result塞到retChan中;因爲worker的retChan與pool的retChan是同一個指針,所以pool取到了retChan的結果,將其返回;

多個worker的狀況,則會競爭從reqChan取數據,可是總能保證只有size個worker在工做,達到了限制協程數量的目的。

相關文章
相關標籤/搜索