一條消費成功被消費經歷了生產者->MQ->消費者,所以在這三個步驟中都有可能形成消息丟失。java
AMQP
協議提供了事務機制,在投遞消息時開啓事務支持,若是消息投遞失敗,則回滾事務。spring
自定義事務管理器緩存
@Configuration public class RabbitTranscation { @Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ return new RabbitTemplate(connectionFactory); } }
修改ymlide
spring: rabbitmq: # 消息在未被隊列收到的狀況下返回 publisher-returns: true
開啓事務支持性能
rabbitTemplate.setChannelTransacted(true);
消息未接收時調用ReturnCallbackfetch
rabbitTemplate.setMandatory(true);
生產者投遞消息this
@Service public class ProviderTranscation implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ // 設置channel開啓事務 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setReturnCallback(this); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("這條消息發送失敗了"+message+",請處理"); } @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager") public void publishMessage(String message) throws Exception { rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); } }
可是,不多有人這麼幹,由於這是同步操做,一條消息發送以後會使發送端阻塞,以等待RabbitMQ-Server的迴應,以後才能繼續發送下一條消息,生產者生產消息的吞吐量和性能都會大大下降。code
發送消息時將信道設置爲confirm
模式,消息進入該信道後,都會被指派給一個惟一ID,一旦消息被投遞到所匹配的隊列後,RabbitMQ
就會發送給生產者一個確認。rabbitmq
開啓消息確認機制隊列
spring: rabbitmq: # 消息在未被隊列收到的狀況下返回 publisher-returns: true # 開啓消息確認機制 publisher-confirm-type: correlated
消息未接收時調用ReturnCallback
rabbitTemplate.setMandatory(true);
生產者投遞消息
@Service public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("確認了這條消息:"+correlationData); }else{ System.out.println("確認失敗了:"+correlationData+";出現異常:"+cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("這條消息發送失敗了"+message+",請處理"); } public void publisMessage(String message){ rabbitTemplate.setMandatory(true); rabbitTemplate.convertAndSend("javatrip",message); } }
若是消息確認失敗後,咱們能夠進行消息補償,也就是消息的重試機制。當未收到確認信息時進行消息的從新投遞。設置以下配置便可完成。
spring: rabbitmq: # 支持消息發送失敗後重返隊列 publisher-returns: true # 開啓消息確認機制 publisher-confirm-type: correlated listener: simple: retry: # 開啓重試 enabled: true # 最大重試次數 max-attempts: 5 # 重試時間間隔 initial-interval: 3000
消息在MQ中有可能發生丟失,這時候咱們就須要將隊列和消息都進行持久化。
@Queue註解爲咱們提供了隊列相關的一些屬性,具體以下:
arguments:隊列的其餘屬性參數,有以下可選項,可參看圖2的arguments:
持久化隊列
建立隊列的時候將持久化屬性durable設置爲true,同時要將autoDelete設置爲false
@Queue(value = "javatrip",durable = "false",autoDelete = "false")
持久化消息
發送消息的時候將消息的deliveryMode設置爲2,在Spring Boot中消息默認就是持久化的。
消費者剛消費了消息,尚未處理業務,結果發生異常。這時候就須要關閉自動確認,改成手動確認消息。
修改yml爲手動簽收模式
spring: rabbitmq: listener: simple: # 手動簽收模式 acknowledge-mode: manual # 每次簽收一條消息 prefetch: 1
消費者手動簽收
@Component @RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true")) public class Consumer { @RabbitHandler public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{ System.out.println(message); // 惟一的消息ID Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 確認該條消息 if(...){ channel.basicAck(deliverTag,false); }else{ // 消費失敗,消息重返隊列 channel.basicNack(deliverTag,false,true); } } }
生產者、MQ、消費者都有可能形成消息丟失