springboot整合rabbitmq

概述

  • RabbitMQ是一個開源的消息代理和隊列服務器,用來經過普通協議在徹底不一樣的應用之間共享數據,或者簡單地將做業隊列以便讓分佈式服務器進行處理。java

  • 它現實了AMQP協議,而且遵循Mozilla Public License開源協議,它支持多種語言,能夠方便的和spring集成。spring

  • 消息隊列使用消息將應用程序鏈接起來,這些消息經過像RabbitMQ這樣的消息代理服務器在應用程序之間路由。springboot

基本概念

  • Broker
    用來處理數據的消息隊列服務器實體服務器

  • vhost
    由RabbitMQ服務器建立的虛擬消息主機,擁有本身的權限機制,一個broker裏能夠開設多個vhost,用於不一樣用戶的權限隔離,vhost之間是也徹底隔離的。架構

  • productor
    產生用於消息通訊的數據分佈式

  • channel
    消息通道,在AMQP中能夠創建多個channel,每一個channel表明一個會話任務。spring-boot

  • exchange
    1. direct
      轉發消息到routing-key指定的隊列
      direct
    2. fanout
      轉發消息到全部綁定的隊列,相似於一種廣播發送的方式。
      fanout
    3. topic
      按照規則轉發消息,這種規則多爲模式匹配,也顯得更加靈活
      topic
  • queue
    1. 隊列是RabbitMQ的內部對象,存儲消息
    2. 以動態的增長消費者,隊列將接受到的消息以輪詢(round-robin)的方式均勻的分配給多個消費者。
  • binding
    表示交換機和隊列之間的關係,在進行綁定時,帶有一個額外的參數binding-key,來和routing-key相匹配。測試

  • consumer
    監聽消息隊列來進行消息數據的讀取網站

springboot下三種Exchange模式(fanout,direct,topic)實現

pom.xml中引用spring-boot-starter-amqpui

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

增長rabbitmq配置

spring:  
 rabbitmq:  
  host: localhost 
  port: 5672  
  username: guest  
  password: guest

direct

direct模式通常狀況下只須要定義queue 使用自帶交換機(defaultExchange)無需綁定交換機

@Configuration
public class RabbitP2PConfigure {
    
 public static final String QUEUE_NAME = "p2p-queue";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

}
@RunWith(SpringRunner.class)
@SpringBootTest(classes = BootCoreTestApplication.class)
@Slf4j
public class RabbitTest {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
    * 發送
    */
    @Test
    public void sendLazy() throws InterruptedException {
        City city = new City(234556666L, "direct_name", "direct_code");
        amqpTemplate.convertAndSend(RabbitLazyConfigure.QUEUE_NAME, city);
    }
    
    /**
    * 領取
    */
    @Test
    public void receive() throws InterruptedException {
        Object obj = amqpTemplate.receiveAndConvert(RabbitLazyConfigure.QUEUE_NAME);
        Assert.notNull(obj, "");
        log.debug(obj.toString());
    }
}

適用場景:點對點

fanout

fanout則模式須要將多個queue綁定在同一個交換機上

@Configuration
public class RabbitFanoutConfigure {

    public static final String EXCHANGE_NAME = "fanout-exchange";

    public static final String FANOUT_A = "fanout.A";
    public static final String FANOUT_B = "fanout.B";
    public static final String FANOUT_C = "fanout.C";

    @Bean
    public Queue AMessage() {
        return new Queue(FANOUT_A);
    }

    @Bean
    public Queue BMessage() {
        return new Queue(FANOUT_B);
    }

    @Bean
    public Queue CMessage() {
        return new Queue(FANOUT_C);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

}

發送者

@Slf4j
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendFanout(Object message) {
        log.debug("begin send fanout message<" + message + ">");
        rabbitTemplate.convertAndSend(RabbitFanoutConfigure.EXCHANGE_NAME, "", message);
    }

}

咱們能夠經過@RabbitListener監聽多個queue來進行消費

@Slf4j
@RabbitListener(queues = {
        RabbitFanoutConfigure.FANOUT_A,
        RabbitFanoutConfigure.FANOUT_B,
        RabbitFanoutConfigure.FANOUT_C
})
public class Receiver {

    @RabbitHandler
    public void receiveMessage(String message) {
        log.debug("Received <" + message + ">");
    }
}

適用場景

- 大規模多用戶在線(MMO)遊戲可使用它來處理排行榜更新等全局事件
- 體育新聞網站能夠用它來近乎實時地將比分更新分發給移動客戶端
- 分發系統使用它來廣播各類狀態和配置更新
- 在羣聊的時候,它被用來分發消息給參與羣聊的用戶

topic

這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」,Exchange會將消息轉發到全部關注主題能與RouteKey模糊匹配的隊列。

在進行綁定時,要提供一個該隊列關心的主題,如「topic.# (「#」表示0個或若干個關鍵字,「*」表示一個關鍵字。 )

@Configuration
public class RabbitTopicConfigure {

    public static final String EXCHANGE_NAME = "topic-exchange";

    public static final String TOPIC = "topic";
    public static final String TOPIC_A = "topic.A";
    public static final String TOPIC_B = "topic.B";

    @Bean
    public Queue queueTopic() {
        return new Queue(RabbitTopicConfigure.TOPIC);
    }

    @Bean
    public Queue queueTopicA() {
        return new Queue(RabbitTopicConfigure.TOPIC_A);
    }

    @Bean
    public Queue queueTopicB() {
        return new Queue(RabbitTopicConfigure.TOPIC_B);
    }

    @Bean
    public TopicExchange exchange() {
        TopicExchange topicExchange = new TopicExchange(EXCHANGE_NAME);
        topicExchange.setDelayed(true);
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding bindingExchangeTopic(Queue queueTopic, TopicExchange exchange) {
        return BindingBuilder.bind(queueTopic).to(exchange).with(RabbitTopicConfigure.TOPIC);
    }

    @Bean
    public Binding bindingExchangeTopics(Queue queueTopicA, TopicExchange exchange) {
        return BindingBuilder.bind(queueTopicA).to(exchange).with("topic.#");
    }
}

同時去監聽三個queue

@Slf4j
@RabbitListener(queues = {
        RabbitTopicConfigure.TOPIC,
        RabbitTopicConfigure.TOPIC_A,
        RabbitTopicConfigure.TOPIC_B
})
public class Receiver {

    @RabbitHandler
    public void receiveMessage(String message) {
        log.debug("Received <" + message + ">");
    }
}

經過測試咱們能夠發現

@RunWith(SpringRunner.class)
@SpringBootTest(classes = BootCoreTestApplication.class)
public class RabbitTest {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void sendAll() {
        rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, "topic.test", "send All");
    }

    @Test
    public void sendTopic() {
        rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC, "send Topic");
    }

    @Test
    public void sendTopicA() {
        rabbitTemplate.convertAndSend(RabbitTopicConfigure.EXCHANGE_NAME, RabbitTopicConfigure.TOPIC_A, "send TopicA");
    }

}

適用場景

- 分發有關於特定地理位置的數據,例如銷售點
- 由多個工做者(workers)完成的後臺任務,每一個工做者負責處理某些特定的任務
- 股票價格更新(以及其餘類型的金融數據更新)
- 涉及到分類或者標籤的新聞更新(例如,針對特定的運動項目或者隊伍)
- 雲端的不一樣種類服務的協調
- 分佈式架構/基於系統的軟件封裝,其中每一個構建者僅能處理一個特定的架構或者系統。

延遲隊列

  • 延遲消費:
    • 如用戶生成訂單以後,須要過一段時間校驗訂單的支付狀態,若是訂單仍未支付則須要及時地關閉訂單。

    • 用戶註冊成功以後,須要過一段時間好比一週後校驗用戶的使用狀況,若是發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。

  • 延遲重試:
    • 如消費者從隊列裏消費消息時失敗了,可是想要延遲一段時間後自動重試。

    • 若是不使用延遲隊列,那麼咱們只能經過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便作成統一的服務便於開發人員使用。可是使用延遲隊列的話,咱們就能夠垂手可得地完成。

設置交換機延遲屬性爲true

@Configuration
public class RabbitLazyConfigure {

    public static final String QUEUE_NAME = "lazy-queue-t";
    public static final String EXCHANGE_NAME = "lazy-exchange-t";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    @Bean
    public DirectExchange defaultExchange() {
        DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME, true, false);
        directExchange.setDelayed(true);
        return directExchange;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(QUEUE_NAME);
    }

}

發送時設置延遲時間便可

@Slf4j
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;


    public void sendLazy(Object msg) {
        log.debug("begin send lazy message<" + msg + ">");

        rabbitTemplate.convertAndSend(RabbitLazyConfigure.EXCHANGE_NAME,
                RabbitLazyConfigure.QUEUE_NAME, msg, message -> {
                    message.getMessageProperties().setHeader("x-delay", 10000);
                    return message;
                }
        );
    }
}

結束

各類使用案例請直接查看官方文檔

相關文章
相關標籤/搜索