RabbitMQ基礎教程之基於配置的消費者實現

RabbitMQ基礎教程之基於配置的消費者實現

相關博文,推薦查看:java

  1. RabbitMq基礎教程之安裝與測試
  2. RabbitMq基礎教程之基本概念
  3. RabbitMQ基礎教程之基本使用篇
  4. RabbitMQ基礎教程之使用進階篇
  5. RabbitMQ基礎教程之Spring&JavaConfig使用篇
  6. RabbitMQ基礎教程之Spring-JavaConfig-FactoryBean使用姿式

前面一篇介紹了使用工廠方式建立消費者,其中一個不太友好的地方就在配置都是硬編碼的方式,不太靈活,那麼是否能夠結合前一篇的FactoryBean來實現從配置中來靈活的建立消費者呢?git

I. 動態配置實現消費者程序

1. 配置文件加載

首先就是須要從配置文件中獲取相應的配置信息,藉助JavaConfig,加一個註解便可github

@Configuration
@PropertySource("classpath:dynamicConfig.properties")
public class DynSpringConfig {

    @Autowired
    private Environment environment;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(environment.getProperty("dyn.mq.host"));
        factory.setPort(Integer.parseInt(environment.getProperty("dyn.mq.port")));
        factory.setUsername(environment.getProperty("dyn.mq.uname"));
        factory.setPassword(environment.getProperty("dyn.mq.pwd"));
        factory.setVirtualHost(environment.getProperty("dyn.mq.vhost"));
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

主要就是 @PropertySource("classpath:dynamicConfig.properties") , 表示從dynamicConfig.properties文件中讀取相應的配置,而這些配置,會存放在 Environment 容器內;spring

獲取配置的方式,就是經過org.springframework.core.env.PropertyResolver#getProperty(java.lang.String)獲取數組

2. 消費者通用實現

實現一個簡單的通用的消費端,主要根據前一篇博文中定義的MQContainerFactory,來生成SimpleMessageListenerContainer,而後注入消費服務,並啓動容器ide

public class DynamicConsumer {
    public DynamicConsumer(MQContainerFactory fac) throws Exception {
        SimpleMessageListenerContainer container = fac.getObject();
        container.setMessageListener(new AbsMQConsumer() {
            @Override
            public boolean process(Message message, Channel channel) {
                System.out.println("DynamicConsumer: " + fac.getQueue() + " | " + new String(message.getBody()));
                return true;
            }
        });

        container.start();
    }
}

上面是一個很是簡單的實現,針對常見的的RabbitMQ消息消費而言,也能夠寫一個泛型類,而後藉助Spring的事件機制,實現一個通用的消費端,一種case以下:學習

public class JsonMsgConsumer {
    public JsonMsgConsumer(ApplicationContext apc, MQContainerFactory fac, Class<?> msgType) throws Exception {
        SimpleMessageListenerContainer container = fac.getObject();
        container.setMessageListener(new AbsMQConsumer() {
            @Override
            public boolean process(Message message, Channel channel) {
                System.out.println("DynamicConsumer: " + fac.getQueue() + " | " + new String(message.getBody()));

                Object type = JSONObject.parseObject(message.getBody(), msgType);
                apc.publishEvent(type);
                return true;
            }
        });

        container.start();
    }
}

若是message中的數據,是經過Json序列化方式存入,則使用方,只須要監聽對應的Event消費數據便可,徹底不用再關係消費端的狀況了測試

3. MQContainerFactory 初始化

根據配置文件中的信息,初始化factory,這個可謂是最關鍵的地方了,實現也和以前大體相似,只不過是將硬編碼改爲配置信息讀取而已,完整的配置文件以下ui

@Configuration
@PropertySource("classpath:dynamicConfig.properties")
public class DynSpringConfig {

    @Autowired
    private Environment environment;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(environment.getProperty("dyn.mq.host"));
        factory.setPort(Integer.parseInt(environment.getProperty("dyn.mq.port")));
        factory.setUsername(environment.getProperty("dyn.mq.uname"));
        factory.setPassword(environment.getProperty("dyn.mq.pwd"));
        factory.setVirtualHost(environment.getProperty("dyn.mq.vhost"));
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public AmqpProducer amqpProducer() {
        return new AmqpProducer();
    }


    @Bean
    public DynamicConsumer dynamicConsumer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin)
            throws Exception {
        MQContainerFactory fac = MQContainerFactory.builder().directExchange(environment.getProperty("dyn.mq.exchange"))
                .queue(environment.getProperty("dyn.mq.queue"))
                .autoDeleted(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoDeleted")))
                .autoAck(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoAck")))
                .durable(Boolean.parseBoolean(environment.getProperty("dyn.mq.durable")))
                .routingKey(environment.getProperty("dyn.mq.routingKey")).rabbitAdmin(rabbitAdmin)
                .connectionFactory(connectionFactory).build();

        return new DynamicConsumer(fac);
    }
}

4. 測試

配置文件內容:編碼

dyn.mq.host=127.0.0.1
dyn.mq.port=5672
dyn.mq.uname=admin
dyn.mq.pwd=admin
dyn.mq.vhost=/
dyn.mq.exchange=fac.direct.exchange
dyn.mq.queue=dyn.queue
dyn.mq.durable=true
dyn.mq.autoDeleted=false
dyn.mq.autoAck=false
dyn.mq.routingKey=fac-routing

測試方法

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = DynSpringConfig.class)
public class DynamicConsumerUnit {
    @Autowired
    private AmqpProducer amqpProducer;

    @Test
    public void testDirectConsumer() throws InterruptedException {
        String[] routingKey = new String[]{"hello.world", "fac-routing", "test1"};
        for (int i = 0; i < 100; i++) {
            amqpProducer.publishMsg("fac.direct.exchange", routingKey[i % 3],
                    ">>> hello " + routingKey[i % 3] + ">>> " + i);
        }
        System.out.println("-------over---------");

        Thread.sleep(1000 * 60 * 10);
    }
}

執行以後,就能夠看到正常的消費了

5. 擴充與小結

看完以後,可能有一個問題,爲何要這樣作,好處是什麼?

大部分的時候,從MQ獲取消息的邏輯都同樣,惟一的區別在於獲取到數據以後作的業務而言,若是把這一塊徹底的抽象出來,經過配置的方式,那麼額外的新增mq的消費,就不須要再改消費端的代碼了,而後就會有一個疑問,上面的配置文件中,生成dynamicConsumer的bean不也是須要額外寫麼?

若是將配置信息,以某種數組的方式定義,遍歷讀取這些配置,而後建立多個DynamicConsuer實例,是否就能支持動態擴展呢?

將配置改爲下面的進行嘗試

@Configuration
@PropertySource("classpath:dynamicConfig.properties")
public class DynSpringConfig {

    @Autowired
    private Environment environment;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(environment.getProperty("dyn.mq.host"));
        factory.setPort(Integer.parseInt(environment.getProperty("dyn.mq.port")));
        factory.setUsername(environment.getProperty("dyn.mq.uname"));
        factory.setPassword(environment.getProperty("dyn.mq.pwd"));
        factory.setVirtualHost(environment.getProperty("dyn.mq.vhost"));
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public AmqpProducer amqpProducer() {
        return new AmqpProducer();
    }


    @Autowired
    private ConnectionFactory connectionFactory;

    @PostConstruct
    public void dynamicConsumer()
            throws Exception {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        MQContainerFactory fac = MQContainerFactory.builder().directExchange(environment.getProperty("dyn.mq.exchange"))
                .queue(environment.getProperty("dyn.mq.queue"))
                .autoDeleted(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoDeleted")))
                .autoAck(Boolean.parseBoolean(environment.getProperty("dyn.mq.autoAck")))
                .durable(Boolean.parseBoolean(environment.getProperty("dyn.mq.durable")))
                .routingKey(environment.getProperty("dyn.mq.routingKey")).rabbitAdmin(rabbitAdmin)
                .connectionFactory(connectionFactory).build();

        new DynamicConsumer(fac);
    }
}

注意以前 dynamicConsumer 是bean的建立,改爲了初始化一個實例,若是配置文件是數組,內部用一個遍歷就能夠所有加載,如今就須要驗證上面的配置改動以後,是否依然能夠消費數據

實測ok,部分輸出以下

-------over---------
DynamicConsumer: dyn.queue | >>> hello fac-routing>>> 1
DynamicConsumer: dyn.queue | >>> hello fac-routing>>> 4
DynamicConsumer: dyn.queue | >>> hello fac-routing>>> 7

II. 其餘

項目地址

一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛

聲明

盡信書則不如,已上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激

掃描關注

QrCode

相關文章
相關標籤/搜索