在實際開發中,不少場景須要異步處理,這時就須要用到RabbitMQ,並且隨着場景的增多程序可能須要鏈接多個RabbitMQ。SpringBoot自己提供了默認的配置能夠快速配置鏈接RabbitMQ,可是隻能鏈接一個RabbitMQ,當須要鏈接多個RabbitMQ時,默認的配置就不太適用了,須要單獨編寫每一個鏈接。java
在SpringBoot框架中,咱們經常使用的兩個類通常是:git
RabbitTemplate
:做爲生產、消費消息使用;RabbitAdmin
:做爲申明、刪除交換機和隊列,綁定和解綁隊列和交換機的綁定關係使用。
因此咱們鏈接多個RabbitMQ就須要從新創建鏈接、從新實現這兩個類。 代碼以下:github
配置
application.properties
配置文件須要配置兩個鏈接:spring
server.port=8080 # rabbitmq v2.spring.rabbitmq.host=host v2.spring.rabbitmq.port=5672 v2.spring.rabbitmq.username=username v2.spring.rabbitmq.password=password v2.spring.rabbitmq.virtual-host=virtual-host #consume 手動 ack v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual #1.當mandatory標誌位設置爲true時, # 若是exchange根據自身類型和消息routingKey沒法找到一個合適的queue存儲消息, # 那麼broker會調用basic.return方法將消息返還給生產者; #2.當mandatory設置爲false時,出現上述狀況broker會直接將消息丟棄;通俗的講, # mandatory標誌告訴broker代理服務器至少將消息route到一個隊列中, # 不然就將消息return給發送者; v2.spring.rabbitmq.template.mandatory=true #publisher confirms 發送確認 v2.spring.rabbitmq.publisher-confirms=true #returns callback : # 1.未送達exchange # 2.送達exchange卻未送道queue的消息 回調returnCallback.(注意)出現2狀況時,publisher-confirms 回調的是true v2.spring.rabbitmq.publisher-returns=true v2.spring.rabbitmq.listener.simple.prefetch=5 # rabbitmq v1.spring.rabbitmq.host=host v1.spring.rabbitmq.port=5672 v1.spring.rabbitmq.username=username v1.spring.rabbitmq.password=password v1.spring.rabbitmq.virtual-host=virtual-host #consume 手動 ack v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual #1.當mandatory標誌位設置爲true時, # 若是exchange根據自身類型和消息routingKey沒法找到一個合適的queue存儲消息, # 那麼broker會調用basic.return方法將消息返還給生產者; #2.當mandatory設置爲false時,出現上述狀況broker會直接將消息丟棄;通俗的講, # mandatory標誌告訴broker代理服務器至少將消息route到一個隊列中, # 不然就將消息return給發送者; v1.spring.rabbitmq.template.mandatory=true #publisher confirms 發送確認 v1.spring.rabbitmq.publisher-confirms=true #returns callback : # 1.未送達exchange # 2.送達exchange卻未送道queue的消息 回調returnCallback.(注意)出現2狀況時,publisher-confirms 回調的是true v1.spring.rabbitmq.publisher-returns=true v1.spring.rabbitmq.listener.simple.prefetch=5
重寫鏈接工廠
須要注意的是,在多源的狀況下,須要在某個鏈接加上@Primary
註解,表示主鏈接,默認使用這個鏈接json
package com.example.config.rabbitmq; import com.alibaba.fastjson.JSON; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * Created by shuai on 2019/4/23. */ @Configuration public class MultipleRabbitMQConfig { @Bean(name = "v2ConnectionFactory") public CachingConnectionFactory hospSyncConnectionFactory( @Value("${v2.spring.rabbitmq.host}") String host, @Value("${v2.spring.rabbitmq.port}") int port, @Value("${v2.spring.rabbitmq.username}") String username, @Value("${v2.spring.rabbitmq.password}") String password, @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost, @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms, @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setPublisherReturns(publisherReturns); return connectionFactory; } @Bean(name = "v2RabbitTemplate") public RabbitTemplate firstRabbitTemplate( @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory, @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) { RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory); v2RabbitTemplate.setMandatory(mandatory); v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> { if (!ack) { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object)); } else { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object)); } }); v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> { // LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)}); }); return v2RabbitTemplate; } @Bean(name = "v2ContainerFactory") public SimpleRabbitListenerContainerFactory hospSyncFactory( @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory, @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge, @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase())); factory.setPrefetchCount(prefetch); return factory; } @Bean(name = "v2RabbitAdmin") public RabbitAdmin iqianzhanRabbitAdmin( @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } // mq主鏈接 @Bean(name = "v1ConnectionFactory") @Primary public CachingConnectionFactory publicConnectionFactory( @Value("${v1.spring.rabbitmq.host}") String host, @Value("${v1.spring.rabbitmq.port}") int port, @Value("${v1.spring.rabbitmq.username}") String username, @Value("${v1.spring.rabbitmq.password}") String password, @Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost, @Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms, @Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setPublisherReturns(publisherReturns); return connectionFactory; } @Bean(name = "v1RabbitTemplate") @Primary public RabbitTemplate publicRabbitTemplate( @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory, @Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) { RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory); v1RabbitTemplate.setMandatory(mandatory); v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> { if (!ack) { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object)); } else { // LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object)); } }); v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> { // LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)}); }); return v1RabbitTemplate; } @Bean(name = "v1ContainerFactory") @Primary public SimpleRabbitListenerContainerFactory insMessageListenerContainer( @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory, @Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge, @Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase())); factory.setPrefetchCount(prefetch); return factory; } @Bean(name = "v1RabbitAdmin") @Primary public RabbitAdmin publicRabbitAdmin( @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
建立Exchange、Queue並綁定
再實現RabbitAdmin
後,咱們就須要根據RabbitAdmin
建立對應的交換機和隊列,並創建綁定關係服務器
package com.example.config.rabbitmq; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * 建立Queue、Exchange並創建綁定關係 * Created by shuai on 2019/5/16. */ @Configuration public class MyRabbitMQCreateConfig { @Resource(name = "v2RabbitAdmin") private RabbitAdmin v2RabbitAdmin; @Resource(name = "v1RabbitAdmin") private RabbitAdmin v1RabbitAdmin; @PostConstruct public void RabbitInit() { v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false)); v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true)); v2RabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("queue.example.topic.new", true)) //直接建立隊列 .to(new TopicExchange("exchange.topic.example.new", true, false)) //直接建立交換機 創建關聯關係 .with("routing.key.example.new")); //指定路由Key } }
生產者
爲了後續驗證每一個鏈接都創建成功,而且都能生產消息,生產者這裏分別使用新生成的RabbitTemplate
發送一條消息。app
package com.example.topic; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class TopicProducer { @Resource(name = "v1RabbitTemplate") private RabbitTemplate v1RabbitTemplate; @Resource(name = "v2RabbitTemplate") private RabbitTemplate v2RabbitTemplate; public void sendMessageByTopic() { String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate"; v1RabbitTemplate.convertAndSend( "exchange.topic.example.new", "routing.key.example.new", content1); String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate"; v2RabbitTemplate.convertAndSend( "exchange.topic.example.new", "routing.key.example.new", content2); } }
消費者
這裏須要注意在配置消費隊列時,須要標識ContainerFactory
框架
package com.example.topic; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory") public class TopicConsumer { @RabbitHandler public void consumer(String message) { System.out.println(message); } }
這樣就完成了SpringBoot鏈接多個RabbitMQ源的示例了,再寫一段測試代碼驗證下。異步
測試驗證
package com.example.test; import com.example.topic.TopicProducer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQMultipleTest { @Autowired private TopicProducer topicProducer; @Test public void topicProducerTest() { topicProducer.sendMessageByTopic(); } }
執行測試代碼,驗證結果爲:spring-boot
驗證SpringBoot鏈接多RabbitMQ源成功!
github地址:Spring Boot 教程、技術棧、示例代碼