RabbitMQ基礎教程之Spring&JavaConfig使用篇

RabbitMQ基礎教程之Spring使用篇

相關博文,推薦查看:java

  1. RabbitMq基礎教程之安裝與測試
  2. RabbitMq基礎教程之基本概念
  3. RabbitMQ基礎教程之基本使用篇
  4. RabbitMQ基礎教程之使用進階篇

在實際的應用場景中,將RabbitMQ和Spring結合起來使用的時候可能更加頻繁,網上關於Spring結合的博文中,大多都是xml的方式,這篇博文,則主要介紹下利用JavaConfig的結合,又會是怎樣的git

<!--more-->github

I. Spring中RabbitMQ的基本使用姿式

1. 準備

開始以前,首先添加上必要的依賴,主要利用 spring-rabbit 來實現,這個依賴中,內部又依賴的Spring相關的模塊,下面統一改爲5.0.4版本spring

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.7.3.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

流程分析ide

實現主要分爲兩塊,一個是投遞服務,一個是消費服務,結合前面RabbitMQ的基本使用姿式中的流程,即使是使用Spring,咱們也避免不了下面幾步學習

  • 創建鏈接
  • 聲明Exchange ,聲明Queue
  • 創建Queue和Exchange之間的綁定關係
  • 發送消息
  • 消費消息(ack/nak)

2. 基本case

首先借助Spring,來實現一個最基本的最簡單的實現方式測試

/**
 * Created by yihui in 19:53 18/5/30.
 */
public class SimpleProducer {
    public static void main(String[] args) throws InterruptedException {
        CachingConnectionFactory factory = new CachingConnectionFactory("127.0.0.1", 5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        RabbitAdmin admin = new RabbitAdmin(factory);

        // 建立隊列
        Queue queue = new Queue("hello", true, false, false, null);
        admin.declareQueue(queue);

        //建立topic類型的交換機
        TopicExchange exchange = new TopicExchange("topic.exchange");
        admin.declareExchange(exchange);

        //交換機和隊列綁定,路由規則爲匹配"foo."開頭的路由鍵
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));


        //設置監聽
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        Object listener = new Object() {
            public void handleMessage(String foo) {
                System.out.println(" [x] Received '" + foo + "'");
            }
        };
        MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
        container.setMessageListener(adapter);
        container.setQueues(queue);
        container.start();

        //發送消息
        RabbitTemplate template = new RabbitTemplate(factory);
        template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
        Thread.sleep(1000);

        // 關閉
        container.stop();
    }
}

3. 邏輯分析

上面這一段代碼中,包含了消息投遞和消費兩塊,從實現而言,基本上邏輯和前面的基礎使用沒有什麼太大的區別,步驟以下:fetch

  1. 創建鏈接: new CachingConnectionFactory("127.0.0.1", 5672)
  2. 聲明Queue: new Queue("hello", true, false, false, null)
  3. 聲明Exchange: new TopicExchange("topic.exchange")
  4. 綁定Queue和Exchange: admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
  5. 投遞消息: template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
  6. 消費消息: 設置MessageListenerAdapter

這裏面有幾個類須要額外注意:ui

  • RabbitTemplate: Spring實現的發送消息的模板,能夠直接發送消息
  • SimpleMessageListenerContainer: 註冊接收消息的容器

II. Spring結合JavaConfig使用RabbitMQ使用姿式

1. 公共配置

主要是將公共的ConnectionFactory 和 RabbitAdmin 抽取出來this

@Configuration
@ComponentScan("com.git.hui.rabbit.spring")
public class SpringConfig {

    private Environment environment;

    @Autowired
    public void setEnvironment(Environment environment) {
        this.environment = environment;
        System.out.println("then env: " + environment);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

2. 消息投遞

發送消息的組件就比較簡單了,直接利用 AmqpTemplate 便可

@Component
public class AmqpProducer {

    private AmqpTemplate amqpTemplate;

    @Autowired
    public void amqpTemplate(ConnectionFactory connectionFactory) {
        amqpTemplate = new RabbitTemplate(connectionFactory);
    }

    /**
     * 將消息發送到指定的交換器上
     *
     * @param exchange
     * @param msg
     */
    public void publishMsg(String exchange, String routingKey, Object msg) {
        amqpTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

3. DirectExchange消息消費

根據不一樣的Exchange類型,分別實現以下

DirectExchange方式

@Configuration
public class DirectConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange("direct.exchange");
        directExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return directExchange;
    }

    @Bean
    public Queue directQueue() {
        Queue queue = new Queue("aaa");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding directQueueBinding() {
        Binding binding = BindingBuilder.bind(directQueue()).to(directExchange()).with("test1");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener directConsumer() {
        return new BasicConsumer("direct");
    }

    @Bean(name = "directMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(directQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(directConsumer());
        return container;
    }
}

從上面的實現,基本上都是從新定義了一個Queue, Exchange, Binding, MessageListenerContainer(用來監聽消息),並將消息的消費抽出了一個公共類

@Slf4j
public class BasicConsumer implements ChannelAwareMessageListener {
    private String name;

    public BasicConsumer(String name) {
        this.name = name;
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            byte[] bytes = message.getBody();
            String data = new String(bytes, "utf-8");
            System.out.println(name + " data: " + data + " tagId: " + message.getMessageProperties().getDeliveryTag());
        } catch (Exception e) {
            log.error("local cache rabbit mq localQueue error! e: {}", e);
        }
    }
}

4. 測試

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfig.class)
public class SprintUnit {
    @Autowired
    private AmqpProducer amqpProducer;

    @Test
    public void testDirectConsumer() throws InterruptedException {
        String[] routingKey = new String[]{"hello.world", "world", "test1"};
        for (int i = 0; i < 10; i++) {
            amqpProducer
                    .publishMsg("direct.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
        }
        System.out.println("-------over---------");

        Thread.sleep(1000 * 60 * 10);
    }
}

這個測試類中,雖然主要是往MQ中投遞消息,但在Spring容器啓動以後,接收MQ消息並消費的實際任務,是經過前面的MessageListenerContainer託付給Spring容器了,上面測試執行以後,輸出爲

direct data: >>> hello test1>>> 2 tagId: 1
direct data: >>> hello test1>>> 5 tagId: 2
direct data: >>> hello test1>>> 8 tagId: 3

5. Topic & Fanout策略

上面的一個寫出來以後,再看這兩個就比較類似了

@Configuration
public class TopicConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public TopicExchange topicExchange() {
        TopicExchange topicExchange = new TopicExchange("topic.exchange");
        topicExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return topicExchange;
    }

    @Bean
    public Queue topicQueue() {
        Queue queue = new Queue("bbb");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding topicQueueBinding() {
        Binding binding = BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.queue");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener topicConsumer() {
        return new BasicConsumer("topic");
    }

    @Bean(name = "topicMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(topicQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(topicConsumer());
        return container;
    }
}

對應的測試case

@Test
public void testTopicConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue"};
    for (int i = 0; i < 20; i++) {
        amqpProducer.publishMsg("topic.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

廣播方式

@Configuration
public class FanoutConsumerConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public FanoutExchange fanoutExchange() {
        FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
        fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return fanoutExchange;
    }

    @Bean
    public Queue fanoutQueue() {
        Queue queue = new Queue("ccc");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding fanoutQueueBinding() {
        Binding binding = BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener fanoutConsumer() {
        return new BasicConsumer("fanout");
    }

    @Bean(name = "FanoutMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(fanoutQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(fanoutConsumer());
        return container;
    }
}

對應的測試case

@Test
public void testFanoutConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue", "hello.world", "world", "test1"};
    for (int i = 0; i < 20; i++) {
        amqpProducer
                .publishMsg("fanout.exchange", routingKey[i % 6], ">>> hello " + routingKey[i % 6] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

II. 其餘

項目地址

一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛

聲明

盡信書則不如,已上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現bug或者有更好的建議,歡迎批評指正,不吝感激

掃描關注

QrCode

相關文章
相關標籤/搜索