上一篇文章 我用休眠作併發控制,搞垮了下游服務 發出去後獲得很多網友的迴應,有人問本身平時用的方案行不行,有人建議借鑑TCP的擁塞控制動態地調整發起的併發數,還有人問爲啥我要管下游抗不抗得住。git
今天我就來總結幾種調用下游服務時作併發控制的方案。github
由於咱們這篇文章是科普向的文章,主要目的是總結一下應該怎麼在享受併發帶來效率提高的同時作好併發控制讓整個系統的上下游都能更穩定一些,不對限流、控制到底該哪一個服務加,出了事故誰負責作討論。golang
前面咱們提到用休眠作併發控制的最大弊端是,沒有考慮下游服務的感覺,每次開固定數量的goroutine
去執行任務後,調用者休眠 1s 再來,而不是等待下游服務的反饋再開啓下一批任務執行。算法
func badConcurrency() {
batchSize := 500
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
break
}
for _, item := range data {
go func(i int) {
doSomething(i)
}(item)
}
time.Sleep(time.Second * 1)
}
}
複製代碼
此外上游還有請求分配不均的問題,休眠的時候徹底沒有請求,休眠結束後無論下游有沒有執行完成立刻又發起一批新的請求。編程
因此咱們應該從等待下游反饋和請求分配儘可能均勻兩個角度去作併發控制,固然實際項目中應該是兩方面結合才行。markdown
本文的可執行示例代碼請訪問下面的連接查看:併發
咱們在向下遊發起併發請求時能夠經過限流器作一下限流,若是達到限制就阻塞直到能再次發起請求。一聽到阻塞直到blabla 有的同窗是否是立刻心裏小激動想用 channel
去實現一個限流器啦,「此處應用咳嗽聲」其實徹底不必Golang 官方限流器 time/rate
包的 Wait 方法就能給咱們提供了這個功能。oop
func useRateLimit() {
limiter := rate.NewLimiter(rate.Every(1*time.Second), 500)
batchSize := 500
for {
data, _ :=queryDataWithSizeN(batchSize)
if len(data) == 0 {
fmt.Println("End of all data")
break
}
for _, item := range data {
// 阻塞直到令牌桶有充足的Token
err := limiter.Wait(context.Background())
if err != nil {
fmt.Println("Error: ", err)
return
}
go func(i int) {
doSomething(i)
}(item)
}
}
}
// 模擬調用下游服務
func doSomething(i int) {
time.Sleep(2 * time.Second)
fmt.Println("End:", i)
}
// 模擬查詢N條數據
func queryDataWithSizeN(size int) (dataList []int, err error) {
rand.Seed(time.Now().Unix())
dataList = rand.Perm(size)
return
}
複製代碼
time/rate
包提供的限流器採用的是令牌桶算法,使用Wait
方法是當桶中沒有足夠的令牌時調用者會阻塞直到能渠道令牌,固然也能夠經過Wait
方法接受的Context
參數設置等待超時時間。限流器往桶中放令牌的速率是恆定的這樣比單純使用time.Sleep
請求更均勻些。ui
關於time/rate 限流器的使用方法的詳解,請查看我以前的文章:Golang官方限流器的用法詳解
用了限流器了以後,只是讓咱們的併發請求分佈地更均勻了,最好咱們能在受到下游反饋完成後再開始下次併發。
咱們能夠等上批併發請求都執行完後再開始下一批任務,估計大部分同窗聽到這立刻就會想到應該加WaitGroup
WaitGroup適合用於併發-等待的場景:一個
goroutine
在檢查點(Check Point)等待一組執行任務的 workergoroutine
所有完成,若是在執行任務的這些workergoroutine
還沒所有完成,等待的goroutine
就會阻塞在檢查點,直到全部wokergoroutine
都完成後才能繼續執行。
func useWaitGroup() {
batchSize := 500
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
fmt.Println("End of all data")
break
}
var wg sync.WaitGroup
for _, item := range data {
wg.Add(1)
go func(i int) {
doSomething(i)
wg.Done()
}(item)
}
wg.Wait()
fmt.Println("Next bunch of data")
}
}
複製代碼
這裏調用程序會等待這一批任務都執行完後,再開始查下一批數據進行下一批請求,等待時間取決於這一批請求中最晚返回的那個響應用了多少時間。
若是你不想等一批所有完成後再開始下一批,也能夠採用一個完成後下一個補上的策略,這種比使用WaitGroup
作併發控制,若是下游資源夠,整個任務的處理時間會更快一些。這種策略須要使用信號量(Semaphore)作併發控制,Go 語言裏經過擴展庫golang.org/x/sync/semaphore
提供了信號量併發原語。
關於信號量的使用方法和實現原理,能夠讀讀我之前的文章:併發編程-信號量的使用方法和其實現原理
上面的程序改成使用信號量semaphore.Weighted
作併發控制的示例以下:
func useSemaphore() {
var concurrentNum int64 = 10
var weight int64 = 1
var batchSize int = 50
s := semaphore.NewWeighted(concurrentNum)
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
fmt.Println("End of all data")
break
}
for _, item := range data {
s.Acquire(context.Background(), weight)
go func(i int) {
doSomething(i)
s.Release(weight)
}(item)
}
}
}
複製代碼
也有很多讀者回復說得加線程池才行,由於每一個人公司裏可能都有在用的線程池實現,直接用就行,我在這裏就再也不獻醜給你們實現線程池了。在我看來咱們實際上是須要實現一個生產者和消費者模式,讓線程池幫助咱們限制只有固定梳理的消費者線程去作下游服務的調用,而生產者則是將數據存儲裏取出來。
channel
正好可以做爲二者之間的媒介。
func useChannel() {
batchSize := 50
dataChan := make(chan int)
var wg sync.WaitGroup
wg.Add(batchSize + 1)
// 生產者
go func() {
for {
data, _ := queryDataWithSizeN(batchSize)
if len(data) == 0 {
break
}
for _, item := range data {
dataChan <- item
}
}
close(dataChan)
wg.Done()
}()
// 消費者
go func() {
for i := 0; i < 50; i++ {
go func() {
for {
select {
case v, ok := <- dataChan:
if !ok {
wg.Done()
return
}
doSomething(v)
}
}
}()
}
}()
wg.Wait()
}
複製代碼
這個代碼實現裏,若是用ErrorGroup
代替WaitGroup
的話還能更簡化一些,這個就留給讀者本身探索吧。
關於
ErrorGroup
的用法總結,推薦閱讀文章:以爲WaitGroup很差用?試試ErrorGroup吧!
經過文章裏總結的一些方法,咱們也能看出來併發編程的場景下,除了關注發起的併發線程數外,更重要的是還須要關注被異步調用的下層服務的反饋,不是一味的加併發數就能解決問題的。理解咱們爲何在併發編程中要關注下層服務的反饋是很重要的,不然咱們列舉的那些方案其實均可以在goroutine
裏再開goroutine
,不關心是否執行完成直接返回,無限套娃下去。