當咱們使用goroutine
的時候讓函數併發執行的時候,能夠藉助着sync.WaitGroup{}
的能力,其中代碼以下:golang
func testGoroutine() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
wg.Done()
fmt.Println("hello world")
}()
}
wg.Wait()
}
複製代碼
看完上述代碼此時咱們須要考慮的是,假設goroutine
由於一些rpc
請求過慢致使hang
住,此時goroutine
會一直卡住在wg.Wait()
,最終致使請求失敗bash
除非你使用的框架提供了一個超時的能力,或者你go
出去的rpc
請求存在超時斷開的能力併發
hang
住呢?最簡單的解法就是增長超時!框架
實際上超時也有不少解法函數
ctx
的context.WithTimeOut()
實現select
實現這裏我選擇基於select
實現超時來給你們看下代碼如何實現post
func testWithGoroutineTimeOut() {
var wg sync.WaitGroup
done := make(chan struct{})
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
}()
}
// wg.Wait()此時也要go出去,防止在wg.Wait()出堵住
go func() {
wg.Wait()
close(done)
}()
select {
// 正常結束完成
case <-done:
// 超時
case <-time.After(500 * time.Millisecond):
}
}
複製代碼
能夠看到上述代碼,已經基於select
實現了超時,是否是很是簡單呢~ui
可是咱們對於這個接口會有更高的要求。spa
goroutine
沒有錯誤處理,go
出去的goroutine
數量是依賴for
循環的數量,假設for
循環100w次,形成goroutine
過多的問題能夠寫一個協程池解決goroutine
過多,,那麼協程池如何實現呢?code
咱們能夠使用sync waitGroup
+ 非阻塞channel
實現 代碼以下:協程
package ezgopool
import "sync"
// goroutine pool
type GoroutinePool struct {
c chan struct{}
wg *sync.WaitGroup
}
// 採用有緩衝channel實現,當channel滿的時候阻塞
func NewGoroutinePool(maxSize int) *GoroutinePool {
if maxSize <= 0 {
panic("max size too small")
}
return &GoroutinePool{
c: make(chan struct{}, maxSize),
wg: new(sync.WaitGroup),
}
}
// add
func (g *GoroutinePool) Add(delta int) {
g.wg.Add(delta)
for i := 0; i < delta; i++ {
g.c <- struct{}{}
}
}
// done
func (g *GoroutinePool) Done() {
<-g.c
g.wg.Done()
}
// wait
func (g *GoroutinePool) Wait() {
g.wg.Wait()
}
複製代碼
以上就是協程池的實現,實際是很是簡單的,個人博客也記錄了另外一個golang
協程池的開源實現,具體見 juejin.im/post/5d4f9f…
而後最後咱們的超時+錯誤快返回+協程池模型就完成了~
func testGoroutineWithTimeOut() {
wg :=sync.WaitGroup{}
done := make(chan struct{})
// 新增阻塞chan
errChan := make(chan error)
pool.NewGoroutinePool(10)
for i := 0; i < 10; i++ {
pool.Add(1)
go func() {
pool.Done()
if err!=nil{
errChan<-errors.New("error")
}
}()
}
go func() {
pool.Wait()
close(done)
}()
select {
// 錯誤快返回,適用於get接口
case err := <-errChan:
return nil, err
case <-done:
case <-time.After(500 * time.Millisecond):
}
}
複製代碼
謝謝