上一篇文章裏,咱們瞭解瞭如何保證消息被可靠投遞到RabbitMQ的交換機中,但還有一些不完美的地方,試想一下,若是向RabbitMQ服務器發送一條消息,服務器確實也接收到了這條消息,因而給你返回了ACK確認消息,但服務器拿到這條消息一看,找不到路由它的隊列,因而就把它丟進了垃圾桶,emmm,我猜應該屬於可回收垃圾。java
若是你對上面的描述還不是很清楚,那我再用代碼來講明一次。服務器
在僅開啓了生產者確認機制的狀況下,交換機接收到消息後,會直接給消息生產者發送確認消息,若是發現該消息不可路由,那麼消息會被直接丟棄,此時,生產者是不知道消息被丟棄這個事件的。dom
咱們將上一篇中的交換機類型改成DirectExchange,這樣就只有當消息的 RoutingKey 和隊列綁定時設置的 Bindingkey (這裏即「key」)一致時,纔會真正將該消息進行路由。ide
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";
// 聲明業務 Exchange
@Bean("businessExchange")
public DirectExchange businessExchange(){
return new DirectExchange(BUSINESS_EXCHANGE_NAME);
}
// 聲明業務隊列
@Bean("businessQueue")
public Queue businessQueue(){
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
}
// 聲明業務隊列綁定關係
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key");
}
複製代碼
對消息生產者也稍做修改:函數
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息確認成功, id:{}", id);
} else {
log.error("消息未成功投遞, id:{}, cause:{}", id, s);
}
}
複製代碼
而後咱們調用該方法,發送兩條消息測試一下:學習
消息id:ba6bf502-9381-4220-8dc9-313d6a289a4e, msg:1
消息id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2, msg:1
消息確認成功, id:ba6bf502-9381-4220-8dc9-313d6a289a4e
消息確認成功, id:f0040a41-dc02-4e45-b8af-e3cfa8a118b2
收到業務消息:1
複製代碼
能夠看到,發送了兩條消息,第一條消息的 RoutingKey 爲 「key」,第二條消息的 RoutingKey 爲 「key2」,兩條消息都成功被交換機接收,也收到了交換機的確認回調,但消費者只收到了一條消息,由於第二條消息的 RoutingKey 與隊列的 BindingKey 不一致,也沒有其它隊列能接收這個消息,全部第二條消息被直接丟棄了。測試
那麼,如何讓消息被路由到隊列後再返回ACK呢?或者沒法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好本身處理啊。優化
別慌別慌,RabbitMQ裏有兩個機制恰好能夠解決咱們上面的疑問:ui
一、mandatory 參數 二、備份交換機this
設置 mandatory 參數能夠在當消息傳遞過程當中不可達目的地時將消息返回給生產者。
當把 mandotory 參數設置爲 true 時,若是交換機沒法將消息進行路由時,會將該消息返回給生產者,而若是該參數設置爲false,若是發現消息沒法進行路由,則直接丟棄。
那麼如何設置這個參數呢?在發送消息的時候,只須要在初始化方法添加一行代碼便可:
rabbitTemplate.setMandatory(true);
複製代碼
開啓以後咱們再從新運行前面的代碼:
消息id:19729f33-15c4-4c1b-8d48-044c301e2a8e, msg:1
消息id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb, msg:1
消息確認成功, id:19729f33-15c4-4c1b-8d48-044c301e2a8e
Returned message but no callback available
消息確認成功, id:4aea5c57-3e71-4a7b-8a00-1595d2b568eb
收到業務消息:1
複製代碼
咱們看到中間多了一行提示 Returned message but no callback available
這是什麼意思呢?
咱們上面提到,設置 mandatory 參數後,若是消息沒法被路由,則會返回給生產者,是經過回調的方式進行的,因此,生產者須要設置相應的回調函數才能接受該消息。
爲了進行回調,咱們須要實現一個接口 RabbitTemplate.ReturnCallback
。
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息確認成功, id:{}", id);
} else {
log.error("消息未成功投遞, id:{}, cause:{}", id, s);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息被服務器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
複製代碼
而後咱們再來從新運行一次:
消息id:2e5c336a-883a-474e-b40e-b6e3499088ef, msg:1
消息id:85c771cb-c88f-47dd-adea-f0da57138423, msg:1
消息確認成功, id:2e5c336a-883a-474e-b40e-b6e3499088ef
消息沒法被路由,被服務器退回。msg:1, replyCode:312. replyText:NO_ROUTE, exchange:rabbitmq.tx.demo.simple.business.exchange, routingKey :key2
消息確認成功, id:85c771cb-c88f-47dd-adea-f0da57138423
收到業務消息:1
複製代碼
能夠看到,咱們接收到了被退回的消息,並帶上了消息被退回的緣由:NO_ROUTE
。可是要注意的是, mandatory 參數僅僅是在當消息沒法被路由的時候,讓生產者能夠感知到這一點,只要開啓了生產者確認機制,不管是否設置了 mandatory 參數,都會在交換機接收到消息時進行消息確認回調,並且一般消息的退回回調會在消息的確認回調以前。
有了 mandatory 參數,咱們得到了對沒法投遞消息的感知能力,有機會在生產者的消息沒法被投遞時發現並處理。但有時候,咱們並不知道該如何處理這些沒法路由的消息,最多打個日誌,而後觸發報警,再來手動處理。而經過日誌來處理這些沒法路由的消息是很不優雅的作法,特別是當生產者所在的服務有多臺機器的時候,手動複製日誌會更加麻煩並且容易出錯。
並且設置 mandatory 參數會增長生產者的複雜性,須要添加處理這些被退回的消息的邏輯。若是既不想丟失消息,又不想增長生產者的複雜性,該怎麼作呢?
前面在設置死信隊列的文章中,咱們提到,能夠爲隊列設置死信交換機來存儲那些處理失敗的消息,但是這些不可路由消息根本沒有機會進入到隊列,所以沒法使用死信隊列來保存消息。
不要慌,在 RabbitMQ 中,有一種備份交換機的機制存在,能夠很好的應對這個問題。
什麼是備份交換機呢?備份交換機能夠理解爲 RabbitMQ 中交換機的「備胎」,當咱們爲某一個交換機聲明一個對應的備份交換機時,就是爲它建立一個備胎,當交換機接收到一條不可路由消息時,將會將這條消息轉發到備份交換機中,由備份交換機來進行轉發和處理,一般備份交換機的類型爲 Fanout ,這樣就能把全部消息都投遞到與其綁定的隊列中,而後咱們在備份交換機下綁定一個隊列,這樣全部那些原交換機沒法被路由的消息,就會都進入這個隊列了。固然,咱們還能夠創建一個報警隊列,用獨立的消費者來進行監測和報警。
聽的不太明白?不要緊,看個圖就知道是怎麼回事了。
(emmm,調整了一下配色,感受仍是很醜- - 。急需一個UI來拯救我。)
接下來,咱們就來設置一下備份交換機:
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.backup.test.exchange";
public static final String BUSINESS_QUEUE_NAME = "rabbitmq.backup.test.queue";
public static final String BUSINESS_BACKUP_EXCHANGE_NAME = "rabbitmq.backup.test.backup-exchange";
public static final String BUSINESS_BACKUP_QUEUE_NAME = "rabbitmq.backup.test.backup-queue";
public static final String BUSINESS_BACKUP_WARNING_QUEUE_NAME = "rabbitmq.backup.test.backup-warning-queue";
// 聲明業務 Exchange
@Bean("businessExchange")
public DirectExchange businessExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(BUSINESS_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
// 聲明備份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange(BUSINESS_BACKUP_EXCHANGE_NAME)
.durable(true);
return (FanoutExchange)exchangeBuilder.build();
}
// 聲明業務隊列
@Bean("businessQueue")
public Queue businessQueue(){
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).build();
}
// 聲明業務隊列綁定關係
@Bean
public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key");
}
// 聲明備份隊列
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BUSINESS_BACKUP_QUEUE_NAME).build();
}
// 聲明報警隊列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(BUSINESS_BACKUP_WARNING_QUEUE_NAME).build();
}
// 聲明備份隊列綁定關係
@Bean
public Binding backupBinding(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 聲明備份報警隊列綁定關係
@Bean
public Binding backupWarningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
複製代碼
這裏咱們使用 ExchangeBuilder
來建立交換機,併爲其設置備份交換機:
.withArgument("alternate-exchange", BUSINESS_BACKUP_EXCHANGE_NAME);
複製代碼
爲業務交換機綁定了一個隊列,爲備份交換機綁定了兩個隊列,一個用來存儲不可投遞消息,待以後人工處理,一個專門用來作報警用途。
接下來,分別爲業務交換機和備份交換機建立消費者:
@Slf4j
@Component
public class BusinessMsgConsumer {
@RabbitListener(queues = BUSINESS_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到業務消息:{}", msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
複製代碼
@Slf4j
@Component
public class BusinessWaringConsumer {
@RabbitListener(queues = BUSINESS_BACKUP_WARNING_QUEUE_NAME)
public void receiveMsg(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.error("發現不可路由消息:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
複製代碼
接下來咱們分別發送一條可路由消息和不可路由消息:
@Slf4j
@Component
public class BusinessMsgProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
}
複製代碼
消息以下:
消息id:5c3a33c9-0764-4d1f-bf6a-a00d771dccb4, msg:1
消息id:42ac8c35-1d0a-4413-a1df-c26a85435354, msg:1
收到業務消息:1
發現不可路由消息:1
複製代碼
這裏僅僅使用 error 日誌配合日誌系統進行報警,若是是敏感數據,可使用郵件、釘釘、短信、電話等報警方式來提升時效性。
那麼問題來了,mandatory 參數與備份交換機能夠一塊兒使用嗎?設置 mandatory 參數會讓交換機將不可路由消息退回給生產者,而備份交換機會讓交換機將不可路由消息轉發給它,那麼若是二者同時開啓,消息究竟何去何從??
emmm,想這麼多幹嗎,試試不就知道了。
修改一下生產者便可:
@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(this);
}
public void sendCustomMsg(String exchange, String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("消息id:{}, msg:{}", correlationData.getId(), msg);
rabbitTemplate.convertAndSend(exchange, "key2", msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if (b) {
log.info("消息確認成功, id:{}", id);
} else {
log.error("消息未成功投遞, id:{}, cause:{}", id, s);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息被服務器退回。msg:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
}
複製代碼
再來測試一下:
消息id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4, msg:1
消息id:d8c9e010-e120-46da-a42e-1ba21026ff06, msg:1
消息確認成功, id:0a3eca1e-d937-418c-a7ce-bfb8ce25fdd4
消息確認成功, id:d8c9e010-e120-46da-a42e-1ba21026ff06
發現不可路由消息:1
收到業務消息:1
複製代碼
能夠看到,兩條消息均可以收到確認成功回調,可是不可路由消息不會被回退給生產者,而是直接轉發給備份交換機。可見備份交換機的處理優先級更高。
上一篇中,咱們介紹了事務機制和生產者確認機制來確保消息的可靠投遞,相對而言,生產者確認機制更加高效和靈活。本篇中,咱們介紹了另外兩種確保生產者的消息不丟失的機制,即經過 mandatory 參數和備份交換機來處理不可路由消息。
經過以上幾種機制,咱們總算是能夠確保消息被萬無一失的投遞到目的地了。到此,咱們的消息可靠投遞也就告一段落了。消息可靠投遞是咱們使用MQ時沒法逃避的話題,一次性搞定它,就不會再爲其所困。總的來講,方法總比問題多,但若是你不知道這些方法,那麼當問題來臨時,也許就會不知所措了。
相信經過這幾篇關於 RabbitMQ 文章的學習,對於 RabbitMQ 的理解已經突破天際,那還在等什麼,趕忙把接入 RabbitMQ 的項目好好優化一下吧,相信如今你就不會再被那些不知所云的配置和代碼所迷惑了。
到此爲止,本篇就完美落幕了,但願能給你帶來一些啓發,也歡迎關注個人公衆號進行留言交流。