RabbitMQ簡介
RabbitMQ使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現(AMQP的主要特徵是面向消息、隊列、路由、可靠性、安全)。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現很出色。html
相關概念java
消息隊列一般有三個概念:發送消息(生產者)、隊列、接收消息(消費者)。RabbitMQ在這個基本概念之上,多作了一層抽象,在發送消息和隊列之間,加入了交換機。這樣發送消息和隊列就沒有直接關係,而是經過交換機來作轉發,交換機會根據分發策略把消息轉給隊列。git
圖一(MQ基本模型):spring
P爲發送消息(生產者)、Q爲消息隊列、C爲接收消息(消費者)api
圖二(RabbitMQ模型):緩存
P爲發送消息(生產者)、X爲交換機、Q爲消息隊列、C爲接收消息(消費者)安全
圖一 圖二springboot
RabbitMQ比較重要的幾個概念:多線程
虛擬主機:RabbitMQ支持權限控制,可是最小控制粒度爲虛擬主機。一個虛擬主機能夠包含多個交換機、隊列、綁定。app
交換機:RabbitMQ分發器,根據不一樣的策略將消息分發到相關的隊列。
隊列:緩存消息的容器。
綁定:設置交換機與隊列的關係。
是時候表演真正的技術了
爲了方便演示,咱們分別建立兩個springboot項目:
spring-boot-rabbitmq-producer(生產者)
spring-boot-rabbitmq-consumer(消費者)
注意:實際項目中,一個系統可能即爲生產者、又爲消費者。
1.添加基礎配置
生產者、消費者基礎配置相同。
1.1)集成rabbitmq,添加maven依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
1.2)添加rabbitmq服務配置(application.properties)
#rabbitmq相關配置
spring.rabbitmq.host=192.168.15.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
2.交換機——DirectExchange
DirectExchange是RabbitMQ的默認交換機,直接使用routingKey匹配隊列。
2.1)添加一個配置類(消費者)
配置一個routingKey爲notify.payment的消息隊列
@Configuration public class DirectConfig { @Bean public Queue paymentNotifyQueue() { return new Queue("notify.payment"); } }
2.2)添加一個消息監聽類(消費者)
監聽routingKey爲notify.payment的隊列消息
@Component @RabbitListener(queues = "notify.payment") public class PaymentNotifyReceive { @RabbitHandler public void receive(String msg) { LogUtil.info("notify.payment receive message: "+msg); } }
2.3)添加一個消息發送類(生產者)
將消息發送至默認的交換機且routingKey爲notify.payment
@Component public class PaymentNotifySender { @Autowired private AmqpTemplate rabbitTemplate; public void sender(String msg){ LogUtil.info("notify.payment send message: "+msg); rabbitTemplate.convertAndSend("notify.payment", msg); } }
2.4)添加一個測試類(生產者)
@RunWith(SpringRunner.class) @SpringBootTest public class PaymentNotifySenderTests { @Autowired private PaymentNotifySender sender; @Test public void test_sender() { sender.sender("支付訂單號:"+System.currentTimeMillis()); } }
2.5)執行test_sender()方法
生產者日誌:
2018-05-14 16:28:53.264 INFO 10624 --- [ main] c.l.sender.PaymentNotifySenderTests : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 10624 (started by lianjinsoft... 2018-05-14 16:28:53.265 INFO 10624 --- [ main] c.l.sender.PaymentNotifySenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:28:53.305 INFO 10624 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@... 2018-05-14 16:28:54.133 INFO 10624 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:28:55.104 INFO 10624 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:28:55.114 INFO 10624 --- [ main] c.l.sender.PaymentNotifySenderTests : Started PaymentNotifySenderTests in 2.246 seconds (JVM running for 3.199) 2018-05-14 16:28:55.343 INFO 10624 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:1526286535342 2018-05-14 16:28:55.383 INFO 10624 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:28:55.444 INFO 10624 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ... 2018-05-14 16:28:55.483 INFO 10624 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ... 2018-05-14 16:28:55.485 INFO 10624 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:28:55.490 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:1526286535342
分析日誌:
從生產者日誌第7行能夠看出,消息已經成功發送。
從消費者日誌能夠看出,消息已經成功接收。
3.交換機——TopicExchange
TopicExchange是按規則轉發消息,是交換機中最靈活的一個。也是最經常使用的一個。
3.1)添加一個配置類(消費者)
配置一個routingKey爲api.core的消息隊列並綁定在coreExchange交換機上(交換機的匹配規則爲api.core.*)
配置一個routingKey爲api.payment的消息隊列並綁定在paymentExchange交換機上(交換機的匹配規則爲api.payment.#)
@Configuration public class TopicConfig { @Bean public Queue coreQueue() { return new Queue("api.core"); } @Bean public Queue paymentQueue() { return new Queue("api.payment"); } @Bean public TopicExchange coreExchange() { return new TopicExchange("coreExchange"); } @Bean public TopicExchange paymentExchange() { return new TopicExchange("paymentExchange"); } @Bean public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) { return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*"); } @Bean public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) { return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#"); } }
3.2)添加兩個消息監聽類(消費者)
監聽routingKey爲api.core的隊列消息
@Component public class ApiCoreReceive { @RabbitHandler @RabbitListener(queues = "api.core") public void user(String msg) { LogUtil.info("api.core receive message: "+msg); } }
監聽routingKey爲api.payment的隊列消息
@Component public class ApiPaymentReceive { @RabbitHandler @RabbitListener(queues = "api.payment") public void order(String msg) { LogUtil.info("api.payment.order receive message: "+msg); } }
3.3)添加兩個消息發送類(生產者)
添加一個user()方法,發送消息至coreExchange交換機且routingKey爲api.core.user
添加一個userQuery()方法,發送消息至coreExchange交換機且routingKey爲api.core.user.query
@Component public class ApiCoreSender { @Autowired private AmqpTemplate rabbitTemplate; public void user(String msg){ LogUtil.info("api.core.user send message: "+msg); rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg); } public void userQuery(String msg){ LogUtil.info("api.core.user.query send message: "+msg); rabbitTemplate.convertAndSend("coreExchange", "api.core.user.query", msg); } }
添加一個order()方法,發送消息至paymentExchange交換機且routingKey爲api.payment.order
添加一個orderQuery()方法,發送消息至paymentExchange交換機且routingKey爲api.payment.order.query
添加一個orderDetailQuery()方法,發送消息至paymentExchange交換機且routingKey爲api.payment.order.detail.query
@Component public class ApiPaymentSender { @Autowired private AmqpTemplate rabbitTemplate; public void order(String msg){ LogUtil.info("api.payment.order send message: "+msg); rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg); } public void orderQuery(String msg){ LogUtil.info("api.payment.order.query send message: "+msg); rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg); } public void orderDetailQuery(String msg){ LogUtil.info("api.payment.order.detail.query send message: "+msg); rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg); } }
3.4)添加兩個測試類(生產者)
測試ApiCoreSender類中的相關方法
@RunWith(SpringRunner.class) @SpringBootTest public class ApiCoreSenderTests { @Autowired private ApiCoreSender sender; @Test public void test_user() { sender.user("用戶管理!"); } @Test public void test_userQuery() { sender.userQuery("查詢用戶信息!"); } }
測試ApiPaymentSender類中的相關方法
@RunWith(SpringRunner.class) @SpringBootTest public class ApiPaymentSenderTests { @Autowired private ApiPaymentSender sender; @Test public void test_order() { sender.order("訂單管理!"); } @Test public void test_orderQuery() { sender.orderQuery("查詢訂單信息!"); } @Test public void test_orderDetailQuery() { sender.orderDetailQuery("查詢訂單詳情信息!"); } }
3.5)驗證
3.5.1)執行ApiCoreSenderTests測試類
生產者日誌:
2018-05-14 16:30:05.804 INFO 7340 --- [ main] c.lianjinsoft.sender.ApiCoreSenderTests : Starting ApiCoreSenderTests on LAPTOP-1DF7S904 with PID 7340 (started by lianjinsoft in ... 2018-05-14 16:30:05.805 INFO 7340 --- [ main] c.lianjinsoft.sender.ApiCoreSenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:30:05.851 INFO 7340 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:30:06.553 INFO 7340 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:30:07.375 INFO 7340 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:30:07.385 INFO 7340 --- [ main] c.lianjinsoft.sender.ApiCoreSenderTests : Started ApiCoreSenderTests in 1.922 seconds (JVM running for 2.846) 2018-05-14 16:30:07.431 INFO 7340 --- [ main] com.lianjinsoft.util.LogUtil : api.core.user send message: 用戶管理! 2018-05-14 16:30:07.463 INFO 7340 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:30:07.578 INFO 7340 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f... 2018-05-14 16:30:07.647 INFO 7340 --- [ main] com.lianjinsoft.util.LogUtil : api.core.user.query send message: 查詢用戶信息! 2018-05-14 16:30:07.716 INFO 7340 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:30:07.728 INFO 7340 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:30:07.609 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : api.core receive message: 用戶管理!
分析日誌:
從生產者日誌第七、10行能夠看出,api.core.user和api.core.user.query消息均已發送成功。
從消費者日誌能夠看出,只有api.core.user發送的消息被收到了。
問題:
爲何api.core.user.query發送的消息沒有被api.core隊列監聽消費?
答:由於在TopicConfig配置類中,咱們對api.core隊列綁定的交換機規則是api.core.*,而通配符「*」只能向後多匹配一層路徑。
3.5.2)執行ApiPaymentSenderTests測試類
生產者日誌:
2018-05-14 16:31:12.823 INFO 6460 --- [ main] c.l.sender.ApiPaymentSenderTests : Starting ApiPaymentSenderTests on LAPTOP-1DF7S904 with PID 6460 (started by lianjinsoft in ... 2018-05-14 16:31:12.823 INFO 6460 --- [ main] c.l.sender.ApiPaymentSenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:31:12.857 INFO 6460 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:31:13.718 INFO 6460 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:31:14.530 INFO 6460 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:31:14.539 INFO 6460 --- [ main] c.l.sender.ApiPaymentSenderTests : Started ApiPaymentSenderTests in 2.05 seconds (JVM running for 2.945) 2018-05-14 16:31:14.592 INFO 6460 --- [ main] com.lianjinsoft.util.LogUtil : api.payment.order.query send message: 查詢訂單信息! 2018-05-14 16:31:14.638 INFO 6460 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:31:14.762 INFO 6460 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#47404bea:0/SimpleConnection@6b54655f ... 2018-05-14 16:31:14.819 INFO 6460 --- [ main] com.lianjinsoft.util.LogUtil : api.payment.order.detail.query send message: 查詢訂單詳情信息! 2018-05-14 16:31:14.825 INFO 6460 --- [ main] com.lianjinsoft.util.LogUtil : api.payment.order send message: 訂單管理! 2018-05-14 16:31:14.836 INFO 6460 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:31:14.840 INFO 6460 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:31:14.809 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : api.payment.order receive message: 查詢訂單信息! 2018-05-14 16:31:14.821 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : api.payment.order receive message: 查詢訂單詳情信息! 2018-05-14 16:31:14.829 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : api.payment.order receive message: 訂單管理!
分析日誌:
從生產者日誌第七、十、11行能夠看出,api.payment.order.query、api.payment.order.detail.query、api.payment.order消息均發送成功。
從消費者日誌能夠看出,api.payment.order隊列監聽到了全部消息並均處理成功了。
知識點:
TopicExchange交換機支持使用通配符*、#
*號只能向後多匹配一層路徑。
#號能夠向後匹配多層路徑。
4.交換機——HeadersExchange
HeadersExchange交換機是根據請求消息中設置的header attribute參數類型來匹配的(和routingKey沒有關係)。
4.1)添加一個配置類(消費者)
配置一個routingKey爲credit.bank的消息隊列並綁定在creditBankExchange交換機上
配置一個routingKey爲credit.finance的消息隊列並綁定在creditFinanceExchange交換機上
@Configuration public class HeadersConfig { @Bean public Queue creditBankQueue() { return new Queue("credit.bank"); } @Bean public Queue creditFinanceQueue() { return new Queue("credit.finance"); } @Bean public HeadersExchange creditBankExchange() { return new HeadersExchange("creditBankExchange"); } @Bean public HeadersExchange creditFinanceExchange() { return new HeadersExchange("creditFinanceExchange"); } @Bean public Binding bindingCreditAExchange(Queue creditBankQueue, HeadersExchange creditBankExchange) { Map<String,Object> headerValues = new HashMap<>(); headerValues.put("type", "cash"); headerValues.put("aging", "fast"); return BindingBuilder.bind(creditBankQueue).to(creditBankExchange).whereAll(headerValues).match(); } @Bean public Binding bindingCreditBExchange(Queue creditFinanceQueue, HeadersExchange creditFinanceExchange) { Map<String,Object> headerValues = new HashMap<>(); headerValues.put("type", "cash"); headerValues.put("aging", "fast"); return BindingBuilder.bind(creditFinanceQueue).to(creditFinanceExchange).whereAny(headerValues).match(); } }
4.2)添加一個消息監聽類(消費者)
添加creditBank()方法,監聽routingKey爲credit.bank的隊列消息
添加creditFinance()方法,監聽routingKey爲credit.finance的隊列消息
@Component public class ApiCreditReceive { @RabbitHandler @RabbitListener(queues = "credit.bank") public void creditBank(String msg) { LogUtil.info("credit.bank receive message: "+msg); } @RabbitHandler @RabbitListener(queues = "credit.finance") public void creditFinance(String msg) { LogUtil.info("credit.finance receive message: "+msg); } }
4.3)添加一個消息發送類(生產者)
添加一個creditBank()方法,發送消息至creditBankExchange交換機且routingKey爲credit.bank
添加一個creditFinance()方法,發送消息至creditFinanceExchange交換機且routingKey爲credit.finance
@Component public class ApiCreditSender { @Autowired private AmqpTemplate rabbitTemplate; public void creditBank(Map<String, Object> head, String msg){ LogUtil.info("credit.bank send message: "+msg); rabbitTemplate.convertAndSend("creditBankExchange", "credit.bank", getMessage(head, msg)); } public void creditFinance(Map<String, Object> head, String msg){ LogUtil.info("credit.finance send message: "+msg); rabbitTemplate.convertAndSend("creditFinanceExchange", "credit.finance", getMessage(head, msg)); } }
4.4)添加一個測試類(生產者)
@RunWith(SpringRunner.class) @SpringBootTest public class ApiCreditSenderTests { @Autowired private ApiCreditSender sender; @Test public void test_creditBank_type() { Map<String,Object> head = new HashMap<>(); head.put("type", "cash"); sender.creditBank(head, "銀行授信(部分匹配)"); } @Test public void test_creditBank_all() { Map<String,Object> head = new HashMap<>(); head.put("type", "cash"); head.put("aging", "fast"); sender.creditBank(head, "銀行授信(所有匹配)"); } @Test public void test_creditFinance_type() { Map<String,Object> head = new HashMap<>(); head.put("type", "cash"); sender.creditFinance(head, "金融公司授信(部分匹配)"); } @Test public void test_creditFinance_all() { Map<String,Object> head = new HashMap<>(); head.put("type", "cash"); head.put("aging", "fast"); sender.creditFinance(head, "金融公司授信(所有匹配)"); } }
4.5)執行ApiCreditSenderTests測試類
生產者日誌:
2018-05-14 16:32:18.954 INFO 5204 --- [ main] c.l.sender.ApiCreditSenderTests : Starting ApiCreditSenderTests on LAPTOP-1DF7S904 with PID 5204 (started by lianjinsoft in... 2018-05-14 16:32:18.964 INFO 5204 --- [ main] c.l.sender.ApiCreditSenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:32:19.007 INFO 5204 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:32:19.609 INFO 5204 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:32:20.437 INFO 5204 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:32:20.446 INFO 5204 --- [ main] c.l.sender.ApiCreditSenderTests : Started ApiCreditSenderTests in 1.839 seconds (JVM running for 2.759) 2018-05-14 16:32:20.566 INFO 5204 --- [ main] com.lianjinsoft.util.LogUtil : credit.bank send message: 銀行授信(部分匹配) 2018-05-14 16:32:20.574 INFO 5204 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:32:20.666 INFO 5204 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f... 2018-05-14 16:32:21.064 INFO 5204 --- [ main] com.lianjinsoft.util.LogUtil : credit.finance send message: 金融公司授信(所有匹配) 2018-05-14 16:32:21.070 INFO 5204 --- [ main] com.lianjinsoft.util.LogUtil : credit.bank send message: 銀行授信(所有匹配) 2018-05-14 16:32:21.077 INFO 5204 --- [ main] com.lianjinsoft.util.LogUtil : credit.finance send message: 金融公司授信(部分匹配) 2018-05-14 16:32:21.109 INFO 5204 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:32:21.114 INFO 5204 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:32:21.093 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : credit.finance receive message: 金融公司授信(所有匹配) 2018-05-14 16:32:21.094 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : credit.bank receive message: 銀行授信(所有匹配) 2018-05-14 16:32:21.097 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : credit.finance receive message: 金融公司授信(部分匹配)
分析日誌:
經過生產者日誌第七、十、十一、12能夠看出,測試的4個方法均已成功發送消息。
經過消費者日誌能夠看出,credit.bank監聽的隊列有一條消息沒有接收到。
問題:
爲何ApiCreditSenderTests.test_creditBank_type()發送的消息,沒有被處理?
答:由於在HeadersConfig配置類中,creditBankExchange交換機的匹配規則是徹底匹配,即header attribute參數必須完成一致。
5.交換機——FanoutExchange
FanoutExchange交換機是轉發消息到全部綁定隊列(廣播模式,和routingKey沒有關係)。
5.1)添加一個配置類(消費者)
配置一個routingKey爲api.report.payment的消息隊列並綁定在reportExchange交換機上
配置一個routingKey爲api.report.refund的消息隊列並綁定在reportExchange交換機上
@Configuration public class FanoutConfig { @Bean public Queue reportPaymentQueue() { return new Queue("api.report.payment"); } @Bean public Queue reportRefundQueue() { return new Queue("api.report.refund"); } @Bean public FanoutExchange reportExchange() { return new FanoutExchange("reportExchange"); } @Bean public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) { return BindingBuilder.bind(reportPaymentQueue).to(reportExchange); } @Bean public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) { return BindingBuilder.bind(reportRefundQueue).to(reportExchange); } }
5.2)添加一個消息監聽類(消費者)
添加payment()方法,監聽routingKey爲api.report.payment的隊列消息
添加refund()方法,監聽routingKey爲api.report.refund的隊列消息
@Component public class ApiReportReceive { @RabbitHandler @RabbitListener(queues = "api.report.payment") public void payment(String msg) { LogUtil.info("api.report.payment receive message: "+msg); } @RabbitHandler @RabbitListener(queues = "api.report.refund") public void refund(String msg) { LogUtil.info("api.report.refund receive message: "+msg); } }
5.3)添加一個消息發送類(生產者)
添加一個generateReports()方法,發送消息至reportExchange交換機
@Component public class ApiReportSender { @Autowired private AmqpTemplate rabbitTemplate; public void generateReports(String msg){ LogUtil.info("api.generate.reports send message: "+msg); rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg); } }
5.4)添加一個測試類(生產者)
@RunWith(SpringRunner.class) @SpringBootTest public class ApiReportSenderTests { @Autowired private ApiReportSender sender; @Test public void test_generateReports() { sender.generateReports("開始生成報表!"); } }
5.5)執行ApiReportSenderTests測試類
生產者日誌:
2018-05-14 16:33:41.453 INFO 14356 --- [ main] c.l.sender.ApiReportSenderTests : Starting ApiReportSenderTests on LAPTOP-1DF7S904 with PID 14356 (started by lianjinsoft in ... 2018-05-14 16:33:41.454 INFO 14356 --- [ main] c.l.sender.ApiReportSenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:33:41.490 INFO 14356 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:33:42.094 INFO 14356 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type... 2018-05-14 16:33:42.960 INFO 14356 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:33:42.972 INFO 14356 --- [ main] c.l.sender.ApiReportSenderTests : Started ApiReportSenderTests in 1.939 seconds (JVM running for 2.843) 2018-05-14 16:33:43.037 INFO 14356 --- [ main] com.lianjinsoft.util.LogUtil : api.generate.reports send message: 開始生成報表! 2018-05-14 16:33:43.054 INFO 14356 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:33:43.174 INFO 14356 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f... 2018-05-14 16:33:43.237 INFO 14356 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:... 2018-05-14 16:33:43.240 INFO 14356 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:33:43.205 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : api.report.payment receive message: 開始生成報表! 2018-05-14 16:33:43.207 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : api.report.refund receive message: 開始生成報表!
分析日誌:
經過生產者日誌第7行能夠看出,消息已發送成功。
經過消費者日誌能夠看出,api.report.payment和api.report.refund隊列均收到了同一個消息。
6.多對一
在實際項目中,咱們的系統一般會作集羣、分佈式或災備部署。那麼就會出現一對多、多對一或多對多的場景。
那麼我們本地如何模擬多對一呢?
6.1)爲了測試方便,咱們複用PaymentNotifyReceive案例,在PaymentNotifySenderTests測試類中,增長測試方法
test_sender_many2one_1:請求參數爲偶數
test_sender_many2one_2:請求參數爲奇數
@Test public void test_sender_many2one_1() throws Exception { for (int i = 0; i < 20; i+=2) { sender.sender("支付訂單號:"+i); Thread.sleep(1000); } } @Test public void test_sender_many2one_2() throws Exception { for (int i = 1; i < 20; i+=2) { sender.sender("支付訂單號:"+i); Thread.sleep(1000); } }
6.2)執行test_sender_many2one_1()、test_sender_many2one_2()方法
生產者1日誌:
2018-05-14 16:34:49.249 INFO 5064 --- [ main] c.l.sender.PaymentNotifySenderTests : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 5064 (started by lianjinsoft in... 2018-05-14 16:34:49.250 INFO 5064 --- [ main] c.l.sender.PaymentNotifySenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:34:49.297 INFO 5064 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:... 2018-05-14 16:34:49.989 INFO 5064 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:34:51.267 INFO 5064 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:34:51.293 INFO 5064 --- [ main] c.l.sender.PaymentNotifySenderTests : Started PaymentNotifySenderTests in 2.449 seconds (JVM running for 3.366) 2018-05-14 16:34:51.357 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:0 2018-05-14 16:34:51.370 INFO 5064 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:34:51.817 INFO 5064 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f ... 2018-05-14 16:34:52.866 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:2 2018-05-14 16:34:53.870 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:4 2018-05-14 16:34:54.870 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:6 2018-05-14 16:34:55.871 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:8 2018-05-14 16:34:56.872 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:10 2018-05-14 16:34:57.872 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:12 2018-05-14 16:34:58.873 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:14 2018-05-14 16:34:59.875 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:16 2018-05-14 16:35:00.876 INFO 5064 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:18 2018-05-14 16:35:01.882 INFO 5064 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:... 2018-05-14 16:35:01.883 INFO 5064 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
生產者2日誌:
2018-05-14 16:34:52.689 INFO 13988 --- [ main] c.l.sender.PaymentNotifySenderTests : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 13988 (started by lianjinsoft in... 2018-05-14 16:34:52.690 INFO 13988 --- [ main] c.l.sender.PaymentNotifySenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:34:52.738 INFO 13988 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ... 2018-05-14 16:34:53.444 INFO 13988 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:34:54.567 INFO 13988 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:34:54.575 INFO 13988 --- [ main] c.l.sender.PaymentNotifySenderTests : Started PaymentNotifySenderTests in 2.237 seconds (JVM running for 4.18) 2018-05-14 16:34:54.788 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:1 2018-05-14 16:34:54.796 INFO 13988 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:34:54.870 INFO 13988 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#305f7627:0/SimpleConnection@665e9289... 2018-05-14 16:34:55.903 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:3 2018-05-14 16:34:56.903 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:5 2018-05-14 16:34:57.904 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:7 2018-05-14 16:34:58.904 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:9 2018-05-14 16:34:59.905 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:11 2018-05-14 16:35:00.906 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:13 2018-05-14 16:35:01.907 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:15 2018-05-14 16:35:02.907 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:17 2018-05-14 16:35:03.910 INFO 13988 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:19 2018-05-14 16:35:04.928 INFO 13988 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:... 2018-05-14 16:35:04.932 INFO 13988 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:34:51.853 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:0 2018-05-14 16:34:52.871 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:2 2018-05-14 16:34:53.871 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:4 2018-05-14 16:34:54.871 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:6 2018-05-14 16:34:54.902 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:1 2018-05-14 16:34:55.872 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:8 2018-05-14 16:34:55.904 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:3 2018-05-14 16:34:56.873 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:10 2018-05-14 16:34:56.905 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:5 2018-05-14 16:34:57.873 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:12 2018-05-14 16:34:57.905 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:7 2018-05-14 16:34:58.878 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:14 2018-05-14 16:34:58.906 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:9 2018-05-14 16:34:59.877 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:16 2018-05-14 16:34:59.906 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:11 2018-05-14 16:35:00.877 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:18 2018-05-14 16:35:00.909 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:13 2018-05-14 16:35:01.909 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:15 2018-05-14 16:35:02.911 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:17 2018-05-14 16:35:03.914 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:19
分析日誌:
從生產者一、生產者2日誌中能夠看出,全部消息均已經發送成功。
從消費者日誌能夠看出,全部消息均被成功接收處理。
7.一對多
如何模擬一對多?
7.1)爲了測試方便,咱們繼續複用PaymentNotifyReceive案例,在PaymentNotifySenderTests測試類中,增長測試方法
test_sender_one2many:循環調用20次。
@Test public void test_sender_one2many() { for (int i = 0; i < 20; i++) { sender.sender("支付訂單號:"+i); } }
7.2)測試
爲了達到一對多的效果,咱們須要多啓動一個(或多個)消費者。而後執行test_sender_one2many()測試方法。
生產者日誌:
2018-05-14 16:36:27.703 INFO 7508 --- [ main] c.l.sender.PaymentNotifySenderTests : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 7508 (started by lianjinsoft in... 2018-05-14 16:36:27.704 INFO 7508 --- [ main] c.l.sender.PaymentNotifySenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:36:27.729 INFO 7508 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:... 2018-05-14 16:36:28.391 INFO 7508 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type... 2018-05-14 16:36:29.285 INFO 7508 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:36:29.303 INFO 7508 --- [ main] c.l.sender.PaymentNotifySenderTests : Started PaymentNotifySenderTests in 2.197 seconds (JVM running for 3.097) 2018-05-14 16:36:29.504 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:0 2018-05-14 16:36:29.516 INFO 7508 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:36:29.635 INFO 7508 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#4f071df8:0/SimpleConnection@42a9e5d1... 2018-05-14 16:36:29.672 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:1 2018-05-14 16:36:29.679 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:2 2018-05-14 16:36:29.705 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:3 2018-05-14 16:36:29.707 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:4 2018-05-14 16:36:29.710 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:5 2018-05-14 16:36:29.710 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:6 2018-05-14 16:36:29.710 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:7 2018-05-14 16:36:29.713 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:8 2018-05-14 16:36:29.719 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:9 2018-05-14 16:36:29.728 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:10 2018-05-14 16:36:29.729 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:11 2018-05-14 16:36:29.733 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:12 2018-05-14 16:36:29.734 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:13 2018-05-14 16:36:29.734 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:14 2018-05-14 16:36:29.734 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:15 2018-05-14 16:36:29.741 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:16 2018-05-14 16:36:29.742 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:17 2018-05-14 16:36:29.742 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:18 2018-05-14 16:36:29.744 INFO 7508 --- [ main] com.lianjinsoft.util.LogUtil : notify.payment send message: 支付訂單號:19 2018-05-14 16:36:29.762 INFO 7508 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:... 2018-05-14 16:36:29.767 INFO 7508 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者1日誌:
2018-05-14 16:36:29.675 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:0 2018-05-14 16:36:29.690 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:2 2018-05-14 16:36:29.712 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:4 2018-05-14 16:36:29.714 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:6 2018-05-14 16:36:29.728 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:9 2018-05-14 16:36:29.733 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:11 2018-05-14 16:36:29.737 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:13 2018-05-14 16:36:29.740 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:15 2018-05-14 16:36:29.743 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:16 2018-05-14 16:36:29.745 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:18
消費者2日誌:
2018-05-14 16:36:29.705 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:1 2018-05-14 16:36:29.709 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:3 2018-05-14 16:36:29.712 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:5 2018-05-14 16:36:29.718 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:7 2018-05-14 16:36:29.722 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:8 2018-05-14 16:36:29.731 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:10 2018-05-14 16:36:29.736 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:12 2018-05-14 16:36:29.738 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:14 2018-05-14 16:36:29.744 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:17 2018-05-14 16:36:29.747 INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.payment receive message: 支付訂單號:19
分析日誌:
從生產者日誌能夠看出,全部的消息均已經發送成功。
從消費者一、消費者2日誌能夠看出,消息被兩個消費者均衡消費了。
8.發送對象
實際項目中,請求信息可能包含多個字段。爲了保證生產者與消費者兩端的字段一致性,一般會傳遞一個對象。
8.1)爲了測試方便,咱們在DirectConfig中增長一個消息隊列
@Bean public Queue refundNotifyQueue() { return new Queue("notify.refund"); }
8.2)添加一個消息監聽類(消費者)
監聽routingKey爲notify.refund的隊列消息
@Component @RabbitListener(queues = "notify.refund") public class RefundNotifyReceive { @RabbitHandler public void receive(Order order) { LogUtil.info("notify.refund receive message: "+order); } }
8.3)添加一個消息發送類(生產者)
@Component public class RefundNotifySender { @Autowired private AmqpTemplate rabbitTemplate; public void sender(Order order){ LogUtil.info("notify.refund send message: "+order); rabbitTemplate.convertAndSend("notify.refund", order); } }
8.4)添加一個測試類
@RunWith(SpringRunner.class) @SpringBootTest public class RefundNotifySenderTests { @Autowired private RefundNotifySender sender; @Test public void test_sender() { Order order = new Order(); order.setId(100001); order.setOrderId(String.valueOf(System.currentTimeMillis())); order.setAmount(new BigDecimal("1999.99")); order.setCreateTime(new Date()); sender.sender(order); } }
8.5)執行RefundNotifySenderTests測試類
生產者日誌:
2018-05-14 16:37:47.038 INFO 13672 --- [ main] c.l.sender.RefundNotifySenderTests : Starting RefundNotifySenderTests on LAPTOP-1DF7S904 with PID 13672 (started by lianjinsoft in... 2018-05-14 16:37:47.041 INFO 13672 --- [ main] c.l.sender.RefundNotifySenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:37:47.070 INFO 13672 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:... 2018-05-14 16:37:47.715 INFO 13672 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:37:48.779 INFO 13672 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:37:48.802 INFO 13672 --- [ main] c.l.sender.RefundNotifySenderTests : Started RefundNotifySenderTests in 2.082 seconds (JVM running for 2.967) 2018-05-14 16:37:49.085 INFO 13672 --- [ main] com.lianjinsoft.util.LogUtil : notify.refund send message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=... 2018-05-14 16:37:49.104 INFO 13672 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:37:49.170 INFO 13672 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#6b5894c8:0/SimpleConnection@38f57b3d [delegate=... 2018-05-14 16:37:49.265 INFO 13672 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ... 2018-05-14 16:37:49.266 INFO 13672 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:37:49.242 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.refund receive message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=...
分析日誌:
從生產者日誌能夠看出,order對象已經發送成功。
從消費者日誌能夠看出,order對象已經接受成功並能夠直接使用。
注意:
傳遞的對象必須支持序列化(實現了Serializable接口)
9.RPC
RabbitMQ支持RPC遠程調用,同步返回結果。
9.1)爲了測試方便,咱們在DirectConfig中增長一個消息隊列
@Bean public Queue queryOrderQueue() { return new Queue("query.order"); }
9.2)添加一個消息監聽類(消費者)
監聽routingKey爲query.order的隊列消息
@Component @RabbitListener(queues = "query.order") public class QueryOrderReceive { @RabbitHandler public Order receive(String orderId) { LogUtil.info("notify.refund receive message: "+orderId); Order order = new Order(); order.setId(100001); order.setOrderId(orderId); order.setAmount(new BigDecimal("2999.99")); order.setCreateTime(new Date()); return order; } }
9.3)添加一個消息發送類(生產者)
@Component public class QueryOrderSender { @Autowired private AmqpTemplate rabbitTemplate; public void sender(String orderId){ LogUtil.info("query.order send message: "+orderId); Order order = (Order) rabbitTemplate.convertSendAndReceive("query.order", orderId); LogUtil.info("query.order return message: "+order); } }
9.4)添加一個測試類
@RunWith(SpringRunner.class) @SpringBootTest public class QueryOrderSenderTests { @Autowired private QueryOrderSender sender; @Test public void test_sender() { sender.sender("900000001"); } }
9.5)執行QueryOrderSenderTests測試類
生產者日誌:
2018-05-14 16:38:14.163 INFO 2024 --- [ main] c.l.sender.QueryOrderSenderTests : Starting QueryOrderSenderTests on LAPTOP-1DF7S904 with PID 2024 (started by lianjinsoft in... 2018-05-14 16:38:14.164 INFO 2024 --- [ main] c.l.sender.QueryOrderSenderTests : No active profile set, falling back to default profiles: default 2018-05-14 16:38:14.197 INFO 2024 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ... 2018-05-14 16:38:14.848 INFO 2024 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ... 2018-05-14 16:38:15.705 INFO 2024 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647 2018-05-14 16:38:15.715 INFO 2024 --- [ main] c.l.sender.QueryOrderSenderTests : Started QueryOrderSenderTests in 1.927 seconds (JVM running for 3.079) 2018-05-14 16:38:15.793 INFO 2024 --- [ main] com.lianjinsoft.util.LogUtil : query.order send message: 900000001 2018-05-14 16:38:15.812 INFO 2024 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.15.131:5672] 2018-05-14 16:38:15.988 INFO 2024 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ... 2018-05-14 16:38:16.057 INFO 2024 --- [ main] com.lianjinsoft.util.LogUtil : query.order return message: Order [id=100001, orderId=900000001, amount=2999.99, createTime=... 2018-05-14 16:38:16.079 INFO 2024 --- [ Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ... 2018-05-14 16:38:16.097 INFO 2024 --- [ Thread-2] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647
消費者日誌:
2018-05-14 16:38:16.028 INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil : notify.refund receive message: 900000001
分析日誌:
從生產者日誌第7行能夠看出,消息已經發送成功。從第10行日誌能夠看出,已經收取到返回的消息,併成功轉化爲Order對象。
從消費者日誌能夠看出,已經成功接收到消息並處理完成。
雖然RabbitMQ支持RPC接口調用,但不推薦使用。
緣由:
1)RPC默認爲單線程阻塞模型,效率極低。
2)須要手動實現多線程消費。
安裝RabbitMQ請參考:CentOS在線安裝RabbitMQ3.7
本帖源代碼:https://gitee.com/skychenjiajun/spring-boot