上手了RabbitMQ?再來看看它的交換機(Exchange)吧

人生終將是場單人旅途,孤獨以前是迷茫,孤獨事後是成長。

楔子

本篇是消息隊列RabbitMQ的第三彈。java

RabbitMQ的入門RabbitMQ+SpringBoot的整合能夠點此連接進去回顧,今天要講的是RabbitMQ的交換機。git

本篇是理解RabbitMQ很重要的一篇,交換機是消息的第一站,只有理解了交換機的分發模式,咱們才能知道不一樣交換機根據什麼規則分發消息,才能明白在面對不一樣業務需求的時候應採用哪一種交換機。程序員


祝有好收穫,先贊後看,快樂無限。github

本文代碼: 碼雲地址GitHub地址算法

1. 🔍Exchange

rabbit架構圖

先來放上幾乎每篇都要出現一遍的我畫了很久的RabbitMQ架構圖。spring

前兩篇文中咱們一直沒有顯式的去使用Exchange,都是使用的默認Exchange,其實Exchange是一個很是關鍵的組件,有了它纔有了各類消息分發模式。編程

我先簡單說說Exchange有哪幾種類型:segmentfault

  1. fanoutFanout-Exchange會將它接收到的消息發往全部與他綁定的Queue中。
  2. directDirect-Exchange會把它接收到的消息發往與它有綁定關係且Routingkey徹底匹配的Queue中(默認)。
  3. topicTopic-Exchange與Direct-Exchange類似,不過Topic-Exchange不須要全匹配,能夠部分匹配,它約定:Routingkey爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞)。
  4. headerHeader-Exchange不依賴於RoutingKey或綁定關係來分發消息,而是根據發送的消息內容中的headers屬性進行匹配。此模式已經再也不使用,本文中也不會去講,你們知道便可。

本文中咱們主要講前三種Exchange方式,相信憑藉着我簡練的文字和靈魂的畫技給你們好好講講,爭取老嫗能解。架構

Tip:本文的代碼演示直接使用SpringBoot+RabbitMQ的模式。spring-boot

2. 📕Fanout-Exchange

先來看看Fanout-ExchangeFanout-Exchange又稱扇形交換機,這個交換機應該是最容易理解的。

扇形交換機

ExchangeQueue創建一個綁定關係,Exchange會分發給全部和它有綁定關係的Queue中,綁定了十個Queue就把消息複製十份進行分發。

這種綁定關係爲了效率確定都會維護一張表,從算法效率上來講通常是O(1),因此Fanout-Exchange是這幾個交換機中查找須要被分發隊列最快的交換機。


下面是一段代碼演示:

@Bean
    public Queue fanout1() {
        return new Queue("fanout1");
    }

    @Bean
    public Queue fanout2() {
        return new Queue("fanout2");
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        // 三個構造參數:name durable autoDelete
        return new FanoutExchange("fanoutExchange", false, false);
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(fanout1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(fanout2()).to(fanoutExchange());
    }

爲了清晰明瞭,我新建了兩個演示用的隊列,而後建了一個FanoutExchange,最後給他們都設置上綁定關係,這樣一組隊列和交換機的綁定設置就算完成了。

緊接着編寫一下生產者和消費者:

public void sendFanout() {
        Client client = new Client();

        // 應讀者要求,之後代碼打印的地方都會改爲log方式,這是一種良好的編程習慣,用System.out.println通常是不推薦的。
        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("fanoutExchange",null,client);
        System.out.println("消息發送完畢。");
    }

    @Test
    public void sendFanoutMessage() {
        rabbitProduce.sendFanout();
    }
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {
    @RabbitListener(queues = "fanout1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已確認");
    }

    @RabbitListener(queues = "fanout2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已確認");
    }

}

這兩段代碼都很好理解,再也不贅述,有遺忘的能夠去看RabbitMQ第一彈的內容。

其中發送消息的代碼有三個參數,第一個參數是Exchange的名稱,第二個參數是routingKey的名稱,這個參數在扇形交換機裏面用不到,在其餘兩個交換機類型裏面會用到。

代碼的準備到此結束,咱們能夠運行發送方法以後run一下了~

項目啓動後,咱們能夠先來觀察一下隊列與交換機的綁定關係有沒有生效,咱們在RabbitMQ控制檯使用rabbitmqctl list_bindings命令查看綁定關係。

扇形交換機綁定關係

關鍵部分我用紅框標記了起來,這就表明着名叫fanoutExchange的交換機綁定着兩個隊列,一個叫fanout1,另外一個叫fanout2

緊接着,咱們來看控制檯的打印狀況:

扇形交換機確認消息

能夠看到,一條信息發送出去以後,兩個隊列都接收到了這條消息,緊接着由咱們的兩個消費者消費。

Tip: 若是你的演示應用啓動以後沒有消費信息,能夠嘗試從新運行一次生產者的方法發送消息。

3. 📗Direct-Exchange

Direct-Exchange是一種精準匹配的交換機,咱們以前一直使用默認的交換機,其實默認的交換機就是Direct類型。

若是將Direct交換機都比做一所公寓的管理員,那麼隊列就是裏面的住戶。(綁定關係)

管理員天天都會收到各類各樣的信件(消息),這些信件的地址不光要標明地址(ExchangeKey)還須要標明要送往哪一戶(routingKey),否則消息沒法投遞。

扇形交換機

以上圖爲例,準備一條消息發往名爲SendService的直接交換機中去,這個交換機主要是用來作發送服務,因此其綁定了兩個隊列,SMS隊列和MAIL隊列,用於發送短信和郵件。

咱們的消息除了指定ExchangeKey還須要指定routingKeyroutingKey對應着最終要發送的是哪一個隊列,咱們的示例中的routingKey是sms,這裏這條消息就會交給SMS隊列。


聽了上面這段,可能你們對routingKey還不是很理解,咱們上段代碼實踐一下,你們應該就明白了。

準備工做:

@Bean
    public Queue directQueue1() {
        return new Queue("directQueue1");
    }

    @Bean
    public Queue directQueue2() {
        return new Queue("directQueue2");
    }

    @Bean
    public DirectExchange directExchange() {
        // 三個構造參數:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }

    @Bean
    public Binding directBinding1() {
        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding directBinding2() {
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
    }

新建兩個隊列,新建了一個直接交換機,並設置了綁定關係。

這裏的示例代碼和上面扇形交換機的代碼很像,惟一能夠說不一樣的就是綁定的時候多調用了一個withroutingKey設置了上去。

因此是交換機和隊列創建綁定關係的時候設置的routingKey,一個消息到達交換機以後,交換機經過消息上帶來的routingKey找到本身與隊列創建綁定關係時設置的routingKey,而後將消息分發到這個隊列去。

生產者:

public void sendDirect() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("directExchange","sms",client);
        System.out.println("消息發送完畢。");
    }

消費者:

@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {
    @RabbitListener(queues = "directQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已確認");
    }

    @RabbitListener(queues = "directQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已確認");
    }

}

效果圖以下:

扇形交換機

只有一個消費者進行了消息,符合咱們的預期。

4. 📙Topic-Exchange

Topic-Exchange是直接交換機的模糊匹配版本,Topic類型的交換器,支持使用"*"和"#"通配符定義模糊bindingKey,而後按照routingKey進行模糊匹配隊列進行分發。

  • *:可以模糊匹配一個單詞。
  • #:可以模糊匹配零個或多個單詞。

由於加入了兩個通配定義符,因此Topic交換機的routingKey也有些變化,routingKey可使用.將單詞分開。


這裏咱們直接來用一個例子說明會更加的清晰:

準備工做:

// 主題交換機示例
    @Bean
    public Queue topicQueue1() {
        return new Queue("topicQueue1");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topicQueue2");
    }

    @Bean
    public TopicExchange topicExchange() {
        // 三個構造參數:name durable autoDelete
        return new TopicExchange("topicExchange", false, false);
    }

    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
    }

    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
    }

新建兩個隊列,新建了一個Topic交換機,並設置了綁定關係。

這裏的示例代碼咱們主要看設置routingKey,這裏的routingKey用上了通配符,且中間用.隔開,這就表明topicQueue1消費sms開頭的消息,topicQueue2消費mail開頭的消息,具體不一樣往下看。

生產者:

public void sendTopic() {
        Client client = new Client();

        log.info("Message content : " + client);

        rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
        System.out.println("消息發送完畢。");
    }

消費者:

@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {
    @RabbitListener(queues = "topicQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已確認");
    }

    @RabbitListener(queues = "topicQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已確認");
    }

}

這裏咱們的生產者發送的消息routingKeysms.liantong,它就會被髮到topicQueue1隊列中去,這裏消息的routingKey也須要用.隔離開,用其餘符號沒法正確識別。

若是咱們的routingKeysms.123.liantong,那麼它將沒法找到對應的隊列,由於topicQueue1的模糊匹配用的通配符是*而不是#,只有#是能夠匹配多個單詞的。

Topic-ExchangeDirect-Exchange很類似,我就再也不贅述了,通配符*#的區別也很簡單,你們能夠本身試一下。

後記

週一沒更文實在慚愧,去醫院抽血了,抽了三管~,吃多少才能補回來~

RabbitMQ已經更新了三篇了,這三篇的內容有些偏基礎,下一篇將會更新高級部份內容:包括防止消息丟失,防止消息重複消費等等內容,但願你們持續關注。


最近這段時間壓力挺大,優狐令我八月底以前升級到三級,因此各位讀者的贊對我很重要,但願你們可以高擡貴手,幫我一哈~

好了,以上就是本期的所有內容,感謝你能看到這裏,歡迎對本文點贊收藏與評論,👍大家的每一個點贊都是我創做的最大動力。

我是耳朵,一個一直想作知識輸出的僞文藝程序員,咱們下期見。

本文代碼:碼雲地址GitHub地址

相關文章
相關標籤/搜索