一文帶你更方便的控制 goroutine

上一篇咱們講了 go-zero 中的併發工具包 core/syncxnode

從總體分析來看,併發組件主要經過 channel + mutex 控制程序中協程之間溝通。git

Do not communicate by sharing memory; instead, share memory by communicating.github

不要經過共享內存來通訊,而應經過通訊來共享內存。安全

本篇來聊 go-zero 對 Go 中 goroutine 支持的併發組件。微信

咱們回顧一下,go原生支持的 goroutine 控制的工具備哪些?閉包

  1. go func() 開啓一個協程
  2. sync.WaitGroup 控制多個協程任務編排
  3. sync.Cond 協程喚醒或者是協程等待

那可能會問 go-zero 爲何還要拿出來說這些?回到 go-zero 的設計理念:工具大於約定和文檔併發

那麼就來看看,go-zero 提供哪些工具?框架

threading

雖然 go func() 已經很方便,可是有幾個問題:函數

  • 若是協程異常退出,沒法追蹤異常棧
  • 某個異常請求觸發panic,應該作故障隔離,而不是整個進程退出,容易被攻擊

咱們看看 core/threading 包提供了哪些額外選擇:微服務

func GoSafe(fn func()) {
    go RunSafe(fn)
}

func RunSafe(fn func()) {
    defer rescue.Recover()
    fn()
}

func Recover(cleanups ...func()) {
    for _, cleanup := range cleanups {
        cleanup()
    }

    if p := recover(); p != nil {
        logx.ErrorStack(p)
    }
}

GoSafe

threading.GoSafe() 就幫你解決了這個問題。開發者能夠將本身在協程中須要完成邏輯,以閉包的方式傳入,由 GoSafe() 內部 go func()

當開發者的函數出現異常退出時,會在 Recover() 中打印異常棧,以便讓開發者更快肯定異常發生點和調用棧。

NewWorkerGroup

咱們再看第二個:WaitGroup。平常開發,其實 WaitGroup 沒什麼好說的,你須要 N 個協程協做 :wg.Add(N) ,等待所有協程完成任務:wg.Wait(),同時完成一個任務須要手動 wg.Done()

能夠看的出來,在任務開始 -> 結束 -> 等待,整個過程須要開發者關注任務的狀態而後手動修改狀態。

NewWorkerGroup 就幫開發者減輕了負擔,開發者只須要關注:

  1. 任務邏輯【函數】
  2. 任務數【workers

而後啓動 WorkerGroup.Start(),對應任務數就會啓動:

func (wg WorkerGroup) Start() {
  // 包裝了sync.WaitGroup
    group := NewRoutineGroup()
    for i := 0; i < wg.workers; i++ {
    // 內部維護了 wg.Add(1) wg.Done()
    // 同時也是 goroutine 安全模式下進行的
        group.RunSafe(wg.job)
    }
    group.Wait()
}

worker 的狀態會自動管理,能夠用來固定數量的 worker 來處理消息隊列的任務,用法以下:

func main() {
  group := NewWorkerGroup(func() {
    // process tasks
    }, runtime.NumCPU())
    group.Start()
}

Pool

這裏的 Pool 不是 sync.Poolsync.Pool 有個不方便的地方是它池化的對象可能會被垃圾回收掉,這個就讓開發者疑惑了,不知道本身建立並存入的對象何時就沒了。

go-zero 中的 pool

  1. pool 中的對象會根據使用時間作懶銷燬;
  2. 使用 cond 作對象消費和生產的通知以及阻塞;
  3. 開發者能夠自定義本身的生產函數,銷燬函數;

那我來看看生產對象,和消費對象在 pool 中時怎麼實現的:

func (p *Pool) Get() interface{} {
  // 調用 cond.Wait 時必需要持有c.L的鎖
    p.lock.Lock()
    defer p.lock.Unlock()

    for {
    // 1. pool中對象池是一個用鏈表鏈接的nodelist
        if p.head != nil {
            head := p.head
            p.head = head.next
      // 1.1 若是當前節點:當前時間 >= 上次使用時間+對象最大存活時間
            if p.maxAge > 0 && head.lastUsed+p.maxAge < timex.Now() {
                p.created--
        // 說明當前節點已通過期了 -> 銷燬節點對應的對象,而後繼續尋找下一個節點
        // 【⚠️:不是銷燬節點,而是銷燬節點對應的對象】
                p.destroy(head.item)
                continue
            } else {
                return head.item
            }
        }
        // 2. 對象池是懶加載的,get的時候纔去建立對象鏈表
        if p.created < p.limit {
            p.created++
      // 由開發者本身傳入:生產函數
            return p.create()
        }
        
        p.cond.Wait()
    }
}
func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    // 互斥訪問 pool 中nodelist
    p.lock.Lock()
    defer p.lock.Unlock()

    p.head = &node{
        item:     x,
        next:     p.head,
        lastUsed: timex.Now(),
    }
  // 放入head,通知其餘正在get的協程【極爲關鍵】
    p.cond.Signal()
}

上述就是 go-zeroCond 的使用。能夠類比 生產者-消費者模型,只是在這裏沒有使用 channel 作通訊,而是用 Cond 。這裏有幾個特性:

  • Cond和一個Locker關聯,能夠利用這個Locker對相關的依賴條件更改提供保護。
  • Cond能夠同時支持 SignalBroadcast 方法,而 Channel 只能同時支持其中一種。

總結

工具大於約定和文檔,一直是 go-zero 設計主旨之一;也同時將平時業務沉澱到組件中,這纔是框架和組件的意義。

關於 go-zero 更多的設計和實現文章,能夠持續關注咱們。歡迎你們去關注和使用。

項目地址

https://github.com/tal-tech/go-zero

歡迎使用 go-zero 並 star 支持咱們!

微信交流羣

關注『微服務實踐』公衆號並回復 進羣 獲取社區羣二維碼。

go-zero 系列文章見『微服務實踐』公衆號
相關文章
相關標籤/搜索