你在使用消息隊列的時候關注過吞吐量嗎?git
思考過吞吐量的影響因素嗎?github
考慮過怎麼提升嗎?redis
總結過最佳實踐嗎?微信
本文帶你一塊兒探討下消息隊列消費端高吞吐的 Go
框架實現。Let’s go!網絡
寫入消息隊列吞吐量取決於如下兩個方面框架
最佳吞吐量是讓其中之一打滿,而通常狀況下內網帶寬都會很是高,不太可能被打滿,因此天然就是講消息隊列的寫入速度打滿,這就就有兩個點須要平衡函數
go-zero 的 PeriodicalExecutor
和 ChunkExecutor
就是爲了這種狀況設計的微服務
從消息隊列裏消費消息的吞吐量取決於如下兩個方面ui
這裏有個核心問題是不能不考慮業務處理速度,而讀取過多的消息到內存裏,不然可能會引發兩個問題:atom
pod
也是有 memory limit
的pod
時堆積的消息來不及處理而致使消息丟失
借用一下 Rob Pike
的一張圖,這個跟隊列消費殊途同歸。左邊4個 gopher
從隊列裏取,右邊4個 gopher
接過去處理。比較理想的結果是左邊和右邊速率基本一致,沒有誰浪費,沒有誰等待,中間交換處也沒有堆積。
咱們來看看 go-zero
是怎麼實現的:
Producer
端for { select { case <-q.quit: logx.Info("Quitting producer") return default: if v, ok := q.produceOne(producer); ok { q.channel <- v } } }
沒有退出事件就會經過 produceOne
去讀取一個消息,成功後寫入 channel
。利用 chan
就能夠很好的解決讀取和消費的銜接問題。
Consumer
端for { select { case message, ok := <-q.channel: if ok { q.consumeOne(consumer, message) } else { logx.Info("Task channel was closed, quitting consumer...") return } case event := <-eventChan: consumer.OnEvent(event) } }
這裏若是拿到消息就去處理,當 ok
爲 false
的時候表示 channel
已被關閉,能夠退出整個處理循環了。同時咱們還在 redis queue
上支持了 pause/resume
,咱們原來在社交場景裏大量使用這樣的隊列,能夠通知 consumer
暫停和繼續。
queue
,有了這些咱們就能夠經過控制 producer/consumer
的數量來達到吞吐量的調優了func (q *Queue) Start() { q.startProducers(q.producerCount) q.startConsumers(q.consumerCount) q.producerRoutineGroup.Wait() close(q.channel) q.consumerRoutineGroup.Wait() }
這裏須要注意的是,先要停掉 producer
,再去等 consumer
處理完。
到這裏核心控制代碼基本就講完了,其實看起來仍是挺簡單的,也能夠到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整實現。
基本的使用流程:
producer
或 consumer
queue
對應到 queue
中,大體以下:
// 生產者建立工廠 producer := newMockedProducer() // 消費者建立工廠 consumer := newMockedConsumer() // 將生產者以及消費者的建立工廠函數傳遞給 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
咱們看看 NewQueue
須要什麼參數:
producer
工廠方法consumer
工廠方法將 producer & consumer
的工廠函數傳遞 queue
,由它去負責建立。框架提供了 Producer
和 Consumer
的接口以及工廠方法定義,而後整個流程的控制 queue
實現會自動完成。
message
咱們經過自定義一個 mockedProducer
來模擬:
type mockedProducer struct { total int32 count int32 // 使用waitgroup來模擬任務的完成 wait sync.WaitGroup } // 實現 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生產者編寫都必須實現:
Produce()
:由開發者編寫生產消息的邏輯AddListener()
:添加事件 listener
message
咱們經過自定義一個 mockedConsumer
來模擬:
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
queue
啓動,而後驗證咱們上述的生產者和消費者之間的數據是否傳輸成功:
func main() { // 建立 queue q := NewQueue(func() (Producer, error) { return newMockedProducer(), nil }, func() (Consumer, error) { return newMockedConsumer(), nil }) // 啓動panic了也能夠確保stop被執行以清理資源 defer q.Stop() // 啓動 q.Start() }
以上就是 queue
最簡易的實現示例。咱們經過這個 core/queue
框架實現了基於 redis
和 kafka
等的消息隊列服務,在不一樣業務場景中通過了充分的實踐檢驗。你也能夠根據本身的業務實際狀況,實現本身的消息隊列服務。
總體流程如上圖:
channel
進行Producer
和 Consumer
的數量能夠設定以匹配不一樣業務需求Produce
和 Consume
具體實現由開發者定義,queue
負責總體流程本篇文章講解了如何經過 channel
來平衡從隊列中讀取和處理消息的速度,以及如何實現一個通用的消息隊列處理框架,並經過 mock
示例簡單展現瞭如何基於 core/queue
實現一個消息隊列處理服務。你能夠經過相似的方式實現一個基於 rocketmq
等的消息隊列處理服務。
關於 go-zero
更多的設計和實現文章,能夠關注『微服務實踐』公衆號。
https://github.com/tal-tech/go-zero
歡迎使用 go-zero 並 star 支持咱們!
關注『微服務實踐』公衆號並點擊 進羣 獲取社區羣二維碼。
go-zero 系列文章見『微服務實踐』公衆號