springboot 集成rabbitmq 實例

springboot 集成rabbitmq 實例

我的在學習rabbitmq時發現網上不多有系統性介紹springboot和rabbitmq如何集成的,其餘人總結的都片斷化,因此結合我的調研過程,整理此篇文章。java

本文章共分爲如下部分:redis

  • rabbitmq簡介
  • springboot配置
  • rabbitmq生產者配置
  • rabbitmq消費者配置
  • 問題補充

1、rabbitmq簡介

目前流程的消息隊列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的應用場景,關於各個框架的介紹,你們可自行百度,網上不少文章介紹~其中rabbit由於其ack特性以及還算不錯的性能被大多數公司採用。spring

概念:

  • 生產者 消息的產生方,負責將消息推送到消息隊列
  • 消費者 消息的最終接受方,負責監聽隊列中的對應消息,消費消息
  • 隊列 消息的寄存器,負責存放生產者發送的消息
  • 交換機 負責根據必定規則分發生產者產生的消息
  • 綁定 完成交換機和隊列之間的綁定

模式:

  • direct
    直連模式,用於實例間的任務分發
  • topic
    話題模式,經過可配置的規則分發給綁定在該exchange上的隊列
  • headers
    適用規則複雜的分發,用headers裏的參數表達規則
  • fanout
    分發給全部綁定到該exchange上的隊列,忽略routing key

安裝

單機版安裝很簡單,大概步驟以下:segmentfault

# 安裝erlang包
    yum install erlang
# 安裝socat
    yum install socat
# 安裝rabbit    
    rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm 
# 啓動服務
  rabbitmq-server start
# 增長管理控制功能
  rabbitmq-plugins enable rabbitmq_management
# 增長用戶:
    sudo rabbitmqctl add_user root password
    rabbitmqctl set_user_tags root administrator 
    rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集羣安裝,可參考如下博客:
     
rabbitmq集羣安裝springboot

2、springboot配置

廢話少說直接上代碼:
配置參數
application.yml:app

spring:
   rabbitmq:
    addresses: 192.168.1.1:5672
    username: username
    password: password
    publisher-confirms: true
    virtual-host: /

java config讀取參數框架

/**
 * RabbitMq配置文件讀取類
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.publisher-confirms}")
    private Boolean publisherConfirms;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    // 構建mq實例工廠
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
}

3、rabbitmq生產者配置

主要配置了直連和話題模式,其中話題模式設置兩個隊列(queueTopicTest一、queueTopicTest2),此兩個隊列在和交換機綁定時分別設置不一樣的routingkey(.TEST.以及lazy.#)來驗證匹配模式。dom

/**
 * 用於配置交換機和隊列對應關係
 * 新增消息隊列應該按照以下步驟
 * 一、增長queue bean,參見queueXXXX方法
 * 二、增長queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

    /**
     * @Author:chenhf
     * @Description: 主題型交換機
     * @Date:下午5:49 2017/10/23
     * @param
     * @return
     */
    @Bean
    TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
        TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
        rabbitAdmin.declareExchange(contractTopicExchange);
        logger.debug("完成主題型交換機bean實例化");
        return contractTopicExchange;
    }
    /**
     * 直連型交換機
     */
    @Bean
    DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
        DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
        rabbitAdmin.declareExchange(contractDirectExchange);
        logger.debug("完成直連型交換機bean實例化");
        return contractDirectExchange;
    }

    //在此能夠定義隊列

    @Bean
    Queue queueTest(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("測試隊列實例化完成");
        return queue;
    }

    //topic 1
    @Bean
    Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("話題測試隊列1實例化完成");
        return queue;
    }
    //topic 2
    @Bean
    Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("話題測試隊列2實例化完成");
        return queue;
    }


    //在此處完成隊列和交換機綁定
    @Bean
    Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("測試隊列與直連型交換機綁定完成");
        return binding;
    }
    //topic binding1
    @Bean
    Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("測試隊列與話題交換機1綁定完成");
        return binding;
    }

    //topic binding2
    @Bean
    Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("測試隊列與話題交換機2綁定完成");
        return binding;
    }

}

在這裏用到枚舉類:RabbitMqEnum ide

/**
 * 定義rabbitMq須要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {

    /**
     * @param
     * @Author:chenhf
     * @Description:定義數據交換方式
     * @Date:下午4:08 2017/10/23
     * @return
     */
    public enum Exchange {
        CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發"),
        CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),
        CONTRACT_DIRECT("CONTRACT_DIRECT", "點對點");

        private String code;
        private String name;

        Exchange(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }
    }

    /**
     * describe: 定義隊列名稱
     * creat_user: chenhf
     * creat_date: 2017/10/31
     **/
    public enum QueueName {
        TESTQUEUE("TESTQUEUE", "測試隊列"),
        TOPICTEST1("TOPICTEST1", "topic測試隊列"),
        TOPICTEST2("TOPICTEST2", "topic測試隊列");

        private String code;
        private String name;

        QueueName(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }

    }

    /**
     * describe: 定義routing_key
     * creat_user: chenhf
     * creat_date: 2017/10/31
     **/
    public enum QueueEnum {
        TESTQUEUE("TESTQUEUE1", "測試隊列key"),
        TESTTOPICQUEUE1("*.TEST.*", "topic測試隊列key"),
        TESTTOPICQUEUE2("lazy.#", "topic測試隊列key");


        private String code;
        private String name;

        QueueEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }

        public String getCode() {
            return code;
        }

        public String getName() {
            return name;
        }
    }

}

以上完成消息生產者的定義,下面封裝調用接口
測試時直接調用此工具類,testUser類需本身實現工具

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/**
 * rabbitmq發送消息工具類
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/

@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMqSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        logger.info("confirm: " + correlationData.getId());
    }

    /**
     * 發送到 指定routekey的指定queue
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqDirect(String routeKey,Object obj) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
    }

    /**
     * 全部發送到Topic Exchange的消息被轉發到全部關心RouteKey中指定Topic的Queue上
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqTopic(String routeKey,Object obj) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
    }
}

4、rabbitmq消費者配置

springboot註解方式監聽隊列,沒法手動指定回調,因此採用了實現ChannelAwareMessageListener接口,重寫onMessage來進行手動回調,詳見如下代碼,詳細介紹能夠在spring的官網上找amqp相關章節閱讀

直連消費者
經過設置TestUser的name來測試回調,分別發兩條消息,一條UserName爲1,一條爲2,查看控制檯中隊列中消息是否被消費

/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
    @Bean("testQueueContainer")
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("TESTQUEUE");
        container.setMessageListener(exampleListener());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }


    @Bean("testQueueListener")
    public ChannelAwareMessageListener exampleListener() {
        return new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
                //經過設置TestUser的name來測試回調,分別發兩條消息,一條UserName爲1,一條爲2,查看控制檯中隊列中消息是否被消費
                if ("2".equals(testUser.getUserName())){
                    System.out.println(testUser.toString());
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                }

                if ("1".equals(testUser.getUserName())){
                    System.out.println(testUser.toString());
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                }

            }
        };
    }

}

topic消費者1

/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
    @Bean("topicTest1Container")
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("TOPICTEST1");
        container.setMessageListener(exampleListener1());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }


    @Bean("topicTest1Listener")
    public ChannelAwareMessageListener exampleListener1(){
        return new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
                System.out.println("TOPICTEST1:"+testUser.toString());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            }
        };
    }




}

topic消費者2

/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
    @Bean("topicTest2Container")
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("TOPICTEST2");
        container.setMessageListener(exampleListener());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }


    @Bean("topicTest2Listener")
    public ChannelAwareMessageListener exampleListener() {
        return new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
                System.out.println("TOPICTEST2:"+testUser.toString());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

            }
        };
    }

}

問題補充

使用過程當中可能出現的坑參考此篇文章
https://segmentfault.com/a/11...

相關文章
相關標籤/搜索