RabbitMQ 基本使用

引入依賴

pom.xml 中添加:spring

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- or '*-stream-kafka' --> 
            <!-- 採用 spring cloud 大版本 -->
        </dependency>
複製代碼

配置RabbitMQ

application.ymlbootstrap.yml 亦或是在 config 配置中心庫中添加配置:bootstrap

spring:
  rabbitmq:
    host: 127.0.0.1
    username: guest
    password: guest
    port: 5672
複製代碼

消息生產/消費

生產者

@Component
public class MySender extends OrderApplicationTests {  // 這裏繼承的測試類

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void send() {
        amqpTemplate.convertAndSend("myQueue",new Date().toString());
    }

    /*
    測試服務1.發送
     */
    @Test
    public void send1() {
        amqpTemplate.convertAndSend("server","server1",new Date().toString());
    }

    /*
    測試服務2.發送
     */
    @Test
    public void send2() {
        amqpTemplate.convertAndSend("server","server2",new Date().toString());
    }
}
複製代碼

消費者

@Component
public class MyReceiver {
    private final Logger logger = LoggerFactory.getLogger(MyReceiver.class);

    // @RabbitListener(queues = "myQueue")  這裏須要在 rabbitmq 中手動添加 myQueue 隊列,否則就會報錯
    // @RabbitListener(queuesToDeclare = @Queue("myQueue"))  // 自動聲明 myQueue 隊列
    @RabbitListener(bindings = @QueueBinding( // 自動建立,Exchange和Queue綁定,具體綁定還應該添加 key 參數。消息分組
            value = @Queue("myQueue"),
            exchange = @Exchange("myExchange")
    )
    )
    public void receive(String msg) {
        logger.info("receive:{}",msg);
    }

    /**
     * 服務1,Exchange 綁定測試
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("server1"),
            key = "server1",
            exchange = @Exchange("server")
    ))
    public void recevice1(String msg) {
        logger.info("receive server1:{}",msg);
    }

    /**
     * 服務2,Exchange 綁定測試
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("server2"),
            key = "server2",
            exchange = @Exchange("server")
    ))
    public void recevice2(String msg) {
        logger.info("receive server2:{}",msg);
    }
}
複製代碼

這裏的服務1和服務2的不一樣綁定,主要是爲了實現業務上面多個服務發送消息到一個服務時,對不一樣服務消息的區分bash

相關文章
相關標籤/搜索