springboot集成RabbitMQ

1、安裝

安裝我是按照這個哥們安裝的,cdns 過客幽影星風java

2、在 spring boot 中使用

  1. 在pom.xml配置文件中加入以下依賴。spring

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 在application.yml文件中配置rabbitmqapp

    spring:
      rabbitmq:
        host: 127.0.0.1   # 主機ip
        port: 5672         # 主機端口
        username: guest    # 訪問用戶名
        password: guest   # 訪問密碼
        publisher-confirms: true # 啓用消息確認,發送失敗會異常
        virtual-host: /   # 定義虛擬主機的路徑,默認爲根目錄
  3. rabbitmq默認使用字節方式進行發送數據,爲了在管理界面方便查看,咱們能夠使用JSON字符串的方式傳輸spring-boot

    // 添加配置文件 RabbitConfig.java
    
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 默認狀況下RabbitMQ發送的消息是轉換爲字節碼,這裏改成發送JSON數據。
     *
     * @author lixingwu
     */
    @Configuration
    public class RabbitConfig {
        @Bean
        public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    }

    注意:若是使用JSON數據發送數據,就只能使用convertAndSend方法,不能使用send方法了,具體怎麼解決還不知道。測試

  4. 建立測試文件RabbitMQTest.javaui

    import cn.hutool.core.lang.Console;
    import org.junit.Test;
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * RabbitMQ 測試
     */
    public class RabbitMQTest extends BaseSpringBootTest {
        @Autowired
        private AmqpTemplate mq;
        @Autowired
        AmqpAdmin amqpAdmin;
    
        //建立交換機
        @Test
        public void declareExchange() {
            // 建立 Direct 一對一的交換器,一個交換器對應一個隊列
            // 該交換機裏面的三個參數分別爲: 名字,持久化,是否自動刪除
            DirectExchange directExchange = new DirectExchange("direct-exchange-001", false, false);
            amqpAdmin.declareExchange(directExchange);
    
            // 建立FANOUT,發佈訂閱模式(不存在路由鍵 將被投放到exchange對應的隊列中)
            // 可綁定多個隊列,消息會給所有的隊列每人發送一份
            FanoutExchange fanoutExchange = new FanoutExchange("fanout-exchange-001", false, false);
            amqpAdmin.declareExchange(fanoutExchange);
    
            // 建立Topic,能夠使得不一樣源頭的數據投放到一個隊列中
            // 經過路由鍵的命名分類來進行篩選,其中
            // * 表示:能夠(只能)匹配一個單詞,
            // # 表示:能夠匹配多個單詞(或者零個)
            TopicExchange topicExchange = new TopicExchange("topic-exchange-001", false, false);
            amqpAdmin.declareExchange(topicExchange);
        }
    
        //建立隊列
        @Test
        public void declareQueue() {
            amqpAdmin.declareQueue(new Queue("direct-queue"));
            amqpAdmin.declareQueue(new Queue("fanout-queue-001"));
            amqpAdmin.declareQueue(new Queue("fanout-queue-002"));
            amqpAdmin.declareQueue(new Queue("fanout-queue-003"));
            amqpAdmin.declareQueue(new Queue("topic-queue-001"));
            amqpAdmin.declareQueue(new Queue("topic-queue-002"));
            amqpAdmin.declareQueue(new Queue("topic-queue-003"));
        }
    
        //綁定交換機隊列
        @Test
        public void binding() {
            // 綁定Direct
            Binding directBinding = BindingBuilder.bind(new Queue("direct-queue")).to(new DirectExchange("direct-exchange-001", false, false)).with("direct-rout-key");
            amqpAdmin.declareBinding(directBinding);
    
            // 綁定FANOUT
            Binding fanoutBinding1 = BindingBuilder.bind(new Queue("fanout-queue-001")).to(new FanoutExchange("fanout-exchange-001", false, false));
            Binding fanoutBinding2 = BindingBuilder.bind(new Queue("fanout-queue-002")).to(new FanoutExchange("fanout-exchange-001", false, false));
            Binding fanoutBinding3 = BindingBuilder.bind(new Queue("fanout-queue-003")).to(new FanoutExchange("fanout-exchange-001", false, false));
            amqpAdmin.declareBinding(fanoutBinding1);
            amqpAdmin.declareBinding(fanoutBinding2);
            amqpAdmin.declareBinding(fanoutBinding3);
    
            // 綁定Topic
            Binding topicBinding1 = BindingBuilder.bind(new Queue("topic-queue-001")).to(new TopicExchange("topic-exchange-001", false, false)).with("*.to");
            Binding topicBinding2 = BindingBuilder.bind(new Queue("topic-queue-002")).to(new TopicExchange("topic-exchange-001", false, false)).with("log.*");
            Binding topicBinding3 = BindingBuilder.bind(new Queue("topic-queue-003")).to(new TopicExchange("topic-exchange-001", false, false)).with("log1.*");
            amqpAdmin.declareBinding(topicBinding1);
            amqpAdmin.declareBinding(topicBinding2);
            amqpAdmin.declareBinding(topicBinding3);
        }
    
        //發送者消息
        @Test
        public void send() {
            Map<String, Object> param = new HashMap<>(3);
            param.put("name", "李興武");
            param.put("age", 24);
            param.put("sex", false);
    
            // 發送自動轉換後的消息,test爲消息隊列的名字
            // mq.convertAndSend("test", param);
    
            // 根據路由鍵發送
            // mq.convertAndSend("direct-rout-key", param);
    
            // 根據交換器+路由鍵發送
            // mq.convertAndSend("topic-exchange-001", "log.bak", param);
    
            // 自定義消息頭
            mq.convertAndSend("direct-exchange-001", "direct-rout-key", param, message -> {
                message.getMessageProperties().getHeaders().put("token", "123456789");
                return message;
            });
        }
    
        // 接收消息
        @Test
        public void process() {
            Message message1 = mq.receive("topic-queue-001");
            Console.log(message1);
            Message message2 = mq.receive("topic-queue-002");
            Console.log(message2);
            // 獲取消息頭
            Message message3 = mq.receive("direct-queue");
            MessageProperties messageProperties = message3.getMessageProperties();
            Console.log(messageProperties.getHeaders().get("token"));
        }
    }
  5. 未完待續....net

相關文章
相關標籤/搜索