上一篇博文中,咱們使用direct exchange 代替了fanout exchange,此次咱們來看下topic exchange。html
1、Topic Exchange介紹sql
topic exchange和direct exchange相似,都是經過routing key和binding key進行匹配,不一樣的是topic exchange能夠爲routing key設置多重標準。ide
direct路由器相似於sql語句中的精確查詢;topic 路由器有點相似於sql語句中的模糊查詢。ui
還記得嗎?咱們在《RabbitMQ入門:發佈/訂閱(Publish/Subscribe)》中對exchange的分類進行過介紹:spa
Direct:徹底根據key進行投遞的,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。 Topic:對key進行模式匹配後進行投遞,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。 Fanout:不須要key,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。 Headers:咱們能夠不考慮它。
下面是官網給出的工做模型(P表明生產者,X表明exhange,紅色的Q表明隊列,C表明消費者):code
咱們來分析下這個模型。htm
它發送的消息是用來描述動物的。路由鍵有三個單詞:<speed>.<color>.<species>,第一個單詞描述了速度,第二個描述了顏色,第三個描述了物種。
有三個綁定鍵,Q1綁定鍵爲*.orange.*(關注全部顏色爲orange的動物); Q2的綁定鍵有兩個,分別是*.*.rabbit(關注全部的兔子)和lazy.#(關注全部速度爲lazy的動物)。blog
所以,路由鍵爲quick.orange.rabbit的消息將發送到Q1和Q2,路由鍵爲quick.orange.fox的消息將發送到Q1,路由鍵爲lazy.brown.fox的消息將發送到Q2。路由鍵爲lazy.pink.rabbit的消息將發送到Q2,可是注意,它只會到達Q2一次,儘管它匹配了兩個綁定鍵。路由鍵爲quick.brown.fox的消息由於不和任意的綁定鍵匹配,因此將會被丟棄。rabbitmq
若是有人手一抖發了個lazy.orange.male.rabbit這種四個單詞的,這個怎麼辦呢? 因爲它和lazy.#匹配,所以將發送到Q2。隊列
2、代碼示例
接下來咱們看下代碼
public class LogTopicSender { // exchange名字 public static String EXCHANGE_NAME = "topicExchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.爲通道聲明topic類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 3.發送消息到指定的exchange,隊列指定爲空,由exchange根據狀況判斷須要發送到哪些隊列 String routingKey = "info"; // String routingKey = "log4j.error"; // String routingKey = "logback.error"; // String routingKey = "log4j.warn"; String msg = " hello rabbitmq, I am " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.關閉鏈接 if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class LogTopicReciver { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.爲通道聲明topic類型的exchange channel.exchangeDeclare(LogTopicSender.EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 3.建立隨機名字的隊列 String queueName = channel.queueDeclare().getQueue(); // 4.創建exchange和隊列的綁定關係 String[] bindingKeys = { "#" }; // String[] bindingKeys = { "log4j.*", "#.error" }; // String[] bindingKeys = { "*.error" }; // String[] bindingKeys = { "log4j.warn" }; for (int i = 0; i < bindingKeys.length; i++) { channel.queueBind(queueName, LogTopicSender.EXCHANGE_NAME, bindingKeys[i]); System.out.println(" **** LogTopicReciver keep alive ,waiting for " + bindingKeys[i]); } // 5.經過回調生成消費者並進行監聽 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 獲取消息內容而後處理 String msg = new String(body, "UTF-8"); System.out.println("*********** LogTopicReciver" + " get message :[" + msg + "]"); } }; // 6.消費消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
生產者: product send a msg: hello rabbitmq, I am info product send a msg: hello rabbitmq, I am log4j.error product send a msg: hello rabbitmq, I am logback.error product send a msg: hello rabbitmq, I am log4j.warn 消費者1: **** LogTopicReciver keep alive ,waiting for # *********** LogTopicReciver get message :[ hello rabbitmq, I am info] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消費者2: **** LogTopicReciver keep alive ,waiting for log4j.* **** LogTopicReciver keep alive ,waiting for #.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
*********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消費者3: **** LogTopicReciver keep alive ,waiting for *.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] 消費者4: **** LogTopicReciver keep alive ,waiting for log4j.warn *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]