golang協程池throttler實現解析

此次來介紹下,golang協程池的使用,以throttler實現爲例。git

首先介紹如何使用(拿做者github的例子爲例)~github

func ExampleThrottler() {
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somestupidname.com/",
	}
	參數1:啓動的協程數量
	參數2:須要執行的任務數
	t := New(2, len(urls))
	for _, url := range urls {

		// goroutine 啓動
		go func(url string) {
			// 請求url
			err := http.Get(url)
			//讓 throttler知道goroutines什麼時候完成,而後throttler會新任命一個worker
		    t.Done(err)
		}(url)
		errorCount := t.Throttle()
		if errorCount > 0 {
			break
		}
	}
}
複製代碼

雖然做者的readme.md沒寫,可是咱們也可用這樣用golang

package main

import (
	"github.com/throttler"
	"fmt"
)

func main() {

	p := throttler.New(10, 5)

	go func() {
		fmt.Println("hello world1")
		defer p.Done(nil)
	}()
	fmt.Println(1)
	p.Throttle()
	go func() {
		fmt.Println("hello world2")
		p.Done(nil)
	}()
	fmt.Println(2)
	p.Throttle()
	go func() {
		fmt.Println("hello world3")
		p.Done(nil)
	}()
	fmt.Println(3)
	p.Throttle()
	//fmt.Println(err + 3)
	go func() {
		fmt.Println("hello world4")
		p.Done(nil)
	}()
	fmt.Println(4)
	p.Throttle()
	//fmt.Println(err + 2)
	go func() {
		fmt.Println("hello world5")
		p.Done(nil)
	}()
	fmt.Println(5)
	p.Throttle()
}

複製代碼

以上就是Throttle的使用例子,看起來很是簡單,那麼它是如何實現的呢?數組

首先咱們看下throttle的主體結構,後續的操做都圍繞着主體結構實現的安全

// Throttler stores all the information about the number of workers, the active workers and error information
type Throttler struct {
	maxWorkers    int32				// 最大的worker數
	workerCount   int32				// 正在工做的worker數量
	batchingTotal int32
	batchSize     int32				// 
	totalJobs     int32    			// 任務數量的和
	jobsStarted   int32  			// 任務開始的數量(初始值爲0)
	jobsCompleted int32	 			// 任務完成的數量
	doneChan      chan struct{}		// 非緩衝隊列,存儲的一半是count(totalJobs)
	errsMutex     *sync.Mutex		// errMutex的併發
	errs          []error 			// 錯誤數組的集合,通常是業務處理返回的error
	errorCount    int32
}
複製代碼

New操做建立一個協程池bash

func New(maxWorkers, totalJobs int) *Throttler {
	// 若是小於1 panic
	if maxWorkers < 1 {
		panic("maxWorkers has to be at least 1")
	}

	return &Throttler{
		// 最大協程數量
		maxWorkers: int32(maxWorkers),
		batchSize:  1,
		// 全部的任務數
		totalJobs:  int32(totalJobs),
		doneChan:   make(chan struct{}, totalJobs),
		errsMutex:  &sync.Mutex{},
	}
}
複製代碼

當完成一個協程動做併發

func (t *Throttler) Done(err error) {
	if err != nil {
		// 若是出現錯誤,將錯誤追加到struct裏面,由於struct非線程安全,因此須要加鎖
		t.errsMutex.Lock()
		t.errs = append(t.errs, err)
		// errorCount ++
		atomic.AddInt32(&t.errorCount, 1)
		t.errsMutex.Unlock()
	} 
	// 每當一個goroutine進來,向struct寫入一條數據
	t.doneChan <- struct{}{}
}
複製代碼

等待協程完成的函數實現,可能稍微有點複雜app

func (t *Throttler) Throttle() int {
	// 加載任務數  < 1 返回錯誤的數量
	if atomic.LoadInt32(&t.totalJobs) < 1 {
		return int(atomic.LoadInt32(&t.errorCount))
	}

	// jobStarted + 1 
	atomic.AddInt32(&t.jobsStarted, 1)
	// workerCount + 1
	atomic.AddInt32(&t.workerCount, 1)


	// 檢查當前worker的數量是否和maxworker數量一致,等待這個workers完成

	// 實際上就是協程數量到達上限,須要等待運行中的協程釋放資源
	if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
		// 完成jobsCompleted - 1
		atomic.AddInt32(&t.jobsCompleted, 1)
		// workerCount - 1
		atomic.AddInt32(&t.workerCount, -1)
		<-t.doneChan
	}

	// check to see if all of the jobs have been started, and if so, wait until all
	// jobs have been completed before continuing

	// 若是任務開始的數量和總共的任務數一致
	if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
		// 若是完成的數量小於總job數 等待Job完成
		for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
			// jobcomplete + 1
			atomic.AddInt32(&t.jobsCompleted, 1)
			<-t.doneChan
		}
	}

	return int(atomic.LoadInt32(&t.errorCount))
}
複製代碼

簡單枚舉了下實現的流程:函數

假設有2個請求限制,3個請求,它的時序圖是這樣的ui

第一輪

totaljobs = 3
jobstarted = 1 workercount = 1   jobscompleted = 0 totaljobs = 3
複製代碼

第二輪

jobstarted = 2 worker count = 2   jobscompleted = 0 totaljobs = 3
複製代碼

第三輪

jobstarted = 3 worker count = 3 jobscompleted = 0 totaljobs = 3

// 操做1:由於goroutine限制爲2,當前wokercount爲3,須要阻塞,等待協程池釋放

// 協程池釋放:
jobstarted = 3 worker count = 2 jobscompleted = 1 totaljobs = 3

// 操做2:當前jobstarted與totaljobs相等,說明全部任務都已經池化了,則開始阻塞處理


//執行結束:

jobstarted = 3 worker count = 2 jobscompleted = 3 totaljobs = 3

複製代碼

總的來講,該實現也是借用了channel的能力進行阻塞,實現起來仍是很是簡單的~

相關文章
相關標籤/搜索