Go 每日一庫之 tunny

簡介

以前寫過一篇文章介紹了ants這個 goroutine 池實現。當時在網上查看相關資料的時候,發現了另一個實現tunny。趁着時間相近,正好研究一番。也比如較一下這兩個庫。那就讓咱們開始吧。git

快速開始

本文代碼使用 Go Modules。github

建立目錄並初始化:golang

$ mkdir tunny && cd tunny
$ go mod init github.com/darjun/go-daily-lib/tunny

使用go get從 GitHub 獲取tunny庫:編程

$ go get -u github.com/Jeffail/tunny

爲了方便地和ants作一個對比,咱們將ants中的示例從新用tunny實現一遍:仍是那個分段求和的例子:數組

const (
  DataSize    = 10000
  DataPerTask = 100
)

func main() {
  numCPUs := runtime.NumCPU()
  p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
    var sum int
    for _, n := range payload.([]int) {
      sum += n
    }
    return sum
  })
  defer p.Close()
  // ...
}

使用也很是簡單,首先建立一個Pool,這裏使用tunny.NewFunc()微信

第一個參數爲池子大小,即同時有多少個 worker (也即 goroutine)在工做,這裏設置成邏輯 CPU 個數,對於 CPU 密集型任務,這個值設置太大無心義,反而有可能致使 goroutine 切換頻繁而下降性能。併發

第二個參數傳入一個func(interface{})interface{}的參數做爲任務處理函數。後續傳入數據就會調用這個函數處理。app

池子使用完須要關閉,這裏使用defer p.Close()在程序退出前關閉。異步

而後,生成測試數據,仍是 10000 個隨機數,分紅 100 組:函數

nums := make([]int, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)
}

處理每組數據:

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partialSums := make([]int, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  go func(i int) {
    partialSums[i] = p.Process(nums[i*DataPerTask : (i+1)*DataPerTask]).(int)
    wg.Done()
  }(i)
}

wg.Wait()

調用p.Process()方法,傳入任務數據,池子中會選擇空閒的 goroutine 來處理這個數據。因爲咱們上面設置了處理函數,goroutine 會直接調用該函數,將這個切片做爲參數傳入。

tunnyants不一樣的是,tunny的任務處理是同步的,即調用p.Process()方法以後,當前 goroutine 會掛起,直到任務處理完成以後纔會被喚醒。因爲是同步的,因此p.Process()方法能夠直接返回處理結果。這也是上面程序在分發任務的時候,啓動多個 goroutine 的緣由。若是不是每一個任務都啓動一個 goroutine,p.Process()方法會一直等待任務完成,那麼後面的任務要等到前面的任務所有執行完以後才能執行。這樣就發揮不了併發的優點了。

這裏注意一個小細節,我將for循環變量做爲參數傳給 goroutine 函數了。若是不這樣作,全部 goroutine 都共用外層的i,並且 goroutine 開始運行時,for循環大機率已經結束了,這時i = DataSize/DataPerTask,索引nums[i*DataPerTask : (i+1)*DataPerTask]會越界觸發 panic。

最後統計數據,驗證結果:

var sum int
for _, s := range partialSums {
  sum += s
}

var expect int
for _, num := range nums {
  expect += num
}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)

運行:

$ go run main.go
finish all tasks, result is 5010172 expect:5010172

超時

默認狀況下,p.Process()會一直阻塞直到任務完成,即便當前沒有空閒 worker 也會阻塞。咱們也可使用帶超時的Process()方法:ProcessTimed()。傳入一個超時時間間隔,若是超過這個時間尚未空閒 worker,或者任務尚未處理完成,就會終止,並返回一個錯誤。

超時有 2 種狀況:

  • 等不到空閒的 worker:全部 worker 一直處理繁忙狀態,正在處理的任務比較耗時,沒法短期內完成;
  • 任務自己比較耗時。

下面咱們編寫一個計算斐波那契的函數,使用遞歸這種低效的實現方法:

func fib(n int) int {
  if n <= 1 {
    return 1
  }

  return fib(n-1) + fib(n-2)
}

咱們先看任務比較耗時的狀況,建立Pool對象。爲了觀察更明顯,在處理函數中添加了time.Sleep()語句:

p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
  n := payload.(int)
  result := fib(n)
  time.Sleep(5 * time.Second)
  return result
})
defer p.Close()

生成與池容量相等的任務數,調用p.ProcessTimed()方法,設置超時爲 1s:

var wg sync.WaitGroup
wg.Add(numCPUs)
for i := 0; i < numCPUs; i++ {
  go func(i int) {
    n := rand.Intn(30)
    result, err := p.ProcessTimed(n, time.Second)
    nowStr := time.Now().Format("2006-01-02 15:04:05")
    if err != nil {
      fmt.Printf("[%s]task(%d) failed:%v\n", nowStr, i, err)
    } else {
      fmt.Printf("[%s]fib(%d) = %d\n", nowStr, n, result)
    }
    wg.Done()
  }(i)
}

wg.Wait()

由於處理函數中 sleep 5s,因此任務在執行過程當中就超時了。運行:

$ go run main.go 
[2021-06-10 16:36:26]task(7) failed:job request timed out
[2021-06-10 16:36:26]task(4) failed:job request timed out
[2021-06-10 16:36:26]task(1) failed:job request timed out
[2021-06-10 16:36:26]task(6) failed:job request timed out
[2021-06-10 16:36:26]task(5) failed:job request timed out
[2021-06-10 16:36:26]task(0) failed:job request timed out
[2021-06-10 16:36:26]task(3) failed:job request timed out
[2021-06-10 16:36:26]task(2) failed:job request timed out

都在同一秒中超時。

咱們將任務數量翻倍,再將處理函數中的 sleep 改成 990ms,保證前一批任務能順利完成,後續任務或者因爲等不到空閒 worker,或者因爲執行時間過長而超時返回。運行:

$ go run main.go
[2021-06-10 16:42:46]fib(11) = 144
[2021-06-10 16:42:46]fib(25) = 121393
[2021-06-10 16:42:46]fib(27) = 317811
[2021-06-10 16:42:46]fib(1) = 1
[2021-06-10 16:42:46]fib(18) = 4181
[2021-06-10 16:42:46]fib(29) = 832040
[2021-06-10 16:42:46]fib(17) = 2584
[2021-06-10 16:42:46]fib(20) = 10946
[2021-06-10 16:42:46]task(5) failed:job request timed out
[2021-06-10 16:42:46]task(14) failed:job request timed out
[2021-06-10 16:42:46]task(8) failed:job request timed out
[2021-06-10 16:42:46]task(7) failed:job request timed out
[2021-06-10 16:42:46]task(13) failed:job request timed out
[2021-06-10 16:42:46]task(12) failed:job request timed out
[2021-06-10 16:42:46]task(11) failed:job request timed out
[2021-06-10 16:42:46]task(6) failed:job request timed out

context

context 是協調 goroutine 的工具。tunny支持帶context.Context參數的方法:ProcessCtx()。當前 context 狀態變爲Done以後,任務也會中止執行。context 會因爲超時、取消等緣由切換爲Done狀態。仍是拿上面的例子:

go func(i int) {
  n := rand.Intn(30)
  ctx, cancel := context.WithCancel(context.Background())
  if i%2 == 0 {
    go func() {
      time.Sleep(500 * time.Millisecond)
      cancel()
    }()
  }

  result, err := p.ProcessCtx(ctx, n)
  if err != nil {
     fmt.Printf("task(%d) failed:%v\n", i, err)
  } else {
     fmt.Printf("fib(%d) = %d\n", n, result)
  }
  wg.Done()
}(i)

其餘代碼都同樣,咱們調用p.ProcessCtx()方法來執行任務。參數是一個可取消的Context。對於序號爲偶數的任務,咱們啓動一個 goroutine 在 500ms 以後cancel()掉這個Context。代碼運行結果以下:

$ go run main.go
task(4) failed:context canceled
task(6) failed:context canceled
task(0) failed:context canceled
task(2) failed:context canceled
fib(27) = 317811
fib(25) = 121393
fib(1) = 1
fib(18) = 4181

咱們看到偶數序號的任務都被取消了。

源碼

tunny的源碼更少,除去測試代碼和註釋,連 500 行都不到。那麼就一塊兒來看一下吧。Pool結構以下:

// src/github.com/Jeffail/tunny.go
type Pool struct {
  queuedJobs int64

  ctor    func() Worker
  workers []*workerWrapper
  reqChan chan workRequest

  workerMut sync.Mutex
}

Pool結構中有一個ctor字段,這是一個函數對象,用於返回一個實現Worker接口的值:

type Worker interface {
  Process(interface{}) interface{}
  BlockUntilReady()
  Interrupt()
  Terminate()
}

這個接口不一樣的方法在任務執行的不一樣階段調用。最重要的當屬Process(interface{}) interface{}方法了。這個就是執行任務的函數。tunny提供New()方法建立Pool對象,這個方法須要咱們本身構造ctor函數對象,使用多有不便。tunny提供了另外兩個默認實現closureWorkercallbackWorker

type closureWorker struct {
  processor func(interface{}) interface{}
}

func (w *closureWorker) Process(payload interface{}) interface{} {
  return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}

type callbackWorker struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {
  f, ok := payload.(func())
  if !ok {
    return ErrJobNotFunc
  }
  f()
  return nil
}

func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt()       {}
func (w *callbackWorker) Terminate()       {}

tunny.NewFunc()方法使用的就是closureWorker

func NewFunc(n int, f func(interface{}) interface{}) *Pool {
  return New(n, func() Worker {
    return &closureWorker{
      processor: f,
    }
  })
}

建立的closureWorker直接將參數f做爲任務處理函數。

tunny.NewCallback()方法使用callbackWorker

func NewCallback(n int) *Pool {
  return New(n, func() Worker {
    return &callbackWorker{}
  })
}

callbackWorker結構中沒有處理函數,只能給它發送無參無返回值的函數對象做爲任務,它的Process()方法就是執行這個函數。

建立Pool對象後,都是調用它的SetSize()方法,設置 worker 數量。在這個方法中會啓動相應數量的 goroutine:

func (p *Pool) SetSize(n int) {
  p.workerMut.Lock()
  defer p.workerMut.Unlock()

  lWorkers := len(p.workers)
  if lWorkers == n {
    return
  }

  for i := lWorkers; i < n; i++ {
    p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
  }

  // 中止過多的 worker
  for i := n; i < lWorkers; i++ {
    p.workers[i].stop()
  }

  // 等待 worker 中止
  for i := n; i < lWorkers; i++ {
    p.workers[i].join()
    // -----------------
  }
  p.workers = p.workers[:n]
}

SetSize()其實在擴容和縮容的時候也會調用。對於擴容,它會建立相應數量的 worker。對於縮容,它會將多餘的 worker 停掉。與ants不一樣,tunny的擴容縮容都是即時生效的。

代碼中,我用-----------------標出來的地方我以爲有點問題。對於縮容,由於底層的數組沒有變化,workers切片長度縮小以後,數組中後面的元素實際上就訪問不到了,可是數組還持有它的引用,算是一種內存泄漏吧。因此穩妥起見最好加上p.workers[i] = nil

這裏建立的 worker 其實是包裝了一層的workerWrapper結構:

// src/github.com/Jeffail/worker.go
type workerWrapper struct {
  worker        Worker
  interruptChan chan struct{}
  reqChan chan<- workRequest
  closeChan chan struct{}
  closedChan chan struct{}
}

func newWorkerWrapper(
  reqChan chan<- workRequest,
  worker Worker,
) *workerWrapper {
  w := workerWrapper{
    worker:        worker,
    interruptChan: make(chan struct{}),
    reqChan:       reqChan,
    closeChan:     make(chan struct{}),
    closedChan:    make(chan struct{}),
  }

  go w.run()
  return &w
}

workerWrapper結構建立以後會馬上調用run()方法啓動一個 goroutine:

func (w *workerWrapper) run() {
  jobChan, retChan := make(chan interface{}), make(chan interface{})
  defer func() {
    w.worker.Terminate()
    close(retChan)
    close(w.closedChan)
  }()

  for {
    w.worker.BlockUntilReady()
    select {
    case w.reqChan <- workRequest{
      jobChan:       jobChan,
      retChan:       retChan,
      interruptFunc: w.interrupt,
    }:
      select {
      case payload := <-jobChan:
        result := w.worker.Process(payload)
        select {
        case retChan <- result:
        case <-w.interruptChan:
          w.interruptChan = make(chan struct{})
        }
      case _, _ = <-w.interruptChan:
        w.interruptChan = make(chan struct{})
      }
    case <-w.closeChan:
      return
    }
  }
}

每一個 worker goroutine 都在嘗試向w.reqChan通道中發送一個workRequest結構數據,發送成功以後,從jobChan中獲取任務數據,而後調用Worker.Process()方法執行任務,最後將結果發送到retChan通道中。這裏其實有好幾個交互。須要結合Process()方法來看才更清晰:

func (p *Pool) Process(payload interface{}) interface{} {
  request, open := <-p.reqChan
  request.jobChan <- payload
  payload, open = <-request.retChan
  return payload
}

刪掉無相關的代碼,最後就是上面這樣。咱們在調用池對象的Process()方法時,嘗試從通道reqChan中接收數據,而後將任務數據發送到jobChan通道中,最後從retChan通道中接收結果。與上面的run流程結合來看,實際上在正常執行一個任務時,PoolworkerWrapper有 3 次交互。

觀察Pool建立到workerWrapper建立的流程,咱們能夠看到實際上Pool結構中的reqChanworkerWrapper結構中的reqChan是同一個通道。即workerWrapper啓動後,會阻塞在向reqChan通道發送數據上,直到調用了PoolProcess*()方法,從通道reqChan取出數據。Process()方法獲得workRequest會向它的jobChan通道中發送任務數據。而workerWrapper.run()方法成功發送數據到reqChan以後就開始等待從jobChan通道中接收數據,這時接收到Process()方法發送過來的數據。開始執行w.worker.Process()方法,而後向retChan通道發送結果數據,Process()方法在成功發送數據到jobChan以後,就開始等待從retChan通道中接收數據。接收成功以後,Process()方法返回,workerWrapper.run()繼續阻塞在w.reqChan <-這條語句上,等待處理下一個任務。注意jobChanretChan都是workerWrapper.run()方法中建立的通道。

那麼超時是怎麼實現的呢?看方法ProcessTimed()的實現:

func (p *Pool) ProcessTimed(
  payload interface{},
  timeout time.Duration,
) (interface{}, error) {
  tout := time.NewTimer(timeout)
  var request workRequest
  select {
  case request, open = <-p.reqChan:
  case <-tout.C:
    return nil, ErrJobTimedOut
  }

  select {
  case request.jobChan <- payload:
  case <-tout.C:
    request.interruptFunc()
    return nil, ErrJobTimedOut
  }

  select {
  case payload, open = <-request.retChan:
  case <-tout.C:
    request.interruptFunc()
    return nil, ErrJobTimedOut
  }

  tout.Stop()
  return payload, nil
}

一樣地,刪除不相干的代碼。首先,建立一個timer,超時時間由傳入參數指定。後面有 3 個select語句:

  • 等待從p.reqChan取數據,即等待有 worker 空閒;
  • 等待發送數據到jobChan,即等待 worker 從jobChan取出任務數據;
  • 等待從retChan取數據,即等待 worker 將結果發送到retChan

第一種狀況,若是超時了,說明 worker 都處於繁忙狀態,直接返回任務超時。後面兩種狀況其實是任務已經開始執行了,可是在規定的時間內沒有完成。這兩種狀況,須要終止任務的執行。咱們看到上面調用了workerRequest.interruptFunc()方法,也就是workerWrapper.interrupt()方法:

func (w *workerWrapper) interrupt() {
  close(w.interruptChan)
  w.worker.Interrupt()
}

這個方法就是簡單關閉了interrupteChan通道,而後調用worker對象的Interrupt()方法,默認實現中這個方法都是空的。

interruptChan通道關閉後,goroutine 中等待從jobChan接收數據和等待向retChan發送數據的操做都會取消:

select {
case payload := <-jobChan:
  result := w.worker.Process(payload)
  select {
  case retChan <- result:
  case <-w.interruptChan:
    w.interruptChan = make(chan struct{})
  }
case _, _ = <-w.interruptChan:
  w.interruptChan = make(chan struct{})
}

ProcessCtx()實現也是相似的。

最後調用workerWrapper.stop()會關閉closeChan通道,這會致使workerWrapper.run()方法中的for循環跳出,進而執行defer函數中的close(retChan)close(closedChan)

defer func() {
  w.worker.Terminate()
  close(retChan)
  close(w.closedChan)
}()

這裏須要關閉retChan通道是爲了防止Process*()方法在等待retChan數據。

closedChan通道關閉後,workerWrapper.join()方法就返回了。

func (w *workerWrapper) join() {
  <-w.closedChan
}

Worker幾個方法的調用時機:

  • Process():執行任務時;
  • Interrupt():任務由於超時會被 context 取消時;
  • BlockUntilReady():每次執行新任務前,可能須要準備一些資源;
  • Terminate()workerWrapper.run()中的 defer 函數中,即中止 worker 後。

這些時機在代碼中都能清晰地看到。

基於源碼,我畫了一個流程圖:

圖中省略了中斷的流程。

tunny vs ants

tunny設計的思路與ants有較大的區別:

tunny只支持同步的方式執行任務,雖然任務在另外一個 goroutine 執行,可是提交任務的 goroutine 必須等待結果返回或超時。不能作其餘事情。正是因爲這一點,致使tunny的設計稍微一點複雜,並且爲了支持超時和取消,設計了多個通道用於和執行任務的 goroutine 通訊。一次任務執行的過程涉及屢次通訊,性能是有損失的。從另外一方面說,同步的編程方式更符合人類的直覺。

ants徹底是異步的任務執行流程,相比tunny性能是稍高一些的。可是也由於它的異步特性,致使沒有任務超時、取消這些機制。並且若是須要收集結果,必需要本身編寫額外的代碼。

總結

本文介紹了另外一個 goroutine 池的實現tunny。它以同步的方式來處理任務,編寫代碼更加直觀,對任務的執行流程有更強的控制,如超時、取消等。固然實現也複雜一些。tunny代碼不走 500 行,很是建議讀一讀。

你們若是發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. tunny GitHub:https://github.com/Jeffail/tunny
  2. ants GitHub:github.com/panjf2000/ants
  3. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

個人博客:https://darjun.github.io

歡迎關注個人微信公衆號【GoUpUp】,共同窗習,一塊兒進步~

相關文章
相關標籤/搜索