使用go做爲RabbitMQ消費者的正確姿式

寫在前面

在咱們的生產環境中搭了兩臺rabbitmq, 前面架設了一臺HAProxy作負載均衡,當咱們的客戶端鏈接到HAProxy,而後由HAProxy負責將連接分配給其中一臺rabbitmq,客戶端須要須要負責斷線重連,須要將獲取的數據,分配消息給相應的處理方法,而後還須要回覆給rabbitmq ACK,這其中客戶端須要負責斷線重連的邏輯是很重要的,由於有可能客戶端和HAProxy的鏈接是正常的,可是HAProxy和rabbitmq的連接由於網絡波動斷開了,那麼這個時候客戶端實際上是沒有工做的,而且會在rabbitmq中不斷積累消息。mysql

下面的內容給出了一個比較完善的處理邏輯,以供參考。git

實戰

定義接口

從以前的說明來看,這是一個典型的觀察者模式,由RabbitMQ對象負責維護鏈接,獲取消息,而後定義若干個接收者註冊到RabbitMQ對象中,這時候RabbitMQ對象一旦收到了由RabbitMQ發來的數據,就能夠將該消息分發到相應的接收者去處理,當接收者處理完成後告訴RabbitMQ對象消息消費成功,而後由RabbitMQ對象回覆RabbitMQ ACK,固然能夠在其中加上重試機制,接收者有可能由於某種狀況處理失敗,那麼每隔必定的時間RabbitMQ對象須要從新調用一次接收者從新處理,直至成功,而後再返回ACK。github

先來看看基本的接口約定redis

// Receiver 觀察者模式須要的接口
// 觀察者用於接收指定的queue到來的數據
type Receiver interface {
    QueueName() string     // 獲取接收者須要監聽的隊列
    RouterKey() string     // 這個隊列綁定的路由
    OnError(error)         // 處理遇到的錯誤,當RabbitMQ對象發生了錯誤,他須要告訴接收者處理錯誤
    OnReceive([]byte) bool // 處理收到的消息, 這裏須要告知RabbitMQ對象消息是否處理成功
}

這樣就將接收者和RabbitMQ對象之間就解耦了,這樣後期若是須要添加新的接收者那就很容易了。sql

下面來看一看RabbitMQ對象的定義:
這裏用到的RabbitMQ client是RabbitMQ官方的 Github數據庫

// RabbitMQ 用於管理和維護rabbitmq的對象
type RabbitMQ struct {
    wg sync.WaitGroup

    channel      *amqp.Channel
    exchangeName string // exchange的名稱
    exchangeType string // exchange的類型
    receivers    []Receiver
}

// New 建立一個新的操做RabbitMQ的對象
func New() *RabbitMQ {
    // 這裏能夠根據本身的須要去定義
    return &RabbitMQ{
        exchangeName: ExchangeName,
        exchangeType: ExchangeType,
    }
}

RabbitMQ對象的初始化操做

這裏RabbitMQ對象須要初始化交換機,註冊接收者並初始化接收者監聽的Queue,以及斷線重連的機制網絡

// prepareExchange 準備rabbitmq的Exchange
func (mq *RabbitMQ) prepareExchange() error {
    // 申明Exchange
    err := mq.channel.ExchangeDeclare(
        mq.exchangeName, // exchange
        mq.exchangeType, // type
        true,            // durable
        false,           // autoDelete
        false,           // internal
        false,           // noWait
        nil,             // args
    )

    if nil != err {
        return err
    }

    return nil
}


// run 開始獲取鏈接並初始化相關操做
func (mq *RabbitMQ) run() {
    if !config.Global.RabbitMQ.Refresh() {
        log.Errorf("rabbit刷新鏈接失敗,將要重連: %s", config.Global.RabbitMQ.URL)
        return
    }

    // 獲取新的channel對象
    mq.channel = config.Global.RabbitMQ.Channel()

    // 初始化Exchange
    mq.prepareExchange()

    for _, receiver := range mq.receivers {
        mq.wg.Add(1)
        go mq.listen(receiver) // 每一個接收者單獨啓動一個goroutine用來初始化queue並接收消息
    }

    mq.wg.Wait()

    log.Errorf("全部處理queue的任務都意外退出了")

    // 理論上mq.run()在程序的執行過程當中是不會結束的
    // 一旦結束就說明全部的接收者都退出了,那麼意味着程序與rabbitmq的鏈接斷開
    // 那麼則須要從新鏈接,這裏嘗試銷燬當前鏈接
    config.Global.RabbitMQ.Distory()
}

// Start 啓動Rabbitmq的客戶端
func (mq *RabbitMQ) Start() {
    for {
        mq.run()
        
        // 一旦鏈接斷開,那麼須要隔一段時間去重連
        // 這裏最好有一個時間間隔
        time.Sleep(3 * time.Second)
    }
}

註冊接收者

// RegisterReceiver 註冊一個用於接收指定隊列指定路由的數據接收者
func (mq *RabbitMQ) RegisterReceiver(receiver Receiver) {
    mq.receivers = append(mq.receivers, receiver)
}

// Listen 監聽指定路由發來的消息
// 這裏須要針對每個接收者啓動一個goroutine來執行listen
// 該方法負責從每個接收者監聽的隊列中獲取數據,並負責重試
func (mq *RabbitMQ) listen(receiver Receiver) {
    defer mq.wg.Done()

    // 這裏獲取每一個接收者須要監聽的隊列和路由
    queueName := receiver.QueueName()
    routerKey := receiver.RouterKey()

    // 申明Queue
    _, err := mq.channel.QueueDeclare(
        queueName, // name
        true,      // durable
        false,     // delete when usused
        false,     // exclusive(排他性隊列)
        false,     // no-wait
        nil,       // arguments
    )
    if nil != err {
        // 當隊列初始化失敗的時候,須要告訴這個接收者相應的錯誤
        receiver.OnError(fmt.Errorf("初始化隊列 %s 失敗: %s", queueName, err.Error()))
    }

    // 將Queue綁定到Exchange上去
    err = mq.channel.QueueBind(
        queueName,       // queue name
        routerKey,       // routing key
        mq.exchangeName, // exchange
        false,           // no-wait
        nil,
    )
    if nil != err {
        receiver.OnError(fmt.Errorf("綁定隊列 [%s - %s] 到交換機失敗: %s", queueName, routerKey, err.Error()))
    }

    // 獲取消費通道
    mq.channel.Qos(1, 0, true) // 確保rabbitmq會一個一個發消息
    msgs, err := mq.channel.Consume(
        queueName, // queue
        "",        // consumer
        false,     // auto-ack
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
    if nil != err {
        receiver.OnError(fmt.Errorf("獲取隊列 %s 的消費通道失敗: %s", queueName, err.Error()))
    }

    // 使用callback消費數據
    for msg := range msgs {
        // 當接收者消息處理失敗的時候,
        // 好比網絡問題致使的數據庫鏈接失敗,redis鏈接失敗等等這種
        // 經過重試能夠成功的操做,那麼這個時候是須要重試的
        // 直到數據處理成功後再返回,而後纔會回覆rabbitmq ack
        for !receiver.OnReceive(msg.Body) {
            log.Warnf("receiver 數據處理失敗,將要重試")
            time.Sleep(1 * time.Second)
        }

        // 確認收到本條消息, multiple必須爲false
        msg.Ack(false)
    }
}

整合到一塊兒

接收者的邏輯這裏就不寫的,只要根據實際的業務邏輯並實現了接口就能夠了,這個比較容易。app

獲取RabbitMQ的鏈接負載均衡

rabbitmqConn, err = amqp.Dial(url)
if err != nil {
    panic("RabbitMQ 初始化失敗: " + err.Error())
}
rabbitmqChannel, err = rabbitmqConn.Channel()
if err != nil {
    panic("打開Channel失敗: " + err.Error())
}
// 啓動並開始處理數據    
func main() {

    // 假設這裏有一個AReceiver和BReceiver
    aReceiver := NewAReceiver()
    bReceiver := NewBReceiver()
    
    mq := rabbitmq.New()
    // 將這個接收者註冊到
    mq.RegisterReceiver(aReceiver)
    mq.RegisterReceiver(bReceiver)
    mq.Start()
}

應用場景

舉一個咱們本身用於生產環境的例子:框架

咱們主要是用於接收Mysql的變動,並增量更新Elasticsearch的索引,負責數據庫變動監聽的服務用的是Canel,它假裝成一個mysql slave,用於接收mysql binlog的變動通知,而後將變動的數據格式化後寫入RabbitMQ,而後由go實現的消費者去訂閱數據庫的變動通知。

因爲客戶端並不關心表中哪些字段發生了變化,只須要知道數據庫指定的表有變動,那麼就將這次變動寫入Elasticsearch,這個邏輯對於每一張監聽的表都是同樣的,那麼這樣咱們就能夠將須要監聽表變動的操做徹底配置化,我只要再配置文件中指定一個接收者並指定待消費的隊列,而後就能夠由程序自動生成若干的接收者而且依次註冊進RabbitMQ對象中,這樣咱們只須要針對一些特殊的操做寫相應地代碼便可,這樣大大簡化了咱們地工做量,來看一看配置文件:

[[autoReceivers]]
    receiverName  = "article_receiver"
    database     = "blog"
    tableName    = "articles"
    primaryKey   = "articleId"
    queueName    = "articles_queue"
    routerKey    = "blog.articles.*"
    esIndex      = "articles_idx"

[[autoReceivers]]
    receiverName  = "comment_receiver"
    database     = "blog"
    tableName    = "comments"
    primaryKey   = "commentId"
    queueName    = "comments_queue"
    routerKey    = "blog.comments.*"
    esIndex      = "comments_idx"

這個時候就須要調整一下接收者地註冊函數了:

// WalkReceivers 使用callback遍歷處理全部的接收者
// 這裏地callback就是上面提到地 mq.RegisterReceiver
func WalkReceivers(callback func(rabbitmq.Receiver)) {
    successCount := 0

    // 遍歷每個配置項,依次生成須要自動建立接收者
    // 這裏的congfig是統一獲取配置地對象,你們根據實際狀況處理就能夠了
    for _, receiverCfg := range config.Global.AutoReceivers {
        // 驗證每個接收者的合法性
        err := receiverCfg.Validate()
        if err != nil {
            log.Criticalf("生成 %s 失敗: %s, 使用該配置: %+v", receiverCfg.ReceiverName, err.Error(), receiverCfg)
            continue
        }

        // 將接收者註冊到監聽rabbitmq的對象中
        callback(NewAutoReceiver(receiverCfg))

        log.Infof("生成 %s 成功使用該配置: %+v", receiverCfg.ReceiverName, receiverCfg)
        successCount++
    }

    if successCount != len(config.Global.AutoReceivers) || successCount == 0 {
        panic("沒法啓動全部的接收者,請檢查配置")
    }

    // 若有必要,這裏能夠繼續添加須要手工建立的接收者
}

啓動地流程也須要進行微調一下:

func registeAndStart() {
    mq := rabbitmq.New()

    // 遍歷全部的receiver,將他們註冊到rabbitmq中去
    WalkReceivers(mq.RegisterReceiver)
    log.Info("初始化全部的Receiver成功")

    mq.Start()
}

這樣就定義好了兩個receiver,啓動程序後,就能夠接收到數據庫地變動並更新elasticsearch中地索引了,很是地方便。

寫在最後

這個是對平時工做地一點總結,但願能夠給你們帶來幫助,若是文中有紕漏之處,還望指正,這裏完整地代碼就不貼了,文章裏已經搭起了一個完整地框架了,剩下地就是業務邏輯了,若是有必要地化,我會整理成一個完整地項目放到github上。

相關文章
相關標籤/搜索