SpringBoot整合RabbitMQ,實現消息發送和消費以及多個消費者的狀況

下載安裝Erlang和RabbitMQ

Erlang和RabbitMQ:http://www.javashuo.com/article/p-wenfljnx-mo.htmlhtml

 

AMQP協議詳解與RABBITMQ,MQ消息隊列的應用場景,如何避免消息丟失

http://www.javashuo.com/article/p-minjciwm-mk.htmljava

 

項目建立和依賴

推薦SpringCloud項目在線建立:https://start.spring.io/spring

不用上面這個也行,下面有代碼和依賴;app

gradle的依賴,和maven差很少:eclipse

buildscript {
    ext {
        springBootVersion = '2.1.1.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'xy.study'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}


dependencies {
    implementation('org.springframework.boot:spring-boot-starter-amqp')
    implementation('org.projectlombok:lombok:1.16.+')
    runtimeOnly('org.springframework.boot:spring-boot-devtools')
    testImplementation('org.springframework.boot:spring-boot-starter-test')
}

 

代碼

配置文件application.propertiesmaven

spring.application.name=spring-boot-rabbitmq

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest



server.port = 5678

 

RabbitMQ配置文件類(註釋的代碼能夠直接刪掉):函數

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * topic 是RabbitMQ中最靈活的一種方式,能夠根據routing_key自由的綁定不一樣的隊列
 * 首先對topic規則配置
 */
//@Configuration
public class TopicRabbitConfig {

    final public static String QUEUE_NAME = "queue.name";
    final public static String TEST_TOPIC_ROUTINGKEY = "test.topic.routingKey";
    final public static String TEST_EXCHANGE_HAHA = "test.exchange.haha";



    /**
     * 設置交換器的名稱
     * @return
     *//*
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(TopicRabbitConfig.TEST_EXCHANGE_HAHA);
    }

    *//**
     * 隊列名稱
     * @return
     *//*
    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.QUEUE_NAME);
    }

    *//**
     * 將指定routing key的名稱綁定交換器的隊列
     * @param queueMessage
     * @param exchange
     * @return
     *//*
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY);
    }*/

    /**
     * 匹配以topic開頭的路由鍵routing key
     * 交換機綁定多個隊列
     */

    /*@Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }*/
}

 

 生產者,這裏根據Exchange和Routing Key,直接發送一個字符串:spring-boot

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.TopicRabbitConfig;

@Component
@Slf4j
public class HelloSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 經過exchange和routingKey的方式
     * rabbitTemplate.convertAndSend(String exchange, String routingKey, Object object)
     * @param i
     */
    public void send(int i) {
        String context = "hello " + i;
        log.info("Sender : {}", context);
        this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TEST_EXCHANGE_HAHA,TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY, context);
    }
}

 

消費者,綁定對應的Exchange,Queue和Routing Key,直接打印獲取的信息:post

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.TopicRabbitConfig;

@Component
@Slf4j
public class HelloReceiver {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
            key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
    )
    public void onOrgDeleted(@Payload String msg) {
        log.info("HelloReceiver msg : {}",msg);
    }
}

 

測試類,調用生產者發送信息的函數send,消費者會監聽消費:測試

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import xy.study.rabbitmq.producer.HelloSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Autowired
    private HelloSender sender;

    @Test
    public void testSend() {
        sender.send(666);
    }

}

 

如圖,控制檯日誌,能生成消息,而且能被對應的消費者消費。

 

topic exchange 通配路由中多個消費者的狀況

修改消費者的代碼以下:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import xy.study.rabbitmq.conf.TopicRabbitConfig;

@Component
@Slf4j
public class HelloReceiver {

    /**
     * 下面四個消費者,exchange和RoutingKey都相同,最後兩個消費者隊列名都相同
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"),
            exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
            key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
    )
    public void queueName(@Payload String msg) {
        log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME,msg);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = TopicRabbitConfig.QUEUE_NAME+".test", durable = "true"),
            exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
            key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
    )
    public void queueNameTest(@Payload String msg) {
        log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME+".test",msg);
    }

    /**
     * 這裏個人消費者隊列名"123445",是亂寫的,也可以接受
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = 123445+"", durable = "true"),
            exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
            key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
    )
    public void queueNameNumber(@Payload String msg) {
        log.info("{}-----HelloReceiver msg : {}",123445+""+".test",msg);
    }

    /**
     * 因爲這個和上面的Exchange、RoutingKey、queue徹底相同,因此這兩個消費者,一條消息,只有一個能消費(隨機)
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = 123445+"", durable = "true"),
            exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
            key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
    )
    public void queueNameNumberSame(@Payload String msg) {
        log.info("same+{}-----HelloReceiver msg : {}",123445+""+".test",msg);
    }
}

再次執行測試,測試結果以下:

上面四個消費者代碼,Exchange和RoutingKey都相同,最後兩個消費者隊列名都相同。

根據結果可知,當Exchange和RoutingKey相同、queue不一樣時全部消費者都能消費一樣的信息;

Exchange和RoutingKey、queue都相同時(最後兩個消費者),消費者中只有一個能消費信息,其餘消費者都不能消費該信息。

相關文章
相關標籤/搜索