Golang 任務隊列策略 -- 讀《JOB QUEUES IN GO》

Golang 在異步處理上有着上佳的表現。由於 goroutines 和 channels 是很是容易使用且有效的異步處理手段。下面咱們一塊兒來看一看 Golang 的簡易任務隊列golang

一種"非任務隊列"的任務隊列

有些時候,咱們須要作異步處理可是並不須要一個任務對列,這類問題咱們使用 Golang 能夠很是簡單的實現。以下:安全

go process(job)

這的確是不少場景下的絕佳選擇,好比操做一個HTTP請求等待結果。然而,在一些相對複雜高併發的場景下,你就不能簡單的使用該方法來實現異步處理。這時候,你須要一個隊列來管理須要處理的任務,而且按照必定的順序來處理這些任務。服務器

最簡單的任務隊列

接下來看一個最簡單的任務隊列和工做者模型。併發

func worker(jobChan <-chan Job) {
    for job := range jobChan {
        process(job)
    }
}

// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)

// start the worker
go worker(jobChan)

// enqueue a job
jobChan <- job

代碼中建立了一個 Job 對象的 channel , 容量爲100。而後開啓一個工做者協程從 channel 中去除任務並執行。任務的入隊操做就是將一個 Job 對象放入任務 channel 中。異步

雖然上面只有短短的幾行代碼,卻完成了不少的工做。咱們實現了一個簡易的線程安全的、支持併發的、可靠的任務隊列。高併發

限流

上面的例子中,咱們初始化了一個容量爲 100 的任務 channel。工具

// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)

這意味着任務的入隊操做十分簡單,以下:ui

// enqueue a job
jobChan <- job

這樣一來,當 job channel 中已經放入 100 個任務的時候,入隊操做將會阻塞,直至有任務被工做者處理完成。這一般不是一個好的現象,由於咱們一般不但願程序出現阻塞等待。這時候,咱們一般但願有一個超時機制來告訴服務調用方,當前服務忙,稍後重試。我以前的博文--我讀《經過Go來處理每分鐘達百萬的數據請求》介紹過相似的限流策略。這裏方法相似,就是當隊列滿的時候,返回503,告訴調用方服務忙。代碼以下:線程

// TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful, and false if enqueuing would not have been
// possible without blocking. Job is not enqueued in the latter case.
func TryEnqueue(job Job, jobChan <-chan Job) bool {
    select {
    case jobChan <- job:
        return true
    default:
        return false
    }
}

這樣一來,咱們嘗試入隊的時候,若是入隊失敗,放回一個 false ,這樣咱們再對這個返回值處理以下:設計

if !TryEnqueue(job, chan) {
    http.Error(w, "max capacity reached", 503)
    return
}

這樣就簡單的實現了限流操做。當 jobChan 滿的時候,程序會走到 default 返回 false ,從而告知調用方當前的服務器狀況。

關閉工做者

到上面的步驟,限流已經能夠解決,那麼咱們接下來考慮,怎麼才能優雅的關閉工做者?假設咱們決定再也不向任務隊列插入任務,咱們但願讓全部的已入隊任務執行完成,咱們能夠很是簡單的實現:

close(jobChan)

沒錯,就是這一行代碼,咱們就可讓任務隊列再也不接收新任務(仍然能夠從 channel 讀取 job ),若是咱們想執行隊列裏的已經存在的任務,只須要:

for job := range jobChan {...}

全部已經入隊的 job 會正常被 woker 取走執行。可是,這樣實際上還存在一個問題,就是主協成不會等待工做者執行完工做就會退出。它不知道工做者協成何時可以處理完以上的任務。能夠運行的例子以下:

package main

import (
    "fmt"
)

var jobChan chan int

func worker(jobChan <- chan int)  {
    for job := range jobChan{
        fmt.Printf("執行任務 %d \n", job)
    }
}

func main() {
    jobChan = make(chan int, 100)
    //入隊
    for i := 1; i <= 10; i++{
        jobChan <- i
    }
    
    close(jobChan)
    go worker(jobChan)

}

運行發現,woker 沒法保證執行完 channel 中的 job 就退出了。那咱們怎麼解決這個問題?

等待 woker 執行完成

使用 sysc.WaitGroup:

package main

import (
    "fmt"
    "sync"
)

var jobChan chan int
var wg sync.WaitGroup

func worker(jobChan <- chan int)  {
    defer wg.Done()
    for job := range jobChan{
        fmt.Printf("執行任務 %d \n", job)
    }
}

func main() {
    jobChan = make(chan int, 100)
    //入隊
    for i := 1; i <= 10; i++{
        jobChan <- i
    }

    wg.Add(1)
    close(jobChan)

    go worker(jobChan)
    wg.Wait()
}

使用這種協程間同步的方法,協成會等待 worker 執行完 job 纔會退出。運行結果:

執行任務 1 
執行任務 2 
執行任務 3 
執行任務 4 
執行任務 5 
執行任務 6 
執行任務 7 
執行任務 8 
執行任務 9 
執行任務 10 

Process finished with exit code 0

這樣是完美的麼?在設計功能的時候,爲了防止協程假死,咱們應該給協程設置一個超時。

超時設置

上面的例子中 wg.Wait() 會一直等待,直到 wg.Done() 被調用。可是若是這個操做假死,沒法調用,將永遠等待。這是咱們不但願看到的,所以,咱們能夠給他設置一個超時時間。方法以下:

package main

import (
    "fmt"
    "sync"
    "time"
)

var jobChan chan int
var wg sync.WaitGroup

func worker(jobChan <-chan int) {
    defer wg.Done()
    for job := range jobChan {
        fmt.Printf("執行任務 %d \n", job)
        time.Sleep(1 * time.Second)
    }
}

func main() {
    jobChan = make(chan int, 100)
    //入隊
    for i := 1; i <= 10; i++ {
        jobChan <- i
    }

    wg.Add(1)
    close(jobChan)

    go worker(jobChan)
    res := WaitTimeout(&wg, 5*time.Second)
    if res {
        fmt.Println("執行完成退出")
    } else {
        fmt.Println("執行超時退出")
    }
}

//超時機制
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    ch := make(chan struct{})
    go func() {
        wg.Wait()
        close(ch)
    }()
    select {
    case <-ch:
        return true
    case <-time.After(timeout):
        return false
    }
}

執行結果以下:

執行任務 1 
執行任務 2 
執行任務 3 
執行任務 4 
執行任務 5 
執行超時退出

Process finished with exit code 0

這樣,5s 超時生效,雖然不是全部的任務被執行,因爲超時,也會退出。

有時候咱們但願 woker 丟棄在執行的工做,也就是 cancel 操做,怎麼處理?

Cancel Worker

咱們能夠藉助 context.Context 實現。以下:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

var jobChan chan int
var ctx context.Context
var cancel context.CancelFunc

func worker(jobChan <-chan int, ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case job := <-jobChan:
            fmt.Printf("執行任務 %d \n", job)
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    jobChan = make(chan int, 100)
    //帶有取消功能的 contex
    ctx, cancel = context.WithCancel(context.Background())
    //入隊
    for i := 1; i <= 10; i++ {
        jobChan <- i
    }

    close(jobChan)

    go worker(jobChan, ctx)
    time.Sleep(2 * time.Second)
    //調用cancel
    cancel()
}

結果以下:

執行任務 1 
執行任務 2 

Process finished with exit code 0

能夠看出,咱們等待2s後,咱們主動調用了取消操做,woker 協程主動退出。

這是藉助 context 包實現了取消操做,實質上也是監聽一個 channel 的操做,那咱們有沒有可能不借助 context 實現取消操做呢?

不使用 context 的超時機制實現取消:

package main

import (
    "fmt"
    "time"
)

var jobChan chan int

func worker(jobChan <-chan int, cancelChan <-chan struct{}) {
    for {
        select {
        case <-cancelChan:
            return
        case job := <-jobChan:
            fmt.Printf("執行任務 %d \n", job)
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    jobChan = make(chan int, 100)
    //經過chan 取消操做
    cancelChan := make(chan struct{})
    //入隊
    for i := 1; i <= 10; i++ {
        jobChan <- i
    }

    close(jobChan)

    go worker(jobChan, cancelChan)
    time.Sleep(2 * time.Second)
    //關閉chan
    close(cancelChan)
}

這樣,咱們使用一個關閉 chan 的信號實現了取消操做。緣由是無緩衝 chan 讀取會阻塞,當關閉後,能夠讀取到空,所以會執行 select 裏的 return.

總結

照例總結一波,本文介紹了 golang 協程間的同步和通訊的一些方法,任務隊列的最簡單實現。關於工做者池的實現,我在其餘博文也寫到了,這裏很少寫。本文更可能是工具性的代碼,寫功能時候能夠借用,好比超時、取消、chan的操做等。

相關文章
相關標籤/搜索