在咱們的生產環境中搭了兩臺rabbitmq, 前面架設了一臺HAProxy作負載均衡,當咱們的客戶端鏈接到HAProxy,而後由HAProxy負責將連接分配給其中一臺rabbitmq,客戶端須要須要負責斷線重連,須要將獲取的數據,分配消息給相應的處理方法,而後還須要回覆給rabbitmq ACK,這其中客戶端須要負責斷線重連的邏輯是很重要的,由於有可能客戶端和HAProxy的鏈接是正常的,可是HAProxy和rabbitmq的連接由於網絡波動斷開了,那麼這個時候客戶端實際上是沒有工做的,而且會在rabbitmq中不斷積累消息。mysql
下面的內容給出了一個比較完善的處理邏輯,以供參考。git
從以前的說明來看,這是一個典型的觀察者模式,由RabbitMQ對象負責維護鏈接,獲取消息,而後定義若干個接收者註冊到RabbitMQ對象中,這時候RabbitMQ對象一旦收到了由RabbitMQ發來的數據,就能夠將該消息分發到相應的接收者去處理,當接收者處理完成後告訴RabbitMQ對象消息消費成功,而後由RabbitMQ對象回覆RabbitMQ ACK,固然能夠在其中加上重試機制,接收者有可能由於某種狀況處理失敗,那麼每隔必定的時間RabbitMQ對象須要從新調用一次接收者從新處理,直至成功,而後再返回ACK。github
先來看看基本的接口約定redis
// Receiver 觀察者模式須要的接口 // 觀察者用於接收指定的queue到來的數據 type Receiver interface { QueueName() string // 獲取接收者須要監聽的隊列 RouterKey() string // 這個隊列綁定的路由 OnError(error) // 處理遇到的錯誤,當RabbitMQ對象發生了錯誤,他須要告訴接收者處理錯誤 OnReceive([]byte) bool // 處理收到的消息, 這裏須要告知RabbitMQ對象消息是否處理成功 }
這樣就將接收者和RabbitMQ對象之間就解耦了,這樣後期若是須要添加新的接收者那就很容易了。sql
下面來看一看RabbitMQ對象的定義:
這裏用到的RabbitMQ client是RabbitMQ官方的 Github數據庫
// RabbitMQ 用於管理和維護rabbitmq的對象 type RabbitMQ struct { wg sync.WaitGroup channel *amqp.Channel exchangeName string // exchange的名稱 exchangeType string // exchange的類型 receivers []Receiver } // New 建立一個新的操做RabbitMQ的對象 func New() *RabbitMQ { // 這裏能夠根據本身的須要去定義 return &RabbitMQ{ exchangeName: ExchangeName, exchangeType: ExchangeType, } }
這裏RabbitMQ對象須要初始化交換機,註冊接收者並初始化接收者監聽的Queue,以及斷線重連的機制網絡
// prepareExchange 準備rabbitmq的Exchange func (mq *RabbitMQ) prepareExchange() error { // 申明Exchange err := mq.channel.ExchangeDeclare( mq.exchangeName, // exchange mq.exchangeType, // type true, // durable false, // autoDelete false, // internal false, // noWait nil, // args ) if nil != err { return err } return nil } // run 開始獲取鏈接並初始化相關操做 func (mq *RabbitMQ) run() { if !config.Global.RabbitMQ.Refresh() { log.Errorf("rabbit刷新鏈接失敗,將要重連: %s", config.Global.RabbitMQ.URL) return } // 獲取新的channel對象 mq.channel = config.Global.RabbitMQ.Channel() // 初始化Exchange mq.prepareExchange() for _, receiver := range mq.receivers { mq.wg.Add(1) go mq.listen(receiver) // 每一個接收者單獨啓動一個goroutine用來初始化queue並接收消息 } mq.wg.Wait() log.Errorf("全部處理queue的任務都意外退出了") // 理論上mq.run()在程序的執行過程當中是不會結束的 // 一旦結束就說明全部的接收者都退出了,那麼意味着程序與rabbitmq的鏈接斷開 // 那麼則須要從新鏈接,這裏嘗試銷燬當前鏈接 config.Global.RabbitMQ.Distory() } // Start 啓動Rabbitmq的客戶端 func (mq *RabbitMQ) Start() { for { mq.run() // 一旦鏈接斷開,那麼須要隔一段時間去重連 // 這裏最好有一個時間間隔 time.Sleep(3 * time.Second) } }
// RegisterReceiver 註冊一個用於接收指定隊列指定路由的數據接收者 func (mq *RabbitMQ) RegisterReceiver(receiver Receiver) { mq.receivers = append(mq.receivers, receiver) } // Listen 監聽指定路由發來的消息 // 這裏須要針對每個接收者啓動一個goroutine來執行listen // 該方法負責從每個接收者監聽的隊列中獲取數據,並負責重試 func (mq *RabbitMQ) listen(receiver Receiver) { defer mq.wg.Done() // 這裏獲取每一個接收者須要監聽的隊列和路由 queueName := receiver.QueueName() routerKey := receiver.RouterKey() // 申明Queue _, err := mq.channel.QueueDeclare( queueName, // name true, // durable false, // delete when usused false, // exclusive(排他性隊列) false, // no-wait nil, // arguments ) if nil != err { // 當隊列初始化失敗的時候,須要告訴這個接收者相應的錯誤 receiver.OnError(fmt.Errorf("初始化隊列 %s 失敗: %s", queueName, err.Error())) } // 將Queue綁定到Exchange上去 err = mq.channel.QueueBind( queueName, // queue name routerKey, // routing key mq.exchangeName, // exchange false, // no-wait nil, ) if nil != err { receiver.OnError(fmt.Errorf("綁定隊列 [%s - %s] 到交換機失敗: %s", queueName, routerKey, err.Error())) } // 獲取消費通道 mq.channel.Qos(1, 0, true) // 確保rabbitmq會一個一個發消息 msgs, err := mq.channel.Consume( queueName, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if nil != err { receiver.OnError(fmt.Errorf("獲取隊列 %s 的消費通道失敗: %s", queueName, err.Error())) } // 使用callback消費數據 for msg := range msgs { // 當接收者消息處理失敗的時候, // 好比網絡問題致使的數據庫鏈接失敗,redis鏈接失敗等等這種 // 經過重試能夠成功的操做,那麼這個時候是須要重試的 // 直到數據處理成功後再返回,而後纔會回覆rabbitmq ack for !receiver.OnReceive(msg.Body) { log.Warnf("receiver 數據處理失敗,將要重試") time.Sleep(1 * time.Second) } // 確認收到本條消息, multiple必須爲false msg.Ack(false) } }
接收者的邏輯這裏就不寫的,只要根據實際的業務邏輯並實現了接口就能夠了,這個比較容易。app
獲取RabbitMQ的鏈接負載均衡
rabbitmqConn, err = amqp.Dial(url) if err != nil { panic("RabbitMQ 初始化失敗: " + err.Error()) } rabbitmqChannel, err = rabbitmqConn.Channel() if err != nil { panic("打開Channel失敗: " + err.Error()) }
// 啓動並開始處理數據 func main() { // 假設這裏有一個AReceiver和BReceiver aReceiver := NewAReceiver() bReceiver := NewBReceiver() mq := rabbitmq.New() // 將這個接收者註冊到 mq.RegisterReceiver(aReceiver) mq.RegisterReceiver(bReceiver) mq.Start() }
舉一個咱們本身用於生產環境的例子:框架
咱們主要是用於接收Mysql的變動,並增量更新Elasticsearch的索引,負責數據庫變動監聽的服務用的是Canel,它假裝成一個mysql slave,用於接收mysql binlog的變動通知,而後將變動的數據格式化後寫入RabbitMQ,而後由go實現的消費者去訂閱數據庫的變動通知。
因爲客戶端並不關心表中哪些字段發生了變化,只須要知道數據庫指定的表有變動,那麼就將這次變動寫入Elasticsearch,這個邏輯對於每一張監聽的表都是同樣的,那麼這樣咱們就能夠將須要監聽表變動的操做徹底配置化,我只要再配置文件中指定一個接收者並指定待消費的隊列,而後就能夠由程序自動生成若干的接收者而且依次註冊進RabbitMQ對象中,這樣咱們只須要針對一些特殊的操做寫相應地代碼便可,這樣大大簡化了咱們地工做量,來看一看配置文件:
[[autoReceivers]] receiverName = "article_receiver" database = "blog" tableName = "articles" primaryKey = "articleId" queueName = "articles_queue" routerKey = "blog.articles.*" esIndex = "articles_idx" [[autoReceivers]] receiverName = "comment_receiver" database = "blog" tableName = "comments" primaryKey = "commentId" queueName = "comments_queue" routerKey = "blog.comments.*" esIndex = "comments_idx"
這個時候就須要調整一下接收者地註冊函數了:
// WalkReceivers 使用callback遍歷處理全部的接收者 // 這裏地callback就是上面提到地 mq.RegisterReceiver func WalkReceivers(callback func(rabbitmq.Receiver)) { successCount := 0 // 遍歷每個配置項,依次生成須要自動建立接收者 // 這裏的congfig是統一獲取配置地對象,你們根據實際狀況處理就能夠了 for _, receiverCfg := range config.Global.AutoReceivers { // 驗證每個接收者的合法性 err := receiverCfg.Validate() if err != nil { log.Criticalf("生成 %s 失敗: %s, 使用該配置: %+v", receiverCfg.ReceiverName, err.Error(), receiverCfg) continue } // 將接收者註冊到監聽rabbitmq的對象中 callback(NewAutoReceiver(receiverCfg)) log.Infof("生成 %s 成功使用該配置: %+v", receiverCfg.ReceiverName, receiverCfg) successCount++ } if successCount != len(config.Global.AutoReceivers) || successCount == 0 { panic("沒法啓動全部的接收者,請檢查配置") } // 若有必要,這裏能夠繼續添加須要手工建立的接收者 }
啓動地流程也須要進行微調一下:
func registeAndStart() { mq := rabbitmq.New() // 遍歷全部的receiver,將他們註冊到rabbitmq中去 WalkReceivers(mq.RegisterReceiver) log.Info("初始化全部的Receiver成功") mq.Start() }
這樣就定義好了兩個receiver,啓動程序後,就能夠接收到數據庫地變動並更新elasticsearch中地索引了,很是地方便。
這個是對平時工做地一點總結,但願能夠給你們帶來幫助,若是文中有紕漏之處,還望指正,這裏完整地代碼就不貼了,文章裏已經搭起了一個完整地框架了,剩下地就是業務邏輯了,若是有必要地化,我會整理成一個完整地項目放到github上。