5.rabbitmq--通配符模式Topics

rabbitmq--通配符模式Topics

topic模式也稱爲主題模式,其實他相對於routing模式最大的好處就是他多了一種匹配模式的路由,怎麼理解匹配呢,其實就至關於咱們以前正則的.*這種,不過他的匹配機制可能不是這種(其實除了匹配規則外,他的做用就和routing模式同樣 ),而他的工做流程圖以下:java

OK! 先說一下他的匹配規則:web

綁定鍵binding key也必須是這種形式。以特定路由鍵發送的消息將會發送到全部綁定鍵與之匹配的隊列中。但綁定鍵有兩種特殊的狀況: 
①*(星號)僅表明一個單詞 
②#(井號)表明任意個單詞spring

示例:apache

以上圖爲例:app

*.orange.* :  匹配以 任意一個單詞字符開頭中間包含 .orange. 以任意一個單詞字符結尾 的字符串。好比 a.orange.b, sdfsd.orange.fdsfsdf 等(注意是一個單詞)。ide

lay.# :只要一lay.開頭的都匹配,他能夠匹配lay.a, lay.a.b, lay.b.c等。fetch

這樣是否是很方便,好比咱們想將log的發給q1隊列,其餘的發給q2,那麼咱們只須要定義log.#、或者log.*,那麼你發送給q1隊列的數據就是log日誌的消息。 spa

 1 package com.maozw.mq.topic;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.Channel;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.rabbit.connection.Connection;
 8 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.web.bind.annotation.RequestMapping;
11 import org.springframework.web.bind.annotation.RestController;
12 
13 import java.io.IOException;
14 import java.util.concurrent.TimeoutException;
15 
16 import static org.apache.log4j.varia.ExternallyRolledFileAppender.OK;
17 
18 /**
19  * work 模式
20  * 兩種分發: 輪詢分發 + 公平分發
21  * 輪詢分發:消費端:自動確認消息;boolean autoAck = true;
22  * 公平分發: 消費端:手動確認消息 boolean autoAck = false; channel.basicAck(envelope.getDeliveryTag(),false);
23  *
24  * @author MAOZW
25  * @Description: 商品
26  * @date 2018/11/26 15:06
27  */
28 @RestController
29 @RequestMapping("/topic")
30 public class TopicProducer {
31     private static final Logger LOGGER = LoggerFactory.getLogger(TopicProducer.class);
32     @Autowired
33     RabbitConfig rabbitConfig;
34 
35     public static final String EXCHANGE_TOPIC = "EXCHANGE_TOPIC";
36     private static final String[] routingKeys = {"commodity.add", "commodity.update", "commodity.select", "commodity.delete"};
37 
38     @RequestMapping("/send")
39     public String send() throws IOException, TimeoutException {
40         Connection connection = null;
41         Channel channel = null;
42         try {
43             ConnectionFactory connectionFactory = rabbitConfig.connectionFactory();
44             connection = connectionFactory.createConnection();
45             channel = connection.createChannel(false);
46 
47             /**
48              * 申明交換機 以及type
49              */
50             channel.exchangeDeclare(EXCHANGE_TOPIC, "topic");
51             /**
52              * 發送消息
53              * 每一個消費者 發送確認消息以前,消息隊列不會發送下一個消息給消費者,一次只處理一個消息
54              * 自動模式無需設置下面設置
55              */
56             int prefetchCount = 1;
57             channel.basicQos(prefetchCount);
58 
59             String Hello = ">>>> Hello EXCHANGE_TOPIC <<<<";
60             String message = Hello;
61             for (String routingKey : routingKeys) {
62                 channel.basicPublish(EXCHANGE_TOPIC, routingKey, null, message.getBytes());
63             }
64             LOGGER.info("生產消息: " + message);
65             return "OK";
66         } catch (Exception e) {
67 
68         } finally {
69             connection.close();
70             channel.close();
71             return OK;
72         }
73     }
74 }
 1 package com.maozw.mq.topic;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class TopicConsumer {
22     private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumer.class);
23     public static final String EXCHANGE_TOPIC = "EXCHANGE_TOPIC";
24     private static final String[] routingKeys = {"commodity.add", "commodity.update", "commodity.delete"};
25 
26     public static void main(String[] args) throws IOException {
27         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
28         Connection connection = connectionFactory.createConnection();
29         Channel channel = connection.createChannel(false);
30         /**
31          * 建立隊列申明
32          */
33         boolean durable = true;
34         channel.queueDeclare(RabbitConfig.QUEUE_TOPIC, durable, false, false, null);
35         /**
36          * 綁定隊列到交換機
37          */
38         channel.queueBind(RabbitConfig.QUEUE_TOPIC, EXCHANGE_TOPIC, "commodity.add");
39 
40 
41         /**
42          * 改變分發規則
43          */
44         channel.basicQos(1);
45         DefaultConsumer consumer = new DefaultConsumer(channel) {
46             @Override
47             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
48                 super.handleDelivery(consumerTag, envelope, properties, body);
49                 System.out.println("[1] 接口數據 : " + new String(body, "utf-8"));
50                 try {
51                     Thread.sleep(300);
52                 } catch (InterruptedException e) {
53                     e.printStackTrace();
54                 } finally {
55                     System.out.println("[1] done!");
56                     //消息應答:手動回執,手動確認消息
57                     channel.basicAck(envelope.getDeliveryTag(), false);
58                 }
59             }
60         };
61         //監聽隊列
62         /**
63          * autoAck 消息應答
64          *  默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。
65          *  使用公平分發須要關閉autoAck:false  須要手動發送回執
66          */
67         boolean autoAck = false;
68         channel.basicConsume(RabbitConfig.QUEUE_TOPIC, autoAck, consumer);
69     }
70 
71 }
 1 package com.maozw.mq.topic;
 2 
 3 import com.maozw.mq.config.RabbitConfig;
 4 import com.rabbitmq.client.AMQP;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.Envelope;
 8 import org.slf4j.Logger;
 9 import org.slf4j.LoggerFactory;
10 import org.springframework.amqp.rabbit.connection.Connection;
11 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
12 
13 import java.io.IOException;
14 
15 /**
16  * @author MAOZW
17  * @Description: ${todo}
18  * @date 2018/11/26 15:06
19  */
20 
21 public class TopicConsumer2 {
22     private static final Logger LOGGER = LoggerFactory.getLogger(TopicConsumer2.class);
23     public static final String EXCHANGE_TOPIC = "EXCHANGE_TOPIC";
24     private static final String[] routingKeys = {"commodity.add", "commodity.update", "commodity.delete", "commodity.select"};
25 
26     public static void main(String[] args) throws IOException {
27         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
28         Connection connection = connectionFactory.createConnection();
29         Channel channel = connection.createChannel(false);
30         /**
31          * 建立隊列申明
32          */
33         boolean durable = true;
34         channel.queueDeclare(RabbitConfig.QUEUE_TOPIC2, durable, false, false, null);
35 
36         /**
37          * 綁定隊列到交換機
38          */
39         channel.queueBind(RabbitConfig.QUEUE_TOPIC2, EXCHANGE_TOPIC, "commodity.*");
40 
41 
42         /**
43          * 改變分發規則
44          */
45         channel.basicQos(1);
46         DefaultConsumer consumer = new DefaultConsumer(channel) {
47             @Override
48             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
49                 super.handleDelivery(consumerTag, envelope, properties, body);
50                 System.out.println("[2] 接口數據 : " + new String(body, "utf-8"));
51                 try {
52                     Thread.sleep(200);
53                 } catch (InterruptedException e) {
54                     e.printStackTrace();
55                 } finally {
56                     System.out.println("[2] done! ");
57                     //消息應答:手動回執,手動確認消息
58                     channel.basicAck(envelope.getDeliveryTag(), false);
59                 }
60             }
61         };
62         //監聽隊列
63         /**
64          * autoAck 消息應答
65          *  默認輪詢分發打開:true :這種模式一旦rabbitmq將消息發送給消費者,就會從內存中刪除該消息,不關心客戶端是否消費正常。
66          *  使用公平分發須要關閉autoAck:false  須要手動發送回執
67          */
68         boolean autoAck = false;
69         channel.basicConsume(RabbitConfig.QUEUE_TOPIC2, autoAck, consumer);
70     }
71 }
相關文章
相關標籤/搜索