RabbitMQ-從基礎到實戰(1)— Hello RabbitMQhtml
RabbitMQ-從基礎到實戰(2)— 防止消息丟失java
RabbitMQ-從基礎到實戰(3)— 消息的交換(上)多線程
RabbitMQ-從基礎到實戰(4)— 消息的交換(中)app
RabbitMQ-從基礎到實戰(6)— 與Spring集成dom
上一章介紹了direct類型的exchange,並用它實現了一個僞廣播(Queue綁定多個routingKey)日誌系統,如今,考慮另外一個問題,咱們的日誌系統不只要分級別級別(error,info)記錄日誌,還須要經過發送日誌的系統來匹配,好比說有一個「核心」系統,它發出的全部級別日誌,都須要記錄到硬盤,其餘系統只須要把error級別的日誌記錄到硬盤。ide
若是用上一章的direct該怎麼作呢?函數
需求實現了,這時,項目經理說,兩個日誌級別太很差管理了,咱們加個debug級別吧!post
你的心裏這樣的學習
是時候學習一下Topic Exchange了測試
topic exchange對routingKey是有要求的,必須是一個關鍵字的列表才能發揮正常做用,用「.」分割每一個關鍵字,你能夠定義任意的層級,惟一的限制是最大長度爲255字節。
上述需求,咱們能夠把routingKey的規則定義爲 「<系統>.<日誌級別>」,這個規則是抽象的,也就是說,是在你腦子裏的,並無地方去具體的把它綁定到exchange上,發送消息和綁定隊列徹底能夠不按這個規則來,只會影響消息是否能分發到對應的隊列上。
用「.」分割一樣不是強制要求,routingKey裏不包含這個字符也不會報錯,「.」只會影響topic中對routingKey的分層解析,果不用它,那麼topic的表現和direct一致
topic與direct的重要區別就是,它有兩個關鍵字
第一條很好理解,第二條有點長,不會是騙人的吧?咱們來實驗一下
首先把把logs的type聲明成topic,注意在控制檯把上一章的direct類型的logs刪除掉
channel.exchangeDeclare("logs","topic");
把Consumer的routingKey提取出來,方便後面測試
1 /** 2 * 獲取一個臨時隊列,並綁定到相應的routingKey上,消費消息 3 */ 4 public void consume(String routingKey) { 5 try { 6 String queueName = channel.queueDeclare().getQueue(); 7 //臨時隊列綁定的routingKey是外部傳過來的 8 channel.queueBind(queueName, "logs", routingKey); 9 Consumer consumer = new DefaultConsumer(channel) { 10 @Override 11 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 12 byte[] body) throws IOException { 13 String message = new String(body, "UTF-8"); 14 logger.debug(" [D] 打印日誌:"+message); 15 } 16 }; 17 //這裏自動確認爲true,接收到消息後該消息就銷燬了 18 channel.basicConsume(queueName, true, consumer); 19 } catch (IOException e) { 20 e.printStackTrace(); 21 } 22 }
Consumer的臨時隊列綁定到logs上,routingKey設置爲 a.#
1 public static void main( String[] args ) 2 { 3 LogConsumer consumer = new LogConsumer(); 4 consumer.consume("a.#"); 5 }
Sender的發送到logs上,routingKey設置爲 a.b
1 public static void main( String[] args ) throws InterruptedException{ 2 LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.b"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7 } 8 }
運行,開始發送消息
成功消費
Consumer的臨時隊列綁定到logs上,routingKey設置爲 a.*
1 public static void main( String[] args )
2 { 3 LogConsumer consumer = new LogConsumer(); 4 consumer.consume("a.*"); 5 }
Sender不變
運行,開始發送消息
消費成功
Consumer繼續綁定到a.*
Sender的發送到logs上,routingKey設置爲 a.b.c
1 public static void main( String[] args ) throws InterruptedException{
2 LogSender sender = new LogSender();
3 while(true){
4 String routingKey = "a.b.c";
5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey);
6 Thread.sleep(1000);
7 }
8 }
運行,開始發送消息
a.*監聽不到消息
Consumer改成監聽 a.#
1 public static void main( String[] args ) 2 { 3 LogConsumer consumer = new LogConsumer(); 4 consumer.consume("a.#"); 5 }
繼續往a.b.c發消息
a.# 消費成功
下面測點特殊的
Consumer綁定到 a.b
public static void main( String[] args ) { LogConsumer consumer = new LogConsumer(); consumer.consume("a.b"); }
Sender發送到a.*
1 public static void main( String[] args ) throws InterruptedException{ 2 LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.*"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7 } 8 }
發送成功
消費不到
Sender發送到a.#,Consumer仍是監聽 a.b
1 public static void main( String[] args ) throws InterruptedException{
2 LogSender sender = new LogSender(); 3 while(true){ 4 String routingKey = "a.#"; 5 sender.sendMessage(" message: "+System.currentTimeMillis(), routingKey); 6 Thread.sleep(1000); 7 } 8 }
發送成功
a.#也收不到消息
經過以上六個測試,咱們發現,topic中的通配符,只有在Queue綁定的時候才能起到通配符的做用,若是在發佈消息的時候使用通配符,將做爲普通的字符處理,發送的routingKey=a.* 並不能把消息發送到routingKey=a.b的Queue上,a.#同理,也不能把消息發送到routingKey=a.b.c的Queue上
爲了體現出#的做用,咱們給一開始的需求增長一點難度,規則定位三層,<系統>.<級別>.<類型>,其中類型有兩種,common和important
需求是,Q1監聽core系統的全部日誌和其餘系統全部全部級別的important類型的日誌以及error級的日誌;Q2監聽全部系統的info日誌
首先改造一下Sender,讓它能夠經過多線程發送不一樣系統的日誌消息
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.Random; 6 7 public class App { 8 9 //聲明一個類型MAP 10 private static Map<Integer, String> typeMap; 11 //聲明一個Random 12 private static Random random; 13 14 //在靜態代碼塊中初始化 15 static { 16 typeMap = new HashMap<>(); 17 typeMap.put(0, "important"); 18 typeMap.put(1, "common"); 19 random = new Random(System.currentTimeMillis()); 20 } 21 22 /** 23 * 獲取一個隨機類型的routingKey 24 * @param system 系統名 25 * @param level 日誌級別 26 * @return routingKey 27 */ 28 public static String getRoutingKey(String system, String level) { 29 return new StringBuilder() 30 .append(system).append(".") 31 .append(level).append(".") 32 .append(typeMap.get(random.nextInt(2))) 33 .toString(); 34 } 35 36 /** 37 * 新建一個線程,發送指定系統的消息 38 * @param system 39 */ 40 public static void createSender(final String system){ 41 new Thread(new Runnable() { 42 //new一個Sender 43 private LogSender sender = new LogSender(); 44 45 @Override 46 public void run() { 47 while(true){ 48 long now = System.currentTimeMillis(); 49 //經過當前時間生成錯誤級別 50 boolean info = now % 2 == 0; 51 //生成routingKey 52 String routingKey = getRoutingKey(system, info?"info":"error"); 53 //發送消息 54 String msg = routingKey+"["+now+"]"; 55 sender.sendMessage(msg, routingKey); 56 try { 57 //隨機睡500-1000毫秒 58 int sleepTime = random.nextInt(1000); 59 Thread.sleep(sleepTime<500?500:sleepTime); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 }).start(); 66 } 67 68 public static void main(String[] args) throws InterruptedException { 69 //開始發送core系統消息 70 createSender("core"); 71 //開始發送biz系統消息 72 createSender("biz"); 73 } 74 75 }
上述代碼能夠用兩個線程發送兩個系統的隨機日誌消息
下面實現消費者
Q1監聽core系統的全部日誌和其餘系統全部級別的important類型的日誌以及error級的日誌
把這句話拆分一下
那麼,咱們只要給Q1綁定這三個routingKey就能夠了,綁定多個routingKey咱們在上一張已經驗證過了
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class LogConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(LogConsumer.class); 20 private ConnectionFactory factory; 21 private Connection connection; 22 private Channel channel; 23 24 /** 25 * 在構造函數中獲取鏈接 26 */ 27 public LogConsumer() { 28 super(); 29 try { 30 factory = new ConnectionFactory(); 31 factory.setHost("127.0.0.1"); 32 connection = factory.newConnection(); 33 channel = connection.createChannel(); 34 // 聲明exchange,防止生產者沒啓動,exchange不存在 35 channel.exchangeDeclare("logs","topic"); 36 } catch (Exception e) { 37 logger.error(" [X] INIT ERROR!", e); 38 } 39 } 40 41 /** 42 * 提供個關閉方法,如今並無什麼卵用 43 * 44 * @return 45 */ 46 public boolean closeAll() { 47 try { 48 this.channel.close(); 49 this.connection.close(); 50 } catch (IOException | TimeoutException e) { 51 logger.error(" [X] CLOSE ERROR!", e); 52 return false; 53 } 54 return true; 55 } 56 57 /** 58 * 獲取一個臨時隊列,並綁定到相應的routingKey上,消費消息 59 */ 60 public void consume() { 61 try { 62 String queueName = channel.queueDeclare().getQueue(); 63 //core系統全部的日誌 64 channel.queueBind(queueName, "logs", "core.#"); 65 //其餘系統全部級別的important類型的日誌 66 channel.queueBind(queueName, "logs", "*.*.important"); 67 //其餘系統全部error級的日誌 68 channel.queueBind(queueName, "logs", "*.error.*"); 69 70 Consumer consumer = new DefaultConsumer(channel) { 71 @Override 72 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 73 byte[] body) throws IOException { 74 String message = new String(body, "UTF-8"); 75 logger.debug(" [Q1] 打印日誌:"+message); 76 } 77 }; 78 //這裏自動確認爲true,接收到消息後該消息就銷燬了 79 channel.basicConsume(queueName, true, consumer); 80 } catch (IOException e) { 81 e.printStackTrace(); 82 } 83 } 84 }
Q2監聽全部系統的info日誌
Q2應該綁定 *.info.*
1 //全部系統的info日誌 2 channel.queueBind(queueName, "logs", "*.info.*");
把Q1 Q2打包成可執行jar,運行結果以下
能夠看到需求已經正確實現了
到這裏,RabbitMQ中四中exchagne類型:direct、topic、fanout、headers已經介紹了三種最經常使用的
headers不是很經常使用,放到Alternate Exchanges中一塊兒介紹,後面還會介紹RabbitMQ的其餘特性,如:TTL、Lazy Queue、Exchange To Exchange、Dead Lettering等,敬請期待