<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
建立發送配置java
/** * @Author: bdsbdg * @Date: 2021/1/30 15:15 */ @Configuration public class ApplicationConfig { // 建立隊列 @Bean public Queue confirmQueue(){ return new Queue("confirm-queue", true); } // 建立交換機 @Bean public Exchange confirmExchange(){ return new DirectExchange("confirm-exchange", true, false); } // 隊列綁定到交換機 @Bean public Binding queueBindToExchangeByConfirm(Queue confirmQueue, Exchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm-routing-key").noargs(); } }
發送消息spring
/** * @Author: bdsbdg * @Date: 2021/1/30 15:40 */ @SpringBootTest @RunWith(SpringRunner.class) public class demo { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testConfirm() { // 將消息發送到指定交換機的指定路由 rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key", "測試消息發送。。。"); } }
建立監聽器監聽指定隊列的消息緩存
@Component public class ConsumerListener { @RabbitListener(queues = "confirm-queue") public void myListener1(String data) throws Exception { System.out.println("消費者接收到的消息data爲:" + data); } }
在使用RabbitMQ的時候,做爲消息的發送方但願杜絕任何消息丟失或者投遞失敗的場景。若是消息投遞失敗,RabbitMQ爲咱們提供了兩種模式用來控制消息的可靠投遞。springboot
confirm模式:ide
return模式:函數
修改配置文件spring-boot
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /day2 publisher-confirms: true # 開啓confirm模式 publisher-returns: true # 開啓return模式
RabbitTemplate設置回調post
/** * @Author: bdsbdg * @Date: 2021/1/30 15:40 */ @SpringBootTest @RunWith(SpringRunner.class) public class demo { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testConfirm() { // confirm rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ System.out.println("發送消息成功"); }else { System.out.println("發送消息失敗!!!"+cause); } } }); // return rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // System.out.println("消息路由失敗會執行該方法。。。"); // System.out.println("發送的消息體:" + new String(message.getBody())); System.out.println("響應碼:" + replyCode); // System.out.println("響應信息:" + replyText); } }); // 將消息發送到指定交換機的指定路由 rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key", "測試消息發送。。。"); rabbitTemplate.convertAndSend("confirm-exchange-1", "confirm-routing-key", "測試消息發送confirm模式失敗。。。"); rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key-1", "測試消息發送return模式失敗。。。"); } }
若是在處理消息的過程當中,消費者的服務在處理消息的時候出現異常,那麼可能這條正在處理的消息就沒有完成消息消費,數據就會丟失。爲了確保數據不會丟失,RabbitMQ支持確認機制ACK (Acknowledge)。測試
消費端接收到消息後有三種ack方式:fetch
自動確認是指,消息一旦被consumer接收到則自動確認收到,並將相應的message從RabbitMQ的消息緩存中移除。可是在實際的業務處理中,極可能是消息被接收到了,可是業務處理出現了異常,那麼消息從緩存中移除即該消息就被丟棄了。若是設置了手動確認,則須要在業務處理成功後,調用channel.basicAck()方法手動簽收,若是出現了異常,則調用channel.basicNack()方法,讓其自動重發消息。
修改消費端配置文件
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # 消息確認方式 listener: simple: acknowledge-mode: manual # 手動確認
修改消費端消息處理代碼
@Component public class ConsumerListener { @RabbitListener(queues = "confirm-queue") public void myListener1(String data, Message message, Channel channel) throws Exception { // System.out.println("消費者接收到的消息data爲:" + data); // Thread.sleep(5000); byte[] body = message.getBody(); System.out.println("消費者接收到的消息body爲:" + new String(body)); // 消息id long id = message.getMessageProperties().getDeliveryTag(); System.out.println("id:"+id); try { if (id%2==0){ int a = 1/0; } System.out.println("處理業務"); // 業務處理成功:手動簽收 channel.basicAck(id,true); System.out.println("處理業務成功!!!"); }catch (Exception e){ System.out.println("處理業務失敗!!!"); // 業務處理失敗:拒收,而且讓消息重回隊列 channel.basicNack(id,true,true); } } }
未設置前
設置後
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # 確認方式 listener: simple: acknowledge-mode: manual # 每次最多處理消息的個數 prefetch: 5
// 修改隊列建立代碼便可 // 建立隊列 @Bean public Queue confirmTtlQueue(){ // 建立隊列 設置裏面內容十秒過時 return QueueBuilder.durable("confirm-ttl-queue").withArgument("x-message-ttl",10000) .build(); } // 當該隊列中的消息進入十秒後將自動銷燬
在消息上設置過時時間
@Test public void testTTL(){ // 能夠設置消息的屬性消息 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 設置消息的過時時間 5s message.getMessageProperties().setExpiration("5000"); return message; } }; rabbitTemplate.convertAndSend("confirm-ttl-exchange","confirm-routing-key", "ttl消息", messagePostProcessor); }
一、生成者將消息發送到交換機後,由交換機路由到指定的隊列
二、當該消息成爲了死信後而且將該消息發送給DLX。
成爲死信的三種狀況
三、DLX再將這個消息路由給專門處理死信的隊列,而且由對應的消費者消費
建立死信隊列
// 建立死信交換機 @Bean public Exchange dlxExhange(){ return new DirectExchange("dlx-exchange"); } // 建立死信隊列 @Bean public Queue dlxQueue(){ return new Queue("dlx-queue"); } // 將死信隊列綁定到死信交換機上 @Bean public Binding dlxBinding(Queue dlxQueue, Exchange dlxExhange){ return BindingBuilder.bind(dlxQueue).to(dlxExhange).with("dlx-routing-key").noargs(); }
建立普通消息隊列時綁定死信隊列
// 建立普通隊列 @Bean public Queue delayQueue(){ Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 20000); // 隊列過時時間 args.put("x-max-length", 10000000); // 隊列中消息數量 args.put("x-dead-letter-exchange", "dlx-exchange"); // 綁定死信交換機 args.put("x-dead-letter-routing-key", "dlx-routing-key"); // 綁定死信路由器 return QueueBuilder.durable("delay-queue").withArguments(args).build(); }
發送消息
消息過時