Golang 在異步處理上有着上佳的表現。由於 goroutines 和 channels 是很是容易使用且有效的異步處理手段。下面咱們一塊兒來看一看 Golang 的簡易任務隊列golang
有些時候,咱們須要作異步處理可是並不須要一個任務對列,這類問題咱們使用 Golang 能夠很是簡單的實現。以下:安全
go process(job)
這的確是不少場景下的絕佳選擇,好比操做一個HTTP請求等待結果。然而,在一些相對複雜高併發的場景下,你就不能簡單的使用該方法來實現異步處理。這時候,你須要一個隊列來管理須要處理的任務,而且按照必定的順序來處理這些任務。服務器
接下來看一個最簡單的任務隊列和工做者模型。併發
func worker(jobChan <-chan Job) { for job := range jobChan { process(job) } } // make a channel with a capacity of 100. jobChan := make(chan Job, 100) // start the worker go worker(jobChan) // enqueue a job jobChan <- job
代碼中建立了一個 Job 對象的 channel , 容量爲100。而後開啓一個工做者協程從 channel 中去除任務並執行。任務的入隊操做就是將一個 Job 對象放入任務 channel 中。異步
雖然上面只有短短的幾行代碼,卻完成了不少的工做。咱們實現了一個簡易的線程安全的、支持併發的、可靠的任務隊列。高併發
上面的例子中,咱們初始化了一個容量爲 100 的任務 channel。工具
// make a channel with a capacity of 100. jobChan := make(chan Job, 100)
這意味着任務的入隊操做十分簡單,以下:ui
// enqueue a job jobChan <- job
這樣一來,當 job channel 中已經放入 100 個任務的時候,入隊操做將會阻塞,直至有任務被工做者處理完成。這一般不是一個好的現象,由於咱們一般不但願程序出現阻塞等待。這時候,咱們一般但願有一個超時機制來告訴服務調用方,當前服務忙,稍後重試。我以前的博文--我讀《經過Go來處理每分鐘達百萬的數據請求》介紹過相似的限流策略。這裏方法相似,就是當隊列滿的時候,返回503,告訴調用方服務忙。代碼以下:線程
// TryEnqueue tries to enqueue a job to the given job channel. Returns true if // the operation was successful, and false if enqueuing would not have been // possible without blocking. Job is not enqueued in the latter case. func TryEnqueue(job Job, jobChan <-chan Job) bool { select { case jobChan <- job: return true default: return false } }
這樣一來,咱們嘗試入隊的時候,若是入隊失敗,放回一個 false ,這樣咱們再對這個返回值處理以下:設計
if !TryEnqueue(job, chan) { http.Error(w, "max capacity reached", 503) return }
這樣就簡單的實現了限流操做。當 jobChan 滿的時候,程序會走到 default 返回 false ,從而告知調用方當前的服務器狀況。
到上面的步驟,限流已經能夠解決,那麼咱們接下來考慮,怎麼才能優雅的關閉工做者?假設咱們決定再也不向任務隊列插入任務,咱們但願讓全部的已入隊任務執行完成,咱們能夠很是簡單的實現:
close(jobChan)
沒錯,就是這一行代碼,咱們就可讓任務隊列再也不接收新任務(仍然能夠從 channel 讀取 job ),若是咱們想執行隊列裏的已經存在的任務,只須要:
for job := range jobChan {...}
全部已經入隊的 job 會正常被 woker 取走執行。可是,這樣實際上還存在一個問題,就是主協成不會等待工做者執行完工做就會退出。它不知道工做者協成何時可以處理完以上的任務。能夠運行的例子以下:
package main import ( "fmt" ) var jobChan chan int func worker(jobChan <- chan int) { for job := range jobChan{ fmt.Printf("執行任務 %d \n", job) } } func main() { jobChan = make(chan int, 100) //入隊 for i := 1; i <= 10; i++{ jobChan <- i } close(jobChan) go worker(jobChan) }
運行發現,woker 沒法保證執行完 channel 中的 job 就退出了。那咱們怎麼解決這個問題?
使用 sysc.WaitGroup:
package main import ( "fmt" "sync" ) var jobChan chan int var wg sync.WaitGroup func worker(jobChan <- chan int) { defer wg.Done() for job := range jobChan{ fmt.Printf("執行任務 %d \n", job) } } func main() { jobChan = make(chan int, 100) //入隊 for i := 1; i <= 10; i++{ jobChan <- i } wg.Add(1) close(jobChan) go worker(jobChan) wg.Wait() }
使用這種協程間同步的方法,協成會等待 worker 執行完 job 纔會退出。運行結果:
執行任務 1 執行任務 2 執行任務 3 執行任務 4 執行任務 5 執行任務 6 執行任務 7 執行任務 8 執行任務 9 執行任務 10 Process finished with exit code 0
這樣是完美的麼?在設計功能的時候,爲了防止協程假死,咱們應該給協程設置一個超時。
上面的例子中 wg.Wait() 會一直等待,直到 wg.Done() 被調用。可是若是這個操做假死,沒法調用,將永遠等待。這是咱們不但願看到的,所以,咱們能夠給他設置一個超時時間。方法以下:
package main import ( "fmt" "sync" "time" ) var jobChan chan int var wg sync.WaitGroup func worker(jobChan <-chan int) { defer wg.Done() for job := range jobChan { fmt.Printf("執行任務 %d \n", job) time.Sleep(1 * time.Second) } } func main() { jobChan = make(chan int, 100) //入隊 for i := 1; i <= 10; i++ { jobChan <- i } wg.Add(1) close(jobChan) go worker(jobChan) res := WaitTimeout(&wg, 5*time.Second) if res { fmt.Println("執行完成退出") } else { fmt.Println("執行超時退出") } } //超時機制 func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { ch := make(chan struct{}) go func() { wg.Wait() close(ch) }() select { case <-ch: return true case <-time.After(timeout): return false } }
執行結果以下:
執行任務 1 執行任務 2 執行任務 3 執行任務 4 執行任務 5 執行超時退出 Process finished with exit code 0
這樣,5s 超時生效,雖然不是全部的任務被執行,因爲超時,也會退出。
有時候咱們但願 woker 丟棄在執行的工做,也就是 cancel 操做,怎麼處理?
咱們能夠藉助 context.Context 實現。以下:
package main import ( "context" "fmt" "sync" "time" ) var jobChan chan int var ctx context.Context var cancel context.CancelFunc func worker(jobChan <-chan int, ctx context.Context) { for { select { case <-ctx.Done(): return case job := <-jobChan: fmt.Printf("執行任務 %d \n", job) time.Sleep(1 * time.Second) } } } func main() { jobChan = make(chan int, 100) //帶有取消功能的 contex ctx, cancel = context.WithCancel(context.Background()) //入隊 for i := 1; i <= 10; i++ { jobChan <- i } close(jobChan) go worker(jobChan, ctx) time.Sleep(2 * time.Second) //調用cancel cancel() }
結果以下:
執行任務 1 執行任務 2 Process finished with exit code 0
能夠看出,咱們等待2s後,咱們主動調用了取消操做,woker 協程主動退出。
這是藉助 context 包實現了取消操做,實質上也是監聽一個 channel 的操做,那咱們有沒有可能不借助 context 實現取消操做呢?
不使用 context 的超時機制實現取消:
package main import ( "fmt" "time" ) var jobChan chan int func worker(jobChan <-chan int, cancelChan <-chan struct{}) { for { select { case <-cancelChan: return case job := <-jobChan: fmt.Printf("執行任務 %d \n", job) time.Sleep(1 * time.Second) } } } func main() { jobChan = make(chan int, 100) //經過chan 取消操做 cancelChan := make(chan struct{}) //入隊 for i := 1; i <= 10; i++ { jobChan <- i } close(jobChan) go worker(jobChan, cancelChan) time.Sleep(2 * time.Second) //關閉chan close(cancelChan) }
這樣,咱們使用一個關閉 chan 的信號實現了取消操做。緣由是無緩衝 chan 讀取會阻塞,當關閉後,能夠讀取到空,所以會執行 select 裏的 return.
照例總結一波,本文介紹了 golang 協程間的同步和通訊的一些方法,任務隊列的最簡單實現。關於工做者池的實現,我在其餘博文也寫到了,這裏很少寫。本文更可能是工具性的代碼,寫功能時候能夠借用,好比超時、取消、chan的操做等。