RabbitMQ-從基礎到實戰(5)— 消息的交換(下)

轉載請註明出處

0.目錄

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQhtml

RabbitMQ-從基礎到實戰(2)— 防止消息丟失java

RabbitMQ-從基礎到實戰(3)— 消息的交換(上)多線程

RabbitMQ-從基礎到實戰(4)— 消息的交換(中)app

RabbitMQ-從基礎到實戰(6)— 與Spring集成dom

1.簡介

上一章介紹了direct類型的exchange,並用它實現了一個僞廣播(Queue綁定多個routingKey)日誌系統,如今,考慮另外一個問題,咱們的日誌系統不只要分級別級別(error,info)記錄日誌,還須要經過發送日誌的系統來匹配,好比說有一個「核心」系統,它發出的全部級別日誌,都須要記錄到硬盤,其餘系統只須要把error級別的日誌記錄到硬盤。ide

若是用上一章的direct該怎麼作呢?函數

  • 給routingKey分層,變成相似這樣的字符串:核心.info,核心.error,其餘.info,其餘.error
  • Q1綁定routingKey:核心.info,核心.error,其餘.error,記錄全部核心日誌,記錄其餘error日誌
  • Q2綁定routingKey:核心.info,其餘.info,打印全部info日誌

需求實現了,這時,項目經理說,兩個日誌級別太很差管理了,咱們加個debug級別吧!post

你的心裏這樣的學習

image

是時候學習一下Topic Exchange了測試

2.Topic Exchange

topic exchange對routingKey是有要求的,必須是一個關鍵字的列表才能發揮正常做用,用「.」分割每一個關鍵字,你能夠定義任意的層級,惟一的限制是最大長度爲255字節。

上述需求,咱們能夠把routingKey的規則定義爲 「<系統>.<日誌級別>」,這個規則是抽象的,也就是說,是在你腦子裏的,並無地方去具體的把它綁定到exchange上,發送消息和綁定隊列徹底能夠不按這個規則來,只會影響消息是否能分發到對應的隊列上。

用「.」分割一樣不是強制要求,routingKey裏不包含這個字符也不會報錯,「.」只會影響topic中對routingKey的分層解析,果不用它,那麼topic的表現和direct一致

topic與direct的重要區別就是,它有兩個關鍵字

  1. 「*」星號,表明一個詞,好比上述規則:*.error 表示全部系統的error級別的日誌
  2. 「#」井號,表明零個或多個詞,好比上述規則: *.# 表示全部系統的全部消息,與單獨一個#是等效的,core.# 表示核心系統的全部日誌,它和 core.* 的區別是,即便之後規則改成 <系統>.<日誌級別>.<其餘條件>.<其餘條件>.……,core.# 依然能夠完成匹配,而 core.* 則沒法匹配 core.info.xxx.xxx

第一條很好理解,第二條有點長,不會是騙人的吧?咱們來實驗一下

首先把把logs的type聲明成topic,注意在控制檯把上一章的direct類型的logs刪除掉

channel.exchangeDeclare("logs","topic");

把Consumer的routingKey提取出來,方便後面測試

 1 /**
 2  * 獲取一個臨時隊列,並綁定到相應的routingKey上,消費消息
 3  */
 4 public void consume(String routingKey) {
 5     try {
 6         String queueName = channel.queueDeclare().getQueue();
 7         //臨時隊列綁定的routingKey是外部傳過來的
 8         channel.queueBind(queueName, "logs", routingKey);  9         Consumer consumer = new DefaultConsumer(channel) {
10             @Override
11             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
12                     byte[] body) throws IOException {
13                 String message = new String(body, "UTF-8");
14                 logger.debug(" [D] 打印日誌:"+message);
15             }
16         };
17         //這裏自動確認爲true,接收到消息後該消息就銷燬了
18         channel.basicConsume(queueName, true, consumer);
19     } catch (IOException e) {
20         e.printStackTrace();
21     }
22 }

測試1

Consumer的臨時隊列綁定到logs上,routingKey設置爲 a.#

1 public static void main( String[] args )
2 {
3     LogConsumer consumer = new LogConsumer();
4     consumer.consume("a.#"); 5 }

Sender的發送到logs上,routingKey設置爲 a.b

1 public static void main( String[] args ) throws InterruptedException{
2     LogSender sender = new LogSender();
3     while(true){
4         String routingKey = "a.b"; 5         sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
6         Thread.sleep(1000);
7     }
8 }

運行,開始發送消息

成功消費

測試2

Consumer的臨時隊列綁定到logs上,routingKey設置爲 a.*

1 public static void main( String[] args )
2 { 3 LogConsumer consumer = new LogConsumer(); 4 consumer.consume("a.*"); 5 }

Sender不變

運行,開始發送消息

消費成功

測試3

Consumer繼續綁定到a.*

Sender的發送到logs上,routingKey設置爲 a.b.c

1 public static void main( String[] args ) throws InterruptedException{
2     LogSender sender = new LogSender();
3     while(true){
4         String routingKey = "a.b.c";
5         sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
6         Thread.sleep(1000);
7     }
8 }

運行,開始發送消息

a.*監聽不到消息

測試4

Consumer改成監聽 a.#

1 public static void main( String[] args )
2 {
3     LogConsumer consumer = new LogConsumer();
4     consumer.consume("a.#");
5 }

繼續往a.b.c發消息

a.# 消費成功

測試5

下面測點特殊的

Consumer綁定到 a.b

public static void main( String[] args )
{
    LogConsumer consumer = new LogConsumer();
    consumer.consume("a.b");
}

Sender發送到a.*

1 public static void main( String[] args ) throws InterruptedException{
2     LogSender sender = new LogSender();
3     while(true){
4         String routingKey = "a.*"; 5         sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
6         Thread.sleep(1000);
7     }
8 }

發送成功

消費不到

測試6

Sender發送到a.#,Consumer仍是監聽 a.b

1 public static void main( String[] args ) throws InterruptedException{
2     LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.#"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7  } 8 }

發送成功

 

a.#也收不到消息

測試結果

經過以上六個測試,咱們發現,topic中的通配符,只有在Queue綁定的時候才能起到通配符的做用,若是在發佈消息的時候使用通配符,將做爲普通的字符處理,發送的routingKey=a.* 並不能把消息發送到routingKey=a.b的Queue上,a.#同理,也不能把消息發送到routingKey=a.b.c的Queue上

3.實戰

爲了體現出#的做用,咱們給一開始的需求增長一點難度,規則定位三層,<系統>.<級別>.<類型>,其中類型有兩種,common和important

需求是,Q1監聽core系統的全部日誌和其餘系統全部全部級別的important類型的日誌以及error級的日誌;Q2監聽全部系統的info日誌

首先改造一下Sender,讓它能夠經過多線程發送不一樣系統的日誌消息

 1 package com.liyang.ticktock.rabbitmq;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.Random;
 6 
 7 public class App {
 8 
 9     //聲明一個類型MAP
10     private static Map<Integer, String> typeMap;
11     //聲明一個Random
12     private static Random random;
13 
14     //在靜態代碼塊中初始化
15     static {
16         typeMap = new HashMap<>();
17         typeMap.put(0, "important");
18         typeMap.put(1, "common");
19         random = new Random(System.currentTimeMillis());
20     }
21 
22     /**
23      * 獲取一個隨機類型的routingKey
24      * @param system 系統名
25      * @param level 日誌級別
26      * @return routingKey
27      */
28     public static String getRoutingKey(String system, String level) {
29         return new StringBuilder()
30                 .append(system).append(".")
31                 .append(level).append(".")
32                 .append(typeMap.get(random.nextInt(2)))
33                 .toString();
34     }
35     
36     /**
37      * 新建一個線程,發送指定系統的消息
38      * @param system
39      */
40     public static void createSender(final String system){
41         new Thread(new Runnable() {
42             //new一個Sender
43             private  LogSender sender = new LogSender();
44             
45             @Override
46             public void run() {
47                 while(true){
48                     long now = System.currentTimeMillis();
49                     //經過當前時間生成錯誤級別
50                     boolean info = now % 2 == 0;
51                     //生成routingKey
52                     String routingKey = getRoutingKey(system, info?"info":"error");
53                     //發送消息
54                     String msg = routingKey+"["+now+"]";
55                     sender.sendMessage(msg, routingKey);
56                     try {
57                         //隨機睡500-1000毫秒
58                         int sleepTime = random.nextInt(1000);
59                         Thread.sleep(sleepTime<500?500:sleepTime);
60                     } catch (InterruptedException e) {
61                         e.printStackTrace();
62                     }
63                 }
64             }
65         }).start();
66     }
67 
68     public static void main(String[] args) throws InterruptedException {
69         //開始發送core系統消息
70         createSender("core");
71         //開始發送biz系統消息
72         createSender("biz");
73     }
74     
75 }

上述代碼能夠用兩個線程發送兩個系統的隨機日誌消息

下面實現消費者

Q1監聽core系統的全部日誌和其餘系統全部級別的important類型的日誌以及error級的日誌

 把這句話拆分一下

  • core系統全部的日誌,core.#
  • 其餘系統全部級別的important類型的日誌,*.*.important
  • 其餘系統全部error級的日誌,*.error.*

那麼,咱們只要給Q1綁定這三個routingKey就能夠了,綁定多個routingKey咱們在上一張已經驗證過了

 1 package com.liyang.ticktock.rabbitmq;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16 
17 public class LogConsumer {
18 
19     private Logger logger = LoggerFactory.getLogger(LogConsumer.class);
20     private ConnectionFactory factory;
21     private Connection connection;
22     private Channel channel;
23 
24     /**
25      * 在構造函數中獲取鏈接
26      */
27     public LogConsumer() {
28         super();
29         try {
30             factory = new ConnectionFactory();
31             factory.setHost("127.0.0.1");
32             connection = factory.newConnection();
33             channel = connection.createChannel();
34             // 聲明exchange,防止生產者沒啓動,exchange不存在
35             channel.exchangeDeclare("logs","topic");
36         } catch (Exception e) {
37             logger.error(" [X] INIT ERROR!", e);
38         }
39     }
40 
41     /**
42      * 提供個關閉方法,如今並無什麼卵用
43      * 
44      * @return
45      */
46     public boolean closeAll() {
47         try {
48             this.channel.close();
49             this.connection.close();
50         } catch (IOException | TimeoutException e) {
51             logger.error(" [X] CLOSE ERROR!", e);
52             return false;
53         }
54         return true;
55     }
56 
57     /**
58      * 獲取一個臨時隊列,並綁定到相應的routingKey上,消費消息
59      */
60     public void consume() {
61         try {
62             String queueName = channel.queueDeclare().getQueue();
63             //core系統全部的日誌
64             channel.queueBind(queueName, "logs", "core.#"); 65             //其餘系統全部級別的important類型的日誌
66             channel.queueBind(queueName, "logs", "*.*.important"); 67             //其餘系統全部error級的日誌
68             channel.queueBind(queueName, "logs", "*.error.*"); 69             
70             Consumer consumer = new DefaultConsumer(channel) {
71                 @Override
72                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
73                         byte[] body) throws IOException {
74                     String message = new String(body, "UTF-8");
75                     logger.debug(" [Q1] 打印日誌:"+message);
76                 }
77             };
78             //這裏自動確認爲true,接收到消息後該消息就銷燬了
79             channel.basicConsume(queueName, true, consumer);
80         } catch (IOException e) {
81             e.printStackTrace();
82         }
83     }
84 }

Q2監聽全部系統的info日誌

 Q2應該綁定 *.info.*

1 //全部系統的info日誌
2 channel.queueBind(queueName, "logs", "*.info.*");

把Q1 Q2打包成可執行jar,運行結果以下

能夠看到需求已經正確實現了

4.結束語

到這裏,RabbitMQ中四中exchagne類型:direct、topic、fanout、headers已經介紹了三種最經常使用的

headers不是很經常使用,放到Alternate Exchanges中一塊兒介紹,後面還會介紹RabbitMQ的其餘特性,如:TTL、Lazy Queue、Exchange To Exchange、Dead Lettering等,敬請期待

相關文章
相關標籤/搜索