SpringBoot 中使用RabbitMQ(一)

Exchange

交換機:Exchange 用於轉發消息,可是它不會作存儲 ,若是沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。 這裏有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麼究竟轉發到哪一個隊列,就要根據該路由鍵。spring

交換機的功能主要是接收消息而且轉發到綁定的隊列,交換機不存儲消息,在啓用ack模式後,交換機找不到隊列會返回錯誤。bash

交換機有四種類型:Direct, topic, Headers and Fanoutapp

* Direct:direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.
  * Topic:按規則轉發消息(最靈活)
  * Headers:設置 header attribute 參數類型的交換機
  * Fanout:轉發消息到全部綁定隊列(廣播模式)
複製代碼

下面介紹經常使用的三種模式的基礎用法。函數

SpringBoot 整合

Pom 依賴spring-boot

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
複製代碼

application.properties 配置文件測試

# rabbitmq鏈接參數
spring.rabbitmq.host=  # mq ip地址
spring.rabbitmq.port=5672 # 端口 默認5672
spring.rabbitmq.username=admin # 用戶名
spring.rabbitmq.password=admin # 密碼
# 開啓發送確認(開啓此模式,生產者成功發送到交換機後執行相應的回調函數)
#spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退(開啓此模式,交換機路由不到隊列時執行相應的回調函數)
#spring.rabbitmq.publisher-returns=true
# 開啓消費者手動確認 ACK 默認auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
複製代碼

Direct Exchange

direct類型的Exchange路由規則很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中ui

  • 定義配置類,註冊交換機和隊列並進行綁定
/**
 * Rabbit 配置類
 * @author peng
 */
@Configuration
public class DirectRabbitConfig {

    @Bean
    DirectExchange directExchange(){
        // 註冊一個 Direct 類型的交換機 默認持久化、非自動刪除
        return new DirectExchange("directExchange");
    }

    @Bean
    Queue infoQueue(){
        // 註冊隊列
        return new Queue("infoMsgQueue");
    }
    
    @Bean
    Queue warnQueue(){
        return new Queue("warnMsgQueue");
    }
    
    @Bean
    Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
        // 將隊列以 info-msg 爲綁定鍵綁定到交換機
        return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
    }
    
    @Bean
    Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg");
    }
}
複製代碼
  • 定義生產者
/**
 * 生產者
 * @author peng
 */
@Component
public class DirectSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendInfo() {
        String content = "I am Info msg!";
        // 將消息以info-msg綁定鍵發送到directExchange交換機
        this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content);
        System.out.println("########### SendInfoMsg : " + content);
    }
    
    public void sendWarn() {
        String content = "I am Warn msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i;
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendError() {
        String content = "I am Error msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content);
        System.out.println("########### SendErrorMsg : " + content);
    }
}

複製代碼
  • 定義消費者
消費者1
/**
 * @author peng
 */
@Component
// 標記此類爲Rabbit消息監聽類,監聽隊列infoMsgQueue
@RabbitListener(queues = "infoMsgQueue")
public class DirectReceiver1 {

    // 定義處理消息的方法
    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message);
    }
}

消費者2 
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver2 Receive warnMsg:" + message);
    }
}
複製代碼
  • 基礎用法測試
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DirectTest {
    @Autowired
    private DirectSender directSender;

    @Test
    public void send() {
        directSender.sendInfo();
        directSender.sendWarn();
        directSender.sendError();
    }
}

結果

    ########### SendInfoMsg : I am Info msg!
    ########### SendWarnMsg : I am Warn msg!
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg!
    ########### DirectReceiver1 Receive InfoMsg:I am Info msg!
    
    InfoMsg 以info-msg綁定鍵發送到directExchange交換機,交換機路由到infoMsgQueue隊列,DirectReceiver1監聽此隊列接受消息。
    WarnMsg 同理
    ErrorMsg 因爲沒有隊列的綁定鍵爲 error-msg 因此消息會被丟棄
複製代碼
  • 一對多測試
消費者3
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver3 Receive warnMsg:" + message);
    }
}

// 一對多
@Test
public void oneToMany() {
    for (int i = 0; i< 100 ; i++){
        directSender.sendWarn(i);
    }
}

結果
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7
    
    消費者2 和 消費者3 均勻(條數上)的消費了消息
複製代碼
  • 多對多測試
/**
 * 生產者3
 * @author peng
 */
@Component
public class DirectSender2 {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i +" fromSend2";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
}

// 多對多
@Test
public void manyToMany() {
    for (int i = 0; i< 10 ; i++){
        directSender.sendWarn(i);
        directSender2.sendWarn(i);
    }
}

結果

    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1
   
    消費者2和消費者3分別接受了生產者1 和生產者2的消息
複製代碼

Fanout Exchang

fanout類型的Exchange路由規則很是簡單,會發送給全部綁定到該交換機的隊列上。會忽略路由鍵this

  • 配置類
/**
 * @author peng
 */
@Configuration
public class FanoutRabbitConfig {

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Queue queue1(){
        return new Queue("fanout.1");
    }
    @Bean
    Queue queue2(){
        return new Queue("fanout.2");
    }
    @Bean
    Queue queue3(){
        return new Queue("fanout.3");
    }

    @Bean
    Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue3).to(fanoutExchange);
    }
}
複製代碼
  • 生產者
@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        System.out.println("######## Sender : " + context);
    }
}

複製代碼
  • 消費者
消費者1
/**
 * @author peng
 */
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 1 : " + message);
    }
}

消費者2
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 2 : " + message);
    }
}

消費者3
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 3 : " + message);
    }
}

複製代碼
  • 測試
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void send() {
        fanoutSender.send();
    }
}

結果

    ######## Sender : hi, fanout msg 
    fanout Receiver 1  : hi, fanout msg 
    fanout Receiver 2  : hi, fanout msg 
    fanout Receiver 3  : hi, fanout msg
複製代碼

Topic Exchange

Topic 類型匹配最爲普遍,Routing Key必須與Binding Key相匹配(可經過通配符模糊匹配)的時候纔將消息傳送給Queuespa

*匹配一個單詞, #匹配多個單詞,單詞之間以.分隔。如 *.male.#可匹配dog.male.four、rabbit.male.four.white等3d

  • 配置類
@Configuration
public class TopicRabbitConfig {

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    @Bean
    Queue topicQueue1(){
        return new Queue("topicQueue1");
    }
    @Bean
    Queue topicQueue2(){
        return new Queue("topicQueue2");
    }
    @Bean
    Queue topicQueue3(){
        return new Queue("topicQueue3");
    }

    @Bean
    Binding topicQueue1Binding(Queue topicQueue1, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue1).to(exchange).with("*.male.four");
    }
    @Bean
    Binding topicQueue2Binding(Queue topicQueue2, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue2).to(exchange).with("#.four");
    }
    
    @Bean
    Binding topicQueue3Binding(Queue topicQueue3, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue3).to(exchange).with("hen.female.two");
    }
}
複製代碼
  • 生產者
@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    public void send1() {
        String context = "##### i am message 1";
        System.out.println("Sender 1 : " + context);
        // 路由鍵 rabbit.male.four 會被綁定鍵 *.male.four 和 #.four匹配
        this.rabbitTemplate.convertAndSend("topicExchange", "rabbit.male.four", context);
    }

    public void send2() {
        String context = "##### i am message 2";
        System.out.println("Sender 2: " + context);
        // 路由鍵 dog.male.four 會被綁定鍵 #.four 匹配
        this.rabbitTemplate.convertAndSend("topicExchange", "dog.female.four", context);
    }

    public void send3() {
        String context = "##### i am messages 3";
        System.out.println("Sender 3: " + context);
        路由鍵 hen.female.two 會被綁定鍵 hen.female.two 匹配
        this.rabbitTemplate.convertAndSend("topicExchange", "hen.female.two", context);
    }
}
複製代碼
  • 消費者
消費者1
@Component
@RabbitListener(queues = "topicQueue1")
public class TopicReceiver1 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Topic Receiver1 : " + msg);
    }
}

消費者2
@Component
@RabbitListener(queues = "topicQueue2")
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Topic Receiver2 : " + msg);
    }
}

消費者3
@Component
@RabbitListener(queues = "topicQueue3")
public class TopicReceiver3 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Topic Receiver3 : " + msg);
    }
}

複製代碼
  • 測試
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TopicTest {
    @Autowired
    private TopicSender topicSender;

    @Test
    public void send() {
        topicSender.send1();
        topicSender.send2();
        topicSender.send3();
    }
}

結果

    Sender 1: ##### i am message 1
    Sender 2: ##### i am message 2
    Sender 3: ##### i am messages 3
    Topic Receiver1 : ##### i am message 1
    Topic Receiver2 : ##### i am message 1
    Topic Receiver3 : ##### i am messages 3
    Topic Receiver2 : ##### i am message 2

消息1 被消費者1和2消費 路由鍵 rabbit.male.four 會被綁定鍵 *.male.four 和 #.four匹配
消息2 被消費者2消費  路由鍵 dog.male.four 會被綁定鍵 #.four 匹配
消息3 被消費者3消費  至關於 direct徹底匹配
複製代碼
相關文章
相關標籤/搜索