RabbitMQ入門:主題路由器(Topic Exchange)

上一篇博文中,咱們使用direct exchange 代替了fanout exchange,此次咱們來看下topic exchange。html

1、Topic Exchange介紹sql

topic exchange和direct exchange相似,都是經過routing key和binding key進行匹配,不一樣的是topic exchange能夠爲routing key設置多重標準。ide

direct路由器相似於sql語句中的精確查詢;topic 路由器有點相似於sql語句中的模糊查詢。ui

還記得嗎?咱們在《RabbitMQ入門:發佈/訂閱(Publish/Subscribe)》中對exchange的分類進行過介紹:spa

Direct:徹底根據key進行投遞的,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。
Topic:對key進行模式匹配後進行投遞,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。
Fanout:不須要key,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。
Headers:咱們能夠不考慮它。

下面是官網給出的工做模型(P表明生產者,X表明exhange,紅色的Q表明隊列,C表明消費者):code

咱們來分析下這個模型。htm

它發送的消息是用來描述動物的。路由鍵有三個單詞:<speed>.<color>.<species>,第一個單詞描述了速度,第二個描述了顏色,第三個描述了物種。
有三個綁定鍵,Q1綁定鍵爲*.orange.*(關注全部顏色爲orange的動物); Q2的綁定鍵有兩個,分別是*.*.rabbit(關注全部的兔子)和lazy.#(關注全部速度爲lazy的動物)。blog

所以,路由鍵爲quick.orange.rabbit的消息將發送到Q1和Q2,路由鍵爲quick.orange.fox的消息將發送到Q1,路由鍵爲lazy.brown.fox的消息將發送到Q2。路由鍵爲lazy.pink.rabbit的消息將發送到Q2,可是注意,它只會到達Q2一次,儘管它匹配了兩個綁定鍵。路由鍵爲quick.brown.fox的消息由於不和任意的綁定鍵匹配,因此將會被丟棄。rabbitmq

若是有人手一抖發了個lazy.orange.male.rabbit這種四個單詞的,這個怎麼辦呢? 因爲它和lazy.#匹配,所以將發送到Q2。隊列

2、代碼示例

接下來咱們看下代碼

  1. 生產者
    public class LogTopicSender {
        // exchange名字
        public static String EXCHANGE_NAME = "topicExchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立鏈接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.爲通道聲明topic類型的exchange
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                
                // 3.發送消息到指定的exchange,隊列指定爲空,由exchange根據狀況判斷須要發送到哪些隊列
                String routingKey = "info";
    //            String routingKey = "log4j.error";
    //            String routingKey = "logback.error";
    //            String routingKey = "log4j.warn";
                String msg = " hello rabbitmq, I am " + routingKey;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.關閉鏈接
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }

     

  2. 消費者
    public class LogTopicReciver {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立鏈接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.爲通道聲明topic類型的exchange
                channel.exchangeDeclare(LogTopicSender.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                // 3.建立隨機名字的隊列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.創建exchange和隊列的綁定關係
                String[] bindingKeys = { "#" };
    //            String[] bindingKeys = { "log4j.*", "#.error" };
    //            String[] bindingKeys = { "*.error" };
    //            String[] bindingKeys = { "log4j.warn" };
                for (int i = 0; i < bindingKeys.length; i++) {
                    channel.queueBind(queueName, LogTopicSender.EXCHANGE_NAME, bindingKeys[i]);
                    System.out.println(" **** LogTopicReciver keep alive ,waiting for " + bindingKeys[i]);
                }
    
                // 5.經過回調生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取消息內容而後處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** LogTopicReciver" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費消息
                channel.basicConsume(queueName, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

     

  3. 啓動消費者,做爲消費者1
  4. 分別將String[] bindingKeys = { "#" };改成String[] bindingKeys = { "log4j.*", "#.error" };/String[] bindingKeys = { "*.error" };/String[] bindingKeys = { "log4j.warn" };,而後啓動做爲消費者二、消費者三、消費者4
  5. 啓動4次生產者,routing key分別爲String routingKey = "info";、String routingKey = "log4j.error";、String routingKey = "logback.error";、String routingKey = "log4j.warn";
  6. 觀察控制檯log
    生產者:
    product send a msg:  hello rabbitmq, I am info
    product send a msg:  hello rabbitmq, I am log4j.error
    product send a msg:  hello rabbitmq, I am logback.error
    product send a msg:  hello rabbitmq, I am log4j.warn
    
    消費者1:
     **** LogTopicReciver keep alive ,waiting for #
    *********** LogTopicReciver get message :[ hello rabbitmq, I am info]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]
    
    消費者2:
     **** LogTopicReciver keep alive ,waiting for log4j.*
     **** LogTopicReciver keep alive ,waiting for #.error
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消費者3: **** LogTopicReciver keep alive ,waiting for *.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] 消費者4: **** LogTopicReciver keep alive ,waiting for log4j.warn *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]

     

  7. 觀察RabbitMQ管理頁面

相關文章
相關標籤/搜索