對於很是健壯穩定的後臺系統,咱們必須得考慮到各類宕機的狀況:物理宕機,應用自身出錯崩潰等,而這個時候咱們的應用須要作到重啓後數據依舊不丟失,這個問題就是數據持久化,也就是說數據持久化到了磁盤。
在RabbitMQ中,若是要保證消息發送到broker
,咱們首先須要作到三點php
exchange
(交換器):聲明時開啓durable
選項queue
(隊列):聲明時開啓durable
選項message
:delivery_mode
設置爲2(php,python之類的庫,2能夠換成更友好的常量),在node的amqp.node
庫中是設置persistent
爲true
須要注意的一點是,持久化會形成性能損耗(寫磁盤操做),但爲了保證生產環境的數據一致性,咱們必須這麼作。html
其實光光作到以上三點,數據依舊有丟失的可能,由於在客戶端成功調用api存入消息以後,RabbitMQ還須要一段時間(很短,但不可忽略)才能落盤,RabbitMQ並非爲每條消息都作fsync的處理,可能僅僅保存到cache中而不是物理磁盤上,而在這段時間內RabbitMQ broker發生crash, 消息保存到cache可是還沒來得及落盤,那麼這些消息將會丟失。
爲了解決以上問題,咱們須要使用RabbitMQ的生產者確認模式。
爲了開啓確認模式,須要生產者將channel設置成confirm模式,一旦channel進入confirm模式,全部在該信道上面發佈的消息都將會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,broker就會發送一個確認給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會在將消息寫入磁盤以後發出,broker回傳給生產者的確認消息中delivery-tag
域包含了確認消息的序列號。node
confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息 (來自參考1)
示例代碼使用NodeJS實現,RabbitMQ服務可使用上一篇RabbitMQ二三事的docker-compose.yml
快速啓動python
const QUEUE_NAME = 'test_queue' const config = require("./config") const amqp = require('amqplib') async function getMQConnection() { return await amqp.connect({ protocol: 'amqp', hostname: config.MQ.host, port: config.MQ.port, username: config.MQ.user, password: config.MQ.pass, locale: 'en_US', frameMax: 0, heartbeat: 5, // 心跳 vhost: config.MQ.vhost, }) } async function run(rmqConn, msgArr) { try { const channel = await rmqConn.createConfirmChannel() // 開啓confirm const exchangeName = `${QUEUE_NAME}_exchange` await channel.assertExchange(exchangeName, 'direct', { durable: true, autoDelete: false }) // 不存在exchange就新建exchange await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) // 不存在queue就新建 await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) // 綁定交換器 // queue name當routing key msgArr.forEach(str => { channel.publish(exchangeName, QUEUE_NAME, Buffer.from(str), { persistent: true, mandatory: true }) }) await channel.waitForConfirms() console.log('發送批量數據成功') await channel.close() } catch(err) { // do something with err console.log('發送批量數據失敗:' + err.message) } } async function testSendBatchMsg() { const conn = await getMQConnection() await run(conn, [ 'cat', 'dog', 'pig', 'mouse', 'mouse', 'penguin' ]) await conn.close() } testSendBatchMsg()
assertExchange
和assertQueue
是保證交換器和隊列必定存在,這裏的exchange是簡單的direct
交換器ConfirmChannel#publish
方法不返回promise
docker
如今咱們須要考慮咱們的消費者了,消費者也會遇到程序出錯或者物理宕機問題,RabbitMQ官方也給出了一套解決方案,和confirm機制相似,就是ack機制(Message acknowledgment).
在ack機制中,消費者在本身處理完業務邏輯後,須要發送一個ack消息,而後broker才認爲這條消息被正確消費,而後從內存和磁盤中移除掉它,只要沒收到消費者的acknowledgment,broker就會一直保存着這條消息.若是一個消費者崩潰(斷開了鏈接)卻沒有發送ack,broker會理解爲這個消息沒有處理徹底,而後交給另外一個消費者去從新處理。在這樣的機制下,即便有一個消費者崩潰也不會丟失任何消息。segmentfault
const QUEUE_NAME = 'test_queue' const config = require("./config") const amqp = require('amqplib') async function getMQConnection() { return await amqp.connect({ protocol: 'amqp', hostname: config.MQ.host, port: config.MQ.port, username: config.MQ.user, password: config.MQ.pass, locale: 'en_US', frameMax: 0, heartbeat: 5, // 心跳 vhost: config.MQ.vhost, }) } async function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)) } async function start() { const mqConn = await getMQConnection() console.log('connecting RabbitMQ successfully!') const channel = await mqConn.createChannel() const exchangeName = `${QUEUE_NAME}_exchange` await channel.assertExchange(exchangeName, 'direct', { durable: true, autoDelete: false }) await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) channel.consume(QUEUE_NAME, async function(msg) { console.log("Received msg: %s from %s", QUEUE_NAME, msg.content.toString()) console.log('consuming message...') try { await sleep(500) // 模擬消費消息 console.log('consuming ends') channel.ack(msg) // 消費成功,發送ack } catch(e) { console.log('consuming failed: ' + e.message) channel.nack(msg) // 消費失敗,發送nack } }, {noAck: false}) // ack } start()
自動ack是默認打開的,也就是說消息發送到消費者的時候就被自動ack了,而不少狀況下,咱們想要手動ack,因此咱們須要顯式設置autoAsk=false
關閉這種機制(在示例中是noAck: false
)api
ack沒有任何超時限制;只有當消費者斷開時,broker
纔會從新投遞。即便處理一條消息會花費很長的時間。promise
amqp.node
這個庫提供了心跳檢測的功能(heartbeat
選項),可是沒有作自動重連的。
對於heartbeat
的值,RabbitMQ官網有說明網絡
Several years worth of feedback from the users and client library
maintainers suggest that values lower than 5 seconds are fairly likely
to cause false positives, and values of 1 second or lower are very
likely to do so. Values within the 5 to 20 seconds range are optimal
for most environments.
因此心跳不宜設置的過低(由於短暫的網絡擁塞或者流控制),過低容易致使誤報,根據經驗5s-20s是比較合理的。異步
參考文章: