前情提要:rabbitmq 管理界面查看姿式java
1、快速搭建/基本信息發送和消費git
一、引入依賴web
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、application.ymlspring
spring: rabbitmq: host: ipXXX port: 5672 username: 帳戶XXX password: 密碼XXX virtual-host: /wen # 交換器名稱
以 direct模式爲例數據庫
一、配置文件 import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class RabbitConfig { //隊列 起名:TestDirectQueue @Bean public Queue emailQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效 // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //通常設置一下隊列的持久化就好,其他兩個就是默認false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效 // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //通常設置一下隊列的持久化就好,其他兩個就是默認false return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效 // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //通常設置一下隊列的持久化就好,其他兩個就是默認false return new Queue("weixin.fanout.queue", true); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(16); map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒後過時 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } //Direct交換機 起名:TestDirectExchange @Bean public DirectExchange fanoutOrderExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("fanout_exchange", true, false); } //綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); } @Bean public Binding bindingDirect1() { return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with(""); } @Bean public Binding bindingDirect3() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with(""); } } 二、生產者 package com.pit.barberShop.common.MQ.Rabbit.fanout; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author :wenye * @date :Created in 2021/6/15 21:41 * @description:廣播模式 * @version: $ */ @RestController @RequestMapping("/rabbitmq") public class ProducerFanout { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = "fanout_exchange"; // 2: 路由key private String routeKey = ""; @RequestMapping("/fanout") public void markerFanout() { String message ="shua"; // 發送消息 rabbitTemplate.convertAndSend(exchangeName, routeKey, message); } @RequestMapping("/ttl") public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 設置過時時間,單位:毫秒 byte[] msgBytes = "測試消息自動過時".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message); return "ok"; } } 三、消費者 package com.pit.barberShop.common.MQ.Rabbit.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * @author :wenye * @date :Created in 2021/6/15 22:07 * @description:fanout消費者 * @version: $ */ @Component public class ConsumerFanout { @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。 value = @Queue(value = "sms.fanout.queue",autoDelete = "false"), // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "fanout_exchange", // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式 type = ExchangeTypes.DIRECT) )) public void messagerevice(String message){ // 此處省略發郵件的邏輯 System.out.println("sms-two111------------->" + message); } @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。 value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"), // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "fanout_exchange", // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式 type = ExchangeTypes.DIRECT) )) public void messageWXrevice(String message){ // 此處省略發郵件的邏輯 System.out.println("weixin----two---------->" + message); } }
2、過時時間緩存
一、生產者發送消息時設置過時時間 public String testTTL() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("20000"); // 設置過時時間,單位:毫秒 byte[] msgBytes = "測試消息自動過時".getBytes(); Message message = new Message(msgBytes, messageProperties); rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message); return "ok"; } 二、隊列中的全部消息設置過時時間 配置中添加 @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒後過時 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public Queue TTLQueue() { Map<String, Object> map = new HashMap<>(); map.put("x-message-ttl", 30000); // 隊列中的消息未被消費則30秒後過時 return new Queue("TTL_QUEUE", true, false, false, map); } @Bean public DirectExchange TTLExchange() { return new DirectExchange("TTL_EXCHANGE", true, false); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL"); }
3、消息確認機制配置
參考:https://blog.csdn.net/qq33098...
默認是自動應答app
spring: rabbitmq: # 開啓發送確認 publisher-confirms: true # 開啓發送失敗退回 publisher-returns: true
目前回調存在ConfirmCallback和ReturnCallback二者。他們的區別在於
若是消息沒有到exchange,則ConfirmCallback回調,ack=false,
若是消息到達exchange,則ConfirmCallback回調,ack=true
exchange到queue成功,則不回調ReturnCallback
rabbitMQ 消息生產者發送消息的流程
less
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /** * correlationData:對象內部只有一個 id 屬性,用來表示當前消息的惟一性。 * ack:消息投遞到broker 的狀態,true表示成功。 * cause:表示投遞失敗的緣由。 **/ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause){ if (!ack) { log.error("消息發送異常!"); } else { log.info("發送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } } import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { //重寫 returnedMessage() 方法,方法有五個參數message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey路由(隊列) @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); } } 配置文件 一、防止重複簽發ack須要在配置類中重寫 @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); //此處也設置爲手動ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } 二、從新建立設置交換器和隊列屬性 @Bean public Queue chongfuQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啓時仍然存在,暫存隊列:當前鏈接有效 // exclusive:默認也是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //通常設置一下隊列的持久化就好,其他兩個就是默認false return new Queue("chongfu.fanout.queue", true); } //Direct交換機 起名:TestDirectExchange @Bean public DirectExchange chongfuExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("chongfu_exchange", true, false); } @Bean public Binding bindingDirect4() { return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with(""); } 生產者 public void markerchongfu() { /** * 確保消息發送失敗後能夠從新返回到隊列中 * 注意:yml須要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消費者確認收到消息後,手動ack回執回調處理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投遞到隊列失敗回調處理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 發送消息 */ String s = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帥哥", message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(s)); } 消費者 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。 value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"), // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "chongfu_exchange", // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式 type = ExchangeTypes.DIRECT) )) public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); // log.info("序號:{}", message.getMessageProperties().getDeliveryTag()); // System.out.println(msg); //TODO 具體業務 // 收到消息 basicAck() channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重複處理失敗,拒絕再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息 } else { log.error("消息即將再次返回隊列處理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
消費消息有三種回執方法
一、basicAckdom
basicAck:表示成功確認,使用此回執方法後,消息會被rabbitmq broker 刪除。
void basicAck(long deliveryTag, boolean multiple)
分佈式
舉個栗子: 假設我先發送三條消息deliveryTag分別是五、六、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag爲8,multiple設置爲 true,會將五、六、七、8的消息所有進行確認。
二、basicNack
basicNack :表示失敗確認,通常在消費消息業務異常時用到此方法,能夠將消息從新投遞入隊列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
三、basicReject
basicReject:拒絕消息,與basicNack區別在於不能進行批量操做,其餘用法很類似。
void basicReject(long deliveryTag, boolean requeue)
4、死信隊列
死信隊列其實和普通的隊列沒啥大的區別,都須要建立本身的Queue、Exchange,而後經過RoutingKey綁定到Exchange上去,只不過死信隊列的RoutingKey和Exchange要做爲參數,綁定到正常的隊列上去,一種應用場景是正常隊列裏面的消息被basicNack或者reject時,消息就會被路由到正常隊列綁定的死信隊列中,還有一種還有經常使用的場景就是開啓了自動簽收,而後消費者消費消息時出現異常,超過了重試次數,那麼這條消息也會進入死信隊列,若是配置了話,
例子
//模擬異經常使用的交換器 ,topic交換器會通配符匹配,固然字符串如出一轍也會匹配 @Bean TopicExchange emailExchange() { return new TopicExchange("demoTopicExchange"); } //死信隊列 @Bean public Queue deadLetterQueue() { return new Queue("demo.dead.letter"); } //死信交換器 @Bean TopicExchange deadLetterExchange() { return new TopicExchange("demoDeadLetterTopicExchange"); } //綁定死信隊列 @Bean Binding bindingDeadLetterQueue() { return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter"); } 生產者 @RequestMapping("/sixin") public void sendEmailMessage() { CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData); log.info("---發送 email 消息---{}---messageId---{}","111",correlationData.getId()); } 消費者 /** * 郵件消費者 * @param message * @param channel * @throws IOException */ @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是隊列名字,這個名字你能夠自定隨便定義。 value = @Queue(value = "demo.email",autoDelete = "false", arguments = { @Argument(name = "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"), @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"), @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long") }), key = "demo.email", // order.fanout 交換機的名字 必須和生產者保持一致 exchange = @Exchange(value = "demoTopicExchange", // 這裏是肯定的rabbitmq模式是:fanout 是以廣播模式 、 發佈訂閱模式 type = ExchangeTypes.TOPIC) )) public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException { try { log.info("---接受到消息---{}",msg); //主動異常 int m=1/0; //手動簽收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { //異常,ture 從新入隊,或者false,進入死信隊列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } } /** * 死信消費者,自動簽收開啓狀態下,超太重試次數,或者手動簽收,reject或者Nack * @param message */ @RabbitListener(queues = "demo.dead.letter") public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException { //能夠考慮數據庫記錄,每次進來查數量,達到必定的數量,進行預警,人工介入處理 log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation")); //回覆ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
一樣也可以使用java類配置
@Bean public Queue emailQueue() { Map<String, Object> arguments = new HashMap<>(2); // 綁定死信交換機 arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange"); // 綁定死信的路由key arguments.put("x-dead-letter-routing-key", "demo.dead.letter"); arguments.put("x-message-ttl", 3000); return new Queue(emailQueue,true,false,false,arguments); } @Bean TopicExchange emailExchange() { return new TopicExchange(topicExchange); } @Bean Binding bindingEmailQueue() { return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#"); }
5、持久化機制和內存磁盤監控
一、持久化
RabbitMQ的持久化隊列分爲:
1:隊列持久化
2:消息持久化
3:交換機持久化
不管是持久化的消息仍是非持久化的消息均可以寫入到磁盤中,只不過非持久的是等內存不足的狀況下才會被寫入到磁盤中。
二、內存磁盤監控
6、分佈式事務
7、配置詳解
rabbitmq: addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client鏈接到的server的地址,多個以逗號分隔(優先取addresses,而後再取host) # port: ##集羣配置 addresses之間用逗號隔開 # addresses: ip:port,ip:port password: admin username: 123456 virtual-host: / # 鏈接到rabbitMQ的vhost requested-heartbeat: #指定心跳超時,單位秒,0爲不指定;默認60s publisher-confirms: #是否啓用 發佈確認 publisher-reurns: # 是否啓用發佈返回 connection-timeout: #鏈接超時,單位毫秒,0表示無窮大,不超時 cache: channel.size: # 緩存中保持的channel數量 channel.checkout-timeout: # 當緩存數量被設置時,從緩存中獲取一個channel的超時時間,單位毫秒;若是爲0,則老是建立一個新channel connection.size: # 緩存的鏈接數,只有是CONNECTION模式時生效 connection.mode: # 鏈接工廠緩存模式:CHANNEL 和 CONNECTION listener: simple.auto-startup: # 是否啓動時自動啓動容器 simple.acknowledge-mode: # 表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto simple.concurrency: # 最小的消費者數量 simple.max-concurrency: # 最大的消費者數量 simple.prefetch: # 指定一個請求能處理多少個消息,若是有事務的話,必須大於等於transaction數量. simple.transaction-size: # 指定一個事務處理的消息數量,最好是小於等於prefetch的數量. simple.default-requeue-rejected: # 決定被拒絕的消息是否從新入隊;默認是true(與參數acknowledge-mode有關係) simple.idle-event-interval: # 多少長時間發佈空閒容器時間,單位毫秒 simple.retry.enabled: # 監聽重試是否可用 simple.retry.max-attempts: # 最大重試次數 simple.retry.initial-interval: # 第一次和第二次嘗試發佈或傳遞消息之間的間隔 simple.retry.multiplier: # 應用於上一重試間隔的乘數 simple.retry.max-interval: # 最大重試時間間隔 simple.retry.stateless: # 重試是有狀態or無狀態 template: mandatory: # 啓用強制信息;默認false receive-timeout: # receive() 操做的超時時間 reply-timeout: # sendAndReceive() 操做的超時時間 retry.enabled: # 發送重試是否可用 retry.max-attempts: # 最大重試次數 retry.initial-interval: # 第一次和第二次嘗試發佈或傳遞消息之間的間隔 retry.multiplier: # 應用於上一重試間隔的乘數 retry.max-interval: #最大重試時間間隔