即向多個消費者傳遞同一條信息
算法
RabbitMQ消息傳遞模型的核心思想是,生產者永遠不會將任何消息直接發送到隊列。安全
相反,生產者只能向交換機(Exchange)發送消息。交換機是一個很是簡單的東西。一邊接收來自生產者的消息,另外一邊將消息推送到隊列。交換器必須確切地知道如何處理它接收到的消息。它應該被添加到一個特定的隊列中嗎?它應該添加到多個隊列中嗎?或者它應該被丟棄。這些規則由exchange的類型定義。dom
有幾種可用的交換類型:direct、topic、header和fanout。
建立fanout交換機logs: c.exchangeDeclare("logs", "fanout");
或c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
ide
fanout交換機很是簡單。它只是將接收到的全部消息廣播給它所知道的全部隊列。ui
建立了一個fanout交換機和一個隊列。如今咱們須要告訴exchange向指定隊列發送消息。exchange和隊列之間的關係稱爲綁定。spa
//指定的隊列,與指定的交換機關聯起來 //稱爲綁定 -- binding //第三個參數時 routingKey, 因爲是fanout交換機, 這裏忽略 routingKey ch.queueBind(queueName, "logs", "");
1.生產者
最重要的更改是,咱們如今但願將消息發佈到logs交換機,而不是無名的日誌交換機。咱們須要在發送時提供一個routingKey,可是對於fanout交換機類型,該值會被忽略。日誌
public class Producer { public static void main(String[] args) throws Exception { //創建鏈接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定義fanout類型交換機:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //向交換機發送信息 while (true){ System.out.println("輸入消息:"); String msg = new Scanner(System.in).nextLine(); c.basicPublish("logs", "", null, msg.getBytes()); } } }
2.消費者
若是尚未隊列綁定到交換器,消息就會丟失,但這對咱們來講沒有問題;若是尚未消費者在聽,咱們能夠安全地丟棄這些信息。code
public class Consumer { public static void main(String[] args) throws Exception { //創建鏈接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //1.定義隨機隊列 2.定義交換機 3.綁定 //隨機命名,非持久,獨佔,自動刪除 String queue = UUID.randomUUID().toString(); c.queueDeclare(queue, false, true, true, null); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //第三個參數對發佈訂閱模式fanout交換機無效 c.queueBind(queue, "logs", ""); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody()); System.out.println("收到:"+msg); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //正常的消費數據 c.basicConsume(queue, true, deliverCallback, cancelCallback); } }
路由模式與訂閱模式不一樣之處在於,咱們將向其添加一個特性—咱們將只訂閱全部消息中的一部分.本文中已添加err/info/warning等報錯提示來示範.
blog
綁定是交換機和隊列之間的關係。這能夠簡單地理解爲:隊列對來自此交換的消息感興趣。rabbitmq
綁定可使用額外的routingKey參數。爲了不與basic_publish參數混淆,咱們將其稱爲bindingKey。這是咱們如何建立一個鍵綁定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey的含義取決於交換機類型。咱們前面使用的fanout交換機徹底忽略它。
上一節中的日誌系統向全部消費者廣播全部消息。咱們但願擴展它,容許根據消息的嚴重性過濾消息。
前面咱們使用的是fanout交換機,這並無給咱們太多的靈活性——它只能進行簡單的廣播。
咱們將用直連交換機(Direct exchange)代替。它背後的路由算法很簡單——消息傳遞到bindingKey與routingKey徹底匹配的隊列。
使用相同的bindingKey綁定多個隊列是徹底容許的。可使用binding key "black"將X與Q1和Q2綁定。在這種狀況下,直連交換機的行爲相似於fanout,並將消息廣播給全部匹配的隊列。一條路由鍵爲black的消息將同時發送到Q1和Q2。
1.發送消息
咱們將提供日誌級別做爲routingKey,這樣,接收程序將可以選擇它但願接收的級別
//參數1: 交換機名 //參數2: routingKey, 路由鍵,這裏咱們用日誌級別,如"error","info","warning" //參數3: 其餘配置屬性 //參數4: 發佈的消息數據 ch.basicPublish("direct_logs", "error", null, message.getBytes());
2.接收消息
咱們將爲感興趣的每一個日誌級別建立一個新的綁定
ch.queueBind(queueName, "logs", "info"); ch.queueBind(queueName, "logs", "warning");
1.生產者
public class Producer { public static void main(String[] args) throws Exception { //創建鏈接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定義fanout類型交換機:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //向交換機發送信息 while (true){ System.out.println("輸入消息:"); String msg = new Scanner(System.in).nextLine(); System.out.println("輸入路由鍵:"); String key = new Scanner(System.in).nextLine(); c.basicPublish("direct_logs", key, //路由鍵關鍵詞 null, msg.getBytes()); } } }
2.消費者
public class Consumer { public static void main(String[] args) throws Exception { //創建鏈接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //1.定義隨機隊列 2.定義交換機 3.綁定 //隨機命名,非持久,獨佔,自動刪除 String queue = UUID.randomUUID().toString(); c.queueDeclare(queue, false, true, true, null); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //用輸入綁定鍵進行綁定 System.out.println("輸入綁定鍵,用空格隔開:"); String s = new Scanner(System.in).nextLine(); String[] a = s.split(" "); //["aaa","bbb","ccc"] for (String key:a){ c.queueBind(queue, "direct_logs", key); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody()); String key = message.getEnvelope().getRoutingKey(); System.out.println("收到:"+msg+" - "+key); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //正常的消費數據 c.basicConsume(queue, true, deliverCallback, cancelCallback); } }