Goroutine + Channel 實踐

goroutine不一樣於thread,threads是操做系統中的對於一個獨立運行實例的描述,不一樣操做系統,對於thread的實現也不盡相同;可是,操做系統並不知道goroutine的存在,goroutine的調度是有Golang運行時進行管理的。啓動thread雖然比process所需的資源要少,可是多個thread之間的上下文切換仍然是須要大量的工做的(寄存器/Program Count/Stack Pointer/...),Golang有本身的調度器,許多goroutine的數據都是共享的,所以goroutine之間的切換會快不少,啓動goroutine所耗費的資源也不多,一個Golang程序同時存在幾百個goroutine是很正常的。前端

channel,即「管道」,是用來傳遞數據(叫消息更爲合適)的一個數據結構,便可以從channel裏面塞數據,也能夠從中獲取數據。channel自己並無什麼神奇的地方,可是channel加上了goroutine,就造成了一種既簡單又強大的請求處理模型,即N個工做goroutine將處理的中間結果或者最終結果放入一個channel,另外有M個工做goroutine從這個channel拿數據,再進行進一步加工,經過組合這種過程,從而勝任各類複雜的業務模型。golang

模型

本身在實踐的過程當中,產生了幾種經過goroutine + channel實現的工做模型,本文分別對這些模型進行介紹。json

V0.1: go關鍵字

直接加上go關鍵字,就可讓一個函數脫離原先的主函數獨立運行,即主函數直接繼續進行剩下的操做,而不須要等待某個十分耗時的操做完成。好比咱們在寫一個服務模塊,接收到前端請求以後,而後去作一個比較耗時的任務。好比下面這個:後端

func (m *SomeController) PorcessSomeTask() {
    var task models.Task
    if err := task.Parse(m.Ctx.Request); err != nil {
        m.Data["json"] = err 
        m.ServeJson()
        return
    }
    task.Process()
    m.ServeJson()

若是Process函數須要耗費大量時間的話,這個請求就會被block住。有時候,前端只須要發出一個請求給後端,而且不須要後端當即所處響應。遇到這樣的需求,直接在耗時的函數前面加上go關鍵字就能夠將請求之間返回給前端了,保證了體驗數據結構

func (m *SomeController) PorcessSomeTask() {
    var task models.Task
    if err := task.Parse(m.Ctx.Request); err != nil {
        m.Data["json"] = err 
        m.ServeJson()
        return
    }
    go task.Process()
    m.ServeJson()

不過,這種作法也是有許多限制的。好比:架構

  • 只能在前端不須要當即獲得後端處理的結果的狀況下
  • 這種請求的頻率不該該很大,由於目前的作法沒有控制併發

V0.2: 併發控制

上一個方案有一個缺點就是沒法控制併發,若是這一類請求同一個時間段有不少的話,每個請求都啓動一個goroutine,若是每一個goroutine中還須要使用其餘系統資源,消耗將是不可控的。併發

遇到這種狀況,一個解決方案是:將請求都轉發給一個channel,而後初始化多個goroutine讀取這個channel中的內容,並進行處理。假設咱們能夠新建一個全局的channel函數

var TASK_CHANNEL = make(chan models.Task)

而後,啓動多個goroutine:優化

for i := 0; i < WORKER_NUM; i ++ {
    go func() {
        for {
            select {
            case task := <- TASK_CHANNEL:
                task.Process()
            }
        }
    } ()
}

服務端接收到請求以後,將任務傳入channel中便可:spa

func (m *SomeController) PorcessSomeTask() {
    var task models.Task
    if err := task.Parse(m.Ctx.Request); err != nil {
        m.Data["json"] = err 
        m.ServeJson()
        return
    }
    //go task.Process()
    TASK_CHANNEL <- task
    m.ServeJson()
}

這樣一來,這個操做的併發度就能夠經過WORKER_NUM來控制了。

V0.3: 處理channel滿的狀況

不過,上面方案有一個bug:那就是channel初始化時是沒有設置長度的,所以當全部WORKER_NUM個goroutine都正在處理請求時,再有請求過來的話,仍然會出現被block的狀況,並且會比沒有通過優化的方案還要慢(由於須要等某一個goroutine結束時才能處理它)。所以,須要在channel初始化時增長一個長度:

var TASK_CHANNEL = make(chan models.Task, TASK_CHANNEL_LEN)

這樣一來,咱們將TASK_CHANNEL_LEN設置得足夠大,請求就能夠同時接收TASK_CHANNEL_LEN個請求而不用擔憂被block。不過,這其實仍是有問題的:那若是真的同時有大於TASK_CHANNEL_LEN個請求過來呢?一方面,這就應該算是架構方面的問題了,能夠經過對模塊進行擴容等操做進行解決。另外一方面,模塊自己也要考慮如何進行「優雅降級了」。遇到這種狀況,咱們應該但願模塊可以及時告知調用方,「我已經達處處理極限了,沒法給你處理請求了」。其實,這種需求,能夠很簡單的在Golang中實現:若是channel發送以及接收操做在select語句中執行而且發生阻塞,default語句就會當即執行

select {
case TASK_CHANNEL <- task:
    //do nothing
default:
    //warnning!
    return fmt.Errorf("TASK_CHANNEL is full!")
}
//...

V0.4: 接收發送給channel以後返回的結果

若是處理程序比較複雜的時候,一般都會出如今一個goroutine中,還會發送一些中間處理的結果發送給其餘goroutine去作,通過多道「工序」才能最終將結果產出。

那麼,咱們既須要把某一箇中間結果發送給某個channel,也要能獲取處處理此次請求的結果。解決的方法是:將一個channel實例包含在請求中,goroutine處理完成後將結果寫回這個channel

type TaskResponse struct {
    //...
}

type Task struct {
    TaskParameter   SomeStruct
    ResChan         *chan TaskResponse
}

//...

task := Task {
    TaskParameter   : xxx,
    ResChan         : make(chan TaskResponse),
}

TASK_CHANNEL <- task
res := <- task.ResChan
//...

(這邊可能會有疑問:爲何不把一個複雜的任務都放在一個goroutine中依次的執行呢?是由於這裏須要考慮到不一樣子任務,所消耗的系統資源不盡相同,有些是CPU集中的,有些是IO集中的,因此須要對這些子任務設置不一樣的併發數,所以須要經由不一樣的channel + goroutine去完成。)

V0.5: 等待一組goroutine的返回

將任務通過分組,交由不一樣的goroutine進行處理,最終再將每一個goroutine處理的結果進行合併,這個是比較常見的處理流程。這裏須要用到WaitGroup來對一組goroutine進行同步。通常的處理流程以下:

var wg sync.WaitGroup
for i := 0; i < someLen; i ++ {
    wg.Add(1)
    go func(t Task) {
        defer wg.Done()
        //對某一段子任務進行處理
    } (tasks[i])
}

wg.Wait()
//處理剩下的工做

 

V0.6: 超時機制

即便是複雜、耗時的任務,也必須設置超時時間。一方面多是業務對此有時限要求(用戶必須在XX分鐘內看到結果),另外一方面模塊自己也不能都消耗在一直沒法結束的任務上,使得其餘請求沒法獲得正常處理。所以,也須要對處理流程增長超時機制。

我通常設置超時的方案是:和以前提到的「接收發送給channel以後返回的結果」結合起來,在等待返回channel的外層添加select,並在其中經過time.After()來判斷超時。

task := Task {
    TaskParameter   : xxx,
    ResChan         : make(chan TaskResponse),
}

select {
case res := <- task.ResChan:
    //...
case <- time.After(PROCESS_MAX_TIME):
    //處理超時
}

V0.7: 廣播機制

既然有了超時機制,那也須要一種機制來告知其餘goroutine結束手上正在作的事情並退出。很明顯,仍是須要利用channel來進行交流,第一個想到的確定就是向某一個chan發送一個struct便可。好比執行任務的goroutine在參數中,增長一個chan struct{}類型的參數,當接收到該channel的消息時,就退出任務。可是,還須要解決兩個問題:

  1. 怎樣能在執行任務的同時去接收這個消息呢?
  2. 如何通知全部的goroutine?

對於第一個問題,比較優雅的做法是:使用另一個channel做爲函數d輸出,再加上select,就能夠一邊輸出結果,一邊接收退出信號了。

另外一方面,對於同時有未知數目個執行goroutine的狀況,一次次調用done <-struct{}{},顯然沒法實現。這時候,就會用到golang對於channel的tricky用法:當關閉一個channel時,全部由於接收該channel而阻塞的語句會當即返回。示例代碼以下:

// 執行方
func doTask(done <-chan struct{}, tasks <-chan Task) (chan Result) {
    out := make(chan Result)
    go func() {
        // close 是爲了讓調用方的range可以正常退出
        defer close(out)
        for t := range tasks {
            select {
            case result <-f(task):
            case <-done:
                return
            }
        }
    }()

    return out
}

// 調用方
func Process(tasks <-chan Task, num int) {
    done := make(chan struct{})
    out := doTask(done, tasks)

    go func() {
        <- time.After(MAX_TIME)
        //done <-struct{}{}

        //通知全部的執行goroutine退出
        close(done)
    }()

    // 由於goroutine執行完畢,或者超時,致使out被close,range退出
    for res := range out {
        fmt.Println(res)
        //...
    }
}
相關文章
相關標籤/搜索