前段時間由於一個爬蟲項目,最開始作的時候是無腦的一個下載任務就開一個協程,可是後期出現了比較大的內存問題,而且下載的效果也不是那麼的好,後面發現是由於協程開的太多了,而且下行的帶寬就只有那麼的大,因此並不能和想象中的那樣。哎,仍是由於too young,too simple,sometimes naive.git
這篇主要是講解的tunny是如何是如何實現並保持一個goroutine pool的。github
由於本人是小菜,加上時間倉促,因此要是有什麼問題的話但願大佬指正。安全
tunny地址:https://github.com/Jeffail/tunny
這是一個goroutine pool包,能夠設置或者動態改變goroutine pool中goroutine的數量,生成一個固定的數量的pool,實現goroutine的重複使用,而且能在必定程度上控制goroutineapp
1.基本的數據類型函數
經過tunny的源碼包文件數量並很少,只有3個文件,tonny.go和worker.go,沒有那麼多的文件層次結構,因此閱讀起來特別的方便。這也是我比較喜歡閱讀go語言代碼的緣由。this
tunny.go中spa
Pool結構
主要是用於對整個pool的管理,其中包括poolcode
type Pool struct { ctor func() Worker //goroutine中用戶的業務邏輯函數 workers []*workerWrapper //目前已經存在的goroutine信息,workerWrapper結構定義在worker.go的中, reqChan chan workRequest //任務調度管道,主要是用戶管理當前goroutine是否執行任務,它和workerWrapper中的reqChan 實際上是一個,可是workerWrapper的reqChan只是一個發送管道,這個後面會繼續講解 workerMut sync.Mutex //鎖 queuedJobs int64 計數,表示當前已經在運行的任務 }
worker接口主要用戶包裝用戶的業務邏輯的funcorm
type Worker interface { // Process will synchronously perform a job and return the result. // Process(interface{}) interface{} // BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady() // Interrupt is called when a job is cancelled. The worker is responsible // for unblocking the Process implementation. Interrupt() // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }
closureWorker 顧明思議,主要是用於包裝用戶的業務邏輯,
而且是Worker的徹底接收者協程
type closureWorker struct { processor func(interface{}) interface{} }
在worker.go中
type workerWrapper struct { worker Worker //用戶存放用戶定義的業務邏輯函數 interruptChan chan struct{} //用於外部干預,使當前goroutine提早終止 // reqChan is NOT owned by this type, it is used to send requests for work. reqChan chan<- workRequest //這個和pool.go中Pool類型中的reqChan是一個,只不過當前這個是一個發送管道 // closeChan can be closed in order to cleanly shutdown this worker. closeChan chan struct{} //這個是用於傳遞關閉當前goroutine的消息 // closedChan is closed by the run() goroutine when it exits. closedChan chan struct{} //這個我感受並無太大的實際意義 }
這個主要是用於傳遞任務參數。以及返回任務執行結果的類型
type workRequest struct { // jobChan is used to send the payload to this worker. jobChan chan<- interface{} // retChan is used to read the result from this worker. retChan <-chan interface{} // interruptFunc can be called to cancel a running job. When called it is no // longer necessary to read from retChan. interruptFunc func() }
2.如何建立一個goroutine pool
根據代碼的調用步驟,
首先是實例化一個Pool類型的數據,並將用戶用戶的業務func包裝成closureWorker類型並存儲在Pool類型實例中的ctor字段中
使用外部調用建立一個Pool對象:
包中建立一個Pool的邏輯
邏輯很簡單,一眼就能看明白。
那麼在哪裏啓動一個goroutine,請看下面
注意這裏的參數傳遞,這裏傳遞了一個channel類型的參數,衆所周知,在go中,分爲兩種類型,一種是值類型,一種是引用類型(map,slice,channel),說這麼多有什麼用呢,怎麼扯到引用類型上面去了呢,但這個很重要
咱們接下咱們看在newWorkerWrapper中的邏輯
上面說到,咱們傳遞過去了兩個參數,其中一個是一個channel類型的,由於channel引用類型,因此他的傳遞是地址,因此在最後newWorkerWrapper中賦值的時候workerWrapper.reqChan和pool.reqChan實際指向的是同一個地址,區別就是workerWrapper.reqChan是一個發送管道罷了
咱們能夠輸出看看
下面是run函數中的代碼
run函數中的代碼算是是整個包中最重要的代碼了。
他的實現原理是比較簡單的,就是採用的是一個for+select+channel來實現的,而且select採用是嵌套的形式,可是其中仍是有些比較難以理解的(固然對我小白我來講哈,2333333)
我感受主要是這兩段
這兩段的代碼,須要結合到下一個小姐來講,請看下一個。
2.調用goroutine pool
這裏調用很簡單,只須要ret := pool.Process(參數)就ok了
咱們來看看Process中是怎麼樣的
Process中邏輯很簡單,上一個小姐咱們知道,pool的reqChan 和 pool.workers.reqChan 是指向的同一個地址,可是後者爲一個發送管道因此,在這樣來使用時安全的,數據是不會錯誤的 。
在前面個人run函數中,有兩段代碼還沒說明意思,如今我就說明一下,第一個就是這段,
(1)在咱們定義reqChan管道的時候,咱們定義的是一個沒有緩衝區的管道,因此在沒有接受操做的狀況下,咱們向管道里面推送數據是會被阻塞住的。
(2)在go中select是在有IO操做的狀況下會被觸發,因此要是咱們沒有在Process函數中調用reqChan接收數據,當前goroutine是會被阻塞住的這樣當前select內層的select也會被阻塞住。
而後咱們在來看經過reqChan傳遞過來的值
上面講到,channel是引用類型,因此它在傳遞的時候是傳遞的地址,而不是值,因此,咱們接收到的jobChan和retChan和傳遞過來指向的是一樣的地址,這樣咱們就能實現共享通訊了。咱們能夠輸出裏面兩邊的地址看看,這裏我開了一個容量爲2的pool,而後我調用pool裏面的其中一個goroutine,咱們看打印的地址
看。。。。沒錯吧。。。。。
有一個問題,就是當咱們的pool有2個goroutine的時候,可是咱們有200個任務須要完成,也就是須要調用200測goroutine,Tunny是怎麼樣實現調度的呢,這個後面的文章補充吧,下班。。。。。。。。
算了仍是在這裏寫吧。。。。。
對於前面的問題其實很簡單。由於我在建立了一個Pool的時候,就只開了2個goroutine,而且使用的是一個雙層的select,第一層是reqChan發送管道阻塞住的
因此就算你這時候同時執行了100個pool.Process(10),可是每次同時也只能有兩個消息從reqChan發送管道發出,其餘的98個reqChan接收管道都會阻塞住。