Java秒殺系統實戰系列~整合RabbitMQ實現消息異步發送

摘要:

本篇博文是「Java秒殺系統實戰系列文章」的第八篇,在這篇文章中咱們將整合消息中間件RabbitMQ,包括添加依賴、加入配置信息以及自定義注入相關操做組件,好比RabbitTemplate等等,最終初步實現消息的發送和接收,並在下一篇章將其與郵件服務整合,實現「用戶秒殺成功發送郵件通知消息」的功能!git

內容:

對於消息中間件RabbitMQ,想必各位小夥伴沒有用過、也該有聽過,它是一款目前市面上應用至關普遍的消息中間件,能夠實現消息異步通訊、業務服務模塊解耦、接口限流、消息分發等功能,在微服務、分佈式系統架構中能夠說是充當着一名了不得的角色!(詳細的介紹,Debug在這裏就不贅述了,各位小夥伴能夠上官網看看其更多的介紹及其典型的應用場景)!spring

在本篇博文中,咱們將使用RabbitMQ充當消息發送的組件,將它與後面篇章介紹的「郵件服務」結合實現「用戶秒殺成功後異步發送郵件通知消息,告知用戶秒殺已經成功!」,下面咱們一塊兒進入代碼實戰吧。數據庫

(1)要使用RabbitMQ,前提得在本地開發環境或者服務器安裝RabbitMQ服務,以下圖所示爲Debug在本地安裝RabbitMQ服務成功後訪問其後端控制檯應用的首頁:後端


以後咱們開始將其與SpringBoot進行整合。首先須要加入其依賴,其版本號跟SpringBoot的版本一致,版本號爲1.5.7.RELEASE:bash

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>
複製代碼

而後須要在配置文件application.properties中加入RabbitMQ服務相關的配置,好比其服務所在的Host、端口Port等等:服務器

#rabbitmq
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.prefetch=10
複製代碼

(2)緊接着,咱們藉助SpringBoot自然具備的一些特性,自動注入RabbitMQ一些組件的配置,包括其「單一實例消費者」配置、「多實例消費者」配置以及用於發送消息的操做組件實例「RabbitTemplate」的配置:微信

//通用化 Rabbitmq 配置
@Configuration
public class RabbitmqConfig {
  private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class);

  @Autowired
  private Environment env;

  @Autowired
  private CachingConnectionFactory connectionFactory;

  @Autowired
  private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

  //單一消費者
  @Bean(name = "singleListenerContainer")
  public SimpleRabbitListenerContainerFactory listenerContainer(){
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory);
      factory.setMessageConverter(new Jackson2JsonMessageConverter());
      factory.setConcurrentConsumers(1);
      factory.setMaxConcurrentConsumers(1);
      factory.setPrefetchCount(1);
      factory.setTxSize(1);
      return factory;
  }

  //多個消費者
  @Bean(name = "multiListenerContainer")
  public SimpleRabbitListenerContainerFactory multiListenerContainer(){
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factoryConfigurer.configure(factory,connectionFactory);
      factory.setMessageConverter(new Jackson2JsonMessageConverter());
      //確認消費模式-NONE
      factory.setAcknowledgeMode(AcknowledgeMode.NONE);
      factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class));
      factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class));
      factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class));
      return factory;
  }

  @Bean
  public RabbitTemplate rabbitTemplate(){
      connectionFactory.setPublisherConfirms(true);
      connectionFactory.setPublisherReturns(true);
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setMandatory(true);
      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
          @Override
          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
              log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
          }
      });
      rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
          @Override
          public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
              log.warn("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
          }
      });
      return rabbitTemplate;
  }
}
複製代碼

在RabbitMQ的消息發送組件RabbitTemplate的配置中,咱們還特地加入了「消息發送確認」、「消息丟失回調」的輸出配置,即當消息正確進入到隊列後,即表明消息發送成功;當消息找不到對應的隊列(在某種程度上,其實也就是找不到交換機和路由)時,會輸出消息丟失。架構

(3)完了以後,咱們準備開始使用RabbitMQ實現消息的發送和接收。首先,咱們須要在RabbitmqConfig配置類中建立隊列、交換機、路由以及綁定等Bean組件,以下所示:併發

//構建異步發送郵箱通知的消息模型
    @Bean
    public Queue successEmailQueue(){
        return new Queue(env.getProperty("mq.kill.item.success.email.queue"),true);
    }

    @Bean
    public TopicExchange successEmailExchange(){
        return new TopicExchange(env.getProperty("mq.kill.item.success.email.exchange"),true,false);
    }

    @Bean
    public Binding successEmailBinding(){
        return BindingBuilder.bind(successEmailQueue()).to(successEmailExchange()).with(env.getProperty("mq.kill.item.success.email.routing.key"));
    }
複製代碼

其中,環境變量實例env讀取的那些屬性咱們是配置在application.properties文件中的,以下所示:app

mq.env=test

#秒殺成功異步發送郵件的消息模型
mq.kill.item.success.email.queue=${mq.env}.kill.item.success.email.queue
mq.kill.item.success.email.exchange=${mq.env}.kill.item.success.email.exchange
mq.kill.item.success.email.routing.key=${mq.env}.kill.item.success.email.routing.key
複製代碼

緊接着,咱們須要在通用的消息發送服務類 RabbitSenderService 中寫一段發送消息的方法,該方法用於接收「訂單編號」參數,而後在數據庫中查詢其對應的詳細訂單記錄,將該記錄充當「消息」併發送至RabbitMQ的隊列中,等待被監聽消費:

/**
* RabbitMQ通用的消息發送服務
* @Author:debug (SteadyJack)
* @Date: 2019/6/21 21:47
**/
@Service
public class RabbitSenderService {

  public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Autowired
  private Environment env;

  @Autowired
  private ItemKillSuccessMapper itemKillSuccessMapper;

  //秒殺成功異步發送郵件通知消息
  public void sendKillSuccessEmailMsg(String orderNo){
      log.info("秒殺成功異步發送郵件通知消息-準備發送消息:{}",orderNo);

      try {
          if (StringUtils.isNotBlank(orderNo)){
              KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderNo);
              if (info!=null){
                  //TODO:rabbitmq發送消息的邏輯
                  rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                  rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.email.exchange"));
                  rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.email.routing.key"));

                  //TODO:將info充當消息發送至隊列
                  rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                      @Override
                      public Message postProcessMessage(Message message) throws AmqpException {
                          MessageProperties messageProperties=message.getMessageProperties();
                          messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                          messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
                          return message;
                      }
                  });
              }
          }
      }catch (Exception e){
          log.error("秒殺成功異步發送郵件通知消息-發生異常,消息爲:{}",orderNo,e.fillInStackTrace());
      }
  }
}
複製代碼

(4)最後,是在通用的消息接收服務類RabbitReceiverService中實現消息的接收,其完整的源代碼以下所示:

/**
 * RabbitMQ通用的消息接收服務
 * @Author:debug (SteadyJack)
 * @Date: 2019/6/21 21:47
 **/
@Service
public class RabbitReceiverService {

  public static final Logger log= LoggerFactory.getLogger(RabbitReceiverService.class);

  @Autowired
  private MailService mailService;

  @Autowired
  private Environment env;

  @Autowired
  private ItemKillSuccessMapper itemKillSuccessMapper;

  //秒殺異步郵件通知-接收消息
  @RabbitListener(queues = {"${mq.kill.item.success.email.queue}"},containerFactory = "singleListenerContainer")
  public void consumeEmailMsg(KillSuccessUserInfo info){
      try {
          log.info("秒殺異步郵件通知-接收消息:{}",info);
      //到時候這裏將整合郵件服務發送郵件通知消息的邏輯

      }catch (Exception e){
          log.error("秒殺異步郵件通知-接收消息-發生異常:",e.fillInStackTrace());
      }
  }
}
複製代碼

至此,關於SpringBoot整合消息中間件RabbitMQ的代碼實戰,本篇文章就介紹到這裏了。

最後一點,咱們須要進行測試,即用戶在界面發起「搶購」的請求操做以後,若是能秒殺成功,則RabbitMQ會發送、接收一條消息,以下所示:


好了,關於RabbitMQ的使用,本文到此就暫且告一段落了,在下一篇文章中咱們將把它與郵件服務進行整合,實現「用戶秒殺成功後異步發送郵件通知消息給到用戶郵箱」的功能!除此以外,咱們還將在後面的篇章介紹「如何使用RabbitMQ的死信隊列,處理用戶下單成功後卻超時未支付的訂單~在那裏咱們將採起失效的操做」。

補充:

一、目前,這一秒殺系統的總體構建與代碼實戰已經所有完成了,完整的源代碼數據庫地址能夠來這裏下載:gitee.com/steadyjack/… 記得Fork跟Star啊!!

二、最後,不要忘記了關注一下Debug的技術微信公衆號:

相關文章
相關標籤/搜索