goroutine併發實踐(協程池+超時+錯誤快返回)

當咱們使用goroutine的時候讓函數併發執行的時候,能夠藉助着sync.WaitGroup{}的能力,其中代碼以下:golang

func testGoroutine() {
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			wg.Done()
			fmt.Println("hello world")
		}()
	}
	wg.Wait()
}
複製代碼

看完上述代碼此時咱們須要考慮的是,假設goroutine由於一些rpc請求過慢致使hang住,此時goroutine會一直卡住在wg.Wait(),最終致使請求失敗bash

除非你使用的框架提供了一個超時的能力,或者你go出去的rpc請求存在超時斷開的能力併發

那麼咱們如何讓代碼不被hang住呢?

最簡單的解法就是增長超時!框架

實際上超時也有不少解法函數

  • 基於ctxcontext.WithTimeOut()實現
  • 基於select實現

這裏我選擇基於select實現超時來給你們看下代碼如何實現post

func testWithGoroutineTimeOut() {
	var wg sync.WaitGroup
	done := make(chan struct{})
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
		}()
	}
	// wg.Wait()此時也要go出去,防止在wg.Wait()出堵住
	go func() {
		wg.Wait()
		close(done)
	}()
	select {
	// 正常結束完成
	case <-done:
	// 超時	
	case <-time.After(500 * time.Millisecond):
	}
}
複製代碼

能夠看到上述代碼,已經基於select實現了超時,是否是很是簡單呢~ui

可是咱們對於這個接口會有更高的要求。spa

  • goroutine沒有錯誤處理,
  • 此時go出去的goroutine數量是依賴for循環的數量,假設for循環100w次,形成goroutine過多的問題

能夠寫一個協程池解決goroutine過多,,那麼協程池如何實現呢?code

咱們能夠使用sync waitGroup+ 非阻塞channel實現 代碼以下:協程

package ezgopool

import "sync"

// goroutine pool
type GoroutinePool struct {
	c  chan struct{}
	wg *sync.WaitGroup
}

// 採用有緩衝channel實現,當channel滿的時候阻塞
func NewGoroutinePool(maxSize int) *GoroutinePool {
	if maxSize <= 0 {
		panic("max size too small")
	}
	return &GoroutinePool{
		c:  make(chan struct{}, maxSize),
		wg: new(sync.WaitGroup),
	}
}

// add
func (g *GoroutinePool) Add(delta int) {
	g.wg.Add(delta)
	for i := 0; i < delta; i++ {
		g.c <- struct{}{}
	}

}

// done
func (g *GoroutinePool) Done() {
	<-g.c
	g.wg.Done()
}

// wait
func (g *GoroutinePool) Wait() {
	g.wg.Wait()
}

複製代碼

以上就是協程池的實現,實際是很是簡單的,個人博客也記錄了另外一個golang協程池的開源實現,具體見 juejin.im/post/5d4f9f…

而後最後咱們的超時+錯誤快返回+協程池模型就完成了~

func testGoroutineWithTimeOut() {
	 wg :=sync.WaitGroup{}
	done := make(chan struct{})
	// 新增阻塞chan
	errChan := make(chan error)

	pool.NewGoroutinePool(10)
	for i := 0; i < 10; i++ {
		pool.Add(1)
		go func() {
			pool.Done()
			if err!=nil{
				errChan<-errors.New("error")
			}
		}()
	}

	go func() {
		pool.Wait()
		close(done)
	}()

	select {
	// 錯誤快返回,適用於get接口
	case err := <-errChan:
		return nil, err
	case <-done:
	case <-time.After(500 * time.Millisecond):
	}
}

複製代碼

謝謝

相關文章
相關標籤/搜索