如何讓消息隊列達到最大吞吐量?

你在使用消息隊列的時候關注過吞吐量嗎?git

思考過吞吐量的影響因素嗎?github

考慮過怎麼提升嗎?redis

總結過最佳實踐嗎?微信

本文帶你一塊兒探討下消息隊列消費端高吞吐的 Go 框架實現。Let’s go!網絡

關於吞吐量的一些思考

  • 寫入消息隊列吞吐量取決於如下兩個方面框架

    • 網絡帶寬
    • 消息隊列(好比Kafka)寫入速度

    最佳吞吐量是讓其中之一打滿,而通常狀況下內網帶寬都會很是高,不太可能被打滿,因此天然就是講消息隊列的寫入速度打滿,這就就有兩個點須要平衡函數

    • 批量寫入的消息量大小或者字節數多少
    • 延遲多久寫入

    go-zero 的 PeriodicalExecutorChunkExecutor 就是爲了這種狀況設計的微服務

  • 從消息隊列裏消費消息的吞吐量取決於如下兩個方面ui

    • 消息隊列的讀取速度,通常狀況下消息隊列自己的讀取速度相比於處理消息的速度都是足夠快的
    • 處理速度,這個依賴於業務

    這裏有個核心問題是不能不考慮業務處理速度,而讀取過多的消息到內存裏,不然可能會引發兩個問題:atom

    • 內存佔用太高,甚至出現OOM,pod 也是有 memory limit
    • 中止 pod 時堆積的消息來不及處理而致使消息丟失

解決方案和實現

借用一下 Rob Pike 的一張圖,這個跟隊列消費殊途同歸。左邊4個 gopher 從隊列裏取,右邊4個 gopher 接過去處理。比較理想的結果是左邊和右邊速率基本一致,沒有誰浪費,沒有誰等待,中間交換處也沒有堆積。

咱們來看看 go-zero 是怎麼實現的:

  • Producer
for {
        select {
        case <-q.quit:
            logx.Info("Quitting producer")
            return
        default:
            if v, ok := q.produceOne(producer); ok {
                q.channel <- v
            }
        }
    }

沒有退出事件就會經過 produceOne 去讀取一個消息,成功後寫入 channel。利用 chan 就能夠很好的解決讀取和消費的銜接問題。

  • Consumer
for {
        select {
        case message, ok := <-q.channel:
            if ok {
                q.consumeOne(consumer, message)
            } else {
                logx.Info("Task channel was closed, quitting consumer...")
                return
            }
        case event := <-eventChan:
            consumer.OnEvent(event)
        }
    }

這裏若是拿到消息就去處理,當 okfalse 的時候表示 channel 已被關閉,能夠退出整個處理循環了。同時咱們還在 redis queue 上支持了 pause/resume,咱們原來在社交場景裏大量使用這樣的隊列,能夠通知 consumer 暫停和繼續。

  • 啓動 queue,有了這些咱們就能夠經過控制 producer/consumer 的數量來達到吞吐量的調優了
func (q *Queue) Start() {
    q.startProducers(q.producerCount)
    q.startConsumers(q.consumerCount)

    q.producerRoutineGroup.Wait()
    close(q.channel)
    q.consumerRoutineGroup.Wait()
}

這裏須要注意的是,先要停掉 producer,再去等 consumer 處理完。

到這裏核心控制代碼基本就講完了,其實看起來仍是挺簡單的,也能夠到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整實現。

如何使用

基本的使用流程:

  1. 建立 producerconsumer
  2. 啓動 queue
  3. 生產消息 / 消費消息

對應到 queue 中,大體以下:

建立 queue

// 生產者建立工廠
producer := newMockedProducer()
// 消費者建立工廠
consumer := newMockedConsumer()
// 將生產者以及消費者的建立工廠函數傳遞給 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

咱們看看 NewQueue 須要什麼參數:

  1. producer 工廠方法
  2. consumer 工廠方法

producer & consumer 的工廠函數傳遞 queue ,由它去負責建立。框架提供了 ProducerConsumer 的接口以及工廠方法定義,而後整個流程的控制 queue 實現會自動完成。

生產 message

咱們經過自定義一個 mockedProducer 來模擬:

type mockedProducer struct {
    total int32
    count int32
  // 使用waitgroup來模擬任務的完成
    wait  sync.WaitGroup
}
// 實現 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
    if atomic.AddInt32(&p.count, 1) <= p.total {
        p.wait.Done()
        return "item", true
    }
    time.Sleep(time.Second)
    return "", false
}

queue 中的生產者編寫都必須實現:

  • Produce():由開發者編寫生產消息的邏輯
  • AddListener():添加事件 listener

消費 message

咱們經過自定義一個 mockedConsumer 來模擬:

type mockedConsumer struct {
    count  int32
}

func (c *mockedConsumer) Consume(string) error {
    atomic.AddInt32(&c.count, 1)
    return nil
}

啓動 queue

啓動,而後驗證咱們上述的生產者和消費者之間的數據是否傳輸成功:

func main() {
    // 建立 queue
    q := NewQueue(func() (Producer, error) {
        return newMockedProducer(), nil
    }, func() (Consumer, error) {
        return newMockedConsumer(), nil
    })
  // 啓動panic了也能夠確保stop被執行以清理資源
  defer q.Stop()
    // 啓動
    q.Start()
}

以上就是 queue 最簡易的實現示例。咱們經過這個 core/queue 框架實現了基於 rediskafka 等的消息隊列服務,在不一樣業務場景中通過了充分的實踐檢驗。你也能夠根據本身的業務實際狀況,實現本身的消息隊列服務。

總體設計

總體流程如上圖:

  1. 全體的通訊都由 channel 進行
  2. ProducerConsumer 的數量能夠設定以匹配不一樣業務需求
  3. ProduceConsume 具體實現由開發者定義,queue 負責總體流程

總結

本篇文章講解了如何經過 channel 來平衡從隊列中讀取和處理消息的速度,以及如何實現一個通用的消息隊列處理框架,並經過 mock 示例簡單展現瞭如何基於 core/queue 實現一個消息隊列處理服務。你能夠經過相似的方式實現一個基於 rocketmq 等的消息隊列處理服務。

關於 go-zero 更多的設計和實現文章,能夠關注『微服務實踐』公衆號。

項目地址

https://github.com/tal-tech/go-zero

歡迎使用 go-zero 並 star 支持咱們!

微信交流羣

關注『微服務實踐』公衆號並點擊 進羣 獲取社區羣二維碼。

go-zero 系列文章見『微服務實踐』公衆號
相關文章
相關標籤/搜索