MQ
是開發中很日常的中間件,本文講述的是怎麼在一個Spring Boot
項目中配置多源的RabbitMQ
,這裏不過多的講解RabbitMQ
的相關知識點。若是你也有遇到須要往多個RabbitMQ
中發送消息的需求,但願本文能夠幫助到你。java
固然軟件的版本不是硬性要求,只是我使用的環境而已,惟一的要求是須要啓動兩個RabbitMQ
,我這邊是在kubernetes
集羣中使用helm
官方提供的charts
包快速啓動的兩個rabbitmq-ha
高可用rabbitmq
集羣。git
想要了解 kubernetes
或者helm
,能夠參看如下 github倉庫:github
在springboot 中配置單個RabbitMQ是極其簡單的,咱們只須要使用Springboot爲咱們自動裝配的RabbitMQ相關的配置就能夠了。可是須要配置多個源時,第二個及其以上的就須要單獨配置了,這裏我使用的都是單獨配置的。spring
/** * @author innerpeacez * @since 2019/3/11 */ @Data public abstract class AbstractRabbitConfiguration { protected String host; protected int port; protected String username; protected String password; protected ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); return connectionFactory; } }
第一個源的配置代碼springboot
package com.zhw.study.springbootmultirabbitmq.config; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @author innerpeacez * @since 2019/3/8 */ @Configuration @ConfigurationProperties("spring.rabbitmq.first") public class FirstRabbitConfiguration extends AbstractRabbitConfiguration { @Bean(name = "firstConnectionFactory") @Primary public ConnectionFactory firstConnectionFactory() { return super.connectionFactory(); } @Bean(name = "firstRabbitTemplate") @Primary public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean(name = "firstFactory") public SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(value = "firstRabbitAdmin") public RabbitAdmin firstRabbitAdmin(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
第二個源的配置代碼app
package com.zhw.study.springbootmultirabbitmq.config; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author innerpeacez * @since 2019/3/8 */ @Configuration @ConfigurationProperties("spring.rabbitmq.second") public class SecondRabbitConfiguration extends AbstractRabbitConfiguration { @Bean(name = "secondConnectionFactory") public ConnectionFactory secondConnectionFactory() { return super.connectionFactory(); } @Bean(name = "secondRabbitTemplate") public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } @Bean(name = "secondFactory") public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; } @Bean(value = "secondRabbitAdmin") public RabbitAdmin secondRabbitAdmin(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
配置信息ide
spring: application: name: multi-rabbitmq rabbitmq: first: host: 192.168.10.76 port: 30509 username: admin password: 123456 second: host: 192.168.10.76 port: 31938 username: admin password: 123456
這樣咱們的兩個RabbitMQ源就配置好了,接下來咱們進行測試使用,爲了方便使用,我寫了一個MultiRabbitTemplate.class 方便咱們使用不一樣的源。spring-boot
/** * @author innerpeacez * @since 2019/3/8 */ @Component public abstract class MultiRabbitTemplate { @Autowired @Qualifier(value = "firstRabbitTemplate") public AmqpTemplate firstRabbitTemplate; @Autowired @Qualifier(value = "secondRabbitTemplate") public AmqpTemplate secondRabbitTemplate; }
第一個消息發送者類 TestFirstSender.class測試
/** * @author innerpeacez * @since 2019/3/11 */ @Component @Slf4j public class TestFirstSender extends MultiRabbitTemplate implements MessageSender { @Override public void send(Object msg) { log.info("rabbitmq1 , msg: {}", msg); firstRabbitTemplate.convertAndSend("rabbitmq1", msg); } public void rabbitmq1sender() { this.send("innerpeacez1"); } }
第二個消息發送者類 TestSecondSender.classthis
/** * @author innerpeacez * @since 2019/3/11 */ @Component @Slf4j public class TestSecondSender extends MultiRabbitTemplate implements MessageSender { @Override public void send(Object msg) { log.info("rabbitmq2 , msg: {}", msg); secondRabbitTemplate.convertAndSend("rabbitmq2", msg); } public void rabbitmq2sender() { this.send("innerpeacez2"); } }
動態建立Queue的消費者
/** * @author innerpeacez * @since 2019/3/11 */ @Slf4j @Component public class TestFirstConsumer implements MessageConsumer { @Override @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq1") , exchange = @Exchange("rabbitmq1") , key = "rabbitmq1") , containerFactory = "firstFactory") public void receive(Object obj) { log.info("rabbitmq1 , {}", obj); } }
/** * @author innerpeacez * @since 2019/3/11 */ @Slf4j @Component public class TestSecondConsumer implements MessageConsumer { @Override @RabbitListener(bindings = @QueueBinding(value = @Queue("rabbitmq2") , exchange = @Exchange("rabbitmq2") , key = "rabbitmq2") , containerFactory = "secondFactory") public void receive(Object obj) { log.info("rabbitmq2 , {}", obj); } }
測試類
@RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class SpringBootMultiRabbitmqApplicationTests extends MultiRabbitTemplate { @Autowired private TestFirstSender firstSender; @Autowired private TestSecondSender secondSender; /** * 一百個線程向 First Rabbitmq 的 rabbitmq1 queue中發送一百條消息 */ @Test public void testFirstSender() { for (int i = 0; i < 100; i++) { new Thread(() -> firstSender.rabbitmq1sender() ).start(); } try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 一百個線程向 Second Rabbitmq 的 rabbitmq2 queue中發送一百條消息 */ @Test public void testSecondSender() { for (int i = 0; i < 100; i++) { new Thread(() -> secondSender.rabbitmq2sender() ).start(); } try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } }
測試結果:
這樣配置好以後咱們就可向兩個RabbitMQ中發送消息啦。這裏只配置了兩個源,固然若是你須要更多的源,僅僅只須要配置*RabbitConfiguration.class
就能夠啦。本文沒有多說關於RabbitMQ的相關知識,若是未使用過須要本身瞭解一下相關知識。
- 源碼:https://github.com/innerpeace...
- Github: https://github.com/innerpeacez
- 我的Blog: https://ipzgo.top
- 日拱一卒,不期速成