一文帶你理解最簡消息隊列實現

最近在看公司的 redis queue 時,發現底層使用的是 go-zeroqueue 。本篇文章來看看 queue 的設計,也但願能夠從裏面瞭解到 mq 的最小型設計實踐。git

使用

結合其餘 mq 的使用經歷,基本的使用流程:github

  1. 建立 producerconsumer
  2. 啓動 mq
  3. 生產消息/消費消息

對應到 queue 中,大體也是這個:redis

建立 queue

// 生產者建立工廠
producer := newMockedProducer()
// 消費者建立工廠
consumer := newMockedConsumer()
// 將生產者以及消費者的建立工廠函數傳遞給 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

咱們看看 NewQueue 須要什麼構建條件:微信

  1. producer constructor
  2. consumer constructor

將雙方的工廠函數傳遞給 queue ,由它去執行以及重試。架構

這兩個須要的目的是將生產者/消費者的構建和消息生產/消費都封裝在 mq 中,並且將生產者/消費者的整套邏輯交給開發者處理:函數

type (
    // 開發者須要實現此接口
    Producer interface {
        AddListener(listener ProduceListener)
        Produce() (string, bool)
    }
    ...
    // ProducerFactory定義了生成Producer的方法
    ProducerFactory func() (Producer, error)
)
  1. 其實也就是將生產者的邏輯交個開發者本身完成,mq 只負責生產者/消費者的消息傳遞和之間的調度。
  2. 工廠方法的設計,是將生產者自己和生產消息,這兩個任務都交給 queue 本身來作調度或者重試。

生產msg

生產消息固然要回到生產者自己:微服務

type mockedProducer struct {
    total int32
    count int32
  // 使用waitgroup來模擬任務的完成
    wait  sync.WaitGroup
}
// 實現 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
    if atomic.AddInt32(&p.count, 1) <= p.total {
        p.wait.Done()
        return "item", true
    }
    time.Sleep(time.Second)
    return "", false
}

queue 中的生產者編寫都必須實現:atom

  • Produce():由開發者編寫生產消息的邏輯
  • AddListener():生產者

消費msg

和生產者相似:spa

type mockedConsumer struct {
    count  int32
}

func (c *mockedConsumer) Consume(string) error {
    atomic.AddInt32(&c.count, 1)
    return nil
}

啓動 queue

啓動,而後驗證咱們上述的生產者和消費者之間的數據是否傳輸成功:設計

func TestQueue(t *testing.T) {
    producer := newMockedProducer(rounds)
    consumer := newMockedConsumer()
    // 建立 queue
    q := NewQueue(func() (Producer, error) {
        return producer, nil
    }, func() (Consumer, error) {
        return consumer, nil
    })
    // 當生產者生產完畢,執行 Stop() 關閉生產端生產
    go func() {
        producer.wait.Wait()
    // mq生產端中止生產,不是mq自己 Stop 運行
        q.Stop()
    }()
    // 啓動
    q.Start()
    // 驗證生產消費端是否消息消費完成
    assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}

以上就是 queue 最簡易的入門使用代碼。開發者能夠根據本身的業務實際狀況:自由定義生產者/消費者已經生產/消費邏輯。

總體設計

image-20210506224102836

總體流程如上圖:

  1. 全體的通訊都由 channel 進行
  2. 經過加入監聽器 listener ,以及事件觸發 event ,至關於將觸發器邏輯分離出來
  3. 生產者有 produceone ,這個是生產消息的邏輯,可是其中的 Produce() 是由開發者編寫【上面的 interface 中正是這個函數】
  4. 同理消費者,Consume()

基本的消息流動就入上圖以及上述描寫的,具體的代碼分析咱們就留到下一篇,咱們😁分析裏面,尤爲是如何控制 channel 是整個設計的核心。

總結

本篇文章從使用以及整個架構分析上簡略介紹了 queue 的設計。下篇咱們將深刻源碼,分析內部消息流轉以及 channel 控制。

關於 go-zero 更多的設計和實現文章,能夠持續關注咱們。歡迎你們去關注和使用。

項目地址

https://github.com/tal-tech/go-zero

歡迎使用 go-zero 並 star 支持咱們!

微信交流羣

關注『微服務實踐』公衆號並回復 進羣 獲取社區羣二維碼。

go-zero 系列文章見『微服務實踐』公衆號
相關文章
相關標籤/搜索