交換機:Exchange 用於轉發消息,可是它不會作存儲 ,若是沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。 這裏有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麼究竟轉發到哪一個隊列,就要根據該路由鍵。spring
交換機的功能主要是接收消息而且轉發到綁定的隊列,交換機不存儲消息,在啓用ack模式後,交換機找不到隊列會返回錯誤。bash
交換機有四種類型:Direct, topic, Headers and Fanoutapp
* Direct:direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.
* Topic:按規則轉發消息(最靈活)
* Headers:設置 header attribute 參數類型的交換機
* Fanout:轉發消息到全部綁定隊列(廣播模式)
複製代碼
下面介紹經常使用的三種模式的基礎用法。函數
Pom 依賴spring-boot
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
複製代碼
application.properties 配置文件測試
# rabbitmq鏈接參數
spring.rabbitmq.host= # mq ip地址
spring.rabbitmq.port=5672 # 端口 默認5672
spring.rabbitmq.username=admin # 用戶名
spring.rabbitmq.password=admin # 密碼
# 開啓發送確認(開啓此模式,生產者成功發送到交換機後執行相應的回調函數)
#spring.rabbitmq.publisher-confirms=true
# 開啓發送失敗退(開啓此模式,交換機路由不到隊列時執行相應的回調函數)
#spring.rabbitmq.publisher-returns=true
# 開啓消費者手動確認 ACK 默認auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
複製代碼
direct類型的Exchange路由規則很簡單,它會把消息路由到那些binding key與routing key徹底匹配的Queue中ui
/**
* Rabbit 配置類
* @author peng
*/
@Configuration
public class DirectRabbitConfig {
@Bean
DirectExchange directExchange(){
// 註冊一個 Direct 類型的交換機 默認持久化、非自動刪除
return new DirectExchange("directExchange");
}
@Bean
Queue infoQueue(){
// 註冊隊列
return new Queue("infoMsgQueue");
}
@Bean
Queue warnQueue(){
return new Queue("warnMsgQueue");
}
@Bean
Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
// 將隊列以 info-msg 爲綁定鍵綁定到交換機
return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
}
@Bean
Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) {
return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg");
}
}
複製代碼
/**
* 生產者
* @author peng
*/
@Component
public class DirectSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendInfo() {
String content = "I am Info msg!";
// 將消息以info-msg綁定鍵發送到directExchange交換機
this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content);
System.out.println("########### SendInfoMsg : " + content);
}
public void sendWarn() {
String content = "I am Warn msg!";
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
public void sendWarn(int i) {
String content = "I am Warn msg! " + i;
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
public void sendError() {
String content = "I am Error msg!";
this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content);
System.out.println("########### SendErrorMsg : " + content);
}
}
複製代碼
消費者1
/**
* @author peng
*/
@Component
// 標記此類爲Rabbit消息監聽類,監聽隊列infoMsgQueue
@RabbitListener(queues = "infoMsgQueue")
public class DirectReceiver1 {
// 定義處理消息的方法
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message);
}
}
消費者2
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver2 Receive warnMsg:" + message);
}
}
複製代碼
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DirectTest {
@Autowired
private DirectSender directSender;
@Test
public void send() {
directSender.sendInfo();
directSender.sendWarn();
directSender.sendError();
}
}
結果
########### SendInfoMsg : I am Info msg!
########### SendWarnMsg : I am Warn msg!
########### DirectReceiver2 Receive warnMsg:I am Warn msg!
########### DirectReceiver1 Receive InfoMsg:I am Info msg!
InfoMsg 以info-msg綁定鍵發送到directExchange交換機,交換機路由到infoMsgQueue隊列,DirectReceiver1監聽此隊列接受消息。
WarnMsg 同理
ErrorMsg 因爲沒有隊列的綁定鍵爲 error-msg 因此消息會被丟棄
複製代碼
消費者3
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver3 Receive warnMsg:" + message);
}
}
// 一對多
@Test
public void oneToMany() {
for (int i = 0; i< 100 ; i++){
directSender.sendWarn(i);
}
}
結果
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7
消費者2 和 消費者3 均勻(條數上)的消費了消息
複製代碼
/**
* 生產者3
* @author peng
*/
@Component
public class DirectSender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendWarn(int i) {
String content = "I am Warn msg! " + i +" fromSend2";
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
}
// 多對多
@Test
public void manyToMany() {
for (int i = 0; i< 10 ; i++){
directSender.sendWarn(i);
directSender2.sendWarn(i);
}
}
結果
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1
消費者2和消費者3分別接受了生產者1 和生產者2的消息
複製代碼
fanout類型的Exchange路由規則很是簡單,會發送給全部綁定到該交換機的隊列上。會忽略路由鍵this
/**
* @author peng
*/
@Configuration
public class FanoutRabbitConfig {
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
Queue queue1(){
return new Queue("fanout.1");
}
@Bean
Queue queue2(){
return new Queue("fanout.2");
}
@Bean
Queue queue3(){
return new Queue("fanout.3");
}
@Bean
Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
@Bean
Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue3).to(fanoutExchange);
}
}
複製代碼
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
System.out.println("######## Sender : " + context);
}
}
複製代碼
消費者1
/**
* @author peng
*/
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 1 : " + message);
}
}
消費者2
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 2 : " + message);
}
}
消費者3
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 3 : " + message);
}
}
複製代碼
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void send() {
fanoutSender.send();
}
}
結果
######## Sender : hi, fanout msg
fanout Receiver 1 : hi, fanout msg
fanout Receiver 2 : hi, fanout msg
fanout Receiver 3 : hi, fanout msg
複製代碼
Topic 類型匹配最爲普遍,Routing Key必須與Binding Key相匹配(可經過通配符模糊匹配)的時候纔將消息傳送給Queuespa
*匹配一個單詞, #匹配多個單詞,單詞之間以.分隔。如 *.male.#可匹配dog.male.four、rabbit.male.four.white等3d
@Configuration
public class TopicRabbitConfig {
@Bean
TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
Queue topicQueue1(){
return new Queue("topicQueue1");
}
@Bean
Queue topicQueue2(){
return new Queue("topicQueue2");
}
@Bean
Queue topicQueue3(){
return new Queue("topicQueue3");
}
@Bean
Binding topicQueue1Binding(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue1).to(exchange).with("*.male.four");
}
@Bean
Binding topicQueue2Binding(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2).to(exchange).with("#.four");
}
@Bean
Binding topicQueue3Binding(Queue topicQueue3, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue3).to(exchange).with("hen.female.two");
}
}
複製代碼
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "##### i am message 1";
System.out.println("Sender 1 : " + context);
// 路由鍵 rabbit.male.four 會被綁定鍵 *.male.four 和 #.four匹配
this.rabbitTemplate.convertAndSend("topicExchange", "rabbit.male.four", context);
}
public void send2() {
String context = "##### i am message 2";
System.out.println("Sender 2: " + context);
// 路由鍵 dog.male.four 會被綁定鍵 #.four 匹配
this.rabbitTemplate.convertAndSend("topicExchange", "dog.female.four", context);
}
public void send3() {
String context = "##### i am messages 3";
System.out.println("Sender 3: " + context);
路由鍵 hen.female.two 會被綁定鍵 hen.female.two 匹配
this.rabbitTemplate.convertAndSend("topicExchange", "hen.female.two", context);
}
}
複製代碼
消費者1
@Component
@RabbitListener(queues = "topicQueue1")
public class TopicReceiver1 {
@RabbitHandler
public void process(String msg) {
System.out.println("Topic Receiver1 : " + msg);
}
}
消費者2
@Component
@RabbitListener(queues = "topicQueue2")
public class TopicReceiver2 {
@RabbitHandler
public void process(String msg) {
System.out.println("Topic Receiver2 : " + msg);
}
}
消費者3
@Component
@RabbitListener(queues = "topicQueue3")
public class TopicReceiver3 {
@RabbitHandler
public void process(String msg) {
System.out.println("Topic Receiver3 : " + msg);
}
}
複製代碼
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TopicTest {
@Autowired
private TopicSender topicSender;
@Test
public void send() {
topicSender.send1();
topicSender.send2();
topicSender.send3();
}
}
結果
Sender 1: ##### i am message 1
Sender 2: ##### i am message 2
Sender 3: ##### i am messages 3
Topic Receiver1 : ##### i am message 1
Topic Receiver2 : ##### i am message 1
Topic Receiver3 : ##### i am messages 3
Topic Receiver2 : ##### i am message 2
消息1 被消費者1和2消費 路由鍵 rabbit.male.four 會被綁定鍵 *.male.four 和 #.four匹配
消息2 被消費者2消費 路由鍵 dog.male.four 會被綁定鍵 #.four 匹配
消息3 被消費者3消費 至關於 direct徹底匹配
複製代碼