MQ的發佈訂閱模式(fanout)| RabbitMQ系列(七)

這是我參與8月更文挑戰的第12天,活動詳情查看:8月更文挑戰markdown


相關文章

RabbitMQ系列彙總:RabbitMQ系列工具


前言

  • 回顧一下簡單模式和工做模式post

    • 簡單模式:一個生產者對應一個消費者。
    • 工做模式:一個生產者對應多個消費者。
  • 如今來介紹一下交換機是幹嗎的測試

  • RabbitMQ 消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發送到隊列。實際上,一般生產 者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。spa

  • 相反,生產者只能將消息發送到交換機(exchange),交換機工做的內容很是簡單,一方面它接收來 自生產者的消息,另外一方面將它們推入隊列。交換機必須確切知道如何處理收到的消息。code

  • 是應該把這些消 息放到特定隊列仍是說把他們到許多隊列中仍是說應該丟棄它們。這就的由交換機的類型來決定。orm

  • image-20210804112052503.png

  • 交換機共有四種類型隊列

    • 直接(direct)
    • 主題(topic)
    • 標題(headers)
    • 扇出(fanout)
    • 通常MQ會幫咱們建立一些默認的交換機,能夠直接拿來使用,也能夠本身建立不一樣類型的交換機
    • image-20210804112606519.png
  • 這裏須要講下無名交換機,也就是默認交換機,先前的幾篇文章咱們都沒有指定交換機路由

    • image-20210804112458681.png
    • 第一個參數是交換機的名稱。空字符串表示默認或無名稱交換機:消息能路由發送到隊列中其實 是由 routingKey(bindingkey)綁定 key 指定的,若是它存在的話。
    • 那麼這種狀況下,MQ會走默認的交換機(AMQP default)
    • image-20210804112542337.png
  • 還有東西叫臨時隊列字符串

    • //建立臨時隊列
      String queueName = channel.queueDeclare().getQueue();
      複製代碼
    • image-20210804113925048.png
    • 直接執行確定報錯的:明顯告訴咱們隊列名不要用amq來命名,因此能夠將代碼稍微改造下
    • image-20210804114330969.png
    • channel.queueDeclare("dy"+queueName,false,false,false,null);
      複製代碼
    • image-20210804114444954.png
  • 瞭解了交換機以後,咱們就能夠正式開始發佈訂閱模式啦~

1、生產者

  • 儘可能在生產者指定交換機,畢竟按照正常邏輯來看,確定是先發送,才能接受。不然的話接受了個寂寞哦~

  •    public static void publishMessageIndividually() throws Exception {
            //工具類獲取通道
            Channel channel = RabbitMqUtils.getChannel();
            /**
             * 指定交換機和模式
             * 參數一:指定的交換機名稱
             * 參數二:指定的交換機模式
             */
            channel.exchangeDeclare(ChangeNameConstant.FANOUT_MODEL,"fanout");
    ​
            Scanner sc = new Scanner(System.in);
            System.out.println("請輸入信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish(ChangeNameConstant.FANOUT_MODEL, "20210804", null, message.getBytes("UTF-8"));
                System.out.println("生產者發出消息" + message);
            }
        }
    複製代碼
  • image-20210804140950379.png

  • 因而可知

    • 建立了一個生產者
    • 創建了一個交換機Name= fanout_pattern
    • 交換機的類型是 fanout(扇型)

2、消費者

  • 消費者A

  • /**
     * 這是一個測試的消費者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyConsumerTest_Fanout01 {
    ​
        public static void main(String[] args) throws Exception{
            //使用工具類來建立通道
            Channel channel = RabbitMqUtils.getChannel();
    ​
            /**
             * 生成一個臨時的隊列 隊列的名稱是隨機的
             * 當消費者斷開和該隊列的鏈接時 隊列自動刪除
             */
            String queueName = channel.queueDeclare().getQueue();
            //把該臨時隊列綁定咱們的 exchange 其中 routingkey(也稱之爲 binding key)爲空字符串
            channel.queueBind(queueName, ChangeNameConstant.FANOUT_MODEL, "20210804");
            System.out.println("交換機A等待接收消息,把接收到的消息打印在屏幕.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("控制檯打印接收到的消息"+message);
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
                System.out.println("消息中斷了~");
            });
        }
    }
    複製代碼
  • 消費者B代碼同樣,只是改爲消費者B等待接受消息~

  • image-20210804141203493.png

  • 兩個消費者的申明的臨時隊列成功綁定到了交換機上!

  • 執行結果

    • 生產者
    • image-20210804141255986.png
    • 消費者A
    • image-20210804141318178.png
    • 消費者B
    • image-20210804141330836.png
  • 生產者給交換機發了一條消息。

  • 兩個消費者都收到了消息。

3、總結

  • image-20210804141425353.png
  • 很明顯,這個地方咱們不會將消息直接發到隊列了
  • 而是將消息發送給交換機
  • 交換機尋找哪些隊列綁定了本交換機
  • 而後將消息發送至全部被綁定的隊列
  • 最終隊列發送消息給消費者

路漫漫其修遠兮,吾必將上下求索~

若是你認爲i博主寫的不錯!寫做不易,請點贊、關注、評論給博主一個鼓勵吧~hahah

相關文章
相關標籤/搜索