相關博文,推薦查看:java
前面一篇介紹了使用工廠方式建立消費者,其中一個不太友好的地方就在配置都是硬編碼的方式,不太靈活,那麼是否能夠結合前一篇的FactoryBean來實現從配置中來靈活的建立消費者呢?git
首先就是須要從配置文件中獲取相應的配置信息,藉助JavaConfig,加一個註解便可github
@Configuration @PropertySource("classpath:dynamicConfig.properties") public class DynSpringConfig { @Autowired private Environment environment; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost(environment.getProperty("dyn.mq.host")); factory.setPort(Integer.parseInt(environment.getProperty("dyn.mq.port"))); factory.setUsername(environment.getProperty("dyn.mq.uname")); factory.setPassword(environment.getProperty("dyn.mq.pwd")); factory.setVirtualHost(environment.getProperty("dyn.mq.vhost")); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
主要就是 @PropertySource("classpath:dynamicConfig.properties")
, 表示從dynamicConfig.properties文件中讀取相應的配置,而這些配置,會存放在 Environment
容器內;spring
獲取配置的方式,就是經過org.springframework.core.env.PropertyResolver#getProperty(java.lang.String)
獲取數組
實現一個簡單的通用的消費端,主要根據前一篇博文中定義的MQContainerFactory
,來生成SimpleMessageListenerContainer
,而後注入消費服務,並啓動容器ide
public class DynamicConsumer { public DynamicConsumer(MQContainerFactory fac) throws Exception { SimpleMessageListenerContainer container = fac.getObject(); container.setMessageListener(new AbsMQConsumer() { @Override public boolean process(Message message, Channel channel) { System.out.println("DynamicConsumer: " + fac.getQueue() + " | " + new String(message.getBody())); return true; } }); container.start(); } }
上面是一個很是簡單的實現,針對常見的的RabbitMQ消息消費而言,也能夠寫一個泛型類,而後藉助Spring的事件機制,實現一個通用的消費端,一種case以下:學習
public class JsonMsgConsumer { public JsonMsgConsumer(ApplicationContext apc, MQContainerFactory fac, Class<?> msgType) throws Exception { SimpleMessageListenerContainer container = fac.getObject(); container.setMessageListener(new AbsMQConsumer() { @Override public boolean process(Message message, Channel channel) { System.out.println("DynamicConsumer: " + fac.getQueue() + " | " + new String(message.getBody())); Object type = JSONObject.parseObject(message.getBody(), msgType); apc.publishEvent(type); return true; } }); container.start(); } }
若是message中的數據,是經過Json序列化方式存入,則使用方,只須要監聽對應的Event消費數據便可,徹底不用再關係消費端的狀況了測試
根據配置文件中的信息,初始化factory,這個可謂是最關鍵的地方了,實現也和以前大體相似,只不過是將硬編碼改爲配置信息讀取而已,完整的配置文件以下ui
@Configuration @PropertySource("classpath:dynamicConfig.properties") public class DynSpringConfig { @Autowired private Environment environment; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost(environment.getProperty("dyn.mq.host")); factory.setPort(Integer.parseInt(environment.getProperty("dyn.mq.port"))); factory.setUsername(environment.getProperty("dyn.mq.uname")); factory.setPassword(environment.getProperty("dyn.mq.pwd")); factory.setVirtualHost(environment.getProperty("dyn.mq.vhost")); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public AmqpProducer amqpProducer() { return new AmqpProducer(); } @Bean public DynamicConsumer dynamicConsumer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) throws Exception { MQContainerFactory fac = MQContainerFactory.builder().directExchange(environment.getProperty("dyn.mq.exchange")) .queue(environment.getProperty("dyn.mq.queue")) .autoDeleted(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoDeleted"))) .autoAck(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoAck"))) .durable(Boolean.parseBoolean(environment.getProperty("dyn.mq.durable"))) .routingKey(environment.getProperty("dyn.mq.routingKey")).rabbitAdmin(rabbitAdmin) .connectionFactory(connectionFactory).build(); return new DynamicConsumer(fac); } }
配置文件內容:編碼
dyn.mq.host=127.0.0.1 dyn.mq.port=5672 dyn.mq.uname=admin dyn.mq.pwd=admin dyn.mq.vhost=/ dyn.mq.exchange=fac.direct.exchange dyn.mq.queue=dyn.queue dyn.mq.durable=true dyn.mq.autoDeleted=false dyn.mq.autoAck=false dyn.mq.routingKey=fac-routing
測試方法
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = DynSpringConfig.class) public class DynamicConsumerUnit { @Autowired private AmqpProducer amqpProducer; @Test public void testDirectConsumer() throws InterruptedException { String[] routingKey = new String[]{"hello.world", "fac-routing", "test1"}; for (int i = 0; i < 100; i++) { amqpProducer.publishMsg("fac.direct.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i); } System.out.println("-------over---------"); Thread.sleep(1000 * 60 * 10); } }
執行以後,就能夠看到正常的消費了
看完以後,可能有一個問題,爲何要這樣作,好處是什麼?
大部分的時候,從MQ獲取消息的邏輯都同樣,惟一的區別在於獲取到數據以後作的業務而言,若是把這一塊徹底的抽象出來,經過配置的方式,那麼額外的新增mq的消費,就不須要再改消費端的代碼了,而後就會有一個疑問,上面的配置文件中,生成dynamicConsumer的bean不也是須要額外寫麼?
若是將配置信息,以某種數組的方式定義,遍歷讀取這些配置,而後建立多個DynamicConsuer實例,是否就能支持動態擴展呢?
將配置改爲下面的進行嘗試
@Configuration @PropertySource("classpath:dynamicConfig.properties") public class DynSpringConfig { @Autowired private Environment environment; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost(environment.getProperty("dyn.mq.host")); factory.setPort(Integer.parseInt(environment.getProperty("dyn.mq.port"))); factory.setUsername(environment.getProperty("dyn.mq.uname")); factory.setPassword(environment.getProperty("dyn.mq.pwd")); factory.setVirtualHost(environment.getProperty("dyn.mq.vhost")); return factory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public AmqpProducer amqpProducer() { return new AmqpProducer(); } @Autowired private ConnectionFactory connectionFactory; @PostConstruct public void dynamicConsumer() throws Exception { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); MQContainerFactory fac = MQContainerFactory.builder().directExchange(environment.getProperty("dyn.mq.exchange")) .queue(environment.getProperty("dyn.mq.queue")) .autoDeleted(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoDeleted"))) .autoAck(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoAck"))) .durable(Boolean.parseBoolean(environment.getProperty("dyn.mq.durable"))) .routingKey(environment.getProperty("dyn.mq.routingKey")).rabbitAdmin(rabbitAdmin) .connectionFactory(connectionFactory).build(); new DynamicConsumer(fac); } }
注意以前 dynamicConsumer
是bean的建立,改爲了初始化一個實例,若是配置文件是數組,內部用一個遍歷就能夠所有加載,如今就須要驗證上面的配置改動以後,是否依然能夠消費數據
實測ok,部分輸出以下
-------over--------- DynamicConsumer: dyn.queue | >>> hello fac-routing>>> 1 DynamicConsumer: dyn.queue | >>> hello fac-routing>>> 4 DynamicConsumer: dyn.queue | >>> hello fac-routing>>> 7
一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛
盡信書則不如,已上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激