這篇文章主要講 RabbitMQ 中 消費者 ack 以及 生產者 confirms。html
如上圖,生產者把消息發送到 RabbitMQ,而後 RabbitMQ 再把消息投遞到消費者。git
生產者和 RabbitMQ,以及 RabbitMQ 和消費者都是經過 TCP 鏈接,可是他們之間是經過信道(Channel)傳遞數據的。多個線程共享一個鏈接,可是每一個線程擁有獨自的信道。github
問題:怎麼保證 RabbitMQ 投遞的消息被成功投遞到了消費者?redis
RabbitMQ 投遞的消息,剛投遞一半,產生了網絡抖動,就有可能到不了消費者。spring
解決辦法:瀏覽器
RabbitMQ 對消費者說:「若是你成功接收到了消息,給我說確認收到了,否則我就當你沒有收到,我還會從新投遞」網絡
在 RabbitMQ 中,有兩種 acknowledgement 模式。app
這也稱做發後即忘模式。ide
在這種模式下,RabbitMQ 投遞了消息,在投遞成功以前,若是消費者的 TCP 鏈接 或者 channel 關閉了,這條消息就會丟失。spring-boot
會有丟失消息問題。
在這種模式下,RabbitMQ 投遞了消息,在投遞成功以前,若是消費者的 TCP 鏈接 或者 channel 關閉了,致使這條消息沒有被 acked,RabbitMQ 會自動把當前消息從新入隊,再次投遞。
會有重複投遞消息的問題,因此消費者得準備好處理重複消息的問題,就是所謂的:冪等性。
爲了啓用 手動 ack 模式,消費者須要實現 ChannelAwareMessageListener
接口。
@Component public class Consumer implements ChannelAwareMessageListener { @Autowired private MessageConverter messageConverter; @Override public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); // 表明投遞的標識符,惟一標識了當前信道上的投遞,經過 deliveryTag ,消費者就能夠告訴 RabbitMQ 確認收到了當前消息,見下面的方法 long deliveryTag = messageProperties.getDeliveryTag(); // 若是是重複投遞的消息,redelivered 爲 true Boolean redelivered = messageProperties.getRedelivered(); // 獲取生產者發送的原始消息 Object originalMessage = messageConverter.fromMessage(message); Console.log("consume message = {} , deliveryTag = {} , redelivered = {}" , originalMessage, deliveryTag, redelivered); // 表明消費者確認收到當前消息,第二個參數表示一次是否 ack 多條消息 channel.basicAck(deliveryTag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } }
channel.basicAck
表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。
channel.basicNack
表明消費者拒絕一條或者多條消息。basicNack 算是 basicReject 的一個擴展,由於 basicReject 不能一次拒絕多條消息。
channel.basicReject
表明消費者拒絕這條消息,語義上表示消費者沒有處理當前消息。
對於 basicNack 和 basicReject ,若是參數 boolean requeue
傳入 false
,消息仍是會從隊列裏面刪除。這三個方法只是語義上的不一樣。
deliveryTag
deliveryTag 是 64 bit long 值,從 1 開始,不停的遞增 1。不一樣的 channel 有獨立的 deliveryTag。好比有兩個消費者,你會發現,都是從 1 開始遞增,互不影響。
因爲上面建立的消費者,沒有指明監聽那個隊列,因此還須要建立一個 MessageListenerContainer
。
@Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, ChannelAwareMessageListener listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 指定消費者 container.setMessageListener(listener); // 指定監聽的隊列 container.setQueueNames(QUEUE_NAME); // 設置消費者的 ack 模式爲手動確認模式 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setPrefetchCount(300); return container; }
這樣就開啓了消費者手動 ack 模式。
若是開啓了消費者手動 ack 模式,可是又沒有調用手動確認方法(好比:channel.basicAck),那問題就大了,RabbitMQ 會在當前 channel 上一直阻塞,等待消費者 ack。
問題:怎麼保證生產者發送的消息被 RabbitMQ 成功接收?
生產者發送的消息,剛發送一半,產生了網絡抖動,就有可能到不了 RabbitMQ。
解決辦法:
生產者對 RabbitMQ 說:「若是你成功接收到了消息,給我說確認收到了,否則我就當你沒有收到」
/** * 自定義消息元數據 */ @NoArgsConstructor @Data public class RabbitMetaMessage implements Serializable{ /** * 是不是 returnCallback */ private boolean returnCallback; /** * 承載原始消息數據數據 */ private Object payload; public RabbitMetaMessage(Object payload) { this.payload = payload; } }
先把消息存儲到 redis,再發送到 rabbitmq
@RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; @Autowired private DefaultKeyGenerator keyGenerator; @GetMapping("/sendMessage") public Object sendMessage() { new Thread(() -> { HashOperations hashOperations = redisTemplate.opsForHash(); for (int i = 0; i < 1; i++) { String id = keyGenerator.generateKey() + ""; String value = "message " + i; RabbitMetaMessage rabbitMetaMessage = new RabbitMetaMessage(value); // 先把消息存儲到 redis hashOperations.put(RedisConfig.RETRY_KEY, id, rabbitMetaMessage); Console.log("send message = {}", value); // 再發送到 rabbitmq rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value, (message) -> { message.getMessageProperties().setMessageId(id); return message; }, new CorrelationData(id)); } }).start(); return "ok"; } }
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.238.132", 5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 設置 生產者 confirms connectionFactory.setPublisherConfirms(true); // 設置 生產者 Returns connectionFactory.setPublisherReturns(true); return connectionFactory; }
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 必須設置爲 true,否則當 發送到交換器成功,可是沒有匹配的隊列,不會觸發 ReturnCallback 回調 // 並且 ReturnCallback 比 ConfirmCallback 先回調,意思就是 ReturnCallback 執行完了纔會執行 ConfirmCallback rabbitTemplate.setMandatory(true); // 設置 ConfirmCallback 回調 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { Console.log("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause); // 若是發送到交換器都沒有成功(好比說刪除了交換器),ack 返回值爲 false // 若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack 返回值爲仍是 true (這是一個坑,須要注意) if (ack) { String messageId = correlationData.getId(); RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId); Console.log("rabbitMetaMessage = {}", rabbitMetaMessage); if (!rabbitMetaMessage.isReturnCallback()) { // 到這一步才能徹底保證消息成功發送到了 rabbitmq // 刪除 redis 裏面的消息 redisTemplate.opsForHash().delete(RedisConfig.RETRY_KEY, messageId); } } }); // 設置 ReturnCallback 回調 // 若是發送到交換器成功,可是沒有匹配的隊列,就會觸發這個回調 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { Console.log("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey); // 從 redis 取出消息,設置 returnCallback 設置爲 true String messageId = message.getMessageProperties().getMessageId(); RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId); rabbitMetaMessage.setReturnCallback(true); redisTemplate.opsForHash().put(RedisConfig.RETRY_KEY, messageId, rabbitMetaMessage); }); return rabbitTemplate; }
必須 rabbitTemplate.setMandatory(true)
,否則當 發送到交換器成功,可是沒有匹配的隊列,不會觸發 ReturnCallback 回調。並且 ReturnCallback 比 ConfirmCallback 先回調。
如何模擬 發送到交換器成功,可是沒有匹配的隊列,先把項目啓動,而後再把隊列解綁,再發送消息,就會觸發 ReturnCallback 回調,並且發現消息也丟失了,沒有到任何隊列。
這樣就解綁了。
運行項目,而後打開瀏覽器,輸入 http://localhost:9999/sendMessage
控制檯打出以下日誌
這樣就觸發了 ReturnCallback 回調 ,從 redis 取出消息,設置 returnCallback 設置爲 true。你會發現 ConfirmCallback 的 ack 返回值仍是 true。
這裏有個須要注意的地方,若是發送到交換器成功,可是沒有匹配的隊列(好比說取消了綁定),ack 返回值爲仍是 true (這是一個坑,須要注意,就像上面那種狀況!!!)。因此不能單靠這個來判斷消息真的發送成功了。這個時候會先觸發 ReturnCallback 回調,咱們把 returnCallback 設置爲 true,因此還得判斷 returnCallback 是否爲 true,若是爲 ture,表示消息發送不成功,false 才能徹底保證消息成功發送到了 rabbitmq。
如何模擬 ack 返回值爲 false,先把項目啓動,而後再把交換器刪除,就會發現 ConfirmCallback 的 ack 爲 false。
運行項目,而後打開瀏覽器,輸入 http://localhost:9999/sendMessage
控制檯打出以下日誌
你會發現 ConfirmCallback 的 ack 返回值纔是 false。
不能單單依靠 ConfirmCallback 的 ack 返回值爲 true,就判定當前消息發送成功了。
Consumer Acknowledgements and Publisher Confirms
因爲本人知識和能力有限,文中若有沒說清楚的地方,但願你們能在評論區指出,以幫助我將博文寫得更好。