RabbitMQ傳輸原理、五種模式

本文代碼基於SpringBoot,文末有代碼鏈接 。首先是一些在Spring Boot的一些配置和概念,而後跟隨代碼看下五種模式 java

MQ兩種消息傳輸方式,點對點(代碼中的簡單傳遞模式),發佈/訂閱(代碼中路由模式)。要是你熟悉RabbitMQ SpringBoot配置的話,就是simple和direct。git

MQ安裝指南:https://blog.csdn.net/qq_19006223/article/details/89421050github

 

0.消息隊列運轉過程異步

生產者生產過程:
(1)生產者鏈接到 RabbitMQ Broker 創建一個鏈接( Connection) ,開啓 個信道 (Channel)
(2) 生產者聲明一個交換器 ,並設置相關屬性,好比交換機類型、是否持久化等
(3)生產者聲明 個隊列井設置相關屬性,好比是否排他、是否持久化、是否自動刪除等
(4)生產者經過路由鍵將交換器和隊列綁定起來。
(5)生產者發送消息至 RabbitMQ Broker ,其中包含路由鍵、交換器等信息。
(6) 相應的交換器根據接收到的路由鍵查找相匹配的隊列 若是找到 ,則將從生產者發送過來的消息存入相應的隊列中。
(7) 若是沒有找到 ,則根據生產者配置的屬性選擇丟棄仍是回退給生產者
(8) 關閉信道。
(9) 關閉鏈接。 ide

消費者接收消息的過程:
(1)消費者鏈接到 RabbitMQ Broker ,創建一個鏈接(Connection ,開啓 個信道(Channel)
(2) 消費者向 RabbitMQ Broker 請求消費相應隊列中的消息,可能會設置相應的回調函數, 以及作 些準備工做。
(3)等待 RabbitMQ Broker 迴應並投遞相應隊列中的消息, 消費者接收消息。
(4) 消費者確認 ack) 接收到的消息
(5) RabbitMQ 從隊列中刪除相應己經被確認的消息
(6) 關閉信道。
(7)關閉鏈接。函數

1.項目結構工具

 common是工具,receiver是消費者,sender是生產者測試

具體各自的pom.xml文件請看項目,都有註釋。fetch

2.sender(生產者的配置)this

 

   

#確認機制
publisher-confirms: true 消息有沒有到達MQ(會返回一個ack確認碼)
publisher-returns: true 消息有沒有找到合適的隊列
主要是爲了生產者和mq之間的一個確認機制,當消息到沒到mq,會提供相應的回調,在項目中 RabbitSender 這個類中進行了相應的配置
 1     private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, s) -> {
 2         if (ack) {
 3             System.out.println(correlationData.getId());
 4         } else {
 5             log.error("ConfirmCallback消息發送失敗: {}", s);
 6         }
 7     };
 8 
 9     private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routingKey)
10             -> log.error("ReturnCallback消息發送失敗: {}", new String(message.getBody(), StandardCharsets.UTF_8));
11 
12 
13     public <T> void sendMsg(String exchangeName, String routingKeyName, T content) {
14         // 設置每一個消息都返回一個確認消息
15         this.rabbitTemplate.setMandatory(true);
16         // 消息確認機制
17         this.rabbitTemplate.setConfirmCallback(confirmCallback);
18         // 消息發送失敗機制
19         this.rabbitTemplate.setReturnCallback(returnCallback);
20         // 異步發送消息
21         CorrelationData correlationData = new CorrelationData();
22         correlationData.setId("123");
23         this.rabbitTemplate.convertAndSend(exchangeName, routingKeyName, content, correlationData);
24     }
View Code

 還能夠根據需求設置發送時CorrelationData 的值

    #mandatory

參數設爲 true 時,交換器沒法根據自身的類型和路由鍵找到一個符合條件 的隊列,那麼 RabbitM 會調用 Basic.Return 命令將消息返回給生產者。
默認爲false,直接丟棄

3.receiver(消費者配置)

 

這裏主要說一下 listerner 的相關配置

一共有兩種模式:simple和direct模式

simple主要包括兩種工做模式,direct主要包括四種,待會代碼會詳解。

先說主要配置(以direct爲例)

#acknowledge-mode: manual 

手動確認模式,推薦使用這種。就是說當消息被消費者消費時,須要手動返回信息告訴mq。若是是自動的話,mq會自動確認,無論你消費者是否完成消費(好比說拋出異常)

#prefetch: 1

一個消費者一次拉取幾條消息,本demo一條一條來。

#consumers-per-queue: 2

一個隊列能夠被多少消費者消費(這個配置,我測試的時候沒測試出來,若是有朋友瞭解的話,能夠評論下。)

還有其餘配置,看下源碼,兩種模式共有的

 simple特有的

direct特有的

4.各類模式詳解

---------simple方式下的兩種

打開上面的listener配置

 

4.1 simple

一個生產者,一個消費者

生產者發送消息都在SenderTest裏面

 

生產者:

    /**簡單模式*/
    @Test
    public void senderSimple() throws Exception {
        String context = "simple---> " + new Date();
        this.rabbitTemplate.convertAndSend("simple", context);
    }

消費者

    @RabbitListener(queues = "simple")
    public void simple(Message message, Channel channel){
        String messageRec = new String(message.getBody());
        System.out.println("simple模式接收到了消息:"+messageRec);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            System.out.println("報錯了------------------"+e.getMessage());
        }
    }

輸出

  

simple模式接收到了消息:simple---> Sat Apr 20 20:40:16 CST 2019

4.2 work 模式

一個生產者,多個消費者

生產者

    private static final List<Integer> ints = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    /**work模式*/
    @Test
    public void senderWork() throws Exception {
        ints.forEach((i)->{
            String context = "work---> " + i;
            this.rabbitTemplate.convertAndSend("work", context);
        });
    }

消費者

 

    @RabbitListener(queues = "work")
    public void work1(Message message, Channel channel){
        try{
            Thread.sleep(500);
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
        String messageRec = new String(message.getBody());
        System.out.println("work1接收到了消息:"+messageRec);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            System.out.println("work1報錯了------------------"+e.getMessage());
        }
    }


    @RabbitListener(queues = "work")
    public void work2(Message message, Channel channel){
        try{
            Thread.sleep(1000);
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
        String messageRec = new String(message.getBody());
        System.out.println("work2接收到了消息:"+messageRec);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            System.out.println("work2報錯了------------------"+e.getMessage());
        }
    }

輸出

 

work1接收到了消息:work---> 2
work2接收到了消息:work---> 0
work1接收到了消息:work---> 1
work1接收到了消息:work---> 4
work2接收到了消息:work---> 3
work1接收到了消息:work---> 6
work1接收到了消息:work---> 7
work2接收到了消息:work---> 5
work1接收到了消息:work---> 8
work1接收到了消息:work---> 10
work2接收到了消息:work---> 9

-----direct方式下的

切換listener配置

4.3direct交換機

生產者發送消息給指定交換機,綁定的某個隊列。

消費者經過監聽某交換機綁定的某個隊列接受消息。

生產者

    /**direct交換機*/
    @Test
    public void senderDirect() throws Exception {
        rabbitSender.sendMsg("direct","directKey1","directContent1");
        rabbitSender.sendMsg("direct","directKey2","directContent2");
    }

消費者

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange("direct"), key = "directKey1"
            , value = @Queue(value = "directQueue1", durable = "true", exclusive = "false", autoDelete = "false")))
    public void direct1(String str, Channel channel, Message message) throws IOException {
        try {
            System.out.println("directQueue1接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange("direct"), key = "directKey2"
            , value = @Queue(value = "directQueue2", durable = "true", exclusive = "false", autoDelete = "false")))
    public void direct2(String str, Channel channel, Message message) throws IOException {
        try {
            System.out.println("directQueue2接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

輸出

directQueue1接收到了:directContent1
directQueue2接收到了:directContent2

4.4 topic交換機

指定主題

# :匹配一個或者多級路徑

*: 匹配一級路徑

生產者

    @Test
    public void senderTopic() throws Exception {
        String contexta = "topic.a";
        rabbitSender.sendMsg("topic","topicKey.a",contexta);
        String contextb = "topic.b";
        rabbitSender.sendMsg("topic","topicKey.b",contextb);
        String contextc = "topic.c";
        rabbitSender.sendMsg("topic","topicKey.c",contextc);
        String contextz = "topic.z";
        rabbitSender.sendMsg("topic","topicKey.c.z",contextz);
    }

消費者

    /**
     * topic交換機
     * */
    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "topic",type = "topic"), key = "topicKey.#"
            , value = @Queue(value = "topicQueue", durable = "true", exclusive = "false", autoDelete = "false")))
    public void topicQueue(String str, Channel channel, Message message) throws Exception {
        try {
            System.out.println("topicQueue接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

輸出

topicQueue接收到了:topic.a

4.5 Fanout 交換機

廣播模式,一個消息能夠給多個消費者消費

生產者

    /**Fanout 交換機*/
    @Test
    public void senderFanout() throws Exception {
        String contexta = "Fanout";
        rabbitSender.sendMsg("fanout","fanoutKey1",contexta);
        //寫不寫KEY都無所謂
    }

消費者

    /**
     * Fanout 交換機
     * */
    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "fanout",type = "fanout"), key = "fanoutKey1"
            , value = @Queue(value = "fanoutQueue1", durable = "true", exclusive = "false", autoDelete = "false")))
    public void fanoutQueue1(String str, Channel channel, Message message) throws Exception {
        try {
            System.out.println("fanoutQueue1接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

    @RabbitListener(bindings = @QueueBinding(exchange = @Exchange(name = "fanout",type = "fanout"), key = "fanoutKey2"
            , value = @Queue(value = "fanoutQueue2", durable = "true", exclusive = "false", autoDelete = "false")))
    public void fanoutQueue2(String str, Channel channel, Message message) throws Exception {
        try {
            System.out.println("fanoutQueue2接收到了:"+str);
        }catch (Exception e){
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
    }

輸出

fanoutQueue2接收到了:Fanout
fanoutQueue1接收到了:Fanout

4.6 Headers 交換機

 

代碼:https://github.com/majian1994/rabbitMQ_Study

相關文章
相關標籤/搜索