SpringBoot鏈接多RabbitMQ源

SpringBoot系列教程

在實際開發中,不少場景須要異步處理,這時就須要用到RabbitMQ,並且隨着場景的增多程序可能須要鏈接多個RabbitMQ。SpringBoot自己提供了默認的配置能夠快速配置鏈接RabbitMQ,可是隻能鏈接一個RabbitMQ,當須要鏈接多個RabbitMQ時,默認的配置就不太適用了,須要單獨編寫每一個鏈接。java

在SpringBoot框架中,咱們經常使用的兩個類通常是:git

  • RabbitTemplate:做爲生產、消費消息使用;
  • RabbitAdmin:做爲申明、刪除交換機和隊列,綁定和解綁隊列和交換機的綁定關係使用。

因此咱們鏈接多個RabbitMQ就須要從新創建鏈接、從新實現這兩個類。 代碼以下:github

配置

application.properties配置文件須要配置兩個鏈接:spring

server.port=8080

# rabbitmq
v2.spring.rabbitmq.host=host
v2.spring.rabbitmq.port=5672
v2.spring.rabbitmq.username=username
v2.spring.rabbitmq.password=password
v2.spring.rabbitmq.virtual-host=virtual-host
#consume 手動 ack
v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.當mandatory標誌位設置爲true時,
#   若是exchange根據自身類型和消息routingKey沒法找到一個合適的queue存儲消息,
#   那麼broker會調用basic.return方法將消息返還給生產者;
#2.當mandatory設置爲false時,出現上述狀況broker會直接將消息丟棄;通俗的講,
#   mandatory標誌告訴broker代理服務器至少將消息route到一個隊列中,
#   不然就將消息return給發送者;
v2.spring.rabbitmq.template.mandatory=true
#publisher confirms 發送確認
v2.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送達exchange
#   2.送達exchange卻未送道queue的消息 回調returnCallback.(注意)出現2狀況時,publisher-confirms 回調的是true
v2.spring.rabbitmq.publisher-returns=true
v2.spring.rabbitmq.listener.simple.prefetch=5

# rabbitmq
v1.spring.rabbitmq.host=host
v1.spring.rabbitmq.port=5672
v1.spring.rabbitmq.username=username
v1.spring.rabbitmq.password=password
v1.spring.rabbitmq.virtual-host=virtual-host
#consume 手動 ack
v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.當mandatory標誌位設置爲true時,
#   若是exchange根據自身類型和消息routingKey沒法找到一個合適的queue存儲消息,
#   那麼broker會調用basic.return方法將消息返還給生產者;
#2.當mandatory設置爲false時,出現上述狀況broker會直接將消息丟棄;通俗的講,
#   mandatory標誌告訴broker代理服務器至少將消息route到一個隊列中,
#   不然就將消息return給發送者;
v1.spring.rabbitmq.template.mandatory=true
#publisher confirms 發送確認
v1.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送達exchange
#   2.送達exchange卻未送道queue的消息 回調returnCallback.(注意)出現2狀況時,publisher-confirms 回調的是true
v1.spring.rabbitmq.publisher-returns=true
v1.spring.rabbitmq.listener.simple.prefetch=5

重寫鏈接工廠

須要注意的是,在多源的狀況下,須要在某個鏈接加上@Primary註解,表示主鏈接,默認使用這個鏈接json

package com.example.config.rabbitmq;

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * Created by shuai on 2019/4/23.
 */
@Configuration
public class MultipleRabbitMQConfig {

    @Bean(name = "v2ConnectionFactory")
    public CachingConnectionFactory hospSyncConnectionFactory(
            @Value("${v2.spring.rabbitmq.host}") String host,
            @Value("${v2.spring.rabbitmq.port}") int port,
            @Value("${v2.spring.rabbitmq.username}") String username,
            @Value("${v2.spring.rabbitmq.password}") String password,
            @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
            @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
            @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }

    @Bean(name = "v2RabbitTemplate")
    public RabbitTemplate firstRabbitTemplate(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
        RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
        v2RabbitTemplate.setMandatory(mandatory);
        v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
            if (!ack) {
//                    LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object));
            } else {
//                    LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object));
            }
        });
        v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
        });
        return v2RabbitTemplate;
    }

    @Bean(name = "v2ContainerFactory")
    public SimpleRabbitListenerContainerFactory hospSyncFactory(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
            @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
    ) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        return factory;
    }

    @Bean(name = "v2RabbitAdmin")
    public RabbitAdmin iqianzhanRabbitAdmin(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }


    // mq主鏈接
    @Bean(name = "v1ConnectionFactory")
    @Primary
    public CachingConnectionFactory publicConnectionFactory(
            @Value("${v1.spring.rabbitmq.host}") String host,
            @Value("${v1.spring.rabbitmq.port}") int port,
            @Value("${v1.spring.rabbitmq.username}") String username,
            @Value("${v1.spring.rabbitmq.password}") String password,
            @Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,
            @Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
            @Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }

    @Bean(name = "v1RabbitTemplate")
    @Primary
    public RabbitTemplate publicRabbitTemplate(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
        RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);
        v1RabbitTemplate.setMandatory(mandatory);
        v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
            if (!ack) {
//                    LOGGER.info("{} 發送RabbitMQ消息 ack確認 失敗: [{}]", this.name, JSON.toJSONString(object));
            } else {
//                    LOGGER.info("{} 發送RabbitMQ消息 ack確認 成功: [{}]", this.name, JSON.toJSONString(object));
            }
        });
        v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 發送RabbitMQ消息returnedMessage,出現異常,Exchange不存在或發送至Exchange卻沒有發送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
        });
        return v1RabbitTemplate;
    }

    @Bean(name = "v1ContainerFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
            @Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        return factory;
    }

    @Bean(name = "v1RabbitAdmin")
    @Primary
    public RabbitAdmin publicRabbitAdmin(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

建立Exchange、Queue並綁定

再實現RabbitAdmin後,咱們就須要根據RabbitAdmin建立對應的交換機和隊列,並創建綁定關係服務器

package com.example.config.rabbitmq;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * 建立Queue、Exchange並創建綁定關係
 * Created by shuai on 2019/5/16.
 */
@Configuration
public class MyRabbitMQCreateConfig {

    @Resource(name = "v2RabbitAdmin")
    private RabbitAdmin v2RabbitAdmin;

    @Resource(name = "v1RabbitAdmin")
    private RabbitAdmin v1RabbitAdmin;

    @PostConstruct
    public void RabbitInit() {
        v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));
        v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));
        v2RabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("queue.example.topic.new", true))        //直接建立隊列
                        .to(new TopicExchange("exchange.topic.example.new", true, false))    //直接建立交換機 創建關聯關係
                        .with("routing.key.example.new"));    //指定路由Key
    }
}

生產者

爲了後續驗證每一個鏈接都創建成功,而且都能生產消息,生產者這裏分別使用新生成的RabbitTemplate發送一條消息。app

package com.example.topic;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class TopicProducer {

    @Resource(name = "v1RabbitTemplate")
    private RabbitTemplate v1RabbitTemplate;

    @Resource(name = "v2RabbitTemplate")
    private RabbitTemplate v2RabbitTemplate;

    public void sendMessageByTopic() {
        String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";
        v1RabbitTemplate.convertAndSend(
                "exchange.topic.example.new",
                "routing.key.example.new",
                content1);

        String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";
        v2RabbitTemplate.convertAndSend(
                "exchange.topic.example.new",
                "routing.key.example.new",
                content2);
    }
}

消費者

這裏須要注意在配置消費隊列時,須要標識ContainerFactory框架

package com.example.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory")
public class TopicConsumer {

    @RabbitHandler
    public void consumer(String message) {
        System.out.println(message);
    }
}

這樣就完成了SpringBoot鏈接多個RabbitMQ源的示例了,再寫一段測試代碼驗證下。異步

測試驗證

package com.example.test;

import com.example.topic.TopicProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQMultipleTest {

    @Autowired
    private TopicProducer topicProducer;


    @Test
    public void topicProducerTest() {
        topicProducer.sendMessageByTopic();
    }
}

執行測試代碼,驗證結果爲:spring-boot

驗證結果

驗證SpringBoot鏈接多RabbitMQ源成功!

github地址:Spring Boot 教程、技術棧、示例代碼

聯繫我

掃碼關注公衆號:java之旅

相關文章
相關標籤/搜索