在上一篇博客《RabbitMQ入門:發佈/訂閱(Publish/Subscribe)》中,咱們認識了fanout類型的exchange,它是一種經過廣播方式發送消息的路由器,全部和exchange創建的綁定關係的隊列都會接收到消息。可是有一些場景只須要訂閱到一部分消息,這個時候就不能使用fanout 類型的exchange了,這個就引出來今天的「豬腳」--Direct Exchange,經過Routing Key來決定須要將消息發送到哪一個或者哪些隊列中。html
接下來請收看詳細內容:react
1、Direct Exchange(直接路由器)ide
在上文中介紹exchange的時候,對direct exchange進行了簡單介紹,它是一種徹底按照routing key(路由關鍵字)進行投遞的:當消息中的routing key和隊列中的binding key徹底匹配時,才進行會將消息投遞到該隊列中。這裏提到了一個routing key和binding key(綁定關鍵字),是什麼東東?ui
在發送消息的時候,basicPublish的第二個參數就是routing key,因爲上次是fanout 類型的exchange 進行廣播方式投遞,這個字段不會影響投遞結果,所以咱們這裏就傳入了「」,可是在direct 類型的exchange中咱們就不能傳入""了,須要指定具體的關鍵字。spa
咱們在前文中創建綁定關係的時候,queueBind的第三個參數就是綁定關鍵字debug
咱們聲明direact exchange的時候使用:code
2、多重綁定htm
多個隊列以相同的綁定鍵綁定到同一個路由器的狀況,咱們稱之爲多重綁定。blog
工做模型爲(P表明生產者,X表明路由器,紅色的Q表明隊列,C表明消費者):rabbitmq
3、代碼實例
預備知識瞭解完了,如今來寫個程序感覺下。
public class LogDirectSender { // exchange名字 public static String EXCHANGE_NAME = "directExchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.爲通道聲明direct類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 3.發送消息到指定的exchange,隊列指定爲空,由exchange根據狀況判斷須要發送到哪些隊列 String routingKey = "debug"; String msg = " hello rabbitmq, I am " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.關閉鏈接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
和上次博客中生產者的區別就是黑字粗體部分:1.路由器類型改成direct 2.消息發佈的時候指定了routing key
public class LogDirectReciver { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.爲通道聲明direct類型的exchange channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 3.建立隨機名字的隊列 String queueName = channel.queueDeclare().getQueue(); // 4.創建exchange和隊列的綁定關係 String[] bindingKeys = { "error", "info", "debug" }; // String[] bindingKeys = { "error" }; for (int i = 0; i < bindingKeys.length; i++) { channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]); System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]); } // 5.經過回調生成消費者並進行監聽 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 獲取消息內容而後處理 String msg = new String(body, "UTF-8"); System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]"); } }; // 6.消費消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
和上次博客中消費者的區別就是黑字粗體部分:1.路由器類型改成direct 2.創建綁定關係的時候指定了binding key
**** LogDirectReciver keep alive ,waiting for error **** LogDirectReciver keep alive ,waiting for info **** LogDirectReciver keep alive ,waiting for debug
這個消費者咱們視爲消費者1,它會接收error,info,debug三個關鍵字的消息。
**** LogDirectReciver keep alive ,waiting for error
這個消費者咱們視爲消費者2,它只會接收error 關鍵字的消息。
第一次執行:
product send a msg: hello rabbitmq, I am debug
第二次執行:
product send a msg: hello rabbitmq, I am info
第三次執行:
product send a msg: hello rabbitmq, I am error
消費者1: **** LogDirectReciver keep alive ,waiting for error **** LogDirectReciver keep alive ,waiting for info **** LogDirectReciver keep alive ,waiting for debug *********** LogDirectReciver get message :[ hello rabbitmq, I am debug] *********** LogDirectReciver get message :[ hello rabbitmq, I am info] *********** LogDirectReciver get message :[ hello rabbitmq, I am error] 消費者2: **** LogDirectReciver keep alive ,waiting for error *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
exchanges標籤頁裏面多了個direct類型的路由器。進入詳細頁面:
有4個綁定關係,其中三個的隊列是同一個。切換到Queues標籤頁:
有兩個臨時隊列。
若是關掉消費者1和消費者2,會發現隊列自動刪除了,綁定關係也不存在了。