在鏈接第三方組件的時候實現斷線重連機制是頗有必要的事情,由於你不知道在啥時候他忽然就抽風掛掉了。以rabbitmq爲例,在忽然面對大流量寫入,或者鏈接數被打滿(好比在資訊,直播等模塊使用rabbitmq(慘痛的教訓))mq就掛掉了,若是你的消費者沒有重連機制,你的消費者進程在mq掛掉後,也會自動掛掉。而後等運維修復了mq,可是你的消費者進程卻沒法再次鏈接消費了,這就有點恐怖了。要不就寫個shell 腳本 ,每隔30S或者1分鐘去檢測進程活度,掛掉了就重啓消費者。固然最簡單的仍是在消費者的代碼裏實現斷線重連機制。代碼以下git
package rabbitmq import ( "encoding/json" "fmt" "github.com/streadway/amqp" "time" ) //測試用例 type Obj struct { Item1 string `json:"item1"` Item2 string `json:"item2"` Item3 string `json:"item3"` } func StartAMQPConsume() { defer func() { if err := recover(); err != nil { time.Sleep(3 * time.Second) fmt.Println("休息3秒") StartAMQPConsume() } }() conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/") //mq鏈接地址 if err != nil { fmt.Println(err) } defer conn.Close() ch, err := conn.Channel() if err != nil { fmt.Println(err) } defer ch.Close() closeChan := make(chan *amqp.Error, 1) notifyClose := ch.NotifyClose(closeChan) //一旦消費者的channel有錯誤,產生一個amqp.Error,channel監聽並捕捉到這個錯誤 closeFlag := false msgs, err := ch.Consume( "xly.test.queue", "", true, false, false, false, nil) var obj Obj for { select { case e := <-notifyClose: fmt.Println("chan通道錯誤,e:%s", e.Error()) close(closeChan) time.Sleep(5 * time.Second) StartAMQPConsume() closeFlag = true case msg := <-msgs: //fmt.Println() if err := json.Unmarshal(msg.Body, &obj); err != nil { fmt.Println(err.Error()) } fmt.Println(obj.Item1) } if closeFlag { break } } }
只須要在main 方法裏 調用這個方法便可,或者go協程調用 ,效果以下github
當我關閉mq服務時會有以下效果shell
再次啓動mqjson
以上就是rabbitmq 消費者斷線重連的完整代碼運維