33. 如何手動實現一個協程池?

Hi,你們好,我是明哥。git

在本身學習 Golang 的這段時間裏,我寫了詳細的學習筆記放在個人我的微信公衆號 《Go編程時光》,對於 Go 語言,我也算是個初學者,所以寫的東西應該會比較適合剛接觸的同窗,若是你也是剛學習 Go 語言,不防關注一下,一塊兒學習,一塊兒成長。github

個人在線博客:http://golang.iswbm.com
個人 Github:github.com/iswbm/GolangCodingTimegolang


在 Golang 中要建立一個協程是一件無比簡單的事情,你只要定義一個函數,並使用 go 關鍵字去執行它就好了。編程

若是你接觸過其餘語言,會發現你在使用使用線程時,爲了減小線程頻繁建立銷燬還來的開銷,一般咱們會使用線程池來複用線程。微信

池化技術就是利用複用來提高性能的,那在 Golang 中須要協程池嗎?函數

在 Golang 中,goroutine 是一個輕量級的線程,他的建立、調度都是在用戶態進行,並不須要進入內核,這意味着建立銷燬協程帶來的開銷是很是小的。性能

所以,我認爲大多數狀況下,開發人員是不太須要使用協程池的。學習

但也不排除有某些場景下是須要這樣作,由於我尚未遇到就不說了。線程

拋開是否必要這個問題,單純從技術的角度來看,咱們能夠怎樣實現一個通用的協程池呢?code

下面就來一塊兒學習一下個人寫法

首先定義一個協程池(Pool)結構體,包含兩個屬性,都是 chan 類型的。

一個是 work,用於接收 task 任務

一個是 sem,用於設置協程池大小,便可同時執行的協程數量

type Pool struct {
    work chan func()   // 任務
    sem  chan struct{} // 數量
}

而後定義一個 New 函數,用於建立一個協程池對象,有一個細節須要注意

work 是一個無緩衝通道

而 sem 是一個緩衝通道,size 大小即爲協程池大小

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

最後給協程池對象綁定兩個函數

一、NewTask:往協程池中添加任務

當第一次調用 NewTask 添加任務的時候,因爲 work 是無緩衝通道,因此會必定會走第二個 case 的分支:使用 go worker 開啓一個協程。

func (p *Pool) NewTask(task func()) { 
    select {
        case p.work <- task:
        case p.sem <- struct{}{}:
            go p.worker(task)
    }
}

二、worker:用於執行任務

爲了可以實現協程的複用,這個使用了 for 無限循環,使這個協程在執行完任務後,也不退出,而是一直在接收新的任務。

func (p *Pool) worker(task func()) { 
    defer func() { <-p.sem }()
    for {
        task()
        task = <-p.work
    }
}

這兩個函數是協程池實現的關鍵函數,裏面的邏輯很值得推敲:

一、若是設定的協程池數大於 2,此時第二次傳入往 NewTask 傳入task,select case 的時候,若是第一個協程還在運行中,就必定會走第二個case,從新建立一個協程執行task

二、若是傳入的任務數大於設定的協程池數,而且此時全部的任務都還在運行中,那此時再調用 NewTask 傳入 task ,這兩個 case 都不會命中,會一直阻塞直到有任務執行完成,worker 函數裏的 work 通道才能接收到新的任務,繼續執行。

以上即是協程池的實現過程。

使用它也很簡單,看下面的代碼你就明白了

func main()  {
    pool := New(128)
    pool.NewTask(func(){
        fmt.Println("run task")
    })
}

爲了讓你看到效果,我設置協程池數爲 2,開啓四個任務,都是 sleep 2 秒後,打印當前時間。

func main()  {
    pool := New(2)

    for i := 1; i <5; i++{
        pool.NewTask(func(){
            time.Sleep(2 * time.Second)
            fmt.Println(time.Now())
        })
    }
    
    // 保證全部的協程都執行完畢
    time.Sleep(5 * time.Second)
}

執行結果以下,能夠看到總共 4 個任務,因爲協程池大小爲 2,因此 4 個任務分兩批執行(從打印的時間能夠看出)

2020-05-24 23:18:02.014487 +0800 CST m=+2.005207182
2020-05-24 23:18:02.014524 +0800 CST m=+2.005243650
2020-05-24 23:18:04.019755 +0800 CST m=+4.010435443
2020-05-24 23:18:04.019819 +0800 CST m=+4.010499440

相關文章
相關標籤/搜索