golang實現rabbitmq消費者模式 斷線重連機制

在鏈接第三方組件的時候實現斷線重連機制是頗有必要的事情,由於你不知道在啥時候他忽然就抽風掛掉了。以rabbitmq爲例,在忽然面對大流量寫入,或者鏈接數被打滿(好比在資訊,直播等模塊使用rabbitmq(慘痛的教訓))mq就掛掉了,若是你的消費者沒有重連機制,你的消費者進程在mq掛掉後,也會自動掛掉。而後等運維修復了mq,可是你的消費者進程卻沒法再次鏈接消費了,這就有點恐怖了。要不就寫個shell 腳本 ,每隔30S或者1分鐘去檢測進程活度,掛掉了就重啓消費者。固然最簡單的仍是在消費者的代碼裏實現斷線重連機制。代碼以下git

package rabbitmq

import (
   "encoding/json"
   "fmt"
   "github.com/streadway/amqp"
   "time"
)

//測試用例
type Obj struct {
   Item1 string `json:"item1"`
   Item2 string `json:"item2"`
   Item3 string `json:"item3"`
}

func StartAMQPConsume() {
   defer func() {
      if err := recover(); err != nil {
         time.Sleep(3 * time.Second)
         fmt.Println("休息3秒")
         StartAMQPConsume()
      }
   }()
   conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/") //mq鏈接地址
   if err != nil {
      fmt.Println(err)
   }
   defer conn.Close()
   ch, err := conn.Channel()
   if err != nil {
      fmt.Println(err)
   }
   defer ch.Close()
   closeChan := make(chan *amqp.Error, 1)
   notifyClose := ch.NotifyClose(closeChan) //一旦消費者的channel有錯誤,產生一個amqp.Error,channel監聽並捕捉到這個錯誤
   closeFlag := false
   msgs, err := ch.Consume(
      "xly.test.queue",
      "",
      true,
      false,
      false,
      false, nil)
   var obj Obj
   for {
      select {
      case e := <-notifyClose:
         fmt.Println("chan通道錯誤,e:%s", e.Error())
         close(closeChan)
         time.Sleep(5 * time.Second)
         StartAMQPConsume()
         closeFlag = true
      case msg := <-msgs:
         //fmt.Println()
         if err := json.Unmarshal(msg.Body, &obj); err != nil {

            fmt.Println(err.Error())
         }
         fmt.Println(obj.Item1)

      }
      if closeFlag {
         break
      }
   }
}

只須要在main 方法裏 調用這個方法便可,或者go協程調用 ,效果以下github

當我關閉mq服務時會有以下效果shell

再次啓動mqjson

以上就是rabbitmq 消費者斷線重連的完整代碼運維

相關文章
相關標籤/搜索