這篇博客介紹訂閱、路由和通配符模式,之因此放在一塊兒介紹,是由於這三種模式都是用了Exchange交換機,消息沒有直接發送到隊列,而是發送到了交換機,通過隊列綁定交換機到達隊列。java
性能排序:fanout > direct >> topic。比例大約爲11:10:6
服務器
一個生產者,多個消費者,每個消費者都有本身的一個隊列,生產者沒有將消息直接發送到隊列,而是發送到了交換機,每一個隊列綁定交換機,生產者發送的消息通過交換機,到達隊列,實現一個消息被多個消費者獲取的目的。須要注意的是,若是將消息發送到一個沒有隊列綁定的exchange上面,那麼該消息將會丟失,這是由於在rabbitMQ中exchange不具有存儲消息的能力,只有隊列具有存儲消息的能力。ide
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上。性能
1.能夠理解爲路由表的模式url
2.這種模式不須要RouteKeyspa
3.這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。.net
4.若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。3d
示例代碼:
orm
生產者:blog
[java] view plain copy
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//從鏈接中建立通道
Channel channel = connection.createChannel();
// 聲明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息內容
String message = "商品已經新增,id = 1000";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1:
[java] view plain copy
public class Recv {
private final static String QUEUE_NAME = "test_queue_fanout_1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前臺系統: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者2的代碼和消費者1的代碼大體相同,只是隊列的名稱不同,這樣兩個消費者有本身的隊列,均可以接收到生產者發送的消息
可是若是生產者有新增商品,修改商品,刪除商品的消息,消費者包快前臺系統和搜索系統,要求前臺系統接收修改和刪除商品的消息,搜索系統接收新增商品、修改商品和刪除商品的消息。因此使用這種訂閱模式實現商品數據的同步並不合理。所以咱們介紹下一種模式:路由模式。
這種模式添加了一個路由鍵,生產者發佈消息的時候添加路由鍵,消費者綁定隊列到交換機時添加鍵值,這樣就能夠接收到須要接收的消息。
任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue。
1.通常狀況可使用rabbitMQ自帶的Exchange:」"(該Exchange的名字爲空字符串,下文稱其爲default Exchange)。
2.這種模式下不須要將Exchange進行任何綁定(binding)操做
3.消息傳遞時須要一個「RouteKey」,能夠簡單的理解爲要發送到的隊列名字。
4.若是vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。
示例代碼:
生產者:
[java] view plain copy
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息內容
String message = "刪除商品, id = 1001";
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1:接收更新和刪除消息
[java] view plain copy
public class Recv {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前臺系統: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者2:接收insert,update,delete的消息
[java] view plain copy
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 搜索系統: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
若是生產者發佈了insert消息,那麼消費者2能夠收到,消費者 1收不到,若是發佈了update或者delete消息,兩個消費者均可以收到。若是發佈ABC消息兩個消費者都收不到,由於沒有綁定這個鍵值。這種模式基本知足了咱們的需求,可是還不夠靈活,下面介紹另一個模式。
基本思想和路由模式是同樣的,只不過路由鍵支持模糊匹配,符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞
任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue上
1.這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」(RouteKey),Exchange會將消息轉發到全部關注主題能與RouteKey模糊匹配的隊列。
2.這種模式須要RouteKey,也許要提早綁定Exchange與Queue。
3.在進行綁定時,要提供一個該隊列關心的主題,如「#.log.#」表示該隊列關心全部涉及log的消息(一個RouteKey爲」MQ.log.error」的消息會被轉發到該隊列)。
4.「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。如「log.*」能與「log.warn」匹配,沒法與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配。
5.一樣,若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息。
生產者:
[java] view plain copy
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息內容
String message = "刪除商品,id = 1001";
channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1:
[java] view plain copy
public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前臺系統: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者2:
[java] view plain copy
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到鏈接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);
// 定義隊列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽隊列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 搜索系統: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者1是按需索取,並無使用通配符模式,而是用的徹底匹配,消費者2使用通配符模式,這樣以item.開頭的消息都會所有接收。
1.與簡單模式和work模式對比,前面兩種同一個消息只能被一個消費者獲取,而今天的這三種模式,能夠實現一個消息被多個消費者 獲取。
2.fanout這種模式沒有加入路由器,隊列與exchange綁定後,就會接收到全部的消息,其他兩種增長了路由鍵,而且第三種增長通配符,更加便利。
本文出自https://blog.csdn.net/ww130929/article/details/72842234