最近在看公司的 redis queue
時,發現底層使用的是 go-zero
的 queue
。本篇文章來看看 queue
的設計,也但願能夠從裏面瞭解到 mq
的最小型設計實踐。git
結合其餘 mq
的使用經歷,基本的使用流程:github
producer
或 consumer
mq
對應到 queue
中,大體也是這個:redis
// 生產者建立工廠 producer := newMockedProducer() // 消費者建立工廠 consumer := newMockedConsumer() // 將生產者以及消費者的建立工廠函數傳遞給 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
咱們看看 NewQueue
須要什麼構建條件:微信
producer constructor
consumer constructor
將雙方的工廠函數傳遞給 queue
,由它去執行以及重試。架構
這兩個須要的目的是將生產者/消費者的構建和消息生產/消費都封裝在 mq
中,並且將生產者/消費者的整套邏輯交給開發者處理:函數
type ( // 開發者須要實現此接口 Producer interface { AddListener(listener ProduceListener) Produce() (string, bool) } ... // ProducerFactory定義了生成Producer的方法 ProducerFactory func() (Producer, error) )
mq
只負責生產者/消費者的消息傳遞和之間的調度。queue
本身來作調度或者重試。生產消息固然要回到生產者自己:微服務
type mockedProducer struct { total int32 count int32 // 使用waitgroup來模擬任務的完成 wait sync.WaitGroup } // 實現 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生產者編寫都必須實現:atom
Produce()
:由開發者編寫生產消息的邏輯AddListener()
:生產者和生產者相似:spa
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
啓動,而後驗證咱們上述的生產者和消費者之間的數據是否傳輸成功:設計
func TestQueue(t *testing.T) { producer := newMockedProducer(rounds) consumer := newMockedConsumer() // 建立 queue q := NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil }) // 當生產者生產完畢,執行 Stop() 關閉生產端生產 go func() { producer.wait.Wait() // mq生產端中止生產,不是mq自己 Stop 運行 q.Stop() }() // 啓動 q.Start() // 驗證生產消費端是否消息消費完成 assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) }
以上就是 queue
最簡易的入門使用代碼。開發者能夠根據本身的業務實際狀況:自由定義生產者/消費者已經生產/消費邏輯。
總體流程如上圖:
channel
進行listener
,以及事件觸發 event
,至關於將觸發器邏輯分離出來produceone
,這個是生產消息的邏輯,可是其中的 Produce()
是由開發者編寫【上面的 interface
中正是這個函數】Consume()
基本的消息流動就入上圖以及上述描寫的,具體的代碼分析咱們就留到下一篇,咱們😁分析裏面,尤爲是如何控制 channel
是整個設計的核心。
本篇文章從使用以及整個架構分析上簡略介紹了 queue
的設計。下篇咱們將深刻源碼,分析內部消息流轉以及 channel
控制。
關於 go-zero
更多的設計和實現文章,能夠持續關注咱們。歡迎你們去關注和使用。
https://github.com/tal-tech/go-zero
歡迎使用 go-zero 並 star 支持咱們!
關注『微服務實踐』公衆號並回復 進羣 獲取社區羣二維碼。
go-zero 系列文章見『微服務實踐』公衆號