golang源碼閱讀---tunny協程池的基本實現原理

前段時間由於一個爬蟲項目,最開始作的時候是無腦的一個下載任務就開一個協程,可是後期出現了比較大的內存問題,而且下載的效果也不是那麼的好,後面發現是由於協程開的太多了,而且下行的帶寬就只有那麼的大,因此並不能和想象中的那樣。哎,仍是由於too young,too simple,sometimes naive.git

這篇主要是講解的tunny是如何是如何實現並保持一個goroutine pool的。github

由於本人是小菜,加上時間倉促,因此要是有什麼問題的話但願大佬指正。安全

1.簡介

tunny地址:https://github.com/Jeffail/tunny
這是一個goroutine pool包,能夠設置或者動態改變goroutine pool中goroutine的數量,生成一個固定的數量的pool,實現goroutine的重複使用,而且能在必定程度上控制goroutineapp

2.源碼

1.基本的數據類型函數

經過tunny的源碼包文件數量並很少,只有3個文件,tonny.go和worker.go,沒有那麼多的文件層次結構,因此閱讀起來特別的方便。這也是我比較喜歡閱讀go語言代碼的緣由。this

tunny.go中spa

Pool結構
主要是用於對整個pool的管理,其中包括poolcode

type Pool struct {
    ctor    func() Worker //goroutine中用戶的業務邏輯函數
    workers []*workerWrapper //目前已經存在的goroutine信息,workerWrapper結構定義在worker.go的中,
    reqChan chan workRequest //任務調度管道,主要是用戶管理當前goroutine是否執行任務,它和workerWrapper中的reqChan 實際上是一個,可是workerWrapper的reqChan只是一個發送管道,這個後面會繼續講解

    workerMut  sync.Mutex //鎖
    queuedJobs int64 計數,表示當前已經在運行的任務
}

worker接口主要用戶包裝用戶的業務邏輯的funcorm

type Worker interface {
    // Process will synchronously perform a job and return the result.
    //
    Process(interface{}) interface{}

    // BlockUntilReady is called before each job is processed and must block the
    // calling goroutine until the Worker is ready to process the next job.
    BlockUntilReady()

    // Interrupt is called when a job is cancelled. The worker is responsible
    // for unblocking the Process implementation.
    Interrupt()

    // Terminate is called when a Worker is removed from the processing pool
    // and is responsible for cleaning up any held resources.
    Terminate()
}

closureWorker 顧明思議,主要是用於包裝用戶的業務邏輯,
而且是Worker的徹底接收者協程

type closureWorker struct {
    processor func(interface{}) interface{}
}

在worker.go中

type workerWrapper struct {
    worker        Worker  //用戶存放用戶定義的業務邏輯函數
    interruptChan chan struct{} //用於外部干預,使當前goroutine提早終止

    // reqChan is NOT owned by this type, it is used to send requests for work.
    reqChan chan<- workRequest //這個和pool.go中Pool類型中的reqChan是一個,只不過當前這個是一個發送管道

    // closeChan can be closed in order to cleanly shutdown this worker.
    closeChan chan struct{}  //這個是用於傳遞關閉當前goroutine的消息

    // closedChan is closed by the run() goroutine when it exits.
    closedChan chan struct{} //這個我感受並無太大的實際意義
}

這個主要是用於傳遞任務參數。以及返回任務執行結果的類型

type workRequest struct {
    // jobChan is used to send the payload to this worker.
    jobChan chan<- interface{}

    // retChan is used to read the result from this worker.
    retChan <-chan interface{}

    // interruptFunc can be called to cancel a running job. When called it is no
    // longer necessary to read from retChan.
    interruptFunc func()
}

2.如何建立一個goroutine pool

根據代碼的調用步驟,
首先是實例化一個Pool類型的數據,並將用戶用戶的業務func包裝成closureWorker類型並存儲在Pool類型實例中的ctor字段中

使用外部調用建立一個Pool對象:

clipboard.png

包中建立一個Pool的邏輯

clipboard.png

clipboard.png

邏輯很簡單,一眼就能看明白。
那麼在哪裏啓動一個goroutine,請看下面

clipboard.png

注意這裏的參數傳遞,這裏傳遞了一個channel類型的參數,衆所周知,在go中,分爲兩種類型,一種是值類型,一種是引用類型(map,slice,channel),說這麼多有什麼用呢,怎麼扯到引用類型上面去了呢,但這個很重要

咱們接下咱們看在newWorkerWrapper中的邏輯

clipboard.png

上面說到,咱們傳遞過去了兩個參數,其中一個是一個channel類型的,由於channel引用類型,因此他的傳遞是地址,因此在最後newWorkerWrapper中賦值的時候workerWrapper.reqChan和pool.reqChan實際指向的是同一個地址,區別就是workerWrapper.reqChan是一個發送管道罷了
咱們能夠輸出看看

clipboard.png

下面是run函數中的代碼

clipboard.png

run函數中的代碼算是是整個包中最重要的代碼了。

他的實現原理是比較簡單的,就是採用的是一個for+select+channel來實現的,而且select採用是嵌套的形式,可是其中仍是有些比較難以理解的(固然對我小白我來講哈,2333333)

我感受主要是這兩段
clipboard.png

這兩段的代碼,須要結合到下一個小姐來講,請看下一個。

2.調用goroutine pool

這裏調用很簡單,只須要ret := pool.Process(參數)就ok了

咱們來看看Process中是怎麼樣的

clipboard.png

Process中邏輯很簡單,上一個小姐咱們知道,pool的reqChan 和 pool.workers.reqChan 是指向的同一個地址,可是後者爲一個發送管道因此,在這樣來使用時安全的,數據是不會錯誤的 。

在前面個人run函數中,有兩段代碼還沒說明意思,如今我就說明一下,第一個就是這段,

clipboard.png

(1)在咱們定義reqChan管道的時候,咱們定義的是一個沒有緩衝區的管道,因此在沒有接受操做的狀況下,咱們向管道里面推送數據是會被阻塞住的。
(2)在go中select是在有IO操做的狀況下會被觸發,因此要是咱們沒有在Process函數中調用reqChan接收數據,當前goroutine是會被阻塞住的這樣當前select內層的select也會被阻塞住。

clipboard.png

而後咱們在來看經過reqChan傳遞過來的值

clipboard.png

上面講到,channel是引用類型,因此它在傳遞的時候是傳遞的地址,而不是值,因此,咱們接收到的jobChan和retChan和傳遞過來指向的是一樣的地址,這樣咱們就能實現共享通訊了。咱們能夠輸出裏面兩邊的地址看看,這裏我開了一個容量爲2的pool,而後我調用pool裏面的其中一個goroutine,咱們看打印的地址

clipboard.png

看。。。。沒錯吧。。。。。

3.Extra

有一個問題,就是當咱們的pool有2個goroutine的時候,可是咱們有200個任務須要完成,也就是須要調用200測goroutine,Tunny是怎麼樣實現調度的呢,這個後面的文章補充吧,下班。。。。。。。。

算了仍是在這裏寫吧。。。。。

對於前面的問題其實很簡單。由於我在建立了一個Pool的時候,就只開了2個goroutine,而且使用的是一個雙層的select,第一層是reqChan發送管道阻塞住的

clipboard.png

因此就算你這時候同時執行了100個pool.Process(10),可是每次同時也只能有兩個消息從reqChan發送管道發出,其餘的98個reqChan接收管道都會阻塞住。

相關文章
相關標籤/搜索