SpringBoot2.0 整合 RocketMQ ,實現請求異步處理

1、RocketMQ

一、架構圖片

二、角色分類

(1)、Brokerjava

RocketMQ 的核心,接收 Producer 發過來的消息、處理 Consumer 的消費消息請求、消息的持 久化存儲、服務端過濾功能等 。spring

(2)、NameServerapache

消息隊列中的狀態服務器,集羣的各個組件經過它來了解全局的信息 。相似微服務中註冊中心的服務註冊,發現,下線,上線的概念。緩存

熱備份: NamServer能夠部署多個,相互之間獨立,其餘角色同時向多個NameServer 機器上報狀態信息。服務器

心跳機制: NameServer 中的 Broker、 Topic等狀態信息不會持久存儲,都是由各個角色定時上報並存儲到內存中,超時不上報的話, NameServer會認爲某個機器出故障不可用。微信

(3)、Producer架構

消息的生成者,最經常使用的producer類就是DefaultMQProducer。負載均衡

(4)、Consumerjvm

消息的消費者,經常使用Consumer類 DefaultMQPushConsumer 收到消息後自動調用傳入的處理方法來處理,實時性高 DefaultMQPullConsumer 用戶自主控制 ,靈活性更高。ide

三、通訊機制

(1)、Broker啓動後須要完成一次將本身註冊至NameServer的操做;隨後每隔30s時間定時向NameServer更新Topic路由信息。

(2)、Producer發送消息時候,須要根據消息的Topic從本地緩存的獲取路由信息。若是沒有則更新路由信息會從NameServer從新拉取,同時Producer會默認每隔30s向NameServer拉取一次路由信息。

(3)、Consumer消費消息時候,從NameServer獲取的路由信息,並再完成客戶端的負載均衡後,監聽指定消息隊列獲取消息並進行消費。

2、代碼實現案例

一、項目結構圖

版本描述

<spring-boot.version>2.1.3.RELEASE</spring-boot.version>
<rocketmq.version>4.3.0</rocketmq.version>

二、配置文件

rocketmq:
  # 生產者配置
  producer:
    isOnOff: on
    # 發送同一類消息的設置爲同一個group,保證惟一
    groupName: FeePlatGroup
    # 服務地址
    namesrvAddr: 10.1.1.207:9876
    # 消息最大長度 默認1024*4(4M)
    maxMessageSize: 4096
    # 發送消息超時時間,默認3000
    sendMsgTimeout: 3000
    # 發送消息失敗重試次數,默認2
    retryTimesWhenSendFailed: 2
  # 消費者配置
  consumer:
    isOnOff: on
    # 官方建議:確保同一組中的每一個消費者訂閱相同的主題。
    groupName: FeePlatGroup
    # 服務地址
    namesrvAddr: 10.1.1.207:9876
    # 接收該 Topic 下全部 Tag
    topics: FeePlatTopic~*;
    consumeThreadMin: 20
    consumeThreadMax: 64
    # 設置一次消費消息的條數,默認爲1條
    consumeMessageBatchMaxSize: 1

# 配置 Group  Topic  Tag
fee-plat:
  fee-plat-group: FeePlatGroup
  fee-plat-topic: FeePlatTopic
  fee-account-tag: FeeAccountTag

三、生產者配置

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * RocketMQ 生產者配置
 */
@Configuration
public class ProducerConfig {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.producer.maxMessageSize}")
    private Integer maxMessageSize ;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private Integer sendMsgTimeout;
    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer retryTimesWhenSendFailed;
    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        //若是須要同一個jvm中不一樣的producer往不一樣的mq集羣發送消息,須要設置不一樣的instanceName
        if(this.maxMessageSize!=null){
            producer.setMaxMessageSize(this.maxMessageSize);
        }
        if(this.sendMsgTimeout!=null){
            producer.setSendMsgTimeout(this.sendMsgTimeout);
        }
        //若是發送消息失敗,設置重試次數,默認爲2次
        if(this.retryTimesWhenSendFailed!=null){
            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        }
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return producer;
    }
}

四、消費者配置

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
 * RocketMQ 消費者配置
 */
@Configuration
public class ConsumerConfig {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.topics}")
    private String topics;
    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int consumeMessageBatchMaxSize;
    @Resource
    private RocketMsgListener msgListener;
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(msgListener);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        try {
            String[] topicTagsArr = topics.split(";");
            for (String topicTags : topicTagsArr) {
                String[] topicTag = topicTags.split("~");
                consumer.subscribe(topicTag[0],topicTag[1]);
            }
            consumer.start();
        }catch (MQClientException e){
            e.printStackTrace();
        }
        return consumer;
    }
}

五、消息監聽配置

import com.rocket.queue.service.impl.ParamConfigService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
/**
 * 消息消費監聽
 */
@Component
public class RocketMsgListener implements MessageListenerConcurrently {
    private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;
    @Resource
    private ParamConfigService paramConfigService ;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        if (CollectionUtils.isEmpty(list)){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        LOG.info("接受到的消息爲:"+new String(messageExt.getBody()));
        int reConsume = messageExt.getReconsumeTimes();
        // 消息已經重試了3次,若是不須要再次消費,則返回成功
        if(reConsume ==3){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if(messageExt.getTopic().equals(paramConfigService.feePlatTopic)){
            String tags = messageExt.getTags() ;
            switch (tags){
                case "FeeAccountTag":
                    LOG.info("開戶 tag == >>"+tags);
                    break ;
                default:
                    LOG.info("未匹配到Tag == >>"+tags);
                    break;
            }
        }
        // 消息消費成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

六、配置參數綁定

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ParamConfigService {
    @Value("${fee-plat.fee-plat-group}")
    public String feePlatGroup ;
    @Value("${fee-plat.fee-plat-topic}")
    public String feePlatTopic ;
    @Value("${fee-plat.fee-account-tag}")
    public String feeAccountTag ;
}

七、消息發送測試

import com.rocket.queue.service.FeePlatMqService;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class FeePlatMqServiceImpl implements FeePlatMqService {
    @Resource
    private DefaultMQProducer defaultMQProducer;
    @Resource
    private ParamConfigService paramConfigService ;
    @Override
    public SendResult openAccountMsg(String msgInfo) {
        // 能夠不使用Config中的Group
        defaultMQProducer.setProducerGroup(paramConfigService.feePlatGroup);
        SendResult sendResult = null;
        try {
            Message sendMsg = new Message(paramConfigService.feePlatTopic,
                                          paramConfigService.feeAccountTag,
                                         "fee_open_account_key", msgInfo.getBytes());
            sendResult = defaultMQProducer.send(sendMsg);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return sendResult ;
    }
}

歡迎你們關注個人我的微信公衆號,裏面不只有技術分享,還有行業趣事,讓您的生活變得更加有趣,點關注,不迷路!

相關文章
相關標籤/搜索