RabbitMQ入門:路由(Routing)

在上一篇博客《RabbitMQ入門:發佈/訂閱(Publish/Subscribe)》中,咱們認識了fanout類型的exchange,它是一種經過廣播方式發送消息的路由器,全部和exchange創建的綁定關係的隊列都會接收到消息。可是有一些場景只須要訂閱到一部分消息,這個時候就不能使用fanout 類型的exchange了,這個就引出來今天的「豬腳」--Direct Exchange,經過Routing Key來決定須要將消息發送到哪一個或者哪些隊列中。html

接下來請收看詳細內容:react

  1. Direct Exchange(直接路由器)
  2. 多重綁定
  3. 代碼實例

1、Direct Exchange(直接路由器)ide

在上文中介紹exchange的時候,對direct exchange進行了簡單介紹,它是一種徹底按照routing key(路由關鍵字)進行投遞的:當消息中的routing key和隊列中的binding key徹底匹配時,才進行會將消息投遞到該隊列中。這裏提到了一個routing key和binding key(綁定關鍵字),是什麼東東?ui

  1. routing key:

     在發送消息的時候,basicPublish的第二個參數就是routing key,因爲上次是fanout 類型的exchange 進行廣播方式投遞,這個字段不會影響投遞結果,所以咱們這裏就傳入了「」,可是在direct 類型的exchange中咱們就不能傳入""了,須要指定具體的關鍵字。spa

  2. binding key:

    咱們在前文中創建綁定關係的時候,queueBind的第三個參數就是綁定關鍵字debug

咱們聲明direact exchange的時候使用:code


2、多重綁定htm

多個隊列相同的綁定鍵綁定到同一個路由器的狀況,咱們稱之爲多重綁定blog

工做模型爲(P表明生產者,X表明路由器,紅色的Q表明隊列,C表明消費者):rabbitmq

 

3、代碼實例

 預備知識瞭解完了,如今來寫個程序感覺下。

  1. 生產者
    public class LogDirectSender {
        // exchange名字
        public static String EXCHANGE_NAME = "directExchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立鏈接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.爲通道聲明direct類型的exchange
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
                
                // 3.發送消息到指定的exchange,隊列指定爲空,由exchange根據狀況判斷須要發送到哪些隊列
                String routingKey = "debug";
                String msg = " hello rabbitmq, I am " + routingKey;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.關閉鏈接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }

    和上次博客中生產者的區別就是黑字粗體部分:1.路由器類型改成direct 2.消息發佈的時候指定了routing key

  2. 消費者
    public class LogDirectReciver {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立鏈接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.爲通道聲明direct類型的exchange
                channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                // 3.建立隨機名字的隊列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.創建exchange和隊列的綁定關係
                 String[] bindingKeys = { "error", "info", "debug" };
    //            String[] bindingKeys = { "error" };
                for (int i = 0; i < bindingKeys.length; i++) {
                    channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]);
                    System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]);
                }
    
                // 5.經過回調生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取消息內容而後處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費消息
                channel.basicConsume(queueName, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

    和上次博客中消費者的區別就是黑字粗體部分:1.路由器類型改成direct 2.創建綁定關係的時候指定了binding key

  3. 執行消費者,控制檯log打印以下:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug

    這個消費者咱們視爲消費者1,它會接收error,info,debug三個關鍵字的消息。

  4. 將String[] bindingKeys = { "error", "info", "debug" };改成String[] bindingKeys = { "error" };,而後再運行一次消費者。控制檯log打印以下:
     **** LogDirectReciver keep alive ,waiting for error

    這個消費者咱們視爲消費者2,它只會接收error 關鍵字的消息。

  5. 執行生產者,而後將String routingKey = "debug";的值分別改成「info"和"error",而後分別執行,這樣一共執行了三次生產者
    第一次執行:
    product send a msg:  hello rabbitmq, I am debug
    
    第二次執行:
    product send a msg:  hello rabbitmq, I am info
    
    第三次執行:
    product send a msg:  hello rabbitmq, I am error
  6. 再次查看兩個消費者的控制檯log:
    消費者1:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug
    *********** LogDirectReciver get message :[ hello rabbitmq, I am debug]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am info]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
    
    消費者2:
     **** LogDirectReciver keep alive ,waiting for error
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]

     

  7. 查看RabbitMQ管理頁面

    exchanges標籤頁裏面多了個direct類型的路由器。進入詳細頁面:

    有4個綁定關係,其中三個的隊列是同一個。切換到Queues標籤頁:

    有兩個臨時隊列。

  8. 若是關掉消費者1和消費者2,會發現隊列自動刪除了,綁定關係也不存在了。

相關文章
相關標籤/搜索