人生終將是場單人旅途,孤獨以前是迷茫,孤獨事後是成長。
本篇是消息隊列RabbitMQ
的第三彈。java
RabbitMQ的入門和RabbitMQ+SpringBoot的整合能夠點此連接進去回顧,今天要講的是RabbitMQ
的交換機。git
本篇是理解RabbitMQ
很重要的一篇,交換機是消息的第一站,只有理解了交換機的分發模式,咱們才能知道不一樣交換機根據什麼規則分發消息,才能明白在面對不一樣業務需求的時候應採用哪一種交換機。程序員
祝有好收穫,先贊後看,快樂無限。github
先來放上幾乎每篇都要出現一遍的我畫了很久的RabbitMQ
架構圖。spring
前兩篇文中咱們一直沒有顯式的去使用Exchange
,都是使用的默認Exchange
,其實Exchange
是一個很是關鍵的組件,有了它纔有了各類消息分發模式。編程
我先簡單說說Exchange
有哪幾種類型:segmentfault
Fanout-Exchange
會將它接收到的消息發往全部與他綁定的Queue中。Direct-Exchange
會把它接收到的消息發往與它有綁定關係且Routingkey
徹底匹配的Queue中(默認)。Topic-Exchange
與Direct-Exchange類似,不過Topic-Exchange不須要全匹配,能夠部分匹配,它約定:Routingkey
爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞)。Header-Exchange
不依賴於RoutingKey或綁定關係來分發消息,而是根據發送的消息內容中的headers屬性進行匹配。此模式已經再也不使用,本文中也不會去講,你們知道便可。本文中咱們主要講前三種Exchange
方式,相信憑藉着我簡練的文字和靈魂的畫技給你們好好講講,爭取老嫗能解。架構
Tip:本文的代碼演示直接使用SpringBoot+RabbitMQ的模式。spring-boot
先來看看Fanout-Exchange
,Fanout-Exchange
又稱扇形交換機,這個交換機應該是最容易理解的。
Exchange
和Queue
創建一個綁定關係,Exchange
會分發給全部和它有綁定關係的Queue
中,綁定了十個Queue
就把消息複製十份進行分發。
這種綁定關係爲了效率確定都會維護一張表,從算法效率上來講通常是O(1),因此Fanout-Exchange
是這幾個交換機中查找須要被分發隊列最快的交換機。
下面是一段代碼演示:
@Bean public Queue fanout1() { return new Queue("fanout1"); } @Bean public Queue fanout2() { return new Queue("fanout2"); } @Bean public FanoutExchange fanoutExchange() { // 三個構造參數:name durable autoDelete return new FanoutExchange("fanoutExchange", false, false); } @Bean public Binding binding1() { return BindingBuilder.bind(fanout1()).to(fanoutExchange()); } @Bean public Binding binding2() { return BindingBuilder.bind(fanout2()).to(fanoutExchange()); }
爲了清晰明瞭,我新建了兩個演示用的隊列,而後建了一個FanoutExchange
,最後給他們都設置上綁定關係,這樣一組隊列和交換機的綁定設置就算完成了。
緊接着編寫一下生產者和消費者:
public void sendFanout() { Client client = new Client(); // 應讀者要求,之後代碼打印的地方都會改爲log方式,這是一種良好的編程習慣,用System.out.println通常是不推薦的。 log.info("Message content : " + client); rabbitTemplate.convertAndSend("fanoutExchange",null,client); System.out.println("消息發送完畢。"); } @Test public void sendFanoutMessage() { rabbitProduce.sendFanout(); }
@Slf4j @Component("rabbitFanoutConsumer") public class RabbitFanoutConsumer { @RabbitListener(queues = "fanout1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已確認"); } @RabbitListener(queues = "fanout2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已確認"); } }
這兩段代碼都很好理解,再也不贅述,有遺忘的能夠去看RabbitMQ第一彈的內容。
其中發送消息的代碼有三個參數,第一個參數是Exchange
的名稱,第二個參數是routingKey
的名稱,這個參數在扇形交換機裏面用不到,在其餘兩個交換機類型裏面會用到。
代碼的準備到此結束,咱們能夠運行發送方法以後run一下了~
項目啓動後,咱們能夠先來觀察一下隊列與交換機的綁定關係有沒有生效,咱們在RabbitMQ控制檯使用rabbitmqctl list_bindings
命令查看綁定關係。
關鍵部分我用紅框標記了起來,這就表明着名叫fanoutExchange
的交換機綁定着兩個隊列,一個叫fanout1
,另外一個叫fanout2
。
緊接着,咱們來看控制檯的打印狀況:
能夠看到,一條信息發送出去以後,兩個隊列都接收到了這條消息,緊接着由咱們的兩個消費者消費。
Tip: 若是你的演示應用啓動以後沒有消費信息,能夠嘗試從新運行一次生產者的方法發送消息。
Direct-Exchange
是一種精準匹配的交換機,咱們以前一直使用默認的交換機,其實默認的交換機就是Direct類型。
若是將Direct交換機都比做一所公寓的管理員,那麼隊列就是裏面的住戶。(綁定關係)
管理員天天都會收到各類各樣的信件(消息),這些信件的地址不光要標明地址(ExchangeKey)還須要標明要送往哪一戶(routingKey),否則消息沒法投遞。
以上圖爲例,準備一條消息發往名爲SendService
的直接交換機中去,這個交換機主要是用來作發送服務,因此其綁定了兩個隊列,SMS隊列和MAIL隊列,用於發送短信和郵件。
咱們的消息除了指定ExchangeKey
還須要指定routingKey
,routingKey
對應着最終要發送的是哪一個隊列,咱們的示例中的routingKey
是sms,這裏這條消息就會交給SMS隊列。
聽了上面這段,可能你們對routingKey
還不是很理解,咱們上段代碼實踐一下,你們應該就明白了。
準備工做:
@Bean public Queue directQueue1() { return new Queue("directQueue1"); } @Bean public Queue directQueue2() { return new Queue("directQueue2"); } @Bean public DirectExchange directExchange() { // 三個構造參數:name durable autoDelete return new DirectExchange("directExchange", false, false); } @Bean public Binding directBinding1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms"); } @Bean public Binding directBinding2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail"); }
新建兩個隊列,新建了一個直接交換機,並設置了綁定關係。
這裏的示例代碼和上面扇形交換機的代碼很像,惟一能夠說不一樣的就是綁定的時候多調用了一個with
將routingKey
設置了上去。
因此是交換機和隊列創建綁定關係的時候設置的routingKey
,一個消息到達交換機以後,交換機經過消息上帶來的routingKey
找到本身與隊列創建綁定關係時設置的routingKey
,而後將消息分發到這個隊列去。
生產者:
public void sendDirect() { Client client = new Client(); log.info("Message content : " + client); rabbitTemplate.convertAndSend("directExchange","sms",client); System.out.println("消息發送完畢。"); }
消費者:
@Slf4j @Component("rabbitDirectConsumer") public class RabbitDirectConsumer { @RabbitListener(queues = "directQueue1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已確認"); } @RabbitListener(queues = "directQueue2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已確認"); } }
效果圖以下:
只有一個消費者進行了消息,符合咱們的預期。
Topic-Exchange
是直接交換機的模糊匹配版本,Topic類型的交換器,支持使用"*"和"#"通配符定義模糊bindingKey,而後按照routingKey
進行模糊匹配隊列進行分發。
*
:可以模糊匹配一個單詞。#
:可以模糊匹配零個或多個單詞。由於加入了兩個通配定義符,因此Topic交換機的routingKey
也有些變化,routingKey
可使用.
將單詞分開。
這裏咱們直接來用一個例子說明會更加的清晰:
準備工做:
// 主題交換機示例 @Bean public Queue topicQueue1() { return new Queue("topicQueue1"); } @Bean public Queue topicQueue2() { return new Queue("topicQueue2"); } @Bean public TopicExchange topicExchange() { // 三個構造參數:name durable autoDelete return new TopicExchange("topicExchange", false, false); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#"); }
新建兩個隊列,新建了一個Topic交換機,並設置了綁定關係。
這裏的示例代碼咱們主要看設置routingKey
,這裏的routingKey
用上了通配符,且中間用.
隔開,這就表明topicQueue1
消費sms
開頭的消息,topicQueue2
消費mail
開頭的消息,具體不一樣往下看。
生產者:
public void sendTopic() { Client client = new Client(); log.info("Message content : " + client); rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client); System.out.println("消息發送完畢。"); }
消費者:
@Slf4j @Component("rabbitTopicConsumer") public class RabbitTopicConsumer { @RabbitListener(queues = "topicQueue1") public void onMessage1(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已確認"); } @RabbitListener(queues = "topicQueue2") public void onMessage2(Message message, Channel channel) throws Exception { log.info("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("消息已確認"); } }
這裏咱們的生產者發送的消息routingKey
是sms.liantong
,它就會被髮到topicQueue1
隊列中去,這裏消息的routingKey
也須要用.
隔離開,用其餘符號沒法正確識別。
若是咱們的routingKey
是sms.123.liantong
,那麼它將沒法找到對應的隊列,由於topicQueue1
的模糊匹配用的通配符是*
而不是#
,只有#
是能夠匹配多個單詞的。
Topic-Exchange
和Direct-Exchange
很類似,我就再也不贅述了,通配符*
和#
的區別也很簡單,你們能夠本身試一下。
週一沒更文實在慚愧,去醫院抽血了,抽了三管~,吃多少才能補回來~
RabbitMQ已經更新了三篇了,這三篇的內容有些偏基礎,下一篇將會更新高級部份內容:包括防止消息丟失,防止消息重複消費等等內容,但願你們持續關注。
最近這段時間壓力挺大,優狐令我八月底以前升級到三級,因此各位讀者的贊對我很重要,但願你們可以高擡貴手,幫我一哈~
好了,以上就是本期的所有內容,感謝你能看到這裏,歡迎對本文點贊收藏與評論,👍大家的每一個點贊都是我創做的最大動力。
我是耳朵,一個一直想作知識輸出的僞文藝程序員,咱們下期見。