Go併發調用的超時處理

以前有聊過 golang 的協程,我發覺彷佛還很理論,特別是在併發安全上,因此特結合網上的一些例子,來試驗下go routine中 的 channel, select, context 的妙用。git

場景-微服務調用

咱們用 gin(一個web框架) 做爲處理請求的工具,需求是這樣的: 一個請求 X 會去並行調用 A, B, C 三個方法,並把三個方法返回的結果加起來做爲 X 請求的 Response。 可是咱們這個 Response 是有時間要求的(不能超過5秒的響應時間),github

可能 A, B, C 中任意一個或兩個,處理邏輯十分複雜,或者數據量超大,致使處理時間超出預期, 那麼咱們就立刻切斷,並返回已經拿到的任意個返回結果之和。golang

咱們先來定義主函數:web

func main() {
	r := gin.New()
	r.GET("/calculate", calHandler)
	http.ListenAndServe(":8008", r)
}
複製代碼

很是簡單,普通的請求接受和 handler 定義。其中 calHandler 是咱們用來處理請求的函數。安全

分別定義三個假的微服務,其中第三個將會是咱們超時的哪位~markdown

func microService1() int {
	time.Sleep(1*time.Second)
	return 1
}

func microService2() int {
	time.Sleep(2*time.Second)
	return 2
}

func microService3() int {
	time.Sleep(10*time.Second)
	return 3
}
複製代碼

接下來,咱們看看 calHandler 裏究竟是什麼併發

func calHandler(c *gin.Context) {
    ...
	c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
	return
}
複製代碼

一個典型的 gin Response,咱們先不用在乎 sum 是什麼。框架

要點1--併發調用

直接用 go 就行了嘛~ 因此一開始咱們可能就這麼寫:函數

go microService1()
go microService2()
go microService3()
複製代碼

很簡單有沒有,可是等等,說好的返回值我怎麼接呢? 爲了可以並行地接受處理結果,咱們很容易想到用 channel 去接。 因此咱們把調用服務改爲這樣:微服務

var resChan = make(chan int, 3) // 由於有3個結果,因此咱們建立一個能夠容納3個值的 int channel。
go func() {
    resChan <- microService1()
}()

go func() {
    resChan <- microService2()
}()

go func() {
    resChan <- microService3()
}()
複製代碼

有東西接,那也要有方法去算,因此咱們加一個一直循環拿 resChan 中結果並計算的方法:

var resContainer, sum int
for {
    resContainer = <-resChan
    sum += resContainer
}
複製代碼

這樣一來咱們就有一個 sum 來計算每次從 resChan 中拿出的結果了。

要點2--超時信號

還沒結束,說好的超時處理呢? 爲了實現超時處理,咱們須要引入一個東西,就是 context,什麼是 context ? 咱們這裏只使用 context 的一個特性,超時通知(其實這個特性徹底能夠用 channel 來替代)。

能夠看在定義 calHandler 的時候咱們已經將 c *gin.Context 做爲參數傳了進來,那咱們就不用本身在聲明瞭。 gin.Context 簡單理解爲貫穿整個 gin 聲明週期的上下文容器,有點像是分身,亦或是量子糾纏的感受。

有了這個 gin.Context, 咱們就能在一個地方對 context 作出操做,而其餘正在使用 context 的函數或方法,也會感覺到 context 作出的變化。

ctx, _ := context.WithTimeout(c, 3*time.Second) //定義一個超時的 context
複製代碼

只要時間到了,咱們就能用 ctx.Done() 獲取到一個超時的 channel(通知),而後其餘用到這個 ctx 的地方也會停掉,並釋放 ctx。 通常來講,ctx.Done() 是結合 select 使用的。 因此咱們又須要一個循環來監聽 ctx.Done()

for {
    select {
    case <- ctx.Done():
        // 返回結果
}
複製代碼

如今咱們有兩個 for 了,是否是可以合併下?

for {
    select {
    case resContainer = <-resChan:
        sum += resContainer
        fmt.Println("add", resContainer)
    case <- ctx.Done():
        fmt.Println("result:", sum)
        return
    }
}
複製代碼

誒嘿,看上去不錯。 不過咱們怎麼在正常完成微服務調用的時候輸出結果呢? 看來咱們還須要一個 flag

var count int
for {
    select {
    case resContainer = <-resChan:
        sum += resContainer
        count ++
        fmt.Println("add", resContainer)
        if count > 2 {
            fmt.Println("result:", sum)
            return
        }
    case <- ctx.Done():
        fmt.Println("timeout result:", sum)
        return
    }
}
複製代碼

咱們加入一個計數器,由於咱們只是調用3次微服務,因此當 count 大於2的時候,咱們就應該結束並輸出結果了。

要點3--併發中的等待

上面的計時器是一種偷懶的方法,由於咱們知道了調用微服務的次數,若是咱們並不知道,或者以後還要添加呢? 手動每次改 count 的判斷閾值會不會太不優雅了?這時候咱們就能夠加入 sync 包。 咱們將會使用的 sync 的一個特性是 WaitGroup。它的做用是等待一組協程運行完畢後,執行接下去的步驟。

咱們來改下以前微服務調用的代碼塊:

var success = make(chan int, 1) // 成功的通道標識
wg := sync.WaitGroup{} // 建立一個 waitGroup 組
wg.Add(3) // 咱們往組裏加3個標識,由於咱們要運行3個任務
go func() {
    resChan <- microService1()
    wg.Done() // 完成一個,Done()一個
}()

go func() {
    resChan <- microService2()
    wg.Done()
}()

go func() {
    resChan <- microService3()
    wg.Done()
}()
wg.Wait() // 直到咱們前面三個標識都被 Done 了,不然程序一直會阻塞在這裏
success <- 1 // 咱們發送一個成功信號到通道中
複製代碼

注意:若是咱們直接把上面的代碼放到 calHandler 裏,會出現一個問題,WaitGroup不論怎麼樣都會堵塞咱們的正常狀況輸出(死活都要讓你超時)。 因此,咱們把上面這段和業務邏輯相關的代碼單獨抽離出來,幷包裝一下。

// rc 是結果 channel, success 是成功與否的 flag channel
func MyLogic(rc chan<- int, success chan<- int) {
	wg := sync.WaitGroup{} // 建立一個 waitGroup 組
	wg.Add(3) // 咱們往組裏加3個標識,由於咱們要運行3個任務
	go func() {
		rc <- microService1()
		wg.Done() // 完成一個,Done()一個
	}()

	go func() {
		rc <- microService2()
		wg.Done()
	}()

	go func() {
		rc <- microService3()
		wg.Done()
	}()

	wg.Wait() // 直到咱們前面三個標識都被 Done 了,不然程序一直會阻塞在這裏
	success <- 1 // 咱們發送一個成功信號到通道中
}
複製代碼

最終,這個 MyLogic 仍是要做爲一個協程運行的。 (多謝@TomorrowWu和@chenqinghe提醒)

既然咱們有了 success 這個信號,那麼再把它加入到監控 for 循環中,並作些修改,刪除原來 count 判斷的部分。

for {
	select {
	case resContainer = <-resChan:
		sum += resContainer
		fmt.Println("add", resContainer)
	case <- success:
		fmt.Println("result:", sum)
		return
	case <- ctx.Done():
		fmt.Println("result:", sum)
		return
	}
}
複製代碼

三個 case,分工明確,

case resContainer = <-resChan:用來拿邏輯的輸出的結果並計算

case <- success:是理想狀況下的正常輸出

case <- ctx.Done():是超時狀況下的輸出

咱們再潤色一下,把後兩個 case 的 fmt.Println("result:", sum)改成 gin 的標準 http Response

c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
return
複製代碼

至此,全部的主要代碼都完成了。下面是徹底版

package main

import (
	"context"
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
)

// 一個請求會觸發調用三個服務,每一個服務輸出一個 int,
// 請求要求結果爲三個服務輸出 int 之和
// 請求返回時間不超過3秒,大於3秒只輸出已經得到的 int 之和
func calHandler(c *gin.Context) {
	var resContainer, sum int
	var success, resChan = make(chan int), make(chan int, 3)
	ctx, cancel := context.WithTimeout(c, 5*time.Second)
	defer cancel()

	// 真正的業務邏輯
	go MyLogic(resChan, success)

	for {
		select {
		case resContainer = <-resChan:
			sum += resContainer
			fmt.Println("add", resContainer)
		case <- success:
			c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
			return
		case <- ctx.Done():
			c.JSON(http.StatusOK, gin.H{"code":200, "result": sum})
			return
		}
	}
}

func main() {
	r := gin.New()
	r.GET("/calculate", calHandler)

	http.ListenAndServe(":8008", r)
}

func MyLogic(rc chan<- int, success chan<- int) {
	wg := sync.WaitGroup{} // 建立一個 waitGroup 組
	wg.Add(3) // 咱們往組裏加3個標識,由於咱們要運行3個任務
	go func() {
		rc <- microService1()
		wg.Done() // 完成一個,Done()一個
	}()

	go func() {
		rc <- microService2()
		wg.Done()
	}()

	go func() {
		rc <- microService3()
		wg.Done()
	}()

	wg.Wait() // 直到咱們前面三個標識都被 Done 了,不然程序一直會阻塞在這裏
	success <- 1 // 咱們發送一個成功信號到通道中
}

func microService1() int {
	time.Sleep(1*time.Second)
	return 1
}

func microService2() int {
	time.Sleep(2*time.Second)
	return 2
}

func microService3() int {
	time.Sleep(6*time.Second)
	return 3
}
複製代碼

上面的程序只是簡單描述了一個調用其餘微服務超時的處理場景。 實際過程當中還須要加不少不少調料,才能保證接口的對外完整性。

相關文章
相關標籤/搜索