springboot集成rabbitmq(實戰)

RabbitMQ簡介
RabbitMQ使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現(AMQP的主要特徵是面向消息、隊列、路由、可靠性、安全)。支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現很出色。html

 

相關概念java

消息隊列一般有三個概念:發送消息(生產者)、隊列、接收消息(消費者)。RabbitMQ在這個基本概念之上,多作了一層抽象,在發送消息和隊列之間,加入了交換機。這樣發送消息和隊列就沒有直接關係,而是經過交換機來作轉發,交換機會根據分發策略把消息轉給隊列。git

圖一(MQ基本模型):spring

P爲發送消息(生產者)、Q爲消息隊列、C爲接收消息(消費者)api

 

圖二(RabbitMQ模型):緩存

P爲發送消息(生產者)、X爲交換機、Q爲消息隊列、C爲接收消息(消費者)安全

   

                                              圖一                             圖二springboot

RabbitMQ比較重要的幾個概念:多線程

虛擬主機:RabbitMQ支持權限控制,可是最小控制粒度爲虛擬主機。一個虛擬主機能夠包含多個交換機、隊列、綁定。app

交換機:RabbitMQ分發器,根據不一樣的策略將消息分發到相關的隊列。

隊列:緩存消息的容器。

綁定:設置交換機與隊列的關係。

 

是時候表演真正的技術了

爲了方便演示,咱們分別建立兩個springboot項目:

spring-boot-rabbitmq-producer(生產者)

spring-boot-rabbitmq-consumer(消費者)

注意:實際項目中,一個系統可能即爲生產者、又爲消費者。

 

1.添加基礎配置

生產者、消費者基礎配置相同。

1.1)集成rabbitmq,添加maven依賴

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

 

 1.2)添加rabbitmq服務配置(application.properties)

#rabbitmq相關配置
spring.rabbitmq.host=192.168.15.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

 

 

2.交換機——DirectExchange

DirectExchange是RabbitMQ的默認交換機,直接使用routingKey匹配隊列。

2.1)添加一個配置類(消費者)

配置一個routingKey爲notify.payment的消息隊列

@Configuration
public class DirectConfig {
	@Bean
	public Queue paymentNotifyQueue() {
		return new Queue("notify.payment");
	}
}

 

2.2)添加一個消息監聽類(消費者)

監聽routingKey爲notify.payment的隊列消息

@Component
@RabbitListener(queues = "notify.payment")
public class PaymentNotifyReceive {
	@RabbitHandler
	public void receive(String msg) {
		LogUtil.info("notify.payment receive message: "+msg);
	}
}

  

2.3)添加一個消息發送類(生產者)

將消息發送至默認的交換機且routingKey爲notify.payment

@Component
public class PaymentNotifySender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void sender(String msg){
		LogUtil.info("notify.payment send message: "+msg);
		rabbitTemplate.convertAndSend("notify.payment", msg);
	}
}

  

2.4)添加一個測試類(生產者)

@RunWith(SpringRunner.class)
@SpringBootTest
public class PaymentNotifySenderTests {
	@Autowired
	private PaymentNotifySender sender;
	
	@Test
	public void test_sender() {
		sender.sender("支付訂單號:"+System.currentTimeMillis());
	}
}

 

2.5)執行test_sender()方法

生產者日誌:

2018-05-14 16:28:53.264  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 10624 (started by lianjinsoft...
2018-05-14 16:28:53.265  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:28:53.305  INFO 10624 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@...
2018-05-14 16:28:54.133  INFO 10624 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:28:55.104  INFO 10624 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:28:55.114  INFO 10624 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.246 seconds (JVM running for 3.199)
2018-05-14 16:28:55.343  INFO 10624 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:1526286535342
2018-05-14 16:28:55.383  INFO 10624 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:28:55.444  INFO 10624 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ...
2018-05-14 16:28:55.483  INFO 10624 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:28:55.485  INFO 10624 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:28:55.490  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:1526286535342

分析日誌:

從生產者日誌第7行能夠看出,消息已經成功發送。

從消費者日誌能夠看出,消息已經成功接收。

 

 

3.交換機——TopicExchange

TopicExchange是按規則轉發消息,是交換機中最靈活的一個。也是最經常使用的一個。

3.1)添加一個配置類(消費者)

配置一個routingKey爲api.core的消息隊列並綁定在coreExchange交換機上(交換機的匹配規則爲api.core.*)

配置一個routingKey爲api.payment的消息隊列並綁定在paymentExchange交換機上(交換機的匹配規則爲api.payment.#)

@Configuration
public class TopicConfig {
	@Bean
	public Queue coreQueue() {
		return new Queue("api.core");
	}
	
	@Bean
	public Queue paymentQueue() {
		return new Queue("api.payment");
	}
	
	@Bean
	public TopicExchange coreExchange() {
		return new TopicExchange("coreExchange");
	}
	
	@Bean
	public TopicExchange paymentExchange() {
		return new TopicExchange("paymentExchange");
	}
	
	@Bean
	public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) {
		return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*");
	}
	
	@Bean
	public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) {
		return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#");
	}
}

  

3.2)添加兩個消息監聽類(消費者)

監聽routingKey爲api.core的隊列消息

@Component
public class ApiCoreReceive {
	@RabbitHandler
	@RabbitListener(queues = "api.core")
	public void user(String msg) {
		LogUtil.info("api.core receive message: "+msg);
	}
}

監聽routingKey爲api.payment的隊列消息

@Component
public class ApiPaymentReceive {
	@RabbitHandler
	@RabbitListener(queues = "api.payment")
	public void order(String msg) {
		LogUtil.info("api.payment.order receive message: "+msg);
	}
}

  

3.3)添加兩個消息發送類(生產者)

添加一個user()方法,發送消息至coreExchange交換機且routingKey爲api.core.user

添加一個userQuery()方法,發送消息至coreExchange交換機且routingKey爲api.core.user.query

@Component
public class ApiCoreSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void user(String msg){
		LogUtil.info("api.core.user send message: "+msg);
		rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg);
	}
	
	public void userQuery(String msg){
		LogUtil.info("api.core.user.query send message: "+msg);
		rabbitTemplate.convertAndSend("coreExchange", "api.core.user.query", msg);
	}
}

添加一個order()方法,發送消息至paymentExchange交換機且routingKey爲api.payment.order

添加一個orderQuery()方法,發送消息至paymentExchange交換機且routingKey爲api.payment.order.query

添加一個orderDetailQuery()方法,發送消息至paymentExchange交換機且routingKey爲api.payment.order.detail.query

@Component
public class ApiPaymentSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void order(String msg){
		LogUtil.info("api.payment.order send message: "+msg);
		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg);
	}
	
	public void orderQuery(String msg){
		LogUtil.info("api.payment.order.query send message: "+msg);
		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg);
	}
	
	public void orderDetailQuery(String msg){
		LogUtil.info("api.payment.order.detail.query send message: "+msg);
		rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg);
	}
}

  

3.4)添加兩個測試類(生產者)

測試ApiCoreSender類中的相關方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiCoreSenderTests {
	@Autowired
	private ApiCoreSender sender;
	
	@Test
	public void test_user() {
		sender.user("用戶管理!");
	}
	
	@Test
	public void test_userQuery() {
		sender.userQuery("查詢用戶信息!");
	}
}

測試ApiPaymentSender類中的相關方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiPaymentSenderTests {
	@Autowired
	private ApiPaymentSender sender;
	
	@Test
	public void test_order() {
		sender.order("訂單管理!");
	}
	
	@Test
	public void test_orderQuery() {
		sender.orderQuery("查詢訂單信息!");
	}
	
	@Test
	public void test_orderDetailQuery() {
		sender.orderDetailQuery("查詢訂單詳情信息!");
	}
}

 

3.5)驗證

3.5.1)執行ApiCoreSenderTests測試類

生產者日誌:

2018-05-14 16:30:05.804  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : Starting ApiCoreSenderTests on LAPTOP-1DF7S904 with PID 7340 (started by lianjinsoft in ...
2018-05-14 16:30:05.805  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : No active profile set, falling back to default profiles: default
2018-05-14 16:30:05.851  INFO 7340 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:30:06.553  INFO 7340 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:30:07.375  INFO 7340 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:30:07.385  INFO 7340 --- [           main] c.lianjinsoft.sender.ApiCoreSenderTests  : Started ApiCoreSenderTests in 1.922 seconds (JVM running for 2.846)
2018-05-14 16:30:07.431  INFO 7340 --- [           main] com.lianjinsoft.util.LogUtil             : api.core.user send message: 用戶管理!
2018-05-14 16:30:07.463  INFO 7340 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:30:07.578  INFO 7340 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
2018-05-14 16:30:07.647  INFO 7340 --- [           main] com.lianjinsoft.util.LogUtil             : api.core.user.query send message: 查詢用戶信息!
2018-05-14 16:30:07.716  INFO 7340 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:30:07.728  INFO 7340 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:30:07.609  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.core receive message: 用戶管理!

分析日誌:

從生產者日誌第七、10行能夠看出,api.core.user和api.core.user.query消息均已發送成功。

從消費者日誌能夠看出,只有api.core.user發送的消息被收到了。

問題:

爲何api.core.user.query發送的消息沒有被api.core隊列監聽消費?

答:由於在TopicConfig配置類中,咱們對api.core隊列綁定的交換機規則是api.core.*,而通配符「*」只能向後多匹配一層路徑。

 

3.5.2)執行ApiPaymentSenderTests測試類

生產者日誌:

2018-05-14 16:31:12.823  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : Starting ApiPaymentSenderTests on LAPTOP-1DF7S904 with PID 6460 (started by lianjinsoft in ...
2018-05-14 16:31:12.823  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : No active profile set, falling back to default profiles: default
2018-05-14 16:31:12.857  INFO 6460 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:31:13.718  INFO 6460 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:31:14.530  INFO 6460 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:31:14.539  INFO 6460 --- [           main] c.l.sender.ApiPaymentSenderTests         : Started ApiPaymentSenderTests in 2.05 seconds (JVM running for 2.945)
2018-05-14 16:31:14.592  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order.query send message: 查詢訂單信息!
2018-05-14 16:31:14.638  INFO 6460 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:31:14.762  INFO 6460 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#47404bea:0/SimpleConnection@6b54655f ...
2018-05-14 16:31:14.819  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order.detail.query send message: 查詢訂單詳情信息!
2018-05-14 16:31:14.825  INFO 6460 --- [           main] com.lianjinsoft.util.LogUtil             : api.payment.order send message: 訂單管理!
2018-05-14 16:31:14.836  INFO 6460 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:31:14.840  INFO 6460 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:31:14.809  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 查詢訂單信息!
2018-05-14 16:31:14.821  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 查詢訂單詳情信息!
2018-05-14 16:31:14.829  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.payment.order receive message: 訂單管理!

分析日誌:

從生產者日誌第七、十、11行能夠看出,api.payment.order.query、api.payment.order.detail.query、api.payment.order消息均發送成功。

從消費者日誌能夠看出,api.payment.order隊列監聽到了全部消息並均處理成功了。

 

知識點:

TopicExchange交換機支持使用通配符*、#

*號只能向後多匹配一層路徑。

#號能夠向後匹配多層路徑。

 

 

4.交換機——HeadersExchange

HeadersExchange交換機是根據請求消息中設置的header attribute參數類型來匹配的(和routingKey沒有關係)。

4.1)添加一個配置類(消費者)

配置一個routingKey爲credit.bank的消息隊列並綁定在creditBankExchange交換機上

配置一個routingKey爲credit.finance的消息隊列並綁定在creditFinanceExchange交換機上

@Configuration
public class HeadersConfig {
	@Bean
	public Queue creditBankQueue() {
		return new Queue("credit.bank");
	}
	
	@Bean
	public Queue creditFinanceQueue() {
		return new Queue("credit.finance");
	}
	
	@Bean
	public HeadersExchange creditBankExchange() {
		 return new HeadersExchange("creditBankExchange");
	}
	
	@Bean
	public HeadersExchange creditFinanceExchange() {
		 return new HeadersExchange("creditFinanceExchange");
	}
	
	@Bean
	public Binding bindingCreditAExchange(Queue creditBankQueue, HeadersExchange creditBankExchange) {
		Map<String,Object> headerValues = new HashMap<>();
		headerValues.put("type", "cash");
		headerValues.put("aging", "fast");
		return BindingBuilder.bind(creditBankQueue).to(creditBankExchange).whereAll(headerValues).match();
	}
	
	@Bean
	public Binding bindingCreditBExchange(Queue creditFinanceQueue, HeadersExchange creditFinanceExchange) {
		Map<String,Object> headerValues = new HashMap<>();
		headerValues.put("type", "cash");
		headerValues.put("aging", "fast");
		return BindingBuilder.bind(creditFinanceQueue).to(creditFinanceExchange).whereAny(headerValues).match();
	}
}

  

4.2)添加一個消息監聽類(消費者)

添加creditBank()方法,監聽routingKey爲credit.bank的隊列消息

添加creditFinance()方法,監聽routingKey爲credit.finance的隊列消息

@Component
public class ApiCreditReceive {
	@RabbitHandler
	@RabbitListener(queues = "credit.bank")
	public void creditBank(String msg) {
		LogUtil.info("credit.bank receive message: "+msg);
	}
	
	@RabbitHandler
	@RabbitListener(queues = "credit.finance")
	public void creditFinance(String msg) {
		LogUtil.info("credit.finance receive message: "+msg);
	}
}

  

4.3)添加一個消息發送類(生產者)

添加一個creditBank()方法,發送消息至creditBankExchange交換機且routingKey爲credit.bank

添加一個creditFinance()方法,發送消息至creditFinanceExchange交換機且routingKey爲credit.finance

@Component
public class ApiCreditSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void creditBank(Map<String, Object> head, String msg){
		LogUtil.info("credit.bank send message: "+msg);
		rabbitTemplate.convertAndSend("creditBankExchange", "credit.bank", getMessage(head, msg));
	}
	
	public void creditFinance(Map<String, Object> head, String msg){
		LogUtil.info("credit.finance send message: "+msg);
		rabbitTemplate.convertAndSend("creditFinanceExchange", "credit.finance", getMessage(head, msg));
	}
}

  

4.4)添加一個測試類(生產者)

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiCreditSenderTests {
	@Autowired
	private ApiCreditSender sender;
	
	@Test
	public void test_creditBank_type() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		sender.creditBank(head, "銀行授信(部分匹配)");
	}
	
	@Test
	public void test_creditBank_all() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		head.put("aging", "fast");
		sender.creditBank(head, "銀行授信(所有匹配)");
	}
	
	@Test
	public void test_creditFinance_type() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		sender.creditFinance(head, "金融公司授信(部分匹配)");
	}
	
	@Test
	public void test_creditFinance_all() {
		Map<String,Object> head = new HashMap<>();
		head.put("type", "cash");
		head.put("aging", "fast");
		sender.creditFinance(head, "金融公司授信(所有匹配)");
	}
}

  

4.5)執行ApiCreditSenderTests測試類

生產者日誌:

2018-05-14 16:32:18.954  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : Starting ApiCreditSenderTests on LAPTOP-1DF7S904 with PID 5204 (started by lianjinsoft in...
2018-05-14 16:32:18.964  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : No active profile set, falling back to default profiles: default
2018-05-14 16:32:19.007  INFO 5204 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:32:19.609  INFO 5204 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:32:20.437  INFO 5204 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:32:20.446  INFO 5204 --- [           main] c.l.sender.ApiCreditSenderTests          : Started ApiCreditSenderTests in 1.839 seconds (JVM running for 2.759)
2018-05-14 16:32:20.566  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.bank send message: 銀行授信(部分匹配)
2018-05-14 16:32:20.574  INFO 5204 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:32:20.666  INFO 5204 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
2018-05-14 16:32:21.064  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.finance send message: 金融公司授信(所有匹配)
2018-05-14 16:32:21.070  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.bank send message: 銀行授信(所有匹配)
2018-05-14 16:32:21.077  INFO 5204 --- [           main] com.lianjinsoft.util.LogUtil             : credit.finance send message: 金融公司授信(部分匹配)
2018-05-14 16:32:21.109  INFO 5204 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:32:21.114  INFO 5204 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:32:21.093  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.finance receive message: 金融公司授信(所有匹配)
2018-05-14 16:32:21.094  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.bank receive message: 銀行授信(所有匹配)
2018-05-14 16:32:21.097  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : credit.finance receive message: 金融公司授信(部分匹配)

分析日誌:

經過生產者日誌第七、十、十一、12能夠看出,測試的4個方法均已成功發送消息。

經過消費者日誌能夠看出,credit.bank監聽的隊列有一條消息沒有接收到。

問題:

爲何ApiCreditSenderTests.test_creditBank_type()發送的消息,沒有被處理?

答:由於在HeadersConfig配置類中,creditBankExchange交換機的匹配規則是徹底匹配,即header attribute參數必須完成一致。

 

 

5.交換機——FanoutExchange

FanoutExchange交換機是轉發消息到全部綁定隊列(廣播模式,和routingKey沒有關係)。

5.1)添加一個配置類(消費者)

配置一個routingKey爲api.report.payment的消息隊列並綁定在reportExchange交換機上

配置一個routingKey爲api.report.refund的消息隊列並綁定在reportExchange交換機上

@Configuration
public class FanoutConfig {
	@Bean
	public Queue reportPaymentQueue() {
		return new Queue("api.report.payment");
	}
	
	@Bean
	public Queue reportRefundQueue() {
		return new Queue("api.report.refund");
	}
	
	@Bean
	public FanoutExchange reportExchange() {
		 return new FanoutExchange("reportExchange");
	}
	
	@Bean
	public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) {
		return BindingBuilder.bind(reportPaymentQueue).to(reportExchange);
	}
	
	@Bean
	public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) {
		return BindingBuilder.bind(reportRefundQueue).to(reportExchange);
	}
}

  

5.2)添加一個消息監聽類(消費者)

添加payment()方法,監聽routingKey爲api.report.payment的隊列消息

添加refund()方法,監聽routingKey爲api.report.refund的隊列消息

@Component
public class ApiReportReceive {
	@RabbitHandler
	@RabbitListener(queues = "api.report.payment")
	public void payment(String msg) {
		LogUtil.info("api.report.payment receive message: "+msg);
	}
	
	@RabbitHandler
	@RabbitListener(queues = "api.report.refund")
	public void refund(String msg) {
		LogUtil.info("api.report.refund receive message: "+msg);
	}
}

  

5.3)添加一個消息發送類(生產者)

添加一個generateReports()方法,發送消息至reportExchange交換機

@Component
public class ApiReportSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void generateReports(String msg){
		LogUtil.info("api.generate.reports send message: "+msg);
		rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg);
	}
}

  

5.4)添加一個測試類(生產者)

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiReportSenderTests {
	@Autowired
	private ApiReportSender sender;
	
	@Test
	public void test_generateReports() {
		sender.generateReports("開始生成報表!");
	}
}

  

5.5)執行ApiReportSenderTests測試類

生產者日誌:

2018-05-14 16:33:41.453  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : Starting ApiReportSenderTests on LAPTOP-1DF7S904 with PID 14356 (started by lianjinsoft in ...
2018-05-14 16:33:41.454  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : No active profile set, falling back to default profiles: default
2018-05-14 16:33:41.490  INFO 14356 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:33:42.094  INFO 14356 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type...
2018-05-14 16:33:42.960  INFO 14356 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:33:42.972  INFO 14356 --- [           main] c.l.sender.ApiReportSenderTests          : Started ApiReportSenderTests in 1.939 seconds (JVM running for 2.843)
2018-05-14 16:33:43.037  INFO 14356 --- [           main] com.lianjinsoft.util.LogUtil             : api.generate.reports send message: 開始生成報表!
2018-05-14 16:33:43.054  INFO 14356 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:33:43.174  INFO 14356 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f...
2018-05-14 16:33:43.237  INFO 14356 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:...
2018-05-14 16:33:43.240  INFO 14356 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:33:43.205  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.report.payment receive message: 開始生成報表!
2018-05-14 16:33:43.207  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : api.report.refund receive message: 開始生成報表!

分析日誌:

經過生產者日誌第7行能夠看出,消息已發送成功。

經過消費者日誌能夠看出,api.report.payment和api.report.refund隊列均收到了同一個消息。

 

 

6.多對一

在實際項目中,咱們的系統一般會作集羣、分佈式或災備部署。那麼就會出現一對多、多對一或多對多的場景。

那麼我們本地如何模擬多對一呢?

6.1)爲了測試方便,咱們複用PaymentNotifyReceive案例,在PaymentNotifySenderTests測試類中,增長測試方法

test_sender_many2one_1:請求參數爲偶數

test_sender_many2one_2:請求參數爲奇數

@Test
public void test_sender_many2one_1() throws Exception {
	for (int i = 0; i < 20; i+=2) {
		sender.sender("支付訂單號:"+i);
		Thread.sleep(1000);
	}
}

@Test
public void test_sender_many2one_2() throws Exception {
	for (int i = 1; i < 20; i+=2) {
		sender.sender("支付訂單號:"+i);
		Thread.sleep(1000);
	}
}

  

6.2)執行test_sender_many2one_1()、test_sender_many2one_2()方法

生產者1日誌:

2018-05-14 16:34:49.249  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 5064 (started by lianjinsoft in...
2018-05-14 16:34:49.250  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:34:49.297  INFO 5064 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:34:49.989  INFO 5064 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:34:51.267  INFO 5064 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:34:51.293  INFO 5064 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.449 seconds (JVM running for 3.366)
2018-05-14 16:34:51.357  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:0
2018-05-14 16:34:51.370  INFO 5064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:34:51.817  INFO 5064 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#56ace400:0/SimpleConnection@773cbf4f ...
2018-05-14 16:34:52.866  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:2
2018-05-14 16:34:53.870  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:4
2018-05-14 16:34:54.870  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:6
2018-05-14 16:34:55.871  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:8
2018-05-14 16:34:56.872  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:10
2018-05-14 16:34:57.872  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:12
2018-05-14 16:34:58.873  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:14
2018-05-14 16:34:59.875  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:16
2018-05-14 16:35:00.876  INFO 5064 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:18
2018-05-14 16:35:01.882  INFO 5064 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:35:01.883  INFO 5064 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

生產者2日誌:

2018-05-14 16:34:52.689  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 13988 (started by lianjinsoft in...
2018-05-14 16:34:52.690  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:34:52.738  INFO 13988 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:34:53.444  INFO 13988 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:34:54.567  INFO 13988 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:34:54.575  INFO 13988 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.237 seconds (JVM running for 4.18)
2018-05-14 16:34:54.788  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:1
2018-05-14 16:34:54.796  INFO 13988 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:34:54.870  INFO 13988 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#305f7627:0/SimpleConnection@665e9289...
2018-05-14 16:34:55.903  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:3
2018-05-14 16:34:56.903  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:5
2018-05-14 16:34:57.904  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:7
2018-05-14 16:34:58.904  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:9
2018-05-14 16:34:59.905  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:11
2018-05-14 16:35:00.906  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:13
2018-05-14 16:35:01.907  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:15
2018-05-14 16:35:02.907  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:17
2018-05-14 16:35:03.910  INFO 13988 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:19
2018-05-14 16:35:04.928  INFO 13988 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:35:04.932  INFO 13988 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:34:51.853  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:0
2018-05-14 16:34:52.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:2
2018-05-14 16:34:53.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:4
2018-05-14 16:34:54.871  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:6
2018-05-14 16:34:54.902  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:1
2018-05-14 16:34:55.872  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:8
2018-05-14 16:34:55.904  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:3
2018-05-14 16:34:56.873  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:10
2018-05-14 16:34:56.905  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:5
2018-05-14 16:34:57.873  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:12
2018-05-14 16:34:57.905  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:7
2018-05-14 16:34:58.878  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:14
2018-05-14 16:34:58.906  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:9
2018-05-14 16:34:59.877  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:16
2018-05-14 16:34:59.906  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:11
2018-05-14 16:35:00.877  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:18
2018-05-14 16:35:00.909  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:13
2018-05-14 16:35:01.909  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:15
2018-05-14 16:35:02.911  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:17
2018-05-14 16:35:03.914  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:19

分析日誌:

從生產者一、生產者2日誌中能夠看出,全部消息均已經發送成功。

從消費者日誌能夠看出,全部消息均被成功接收處理。

 

 

7.一對多

如何模擬一對多?

7.1)爲了測試方便,咱們繼續複用PaymentNotifyReceive案例,在PaymentNotifySenderTests測試類中,增長測試方法

test_sender_one2many:循環調用20次。

@Test
public void test_sender_one2many() {
	for (int i = 0; i < 20; i++) {
		sender.sender("支付訂單號:"+i);
	}
}

  

7.2)測試

爲了達到一對多的效果,咱們須要多啓動一個(或多個)消費者。而後執行test_sender_one2many()測試方法。

生產者日誌:

2018-05-14 16:36:27.703  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : Starting PaymentNotifySenderTests on LAPTOP-1DF7S904 with PID 7508 (started by lianjinsoft in...
2018-05-14 16:36:27.704  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : No active profile set, falling back to default profiles: default
2018-05-14 16:36:27.729  INFO 7508 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:36:28.391  INFO 7508 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type...
2018-05-14 16:36:29.285  INFO 7508 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:36:29.303  INFO 7508 --- [           main] c.l.sender.PaymentNotifySenderTests      : Started PaymentNotifySenderTests in 2.197 seconds (JVM running for 3.097)
2018-05-14 16:36:29.504  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:0
2018-05-14 16:36:29.516  INFO 7508 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:36:29.635  INFO 7508 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#4f071df8:0/SimpleConnection@42a9e5d1...
2018-05-14 16:36:29.672  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:1
2018-05-14 16:36:29.679  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:2
2018-05-14 16:36:29.705  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:3
2018-05-14 16:36:29.707  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:4
2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:5
2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:6
2018-05-14 16:36:29.710  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:7
2018-05-14 16:36:29.713  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:8
2018-05-14 16:36:29.719  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:9
2018-05-14 16:36:29.728  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:10
2018-05-14 16:36:29.729  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:11
2018-05-14 16:36:29.733  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:12
2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:13
2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:14
2018-05-14 16:36:29.734  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:15
2018-05-14 16:36:29.741  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:16
2018-05-14 16:36:29.742  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:17
2018-05-14 16:36:29.742  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:18
2018-05-14 16:36:29.744  INFO 7508 --- [           main] com.lianjinsoft.util.LogUtil             : notify.payment send message: 支付訂單號:19
2018-05-14 16:36:29.762  INFO 7508 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008:...
2018-05-14 16:36:29.767  INFO 7508 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者1日誌:

2018-05-14 16:36:29.675  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:0
2018-05-14 16:36:29.690  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:2
2018-05-14 16:36:29.712  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:4
2018-05-14 16:36:29.714  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:6
2018-05-14 16:36:29.728  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:9
2018-05-14 16:36:29.733  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:11
2018-05-14 16:36:29.737  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:13
2018-05-14 16:36:29.740  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:15
2018-05-14 16:36:29.743  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:16
2018-05-14 16:36:29.745  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:18

消費者2日誌:

2018-05-14 16:36:29.705  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:1
2018-05-14 16:36:29.709  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:3
2018-05-14 16:36:29.712  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:5
2018-05-14 16:36:29.718  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:7
2018-05-14 16:36:29.722  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:8
2018-05-14 16:36:29.731  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:10
2018-05-14 16:36:29.736  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:12
2018-05-14 16:36:29.738  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:14
2018-05-14 16:36:29.744  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:17
2018-05-14 16:36:29.747  INFO 13448 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.payment receive message: 支付訂單號:19

分析日誌:

從生產者日誌能夠看出,全部的消息均已經發送成功。

從消費者一、消費者2日誌能夠看出,消息被兩個消費者均衡消費了。

 

 

8.發送對象

實際項目中,請求信息可能包含多個字段。爲了保證生產者與消費者兩端的字段一致性,一般會傳遞一個對象。

8.1)爲了測試方便,咱們在DirectConfig中增長一個消息隊列

@Bean
public Queue refundNotifyQueue() {
	return new Queue("notify.refund");
}

  

8.2)添加一個消息監聽類(消費者)

監聽routingKey爲notify.refund的隊列消息

@Component
@RabbitListener(queues = "notify.refund")
public class RefundNotifyReceive {
	@RabbitHandler
	public void receive(Order order) {
		LogUtil.info("notify.refund receive message: "+order);
	}
}

  

8.3)添加一個消息發送類(生產者)

@Component
public class RefundNotifySender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void sender(Order order){
		LogUtil.info("notify.refund send message: "+order);
		rabbitTemplate.convertAndSend("notify.refund", order);
	}
}

  

8.4)添加一個測試類

@RunWith(SpringRunner.class)
@SpringBootTest
public class RefundNotifySenderTests {
	@Autowired
	private RefundNotifySender sender;
	
	@Test
	public void test_sender() {
		Order order = new Order();
		order.setId(100001);
		order.setOrderId(String.valueOf(System.currentTimeMillis()));
		order.setAmount(new BigDecimal("1999.99"));
		order.setCreateTime(new Date());
		sender.sender(order);
	}
}

  

8.5)執行RefundNotifySenderTests測試類

生產者日誌:

2018-05-14 16:37:47.038  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : Starting RefundNotifySenderTests on LAPTOP-1DF7S904 with PID 13672 (started by lianjinsoft in...
2018-05-14 16:37:47.041  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : No active profile set, falling back to default profiles: default
2018-05-14 16:37:47.070  INFO 13672 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa:...
2018-05-14 16:37:47.715  INFO 13672 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:37:48.779  INFO 13672 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:37:48.802  INFO 13672 --- [           main] c.l.sender.RefundNotifySenderTests       : Started RefundNotifySenderTests in 2.082 seconds (JVM running for 2.967)
2018-05-14 16:37:49.085  INFO 13672 --- [           main] com.lianjinsoft.util.LogUtil             : notify.refund send message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=...
2018-05-14 16:37:49.104  INFO 13672 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:37:49.170  INFO 13672 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#6b5894c8:0/SimpleConnection@38f57b3d [delegate=...
2018-05-14 16:37:49.265  INFO 13672 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@1e800aaa: ...
2018-05-14 16:37:49.266  INFO 13672 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:37:49.242  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.refund receive message: Order [id=100001, orderId=1526287069081, amount=1999.99, createTime=...

分析日誌:

從生產者日誌能夠看出,order對象已經發送成功。

從消費者日誌能夠看出,order對象已經接受成功並能夠直接使用。

 

注意:

傳遞的對象必須支持序列化(實現了Serializable接口)

 

 

9.RPC

RabbitMQ支持RPC遠程調用,同步返回結果。

9.1)爲了測試方便,咱們在DirectConfig中增長一個消息隊列

@Bean
public Queue queryOrderQueue() {
	return new Queue("query.order");
}

  

9.2)添加一個消息監聽類(消費者)

監聽routingKey爲query.order的隊列消息

@Component
@RabbitListener(queues = "query.order")
public class QueryOrderReceive {
	@RabbitHandler
	public Order receive(String orderId) {
		LogUtil.info("notify.refund receive message: "+orderId);
		
		Order order = new Order();
		order.setId(100001);
		order.setOrderId(orderId);
		order.setAmount(new BigDecimal("2999.99"));
		order.setCreateTime(new Date());
		return order;
	}
}

  

9.3)添加一個消息發送類(生產者)

@Component
public class QueryOrderSender {
	@Autowired
	private AmqpTemplate rabbitTemplate;
	
	public void sender(String orderId){
		LogUtil.info("query.order send message: "+orderId);
		Order order = (Order) rabbitTemplate.convertSendAndReceive("query.order", orderId);
		LogUtil.info("query.order return message: "+order);
	}
}

  

9.4)添加一個測試類

@RunWith(SpringRunner.class)
@SpringBootTest
public class QueryOrderSenderTests {
	@Autowired
	private QueryOrderSender sender;
	
	@Test
	public void test_sender() {
		sender.sender("900000001");
	}
}

  

9.5)執行QueryOrderSenderTests測試類

生產者日誌:

2018-05-14 16:38:14.163  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : Starting QueryOrderSenderTests on LAPTOP-1DF7S904 with PID 2024 (started by lianjinsoft in...
2018-05-14 16:38:14.164  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : No active profile set, falling back to default profiles: default
2018-05-14 16:38:14.197  INFO 2024 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:38:14.848  INFO 2024 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type ...
2018-05-14 16:38:15.705  INFO 2024 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-05-14 16:38:15.715  INFO 2024 --- [           main] c.l.sender.QueryOrderSenderTests         : Started QueryOrderSenderTests in 1.927 seconds (JVM running for 3.079)
2018-05-14 16:38:15.793  INFO 2024 --- [           main] com.lianjinsoft.util.LogUtil             : query.order send message: 900000001
2018-05-14 16:38:15.812  INFO 2024 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.15.131:5672]
2018-05-14 16:38:15.988  INFO 2024 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#38be305c:0/SimpleConnection@71984c3 ...
2018-05-14 16:38:16.057  INFO 2024 --- [           main] com.lianjinsoft.util.LogUtil             : query.order return message: Order [id=100001, orderId=900000001, amount=2999.99, createTime=...
2018-05-14 16:38:16.079  INFO 2024 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@7eac9008: ...
2018-05-14 16:38:16.097  INFO 2024 --- [       Thread-2] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647

消費者日誌:

2018-05-14 16:38:16.028  INFO 14876 --- [cTaskExecutor-1] com.lianjinsoft.util.LogUtil             : notify.refund receive message: 900000001

分析日誌:

從生產者日誌第7行能夠看出,消息已經發送成功。從第10行日誌能夠看出,已經收取到返回的消息,併成功轉化爲Order對象。

從消費者日誌能夠看出,已經成功接收到消息並處理完成。

 

雖然RabbitMQ支持RPC接口調用,但不推薦使用。

緣由:

1)RPC默認爲單線程阻塞模型,效率極低。

2)須要手動實現多線程消費。

 

 

安裝RabbitMQ請參考:CentOS在線安裝RabbitMQ3.7

本帖源代碼:https://gitee.com/skychenjiajun/spring-boot

相關文章
相關標籤/搜索