此次來介紹下,golang協程池的使用,以throttler
實現爲例。git
首先介紹如何使用(拿做者github的例子爲例)~github
func ExampleThrottler() {
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
參數1:啓動的協程數量
參數2:須要執行的任務數
t := New(2, len(urls))
for _, url := range urls {
// goroutine 啓動
go func(url string) {
// 請求url
err := http.Get(url)
//讓 throttler知道goroutines什麼時候完成,而後throttler會新任命一個worker
t.Done(err)
}(url)
errorCount := t.Throttle()
if errorCount > 0 {
break
}
}
}
複製代碼
雖然做者的readme.md
沒寫,可是咱們也可用這樣用golang
package main
import (
"github.com/throttler"
"fmt"
)
func main() {
p := throttler.New(10, 5)
go func() {
fmt.Println("hello world1")
defer p.Done(nil)
}()
fmt.Println(1)
p.Throttle()
go func() {
fmt.Println("hello world2")
p.Done(nil)
}()
fmt.Println(2)
p.Throttle()
go func() {
fmt.Println("hello world3")
p.Done(nil)
}()
fmt.Println(3)
p.Throttle()
//fmt.Println(err + 3)
go func() {
fmt.Println("hello world4")
p.Done(nil)
}()
fmt.Println(4)
p.Throttle()
//fmt.Println(err + 2)
go func() {
fmt.Println("hello world5")
p.Done(nil)
}()
fmt.Println(5)
p.Throttle()
}
複製代碼
以上就是Throttle
的使用例子,看起來很是簡單,那麼它是如何實現的呢?數組
首先咱們看下throttle
的主體結構,後續的操做都圍繞着主體結構實現的安全
// Throttler stores all the information about the number of workers, the active workers and error information
type Throttler struct {
maxWorkers int32 // 最大的worker數
workerCount int32 // 正在工做的worker數量
batchingTotal int32
batchSize int32 //
totalJobs int32 // 任務數量的和
jobsStarted int32 // 任務開始的數量(初始值爲0)
jobsCompleted int32 // 任務完成的數量
doneChan chan struct{} // 非緩衝隊列,存儲的一半是count(totalJobs)
errsMutex *sync.Mutex // errMutex的併發
errs []error // 錯誤數組的集合,通常是業務處理返回的error
errorCount int32
}
複製代碼
New
操做建立一個協程池bash
func New(maxWorkers, totalJobs int) *Throttler {
// 若是小於1 panic
if maxWorkers < 1 {
panic("maxWorkers has to be at least 1")
}
return &Throttler{
// 最大協程數量
maxWorkers: int32(maxWorkers),
batchSize: 1,
// 全部的任務數
totalJobs: int32(totalJobs),
doneChan: make(chan struct{}, totalJobs),
errsMutex: &sync.Mutex{},
}
}
複製代碼
當完成一個協程動做併發
func (t *Throttler) Done(err error) {
if err != nil {
// 若是出現錯誤,將錯誤追加到struct裏面,由於struct非線程安全,因此須要加鎖
t.errsMutex.Lock()
t.errs = append(t.errs, err)
// errorCount ++
atomic.AddInt32(&t.errorCount, 1)
t.errsMutex.Unlock()
}
// 每當一個goroutine進來,向struct寫入一條數據
t.doneChan <- struct{}{}
}
複製代碼
等待協程完成的函數實現,可能稍微有點複雜app
func (t *Throttler) Throttle() int {
// 加載任務數 < 1 返回錯誤的數量
if atomic.LoadInt32(&t.totalJobs) < 1 {
return int(atomic.LoadInt32(&t.errorCount))
}
// jobStarted + 1
atomic.AddInt32(&t.jobsStarted, 1)
// workerCount + 1
atomic.AddInt32(&t.workerCount, 1)
// 檢查當前worker的數量是否和maxworker數量一致,等待這個workers完成
// 實際上就是協程數量到達上限,須要等待運行中的協程釋放資源
if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
// 完成jobsCompleted - 1
atomic.AddInt32(&t.jobsCompleted, 1)
// workerCount - 1
atomic.AddInt32(&t.workerCount, -1)
<-t.doneChan
}
// check to see if all of the jobs have been started, and if so, wait until all
// jobs have been completed before continuing
// 若是任務開始的數量和總共的任務數一致
if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
// 若是完成的數量小於總job數 等待Job完成
for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
// jobcomplete + 1
atomic.AddInt32(&t.jobsCompleted, 1)
<-t.doneChan
}
}
return int(atomic.LoadInt32(&t.errorCount))
}
複製代碼
簡單枚舉了下實現的流程:函數
假設有2個請求限制,3個請求,它的時序圖是這樣的ui
第一輪
totaljobs = 3
jobstarted = 1 workercount = 1 jobscompleted = 0 totaljobs = 3
複製代碼
第二輪
jobstarted = 2 worker count = 2 jobscompleted = 0 totaljobs = 3
複製代碼
第三輪
jobstarted = 3 worker count = 3 jobscompleted = 0 totaljobs = 3
// 操做1:由於goroutine限制爲2,當前wokercount爲3,須要阻塞,等待協程池釋放
// 協程池釋放:
jobstarted = 3 worker count = 2 jobscompleted = 1 totaljobs = 3
// 操做2:當前jobstarted與totaljobs相等,說明全部任務都已經池化了,則開始阻塞處理
//執行結束:
jobstarted = 3 worker count = 2 jobscompleted = 3 totaljobs = 3
複製代碼
總的來講,該實現也是借用了channel
的能力進行阻塞,實現起來仍是很是簡單的~