RabbitMQ :主題(Topic)

一、 主題轉發(Topic Exchange)

發往主題類型的轉發器的消息不能隨意的設置選擇鍵(routing_key),必須是由點隔開的一系列的標識符組成。標識符能夠是任何東西,可是通常都與消息的某些特性相關。一些合法的選擇鍵的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你能夠定義任何數量的標識符,上限爲255個字節。
綁定鍵和選擇鍵的形式同樣。主題類型的轉發器背後的邏輯和直接類型的轉發器很相似:一個附帶特殊的選擇鍵將會被轉發到綁定鍵與之匹配的隊列中。須要注意的是:關於綁定鍵有兩種特殊的狀況。
*能夠匹配一個標識符。
#能夠匹配0個或多個標識符。
java

二、 圖解:

咱們準備發送關於動物的消息。消息會附加一個選擇鍵包含3個標識符(兩個點隔開)。第一個標識符描述動物的速度,第二個標識符描述動物的顏色,第三個標識符描述動物的物種:<speed>.<color>.<species>。
咱們建立3個綁定鍵:Q1與*.orange.*綁定Q2與*.*.rabbit和lazy.#綁定。
能夠簡單的認爲:
Q1對全部的橙色動物感興趣。
Q2想要知道關於兔子的一切以及關於懶洋洋的動物的一切。
一個附帶quick.orange.rabbit的選擇鍵的消息將會被轉發到兩個隊列。附帶lazy.orange.elephant的消息也會被轉發到兩個隊列。另外一方面quick.orange.fox只會被轉發到Q1,lazy.brown.fox將會被轉發到Q2。lazy.pink.rabbit雖然與兩個綁定鍵匹配,可是也只會被轉發到Q2一次。quick.brown.fox不能與任何綁定鍵匹配,因此會被丟棄。
若是咱們違法咱們的約定,發送一個或者四個標識符的選擇鍵,相似:orange,quick.orange.male.rabbit,這些選擇鍵不能與任何綁定鍵匹配,因此消息將會被丟棄。
另外一方面,lazy.orange.male.rabbit,雖然是四個標識符,也能夠與lazy.#匹配,從而轉發至Q2。
注:主題類型的轉發器很是強大,能夠實現其餘類型的轉發器。
當一個隊列與綁定鍵#綁定,將會收到全部的消息,相似fanout類型轉發器。
當綁定鍵中不包含任何#與*時,相似direct類型轉發器。web

發送端:dom

 

package event;ui

import java.util.UUID; spa

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
 
public class EmitLogTopic 

 
    private static final String EXCHANGE_NAME = "topic_logs"; 
 
    public static void main(String[] argv) throws Exception 
    { 
        // 建立鏈接和頻道 
        ConnectionFactory factory = new ConnectionFactory(); 
        factory.setHost("localhost"); 
        Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
        //與direct模式有相似之處,都使用routing key做爲路由
        //不一樣之處在於direct模式只能指定固定的字符串,而topic能夠指定一個字符串模式
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
 
        String[] routing_keys = new String[] { "kernal.info", "cron.warning", 
                "auth.info", "kernel.critical" }; 
        for (String routing_key : routing_keys) 
        { 
            String msg = UUID.randomUUID().toString(); 
            channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg 
                    .getBytes()); 
            System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + "."); 
        } 
 
        channel.close(); 
        connection.close(); 
    } 
orm

接收端1:rabbitmq

package event;隊列

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.QueueingConsumer; 
 
public class ReceiveLogsTopicForKernel 

 
    private static final String EXCHANGE_NAME = "topic_logs"; 
 
    public static void main(String[] argv) throws Exception 
    { 
        // 建立鏈接和頻道 
        ConnectionFactory factory = new ConnectionFactory(); 
        factory.setHost("localhost"); 
        Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
        // 聲明轉發器 
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 
        // 隨機生成一個隊列 
        String queueName = channel.queueDeclare().getQueue(); 
         
        //接收全部與kernel相關的消息 
        channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*"); 
 
        System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C"); 
 
        QueueingConsumer consumer = new QueueingConsumer(channel); 
        channel.basicConsume(queueName, true, consumer); 
 
        while (true) 
        { 
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
            String message = new String(delivery.getBody()); 
            String routingKey = delivery.getEnvelope().getRoutingKey(); 
 
            System.out.println(" [x] Received routingKey = " + routingKey 
                    + ",msg = " + message + "."); 
        } 
    } 
ci

接收端2:路由

 

package event;

import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.QueueingConsumer;    public class ReceiveLogsTopicForCritical  {        private static final String EXCHANGE_NAME = "topic_logs";        public static void main(String[] argv) throws Exception      {          // 建立鏈接和頻道          ConnectionFactory factory = new ConnectionFactory();          factory.setHost("localhost");          Connection connection = factory.newConnection();          Channel channel = connection.createChannel();          // 聲明轉發器          channel.exchangeDeclare(EXCHANGE_NAME, "topic");          // 隨機生成一個隊列          String queueName = channel.queueDeclare().getQueue();            // 接收全部與kernel相關的消息          channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");            System.out                  .println(" [*] Waiting for critical messages. To exit press CTRL+C");            QueueingConsumer consumer = new QueueingConsumer(channel);          channel.basicConsume(queueName, true, consumer);            while (true)          {              QueueingConsumer.Delivery delivery = consumer.nextDelivery();              String message = new String(delivery.getBody());              String routingKey = delivery.getEnvelope().getRoutingKey();                System.out.println(" [x] Received routingKey = " + routingKey                      + ",msg = " + message + ".");          }      }  } 

相關文章
相關標籤/搜索