goroutine不一樣於thread,threads是操做系統中的對於一個獨立運行實例的描述,不一樣操做系統,對於thread的實現也不盡相同;可是,操做系統並不知道goroutine的存在,goroutine的調度是有Golang運行時進行管理的。啓動thread雖然比process所需的資源要少,可是多個thread之間的上下文切換仍然是須要大量的工做的(寄存器/Program Count/Stack Pointer/...),Golang有本身的調度器,許多goroutine的數據都是共享的,所以goroutine之間的切換會快不少,啓動goroutine所耗費的資源也不多,一個Golang程序同時存在幾百個goroutine是很正常的。前端
channel,即「管道」,是用來傳遞數據(叫消息更爲合適)的一個數據結構,便可以從channel裏面塞數據,也能夠從中獲取數據。channel自己並無什麼神奇的地方,可是channel加上了goroutine,就造成了一種既簡單又強大的請求處理模型,即N個工做goroutine將處理的中間結果或者最終結果放入一個channel,另外有M個工做goroutine從這個channel拿數據,再進行進一步加工,經過組合這種過程,從而勝任各類複雜的業務模型。golang
本身在實踐的過程當中,產生了幾種經過goroutine + channel實現的工做模型,本文分別對這些模型進行介紹。json
直接加上go
關鍵字,就可讓一個函數脫離原先的主函數獨立運行,即主函數直接繼續進行剩下的操做,而不須要等待某個十分耗時的操做完成。好比咱們在寫一個服務模塊,接收到前端請求以後,而後去作一個比較耗時的任務。好比下面這個:後端
func (m *SomeController) PorcessSomeTask() { var task models.Task if err := task.Parse(m.Ctx.Request); err != nil { m.Data["json"] = err m.ServeJson() return } task.Process() m.ServeJson()
若是Process函數須要耗費大量時間的話,這個請求就會被block住。有時候,前端只須要發出一個請求給後端,而且不須要後端當即所處響應。遇到這樣的需求,直接在耗時的函數前面加上go
關鍵字就能夠將請求之間返回給前端了,保證了體驗數據結構
func (m *SomeController) PorcessSomeTask() { var task models.Task if err := task.Parse(m.Ctx.Request); err != nil { m.Data["json"] = err m.ServeJson() return } go task.Process() m.ServeJson()
不過,這種作法也是有許多限制的。好比:架構
上一個方案有一個缺點就是沒法控制併發,若是這一類請求同一個時間段有不少的話,每個請求都啓動一個goroutine,若是每一個goroutine中還須要使用其餘系統資源,消耗將是不可控的。併發
遇到這種狀況,一個解決方案是:將請求都轉發給一個channel,而後初始化多個goroutine讀取這個channel中的內容,並進行處理。假設咱們能夠新建一個全局的channel函數
var TASK_CHANNEL = make(chan models.Task)
而後,啓動多個goroutine:優化
for i := 0; i < WORKER_NUM; i ++ { go func() { for { select { case task := <- TASK_CHANNEL: task.Process() } } } () }
服務端接收到請求以後,將任務傳入channel中便可:spa
func (m *SomeController) PorcessSomeTask() { var task models.Task if err := task.Parse(m.Ctx.Request); err != nil { m.Data["json"] = err m.ServeJson() return } //go task.Process() TASK_CHANNEL <- task m.ServeJson() }
這樣一來,這個操做的併發度就能夠經過WORKER_NUM
來控制了。
不過,上面方案有一個bug:那就是channel初始化時是沒有設置長度的,所以當全部WORKER_NUM
個goroutine都正在處理請求時,再有請求過來的話,仍然會出現被block的狀況,並且會比沒有通過優化的方案還要慢(由於須要等某一個goroutine結束時才能處理它)。所以,須要在channel初始化時增長一個長度:
var TASK_CHANNEL = make(chan models.Task, TASK_CHANNEL_LEN)
這樣一來,咱們將TASK_CHANNEL_LEN
設置得足夠大,請求就能夠同時接收TASK_CHANNEL_LEN
個請求而不用擔憂被block。不過,這其實仍是有問題的:那若是真的同時有大於TASK_CHANNEL_LEN
個請求過來呢?一方面,這就應該算是架構方面的問題了,能夠經過對模塊進行擴容等操做進行解決。另外一方面,模塊自己也要考慮如何進行「優雅降級了」。遇到這種狀況,咱們應該但願模塊可以及時告知調用方,「我已經達處處理極限了,沒法給你處理請求了」。其實,這種需求,能夠很簡單的在Golang中實現:若是channel發送以及接收操做在select語句中執行而且發生阻塞,default語句就會當即執行。
select { case TASK_CHANNEL <- task: //do nothing default: //warnning! return fmt.Errorf("TASK_CHANNEL is full!") } //...
若是處理程序比較複雜的時候,一般都會出如今一個goroutine中,還會發送一些中間處理的結果發送給其餘goroutine去作,通過多道「工序」才能最終將結果產出。
那麼,咱們既須要把某一箇中間結果發送給某個channel,也要能獲取處處理此次請求的結果。解決的方法是:將一個channel實例包含在請求中,goroutine處理完成後將結果寫回這個channel。
type TaskResponse struct { //... } type Task struct { TaskParameter SomeStruct ResChan *chan TaskResponse } //... task := Task { TaskParameter : xxx, ResChan : make(chan TaskResponse), } TASK_CHANNEL <- task res := <- task.ResChan //...
(這邊可能會有疑問:爲何不把一個複雜的任務都放在一個goroutine中依次的執行呢?是由於這裏須要考慮到不一樣子任務,所消耗的系統資源不盡相同,有些是CPU集中的,有些是IO集中的,因此須要對這些子任務設置不一樣的併發數,所以須要經由不一樣的channel + goroutine去完成。)
將任務通過分組,交由不一樣的goroutine進行處理,最終再將每一個goroutine處理的結果進行合併,這個是比較常見的處理流程。這裏須要用到WaitGroup
來對一組goroutine進行同步。通常的處理流程以下:
var wg sync.WaitGroup for i := 0; i < someLen; i ++ { wg.Add(1) go func(t Task) { defer wg.Done() //對某一段子任務進行處理 } (tasks[i]) } wg.Wait() //處理剩下的工做
即便是複雜、耗時的任務,也必須設置超時時間。一方面多是業務對此有時限要求(用戶必須在XX分鐘內看到結果),另外一方面模塊自己也不能都消耗在一直沒法結束的任務上,使得其餘請求沒法獲得正常處理。所以,也須要對處理流程增長超時機制。
我通常設置超時的方案是:和以前提到的「接收發送給channel以後返回的結果」結合起來,在等待返回channel的外層添加select
,並在其中經過time.After()
來判斷超時。
task := Task { TaskParameter : xxx, ResChan : make(chan TaskResponse), } select { case res := <- task.ResChan: //... case <- time.After(PROCESS_MAX_TIME): //處理超時 }
既然有了超時機制,那也須要一種機制來告知其餘goroutine結束手上正在作的事情並退出。很明顯,仍是須要利用channel來進行交流,第一個想到的確定就是向某一個chan發送一個struct便可。好比執行任務的goroutine在參數中,增長一個chan struct{}
類型的參數,當接收到該channel的消息時,就退出任務。可是,還須要解決兩個問題:
對於第一個問題,比較優雅的做法是:使用另一個channel做爲函數d輸出,再加上select
,就能夠一邊輸出結果,一邊接收退出信號了。
另外一方面,對於同時有未知數目個執行goroutine的狀況,一次次調用done <-struct{}{}
,顯然沒法實現。這時候,就會用到golang對於channel的tricky用法:當關閉一個channel時,全部由於接收該channel而阻塞的語句會當即返回。示例代碼以下:
// 執行方 func doTask(done <-chan struct{}, tasks <-chan Task) (chan Result) { out := make(chan Result) go func() { // close 是爲了讓調用方的range可以正常退出 defer close(out) for t := range tasks { select { case result <-f(task): case <-done: return } } }() return out } // 調用方 func Process(tasks <-chan Task, num int) { done := make(chan struct{}) out := doTask(done, tasks) go func() { <- time.After(MAX_TIME) //done <-struct{}{} //通知全部的執行goroutine退出 close(done) }() // 由於goroutine執行完畢,或者超時,致使out被close,range退出 for res := range out { fmt.Println(res) //... } }