一、簡介java
RabbitMQ 即一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。spring
二、建立一個springboot的項目express
三、添加RabbitMQ依賴json
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
四、在application.yml中配置RabbitMQ數組
spring緩存
rabbitmq:
host: 127.0.0.1
port: 5672
username: hxj
password: 123hanxujie
publisher-confirms: true
virtual-host: /springboot
五、建立一個rabbitMQ配置類(這個必定要看明白)app
/** * FileName: Application * Author: 韓旭傑 * Date: 2019/2/13 10:42 * Description: 該類初始化建立隊列、轉發器,並把隊列綁定到轉發器 */ package com.example.springboot.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 說明:〈該類初始化建立隊列、轉發器,並把隊列綁定到轉發器〉 * * @author 韓旭傑 * @date 2019/2/13 * @since 1.0.0 */ @Configuration public class Application { private static Logger log = LoggerFactory.getLogger(Application.class); @Autowired private CachingConnectionFactory connectionFactory; final static String queueName = "helloQuery"; @Bean public Queue helloQueue() { return new Queue(queueName); } @Bean public Queue userQueue() { return new Queue("user"); } @Bean public Queue dirQueue() { return new Queue("direct"); } //===============如下是驗證topic Exchange的隊列========== // Bean默認的name是方法名 @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); } //===============以上是驗證topic Exchange的隊列========== //===============如下是驗證Fanout Exchange的隊列========== @Bean(name="AMessage") public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } //===============以上是驗證Fanout Exchange的隊列========== /** * exchange是交換機交換機的主要做用是接收相應的消息而且綁定到指定的隊列.交換機有四種類型,分別爲Direct,topic,headers,Fanout. * * Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即建立消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被髮送到該消息隊列中. * * topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息纔會被髮送到該消息隊列中. * * headers也是根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被髮送到匹配的消息隊列中. * * Fanout是路由廣播的形式,將會把消息發給綁定它的所有隊列,即使設置了key,也會被忽略. */ @Bean DirectExchange directExchange(){ return new DirectExchange("directExchange"); } @Bean TopicExchange exchange() { // 參數1爲交換機的名稱 return new TopicExchange("exchange"); } /** * //配置廣播路由器 * @return FanoutExchange */ @Bean FanoutExchange fanoutExchange() { // 參數1爲交換機的名稱 return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){ return BindingBuilder.bind(dirQueue).to(directExchange).with("direct"); } /** * 將隊列topic.message與exchange綁定,routing_key爲topic.message,就是徹底匹配 * @param queueMessage * @param exchange * @return */ @Bean // 若是參數名和上面用到方法名稱同樣,能夠不用寫@Qualifier Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 將隊列topic.messages與exchange綁定,routing_key爲topic.#,模糊匹配 * @param queueMessages * @param exchange * @return */ @Bean Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } @Bean Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } @Bean public RabbitTemplate rabbitTemplate(){ //若使用confirm-callback或return-callback,必需要配置publisherConfirms或publisherReturns爲true //每一個rabbitTemplate只能有一個confirm-callback和return-callback,若是這裏配置了,那麼寫生產者的時候不能再寫confirm-callback和return-callback //使用return-callback時必須設置mandatory爲true,或者在配置中設置mandatory-expression的值爲true connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); // /** // * 若是消息沒有到exchange,則confirm回調,ack=false // * 若是消息到達exchange,則confirm回調,ack=true // * exchange到queue成功,則不回調return // * exchange到queue失敗,則回調return(需設置mandatory=true,不然不回回調,消息就丟了) // */ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); }else{ log.info("消息發送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } }
rabbitMQ配置類大約就這些內容,裏面我基本上都作了註釋。dom
下面咱們就開始寫rabbitMQ的用法了異步
六、單生產者和單消費者
6.一、生產者
@Component public class HelloSender1 { /** * AmqpTemplate能夠說是RabbitTemplate父類,RabbitTemplate實現類RabbitOperations接口,RabbitOperations繼承了AmqpTemplate接口 */ @Autowired private AmqpTemplate rabbitTemplate; @Autowired private RabbitTemplate rabbitTemplate1; /** * 用於單生產者-》單消費者測試 */ public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate1.convertAndSend("helloQueue", sendMsg); } }
名爲helloQueue的隊列在配置類建立好了,項目啓動的時候會自動建立
6.二、消費者
@Component @RabbitListener(queues = "helloQueue") public class HelloReceiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1 : " + hello); } }
@RabbitListener註解是監聽隊列的,當隊列有消息的時候,它會自動獲取。
@RabbitListener 標註在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪一個方法處理,根據 MessageConverter 轉換後的參數類型
注意
6.三、controller
/** * 最簡單的hello生產和消費實現(單生產者和單消費者) */ @RequestMapping("/hello") public void hello() { helloSender1.send(); }
6.四、結果
控制檯的結果
Sender1 : hello1 Mon Feb 18 10:13:35 CST 2019
2019-02-18 10:13:35,831 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
Receiver1 : hello1 Mon Feb 18 10:13:35 CST 2019
七、單生產者對多消費者
7.一、生產者
/** * 用於單/多生產者-》多消費者測試 */ public void send(String msg) { String sendMsg = msg + new Date(); System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate.convertAndSend("helloQueue", sendMsg); }
7.二、消費者
消費者1
@Component @RabbitListener(queues = "helloQueue") public class HelloReceiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1 : " + hello); } }
消費者2
@Component @RabbitListener(queues = "helloQueue") public class HelloReceiver2 { @RabbitHandler public void process(String hello) { System.out.println("Receiver2 : " + hello); } }
7.三、controller
/**
* 單生產者-多消費者
*/
@RequestMapping("/oneToMany")
public void oneToMany() {
for(int i=0;i<10;i++){
helloSender1.send("hellomsg:"+i);
}
}
7.四、結果
Sender1 : hellomsg:0Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:2Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:4Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:6Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019
Sender1 : hellomsg:8Mon Feb 18 10:19:10 CST 2019
Sender1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019
Receiver2 : hellomsg:0Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:2Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:4Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:6Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:8Mon Feb 18 10:19:10 CST 2019
Receiver1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019
Receiver1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019
2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,044 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
八、實體類的傳輸,必須格式化
8.一、實體類
public class User implements Serializable { private String name; private String pass; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPass() { return pass; } public void setPass(String pass) { this.pass = pass; } @Override public String toString() { return "User{" + "name='" + name + '\'' + ", pass='" + pass + '\'' + '}'; } }
8.二、生產者
/** * 實體類的傳輸(springboot完美的支持對象的發送和接收,不須要格外的配置。實體類必須序列化) * @param user */ public void send(User user) { System.out.println("user send : " + user.getName()+"/"+user.getPass()); this.rabbitTemplate.convertAndSend("userQueue", user); }
8.三、消費者
@Component @RabbitListener(queues = "userQueue") public class HelloReceiver3 { @RabbitHandler public void process(User user){ System.out.println("user receive : " + user.getName()+"/"+user.getPass()); } }
8.四、controller
/** * 實體列的傳輸 */ @RequestMapping("/userTest") public void userTest(){ User user=new User(); user.setName("韓旭傑"); user.setPass("123456"); userSender.send(user); }
8.五、結果
user send : 韓旭傑/123456
2019-02-18 10:24:24,251 INFO (Application.java:169)- 消息發送成功:correlationData(null),ack(true),cause(null)
user receive : 韓旭傑/123456
九、directExchange
Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即建立消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被髮送到該消息隊列中.
9.一、在rabbitMQ配置類中添加內容
@Bean public Queue dirQueue() { return new Queue("direct"); } @Bean DirectExchange directExchange(){ return new DirectExchange("directExchange"); } /** * 將隊列dirQueue與directExchange交換機綁定,routing_key爲direct * @param dirQueue * @param directExchange * @return */ @Bean Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){ return BindingBuilder.bind(dirQueue).to(directExchange).with("direct"); }
9.二、生產者
@Component public class DirectSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msgString="directSender :hello i am hzb"; System.out.println(msgString); this.rabbitTemplate.convertAndSend("direct", msgString); } }
9.三、消費者
@Component @RabbitListener(queues = "direct") public class DirectReceiver { @RabbitHandler public void process(String msg) { System.out.println("directReceiver : " + msg); } }
9.四、controller
@RequestMapping("/directTest") public void directTest() { directSender.send(); }
9.五、結果
directSender :hello i am hzb
directReceiver : directSender :hello i am hzb
2019-02-18 10:33:25,974 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)
十、topicExchange
topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息纔會被髮送到該消息隊列中.
10.一、在rabbitMQ配置類中添加內容
// Bean默認的name是方法名 @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); }
@Bean TopicExchange exchange() { // 參數1爲交換機的名稱 return new TopicExchange("exchange"); } /** * 將隊列topic.message與exchange綁定,routing_key爲topic.message,就是徹底匹配 * @param queueMessage * @param exchange * @return */ @Bean // 若是參數名和上面用到方法名稱同樣,能夠不用寫@Qualifier Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 將隊列topic.messages與exchange綁定,routing_key爲topic.#,模糊匹配 * @param queueMessages * @param exchange * @return */ @Bean Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }
10.二、生產者
@Component public class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msg1 = "I am topic.mesaage msg======"; System.out.println("sender1 : " + msg1); this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1); String msg2 = "I am topic.mesaages msg########"; System.out.println("sender2 : " + msg2); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2); } }
10.三、消費者
消費者1
@Component @RabbitListener(queues = "topic.message") public class TopicMessageReceiver { @RabbitHandler public void process(String msg) { System.out.println("topicMessageReceiver : " +msg); } }
消費者2
@Component @RabbitListener(queues = "topic.messages") public class TopicMessagesReceiver { @RabbitHandler public void process(String msg) { System.out.println("topicMessagesReceiver : " +msg); } }
10.四、controller
/** * topic exchange類型rabbitmq測試 */ @RequestMapping("/topicTest") public void topicTest() { topicSender.send(); }
10.五、結果
sender1 : I am topic.mesaage msg======
sender2 : I am topic.mesaages msg########
topicMessageReceiver : I am topic.mesaage msg======
topicMessagesReceiver : I am topic.mesaage msg======
topicMessagesReceiver : I am topic.mesaages msg########
2019-02-18 10:39:46,150 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:39:46,206 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)
十一、fanoutExchange
Fanout是路由廣播的形式,將會把消息發給綁定它的所有隊列,即使設置了key,也會被忽略.
11.一、在rabbitMQ配置類中添加內容
//===============如下是驗證Fanout Exchange的隊列========== @Bean(name="AMessage") public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { // 參數1爲交換機的名稱 return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); }
11.二、生產者
@Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msgString="fanoutSender :hello i am hzb"; System.out.println(msgString); // 參數2被忽略 this.rabbitTemplate.convertAndSend("fanoutExchange","", msgString); } }
11.三、消費者
消費者A
@Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverA : " + msg); } }
消費者B
@Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverB : " + msg); } }
消費者C
@Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverC : " + msg); } }
11.四、controller
/** * fanout exchange類型rabbitmq測試 */ @RequestMapping("/fanoutTest") public void fanoutTest() { fanoutSender.send(); }
11.五、結果
fanoutSender :hello i am hzb
FanoutReceiverA : fanoutSender :hello i am hzb
FanoutReceiverC : fanoutSender :hello i am hzb
FanoutReceiverB : fanoutSender :hello i am hzb
2019-02-18 10:45:38,760 INFO (Application.java:175)- 消息發送成功:correlationData(null),ack(true),cause(null)
十二、配置類中的rabbitTemplate
@Bean public RabbitTemplate rabbitTemplate() { //若使用confirm-callback或return-callback,必需要配置publisherConfirms或publisherReturns爲true //每一個rabbitTemplate只能有一個confirm-callback和return-callback // 配置文件配置了publisher-confirms: true,那麼這句話能夠省略 connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //使用return-callback時必須設置mandatory爲true,或者在配置中設置mandatory-expression的值爲true rabbitTemplate.setMandatory(true); // /** // * 若是消息沒有到exchange,則confirm回調,ack=false // * 若是消息到達exchange,則confirm回調,ack=true // * exchange到queue成功,則不回調return // * exchange到queue失敗,則回調return(需設置mandatory=true,不然不回回調,消息就丟了) // */ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息發送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } else { log.info("消息發送失敗:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; }
好好看看註釋
1三、不在配置類中配置callback
方法一:
13.一、配置一個接口
/** * 說明:〈定義一個名爲SendMessageService 的接口,這個接口繼承了RabbitTemplate.ConfirmCallback, * ConfirmCallback接口是用來回調消息發送成功後的方法,當一個消息被成功寫入到RabbitMQ服務端時, * 會自動的回調RabbitTemplate.ConfirmCallback接口內的confirm方法完成通知 * * @author 韓旭傑 * @date 2019/2/14 * @since 1.0.0 */ public interface SendMessageService extends RabbitTemplate.ConfirmCallback{ void sendMessage(String exchange,String routekey,Object message); }
13.二、實現這個接口
/** * 說明:〈該類注入了RabbitTemplate,RabbitTemplate封裝了發送消息的方法,咱們直接使用便可。 * 能夠看到咱們構建了一個回調返回的數據,並使用convertAndSend方法發送了消息。同時實現了confirm回調方法, * 經過判斷isSendSuccess能夠知道消息是否發送成功,這樣咱們就能夠進行進一步處理。 * * @author 韓旭傑 * @date 2019/2/14 * @since 1.0.0 */ @Service public class SendMessageServiceImpl implements SendMessageService{ private static Logger logger = LoggerFactory.getLogger(SendMessageServiceImpl.class); @Autowired private RabbitTemplate rabbitTemplate; @Override public void sendMessage(String exchange,String routekey,Object message) { //設置回調對象 //rabbitTemplate.setConfirmCallback(this); //rabbitTemplate.setMandatory(true); //構建回調返回的數據 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData); rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData); logger.info("SendMessageServiceImpl() >>> 發送消息到RabbitMQ, 消息內容: " + message); } /** * 消息回調確認方法 * * @param correlationData 回調數據 * @param isSendSuccess 是否發送成功 * @param */ @Override public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) { logger.info("confirm回調方法>>>>>>>>>>>>>回調消息ID爲: " + correlationData.getId()); if (isSendSuccess) { logger.info("confirm回調方法>>>>>>>>>>>>>消息發送成功"); } else { logger.info("confirm回調方法>>>>>>>>>>>>>消息發送失敗" + s); } } }
方法二:
直接在生產者發送信息的時候修改rabbitTemplate
@Service public class SendMessage1 { private static Logger log = LoggerFactory.getLogger(SendMessage1.class); @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String exchange, String routekey, Object message) { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); }else{ log.info("消息發送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } });
1三、有時候消費者出現錯誤,須要人工處理
//構建回調返回的數據 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); //rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData); //rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData); // 將 CorrelationData的id 與 Message的correlationId綁定,而後關係保存起來,例如放到緩存中,而後人工處理 // 當confirm或return回調時,根據ack類別等,分別處理. 例如return或者ack=false則說明有問題,報警, 若是ack=true則刪除關係 // (由於return在confirm前,因此一條消息在return後又ack=true的狀況也是按return處理) Message message1 = MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(correlationData.getId()).build(); rabbitTemplate.send(exchange, routekey, message1, correlationData);
將 CorrelationData的id 與 Message的correlationId綁定,而後關係保存起來,例如放到緩存中,而後人工處理
咱們能夠看到,這兩條消息關聯起來了。