name: 交換機名稱
type: 交換機類型 direct,topic,fanout,headers
durability: 是否須要持久化,true 爲持久化
auto delete: 當最後一個綁定到 exchange 上的隊列被刪除後,exchange 沒有綁定的隊列了,自動刪除該 exchange
internal: 當前 exchange 是否用於 rabbitMQ 內部使用,默認爲 false
arguments: 擴展參數,用於擴展 AMQP 協議自制定化使用
複製代碼
direct exchange: 全部發送到 direct exchange 的消息被轉發到 routing key 中指定的queue
複製代碼
注意:direct模式可使用 rabbitMQ 自帶的 exchange:default exchange,因此不須要將 exchange 進行任何綁定(binding)操做,消息傳遞時,routingkey 必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。 流轉示意圖以下 git
代碼地址: https://github.com/hmilyos/rabbitmq-api-demo
複製代碼
消費端代碼:github
public class ConsumerDirectExchange {
private static final Logger log = LoggerFactory.getLogger(ConsumerDirectExchange.class);
// 聲明
public final static String EXCHANGE_NAME = "test_direct_exchange";
public final static String EXCHANGE_TYPE = "direct";
public final static String QUEUE_NAME = "test_direct_queue";
public final static String ROUTING_KEY = "test.direct";
public final static String ROUTING_KEY_ERROR = "test.direct.error";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
ConsumerCancelledException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 表示聲明瞭一個交換機
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
// 表示聲明瞭一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 創建一個綁定關係:
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
// 獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("收到消息:{}", msg);
}
}
}
複製代碼
啓動消費端api
上管控臺查看交換機和隊列是否成功建立bash
點擊進去查看綁定狀況服務器
生產端代碼app
public class ProducerDirectExchange {
private final static Logger log = LoggerFactory.getLogger(ProducerDirectExchange.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello World RabbitMQ Direct Exchange test.direct Message ... ";
log.info("生產端發送了:{}", msg);
channel.basicPublish(ConsumerDirectExchange.EXCHANGE_NAME, ConsumerDirectExchange.ROUTING_KEY, null, msg.getBytes());
// channel.basicPublish(ConsumerDirectExchange.EXCHANGE_NAME, ConsumerDirectExchange.ROUTING_KEY_ERROR, null, msg.getBytes());
channel.close();
connection.close();
}
}
複製代碼
而後把生產端run一下ui
再查看消費端的日誌spa
該消費端只接收 routingkey 爲 test.direct 的消息,證實 direct exchange 類型的,routingkey 必須徹底匹配纔會被隊列接收,不然該消息會被拋棄。3d
topic exchange: 全部發送到 topic exchange 的消息被轉發到全部關心 routingkey 中 topic 的 queue 上 exchange 將 routingkey 和某 topic 進行模糊匹配,此時隊列須要綁定一個 topic。 注意: topic 可使用通配符進行模糊匹配 # 匹配一個或多個詞,注意是詞 * 只能匹配一個詞 例如 「log.#」 能匹配到 「log.info.oa」 「log.*」 只能匹配到 「log.erro」 這種格式 具體示例圖以下圖,usa.news 能被 usa.#,#.news 所消費,usa.weather 能被 usa.#,#.weather 所消費...日誌
代碼示例: 消費端:
public class ConsumerTopicExchange {
private final static Logger log = LoggerFactory.getLogger(ConsumerTopicExchange.class);
// 聲明
public static final String EXCHANGE_NAME = "test_topic_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String QUEUE_NAME = "test_topic_queue";
public static final String ROUTING_KEY_one = "user.#";
public static final String ROUTING_KEY = "user.*";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,
ConsumerCancelledException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 1 聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
// 2 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 3 創建交換機和隊列的綁定關係:
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// durable 是否持久化消息
QueueingConsumer consumer = new QueueingConsumer(channel);
// 參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);
// 循環獲取消息
while (true) {
// 獲取消息,若是沒有消息,這一步將會一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消費端收到消息:{}", msg);
}
}
}
複製代碼
啓動消費端,上管控臺查看建立、綁定是否成功
確認成功後,編寫生產端代碼
public class ProducerTopicExchange {
private final static Logger log = LoggerFactory.getLogger(ProducerTopicExchange.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 聲明
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
String msg1 = "Hello World RabbitMQ Topic Exchange Message ..." + routingKey1;
String msg2 = "Hello World RabbitMQ Topic Exchange Message ..." + routingKey2;
String msg3 = "Hello World RabbitMQ Topic Exchange Message ..." + routingKey3;
log.info("生產端, {} :{}", routingKey1, msg1);
channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey1, null, msg1.getBytes());
log.info("生產端, {} :{}", routingKey2, msg2);
channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey2, null, msg2.getBytes());
log.info("生產端, {} :{}", routingKey3, msg3);
channel.basicPublish(ConsumerTopicExchange.EXCHANGE_NAME, routingKey3, null, msg3.getBytes());
channel.close();
connection.close();
}
}
複製代碼
啓動生產端
消費端接收到的
routingKey3 ="user.delete.abc" 的未被接收,符合 user.* 的規則 這時候在消費端把 routingKey 修改一下, routingKey ="user.#",重啓消費端,上管控臺
發現以前 * 的並無解綁,須要咱們手動解綁一下,而後再啓動生產端的代碼
發現三條都能接收到了,符合 # 的規則。
fanout exchange: 不處理路由鍵,只須要簡單的將隊列綁定到交換機上,發送到該交換機的消息都會被轉發到於該交換機綁定的全部隊列上,fanout 交換機因爲不須要進行routingkey 的對比 直接發送因此綁定的 queue,因此轉發消息是最快的 示意圖以下圖所示
代碼實現:
public class ConsumerFanoutExchange {
private static final Logger log = LoggerFactory.getLogger(ConsumerFanoutExchange.class);
public static final String EXCHANGE_NAME = "test_fanout_exchange";
public static final String EXCHANGE_TYPE = "fanout";
public static final String QUEUE_NAME = "test_fanout_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(3000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, false, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 不設置路由鍵
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
//參數:隊列名稱、是否自動ACK、Consumer
channel.basicConsume(QUEUE_NAME, true, consumer);
log.info("消費端啓動。。。");
//循環獲取消息
while (true) {
//獲取消息,若是沒有消息,這一步將會一直阻塞
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消費端收到消息:{}", msg);
}
}
}
複製代碼
生產端代碼:
public class ProducerFanoutExchange {
private static final Logger log = LoggerFactory.getLogger(ProducerFanoutExchange.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 5 發送
for (int i = 0; i < 10; i++) {
String msg = "Hello World RabbitMQ FANOUT Exchange Message ...";
log.info("生產端,routingKey{}: {}", i, msg);
channel.basicPublish(ConsumerFanoutExchange.EXCHANGE_NAME, "" + i, null, (msg + i).getBytes());
}
channel.close();
connection.close();
}
}
複製代碼
先啓動消費端,再啓動生產端
查看消費端的日誌
routingkey0-9 的都能被就收,也就至關於該交換機上全部的隊列都能接收來到該交換機的消息。 headers 類型的不經常使用,就不介紹了
binding: 綁定 exchange 和 exchange/queue 之間的鏈接關心。binding 中能夠包含 routingkey 或者參數
queue: 消息隊列,實際存儲消息數據,durability 表示是否持久化,durable 表示是,transient 表示否。auto delete: 如選擇 yes,表示當最後一個監聽被移除後,該 queue 會被自動刪除。
message: 服務器和應用程序之間傳送的數據 本質上就是一段數據,由 properties 和 payload(body) 組成 經常使用屬性: delivery mode,headersheaders(自定義屬性),content_type,content_encoding,priority,correlation_id,reply_to,expiration,message_id,timestamp,type,user_id,app_id,cluster_id 代碼實現: 消費端:
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) throws IOException, TimeoutException,
ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 經過鏈接工廠建立鏈接
Connection connection = connectionFactory.newConnection();
//3 經過connection建立一個Channel
Channel channel = connection.createChannel();
//4 聲明(建立)一個隊列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5 建立消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 設置Channel
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
//7 獲取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
log.info("消費端: " + msg);
Map<String, Object> headers = delivery.getProperties().getHeaders();
log.info("headers get myHeaders1 value: " + headers.get("myHeaders1"));
log.info("headers get myHeaders2value: " + headers.get("myHeaders2"));
//Envelope envelope = delivery.getEnvelope();
}
}
}
複製代碼
生產端:
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
// 2 經過鏈接工廠建立鏈接
Connection connection = connectionFactory.newConnection();
// 3 經過connection建立一個Channel
Channel channel = connection.createChannel();
Map<String, Object> headers = new HashMap<>();
headers.put("myHeaders1", "111");
headers.put("myHeaders2", "222");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8")
.expiration("10000").headers(headers).build();
// 4 經過Channel發送數據
for (int i = 0; i < 5; i++) {
String msg = "Hello RabbitMQ!";
// 1 exchange 2 routingKey
log.info("生產端,test001: {}", msg);
channel.basicPublish("", "test001", properties, msg.getBytes());
}
// 5 記得要關閉相關的鏈接
channel.close();
connection.close();
}
}
複製代碼
先啓動消費端,上管控臺確認交換機和隊列是否建立和綁定成功,再啓動生產端,消費端接收到以下的信息
virtual host 虛擬主機 虛擬地址,用於進行邏輯隔離,最上層的消息路由,一個 virtual host 裏面能夠有若干個 exchange 和 queue,可是裏面不能有相同名稱的 exchange 或 queue