Spring Boot 中使用 RabbitMQ

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。java

AMQP,即Advanced message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。git

AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。github

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。spring

經常使用概念

一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。segmentfault

準備

環境安裝

任選其一安全

CentOs7.3 搭建 RabbitMQ 3.6 單機服務與使用bash

CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集羣服務與使用服務器

Github 代碼

代碼我已放到 Github ,導入spring-boot-rabbitmq 項目app

github github.com/souyunku/sp…分佈式

項目結構

添加依賴

在項目中添加 spring-boot-starter-amqp 依賴

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
複製代碼

參數配置

spring.application.name=ymq-rabbitmq-spring-boot

spring.rabbitmq.host=10.4.98.15
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
複製代碼

交換機(Exchange)

1.Direct Exchange 根據route key 直接找到隊列
2.Topic Exchange 根據route key 匹配隊列
3.Topic Exchange 不處理route key 全網發送,全部綁定的隊列都發送

Direct Exchange

Direct Exchange 圖解

Direct ExchangeRabbitMQ默認的交換機模式,也是最簡單的模式,根據key全文匹配去尋找隊列。

任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue

1.通常狀況能夠使用rabbitMQ自帶的Exchange:""(該Exchange的名字爲空字符串,下文稱其爲default Exchange)。
2.這種模式下不須要將Exchange進行任何綁定(binding)操做
3.消息傳遞時須要一個RouteKey,能夠簡單的理解爲要發送到的隊列名字。
4.若是vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。

配置隊列

@Configuration
public class RabbitDirectConfig {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }

    @Bean
    public Queue directQueue() {
        return new Queue("direct");
    }

    //-------------------配置默認的交換機模式,能夠不須要配置如下-----------------------------------
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }

    //綁定一個key "direct",當消息匹配到就會放到這個隊列中
    @Bean
    Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("direct");
    }
    // 推薦使用 helloQueue() 方法寫法,這種方式在 Direct Exchange 模式 畫蛇添足,不必這樣寫
    //---------------------------------------------------------------------------------------------
}

複製代碼

監聽隊列

@Component
@RabbitListener(queues = "hello")
public class helloReceiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 helloReceiver," + message);
    }
}
複製代碼
@Component
@RabbitListener(queues = "direct")
public class DirectReceiver {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 DirectReceiver," + message);
    }
}

複製代碼

發送消息

package io.ymq.rabbitmq.test;

import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/** * 描述: 默認的交換機模式 * * @author: yanpenglei * @create: 2017/10/25 1:03 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitDirectTest {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void sendHelloTest() {

        String context = "此消息在,默認的交換機模式隊列下,有 helloReceiver 能夠收到";

        String routeKey = "hello";

        context = "routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendHelloTest : " + context);

        this.rabbitTemplate.convertAndSend(routeKey, context);
    }

    @Test
    public void sendDirectTest() {

        String context = "此消息在,默認的交換機模式隊列下,有 DirectReceiver 能夠收到";

        String routeKey = "direct";

        String exchange = "directExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendDirectTest : " + context);

        // 推薦使用 sendHello() 方法寫法,這種方式在 Direct Exchange 畫蛇添足,不必這樣寫
        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }
}
複製代碼

按順序執行:響應

接收者 helloReceiver,routeKey:hello,context:此消息在,默認的交換機模式隊列下,有 helloReceiver 能夠收到

接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默認的交換機模式隊列下,有 DirectReceiver 能夠收到
複製代碼

Fanout Exchange

Fanout Exchange 圖解

任何發送到Fanout Exchange 的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上

1.能夠理解爲路由表的模式
2.這種模式不須要 RouteKey
3.這種模式須要提早將ExchangeQueue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定。
4.若是接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。

配置隊列

@Configuration
public class RabbitFanoutConfig {

    final static String PENGLEI = "fanout.penglei.net";

    final static String SOUYUNKU = "fanout.souyunku.com";
    @Bean
    public Queue queuePenglei() {
        return new Queue(RabbitFanoutConfig.PENGLEI);
    }

    @Bean
    public Queue queueSouyunku() {
        return new Queue(RabbitFanoutConfig.SOUYUNKU);
    }

    /** * 任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部隊列上。 */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queuePenglei).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);
    }

}
複製代碼

監聽隊列

@Component
@RabbitListener(queues = "fanout.penglei.net")
public class FanoutReceiver1 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 FanoutReceiver1," + message);
    }
}
複製代碼
@Component
@RabbitListener(queues = "fanout.souyunku.com")
public class FanoutReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 FanoutReceiver2," + message);
    }
}
複製代碼

發送消息

package io.ymq.rabbitmq.test;

import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/** * 描述: 廣播模式或者訂閱模式隊列 * * @author: yanpenglei * @create: 2017/10/25 1:08 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitFanoutTest {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void sendPengleiTest() {

        String context = "此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 能夠收到";

        String routeKey = "topic.penglei.net";

        String exchange = "fanoutExchange";

        System.out.println("sendPengleiTest : " + context);

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }

    @Test
    public void sendSouyunkuTest() {

        String context = "此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 能夠收到";

        String routeKey = "topic.souyunku.com";

        String exchange = "fanoutExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendSouyunkuTest : " + context);

        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }
}

複製代碼

按順序執行:響應

接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 能夠收到
接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 能夠收到


接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 能夠收到
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,廣播模式或者訂閱模式隊列下,有 FanoutReceiver1 FanoutReceiver2 能夠收到

複製代碼

Topic Exchange

Topic Exchange 圖解

任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue

1.這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個標題``(RouteKey)Exchange會將消息轉發到全部關注主題能與RouteKey模糊匹配的隊列。
2.這種模式須要RouteKey,也許要提早綁定ExchangeQueue
3.在進行綁定時,要提供一個該隊列關心的主題,如#.log.#表示該隊列關心全部涉及log的消息(一個RouteKey爲MQ.log.error的消息會被轉發到該隊列)。
4.#表示0個或若干個關鍵字,*表示一個關鍵字。如topic.*能與topic.warn匹配,沒法與topic.warn.timeout匹配;可是topic.#能與上述二者匹配。
5.一樣,若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息。

配置隊列

@Configuration
public class RabbitTopicConfig {

    final static String MESSAGE = "topic.message";

    final static String MESSAGES = "topic.message.s";

    final static String YMQ = "topic.ymq";

    @Bean
    public Queue queueMessage() {
        return new Queue(RabbitTopicConfig.MESSAGE);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(RabbitTopicConfig.MESSAGES);
    }

    @Bean
    public Queue queueYmq() {
        return new Queue(RabbitTopicConfig.YMQ);
    }

    /** * 交換機(Exchange) 描述:接收消息而且轉發到綁定的隊列,交換機不存儲消息 */
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    //綁定隊列 queueMessages() 到 topicExchange 交換機,路由鍵只接受徹底匹配 topic.message 的隊列接受者能夠收到消息
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");
    }

    //綁定隊列 queueMessages() 到 topicExchange 交換機,路由鍵只要是以 topic.message 開頭的隊列接受者能夠收到消息
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");
    }

    //綁定隊列 queueYmq() 到 topicExchange 交換機,路由鍵只要是以 topic 開頭的隊列接受者能夠收到消息
    @Bean
    Binding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");
    }

}

複製代碼

監聽隊列

@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 TopicReceiver1," + message);
    }

}
複製代碼
@Component
@RabbitListener(queues = "topic.message.s")
public class TopicReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 TopicReceiver2," + message);
    }

}
複製代碼
@Component
@RabbitListener(queues = "topic.ymq")
public class TopicReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("接收者 TopicReceiver3," + message);
    }

}

複製代碼

發送消息

package io.ymq.rabbitmq.test;

import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/** * 描述: 配置轉發消息模式隊列 * * @author: yanpenglei * @create: 2017/10/25 1:20 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitTopicTest {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void sendMessageTest() {

        String context = "此消息在,配置轉發消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能夠收到";

        String routeKey = "topic.message";

        String exchange = "topicExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendMessageTest : " + context);

        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }

    @Test
    public void sendMessagesTest() {


        String context = "此消息在,配置轉發消息模式隊列下,有 TopicReceiver2 TopicReceiver3 能夠收到";

        String routeKey = "topic.message.s";

        String exchange = "topicExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendMessagesTest : " + context);

        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }

    @Test
    public void sendYmqTest() {

        String context = "此消息在,配置轉發消息模式隊列下,有 TopicReceiver3 能夠收到";

        String routeKey = "topic.ymq";

        String exchange = "topicExchange";

        context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;

        System.out.println("sendYmqTest : " + context);

        this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
    }
}

複製代碼

按順序執行:響應

接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置轉發消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能夠收到
接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置轉發消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能夠收到
接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置轉發消息模式隊列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能夠收到


接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置轉發消息模式隊列下,有  TopicReceiver2 TopicReceiver3 能夠收到
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置轉發消息模式隊列下,有  TopicReceiver2 TopicReceiver3 能夠收到


接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置轉發消息模式隊列下,有 TopicReceiver3 能夠收到

複製代碼

代碼我已放到 Github ,導入spring-boot-rabbitmq 項目

github github.com/souyunku/sp…

Contact

  • 做者:鵬磊
  • 出處:www.ymq.io
  • Email:admin@souyunku.com
  • 版權歸做者全部,轉載請註明出處
  • Wechat:關注公衆號,搜雲庫,專一於開發技術的研究與知識分享

關注公衆號-搜雲庫
搜雲庫
相關文章
相關標籤/搜索