paascloud開源項目學習(1) -- 用戶郵箱註冊可靠消息服務流程

用戶註冊

本篇文章目的是理解該項目 可靠消息服務中心(TCP)發送消息、消費消息的流程,用戶註冊發送激活郵箱和激活後發送註冊成功郵箱都是利用可靠消息服務來解決分佈式事務,理解了該流程也就弄懂了該項目中其餘業務流程。

發送激活郵箱過程

  • 消息生產端:UAC
  • 可靠消息服務:TPC
  • 消息服務端:OPC
用戶註冊後,向註冊郵箱發送一封激活郵箱。

消息生產端(UAC)

大體流程爲:java

  1. 本地服務 UAC 先持久化 預發送消息(等待確認消息),表 pc_mq_message_data
  2. 調用遠端可靠消息服務TPC持久化預發送消息,可靠消息表pc_tpc_mq_message
  3. 執行本地事務保存用戶信息
  4. 調用遠端可靠消息服務TPC更新第2步中的等待確認狀態爲發送中sending
  5. 同時建立消費待確認列表,即持久化該Topic類型的消息被哪些消費者訂閱監聽的全部消費待確認列表,狀態爲未確認,表pc_tpc_mq_confirm
  6. 完成上面操做後,發送消息到 RocketMQ

controller層

  • AuthRestController.java
@PostMapping(value = "/register")
@ApiOperation(httpMethod = "POST", value = "註冊用戶")
public Wrapper registerUser(UserRegisterDto user) {
    uacUserService.register(user);
    return WrapMapper.ok();
}

service層

  1. 用戶ID生成:雪花算法生成分佈式惟一 ID
  2. 用戶密碼加密SpringSecurity BCryptPasswordEncoder 強哈希方法加密,每次加密的結果都不同。
bcrypt 能夠有效抵禦彩虹表暴力破解,其原理就是 在加鹽的基礎上屢次 hash,關於密碼參考: https://mp.weixin.qq.com/s/Dk...
  1. Redis存儲激活郵箱token:key(active_token):email:過時時間1天,即激活接口參數:activeUserToken
  2. 生成郵件發送模板(freeMarker):activeUserTemplate.ftl
  3. 根據上面模板和發送郵件參數生成實體:MqMessageData(pc_mq_message_data)
各個子系統消息落地的消息表,好比 用戶服務系統主要就是 郵件消息、短信消息等。
@Override
public void register(UserRegisterDto registerDto) {
    // 校驗註冊信息
    validateRegisterInfo(registerDto);
    String mobileNo = registerDto.getMobileNo();
    String email = registerDto.getEmail();
    Date row = new Date();
    String salt = String.valueOf(generateId());
    // 封裝註冊信息
    long id = generateId();    // id 雪花算法生成
    UacUser uacUser = new UacUser();
    uacUser.setLoginName(registerDto.getLoginName());
    uacUser.setSalt(salt);
    uacUser.setLoginPwd(Md5Util.encrypt(registerDto.getLoginPwd()));
    uacUser.setMobileNo(mobileNo);
    uacUser.setStatus(UacUserStatusEnum.DISABLE.getKey());
    uacUser.setUserSource(UacUserSourceEnum.REGISTER.getKey());
    uacUser.setCreatedTime(row);
    uacUser.setUpdateTime(row);
    uacUser.setEmail(email);
    uacUser.setId(id);
    uacUser.setCreatorId(id);
    uacUser.setCreator(registerDto.getLoginName());
    uacUser.setLastOperatorId(id);
    uacUser.setUserName(registerDto.getLoginName());
    uacUser.setLastOperator(registerDto.getLoginName());

    // 發送激活郵件
    String activeToken = PubUtils.uuid() + super.generateId();
    redisService.setKey(RedisKeyUtil.getActiveUserKey(activeToken), email, 1, TimeUnit.DAYS);

    Map<String, Object> param = Maps.newHashMap();
    param.put("loginName", registerDto.getLoginName());
    param.put("email", registerDto.getEmail());
    param.put("activeUserUrl", activeUserUrl + activeToken);
    param.put("dateTime", DateUtil.formatDateTime(new Date()));

    Set<String> to = Sets.newHashSet();
    to.add(registerDto.getEmail());

    MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER, param);
    // 即下面的第6步
    userManager.register(mqMessageData, uacUser);
}
  1. userManager.register() 經過註解 @MqProducerStore 發送消息服務。
執行該方法前,先進入切面編程
@MqProducerStore
public void register(final MqMessageData mqMessageData, final UacUser uacUser) {
    log.info("註冊用戶. mqMessageData={}, user={}", mqMessageData, uacUser);
    uacUserMapper.insertSelective(uacUser);
}
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface MqProducerStore {
    // WAIT_CONFIRM:等待確認;SAVE_AND_SEND:直接發送;
    MqSendTypeEnum sendType() default MqSendTypeEnum.WAIT_CONFIRM;
    // ORDER(1):有序;DIS_ORDER(0):無序
    MqOrderTypeEnum orderType() default MqOrderTypeEnum.ORDER;
    // Rocketmq 默認延時級別
    // ZERO(0, 不延時);ONE(1, 1秒)....EIGHTEEN(18, 2小時)
    DelayLevelEnum delayLevel() default DelayLevelEnum.ZERO;
}
  1. 切面中,由於郵件激活發送消息類型爲默認的:等待確認mysql

    • 此處本地服務 UAC 消息落地:保存待確認消息 MqMessageData 到 mysql,表pc_mq_message_data
    • 發送待確認消息到可靠消息系統(TPC):發送預發送狀態的消息給消息中心
// 切面
MqMessageData domain = null;
for (Object object : args) {
    if (object instanceof MqMessageData) {
        domain = (MqMessageData) object;
        break;
    }
}
domain.setOrderType(orderType);
domain.setProducerGroup(producerGroup);
// 1. 等待確認
if (type == MqSendTypeEnum.WAIT_CONFIRM) {
    if (delayLevelEnum != DelayLevelEnum.ZERO) {
        domain.setDelayLevel(delayLevelEnum.delayLevel());
    }
    // 1.1 發送待確認消息到可靠消息系統
    // 本地服務消息落地,可靠消息服務中心也持久化預發送消息,可是不發送
    mqMessageService.saveWaitConfirmMessage(domain);
}
result = joinPoint.proceed();    // 返回註解方法,執行業務
@Override
@Transactional(rollbackFor = Exception.class)
public void saveWaitConfirmMessage(final MqMessageData mqMessageData) {
    // 1. 持久化到本地mysql
    this.saveMqProducerMessage(mqMessageData);
    // 2. 發送預發送狀態的消息給消息中心
    TpcMqMessageDto tpcMqMessageDto = mqMessageData.getTpcMqMessageDto();
    // 3. 調用遠端可靠消息服務(tpc),持久化等待確認消息
    tpcMqMessageFeignApi.saveMessageWaitingConfirm(tpcMqMessageDto);
    // 4. mqMessageData 此時爲調用遠端服務返回來的數據
    log.info("<== saveWaitConfirmMessage - 存儲預發送消息成功. messageKey={}", mqMessageData.getMessageKey());
}
  1. 緊接着第7步,調用遠端可靠消息服務(TCP),此時只是持久化預發送消息,可是沒有發送(等執行完本地事務即保存用戶後在發送,即第9步)
持久化 TpcMqMessage,即 pc_tpc_mq_message(可靠消息表)
@Override
public void saveMessageWaitingConfirm(TpcMqMessageDto messageDto) {

    if (StringUtils.isEmpty(messageDto.getMessageTopic())) {
        throw new TpcBizException(ErrorCodeEnum.TPC10050001);
    }

    Date now = new Date();
    TpcMqMessage message = new ModelMapper().map(messageDto, TpcMqMessage.class);
    // 消息狀態:WAIT_SEND(10, "未發送");SENDING(20, "已發送");FINISH(30, "已完成");
    message.setMessageStatus(MqSendStatusEnum.WAIT_SEND.sendStatus());
    message.setUpdateTime(now);
    message.setCreatedTime(now);
    tpcMqMessageMapper.insertSelective(message);
}
  1. 上面執行完後,返回註解 @MqProducerStore所在方法,執行本地事務:保存用戶到 mysql
result = joinPoint.proceed();    // 返回註解方法,執行業務
  1. 第9步執行完後,再次進入切面,發送確認消息給可靠消息服務中心
result = joinPoint.proceed();    // 返回註解方法,執行業務
// 2. 直接發送
if (type == MqSendTypeEnum.SAVE_AND_SEND) {
    mqMessageService.saveAndSendMessage(domain);
// 3. XXX
} else if (type == MqSendTypeEnum.DIRECT_SEND) {
    mqMessageService.directSendMessage(domain);
} else {    // type = WAIT_CONFIRM
    final MqMessageData finalDomain = domain;
    taskExecutor.execute(() -> mqMessageService.confirmAndSendMessage(finalDomain.getMessageKey()));
}
return result;
  1. 緊接着上面,可靠消息服務中心(TCP):根據傳過來的 messageKey 確認併發送以前已經持久化的預發送消息。
// TpcMqMessageFeignClient.java
@Override
@ApiOperation(httpMethod = "POST", value = "確認併發送消息")
public Wrapper confirmAndSendMessage(@RequestParam("messageKey") String messageKey) {
    logger.info("確認併發送消息. messageKey={}", messageKey);
    tpcMqMessageService.confirmAndSendMessage(messageKey);
    return WrapMapper.ok();
}

// TpcMqMessageServiceImpl.java
@Override
public void confirmAndSendMessage(String messageKey) {
    final TpcMqMessage message = tpcMqMessageMapper.getByMessageKey(messageKey);
    if (message == null) {
        throw new TpcBizException(ErrorCodeEnum.TPC10050002);
    }

    TpcMqMessage update = new TpcMqMessage();
    update.setMessageStatus(MqSendStatusEnum.SENDING.sendStatus());
    update.setId(message.getId());
    update.setUpdateTime(new Date());
    // 1. 更新消息狀態爲:SENDING
    tpcMqMessageMapper.updateByPrimaryKeySelective(update);
    // 2. 建立消費待確認列表(此處topic:SEND_EMAIL_TOPIC)
    this.createMqConfirmListByTopic(message.getMessageTopic(), message.getId(), message.getMessageKey());
    // 3. 直接發送消息
    this.directSendMessage(message.getMessageBody(), message.getMessageTopic(),
     message.getMessageTag(), message.getMessageKey(), message.getProducerGroup(), message.getDelayLevel());
}
  1. 第11步中的第2點TCP 服務中,建立消費待確認列表,根據表 pc_tpc_mq_subscribe,查詢出不一樣 topic 下相對應的全部 consumer_code(消費監聽者),即設置該消息被哪些服務(CID)監聽消費;

pc_tpc_mq_subscribe

- ``SEND_EMAIL_TOPIC`` --> ``CID_OPC``:該消息會被 `consumerGroup` 爲 `CID_OPC` 的服務監聽並消費。
- 同時,保存**確認消息**:``TpcMqConfirm`` --> 表 ``pc_tpc_mq_confirm``
@Override
public void createMqConfirmListByTopic(final String topic, final Long messageId, final String messageKey) {
    List<TpcMqConfirm> list = Lists.newArrayList();
    TpcMqConfirm tpcMqConfirm;
    List<String> consumerGroupList = tpcMqConsumerService.listConsumerGroupByTopic(topic);
    if (PublicUtil.isEmpty(consumerGroupList)) {
        throw new TpcBizException(ErrorCodeEnum.TPC100500010, topic);
    }
    for (final String cid : consumerGroupList) {
        tpcMqConfirm = new TpcMqConfirm(UniqueIdGenerator.generateId(), messageId, messageKey, cid);
        list.add(tpcMqConfirm);
    }

    tpcMqConfirmMapper.batchCreateMqConfirm(list);
}
  1. 第11步中的第3點:完成上面操做後,直接發送消息到中間件 RocketMQ 隊列中。
@Override
public void directSendMessage(String body, String topic, String tag, String key,
        String pid, Integer delayLevel) {
    RocketMqProducer.sendSimpleMessage(body, topic, tag, key, pid, delayLevel);
}

// 核心方法:重試發送消息(重試次數3次)
// pid:producerGroup --> 發送郵件服務是 PID_UAC
// cid: consumerGroup --> 監聽郵件消息服務是 CID_OPC
private static SendResult retrySendMessage(String pid, Message msg) {
    int iniCount = 1;
    SendResult result;
    while (true) {
        try {
            // Message中屬性
            result = MqProducerBeanFactory.getBean(pid).send(msg);
            break;
        } catch (Exception e) {
            log.error("發送消息失敗:", e);
            if (iniCount++ >= PRODUCER_RETRY_TIMES) {
                throw new TpcBizException(ErrorCodeEnum.TPC100500014, msg.getTopic(), msg.getKeys());
            }
        }
    }
    log.info("<== 發送MQ SendResult={}", result);
    return result;
}

消息消費端(OPC)

大體流程:git

  1. OPC服務經過配置類AliyunMqConfiguration.java啓動DefaultMQPushConsumer RocketMQ 消費端,並設置消息邏輯處理監聽器OptPushMessageListener
  2. 本地服務OPC持久化消費者確認消息,表 pc_mq_message_data
  3. 調用遠端可靠消息服務TPC,更新以前生產端持久化的消費確認列表狀態,未確認 --> 已確認,表pc_tpc_mq_confirm
  4. 接着就能夠發送激活郵箱
  5. 若是發送成功,調用遠端可靠消息服務TPC,繼續更新第3步表中消費確認消息的狀態爲已消費

消費端 RocketMQ 啓動配置類

  1. DefaultMQPushConsumer 根據配置信息啓動;
  2. subscribe 訂閱 該服務 OPC 全部的 Topictags 消息。
包括短信、郵箱激活、附件更新刪除等全部消息。
@Slf4j
@Configuration
public class AliyunMqConfiguration {

    @Resource
    private PaascloudProperties paascloudProperties;

    @Resource
    private OptPushMessageListener optPushConsumer;

    @Resource
    private TaskExecutor taskExecutor;

    /**
     * Default mq push consumer default mq push consumer.
     *
     * @return the default mq push consumer
     *
     * @throws MQClientException the mq client exception
     */
    @Bean
    public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
        // 1. 新建消費者組
        // RocketMQ實際上都是拉模式,這裏的DefaultMQPushConsumer實現了推模式,
        // 也只是對拉消息服務作了一層封裝,即拉到消息的時候觸發業務消費者註冊到這裏的callback
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(paascloudProperties.getAliyun().getRocketMq().getConsumerGroup());
        // 2. 指定NameServer地址,多個地址以 ; 隔開
        consumer.setNamesrvAddr(paascloudProperties.getAliyun().getRocketMq().getNamesrvAddr());
        // 3. 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費
        // 若是非第一次啓動,那麼按照上次消費的位置繼續消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        String[] strArray = AliyunMqTopicConstants.ConsumerTopics.OPT.split(GlobalConstant.Symbol.COMMA);
        for (String aStrArray : strArray) {
            String[] topicArray = aStrArray.split(GlobalConstant.Symbol.AT);
            String topic = topicArray[0];
            String tags = topicArray[1];
            if (PublicUtil.isEmpty(tags)) {
                tags = "*";
            }
            // 4. 進行Topic訂閱,訂閱PushTopic下Tag爲push的消息
            consumer.subscribe(topic, tags);
            log.info("RocketMq OpcPushConsumer topic = {}, tags={}", topic, tags);
        }

        // 5. 設置消息處理器
        consumer.registerMessageListener(optPushConsumer);
        consumer.setConsumeThreadMax(2);
        consumer.setConsumeThreadMin(2);

        taskExecutor.execute(() -> {
            try {
                Thread.sleep(5000);
                consumer.start();
                log.info("RocketMq OpcPushConsumer OK.");
            } catch (InterruptedException | MQClientException e) {
                log.error("RocketMq OpcPushConsumer, 出現異常={}", e.getMessage(), e);
            }
        });
        return consumer;
    }
}

消息邏輯處理監聽器

  1. consumeMessage() 上有註解 @MqConsumerStore,執行前先進入切面編程;
@Slf4j
@Component
public class OptPushMessageListener implements MessageListenerConcurrently {

    @Resource
    private OptSendSmsTopicConsumer optSendSmsTopicService;
    @Resource
    private OptSendEmailTopicConsumer optSendEmailTopicService;
    @Resource
    private MdcTopicConsumer mdcTopicConsumer;

    @Resource
    private MqMessageService mqMessageService;
    @Resource
    private StringRedisTemplate srt;

    /**
     * Consume message consume concurrently status.
     *
     * @param messageExtList             the message ext list
     * @param consumeConcurrentlyContext the consume concurrently context
     *
     * @return the consume concurrently status
     */
    @Override
    @MqConsumerStore
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MessageExt msg = messageExtList.get(0);
        String body = new String(msg.getBody());
        String topicName = msg.getTopic();
        String tags = msg.getTags();
        String keys = msg.getKeys();
        log.info("MQ消費Topic={},tag={},key={}", topicName, tags, keys);
        ValueOperations<String, String> ops = srt.opsForValue();
        // 控制冪等性使用的key
        try {
            MqMessage.checkMessage(body, topicName, tags, keys);
            String mqKV = null;
            if (srt.hasKey(keys)) {
                mqKV = ops.get(keys);
            }
            if (PublicUtil.isNotEmpty(mqKV)) {
                log.error("MQ消費Topic={},tag={},key={}, 重複消費", topicName, tags, keys);

                // 消費成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            if (AliyunMqTopicConstants.MqTopicEnum.SEND_SMS_TOPIC.getTopic().equals(topicName)) {
                optSendSmsTopicService.handlerSendSmsTopic(body, topicName, tags, keys);
            }
            if (AliyunMqTopicConstants.MqTopicEnum.SEND_EMAIL_TOPIC.getTopic().equals(topicName)) {
                optSendEmailTopicService.handlerSendEmailTopic(body, topicName, tags, keys);
            }
            if (AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic().equals(topicName)) {
                mqMessageService.deleteMessageTopic(body, tags);
            }
            if (AliyunMqTopicConstants.MqTopicEnum.MDC_TOPIC.getTopic().equals(topicName)) {
                mdcTopicConsumer.handlerSendSmsTopic(body, topicName, tags, keys);
            } else {
                log.info("OPC訂單信息消 topicName={} 不存在", topicName);
            }
        } catch (IllegalArgumentException ex) {
            log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex);
        } catch (Exception e) {
            log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e);
            // 若是消息消費失敗,例如數據庫異常等,扣款失敗,發送失敗須要重試的場景,
            // 返回下面代碼,RocketMQ就認爲消費失敗。
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        ops.set(keys, keys, 10, TimeUnit.DAYS);
        // 業務實現消費回調的時候,當且僅當返回下面代碼時,RocketMQ纔會認爲這批消息是消費完成的
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
  1. 執行方法以前先進入切面編程執行,獲取註解方法的參數和消息;
@Around(value = "mqConsumerStoreAnnotationPointcut()")
public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable {
    // ...
    MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0));
    final String messageKey = dto.getMessageKey();
    if (isStorePreStatus) {
        // 執行下面三、4步
        mqMessageService.confirmReceiveMessage(consumerGroup, dto);
    }
    String methodName = joinPoint.getSignature().getName();
    try {
        // 返回註解方法;
        result = joinPoint.proceed();
        log.info("result={}", result);
        if (CONSUME_SUCCESS.equals(result.toString())) {
            mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey);
        }
    } catch (Exception e) {
        log.error("發送可靠消息, 目標方法[{}], 出現異常={}", methodName, e.getMessage(), e);
        throw e;
    } finally {
        log.info("發送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime);
    }
    return result;
}
  1. confirmReceiveMessage消費者確認收到消息;在上面目錄【發送激活郵箱的消息/service層】的第 12 步
持久化 消費者確認消息 MqMessageData 到本地服務 OPC mysql 中,表 pc_mq_message_data
@Override
@Transactional(rollbackFor = Exception.class)
public void confirmReceiveMessage(String cid, MqMessageData messageData) {
    final String messageKey = messageData.getMessageKey();
    log.info("confirmReceiveMessage - 消費者={}, 確認收到key={}的消息", cid, messageKey);
    // 持久化消費者確認消息 MqMessageData 到本地服務 mysql 中,表 pc_mq_message_data
    messageData.setMessageType(MqMessageTypeEnum.CONSUMER_MESSAGE.messageType());
    messageData.setId(UniqueIdGenerator.generateId());
    mqMessageDataMapper.insertSelective(messageData);
    // 調用遠端服務 TPC,更新確認收到消息表狀態爲已確認,TpcMqConfirm,表 pc_tpc_mq_confirm;
    Wrapper wrapper = tpcMqMessageFeignApi.confirmReceiveMessage(cid, messageKey);
    log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper);
    if (wrapper == null) {
        throw new TpcBizException(ErrorCodeEnum.GL99990002);
    }
    if (wrapper.error()) {
        throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey);
    }
}
  1. 緊接着第3步,調用遠端服務 TPC,更新確認收到消息狀態爲已確認TpcMqConfirm,表 pc_tpc_mq_confirmgithub

    • status:狀態, 10 - 未確認 ; 20 - 已確認; 30 已消費;
    • consumeCount:消費的次數,加 1;
@Override
public void confirmReceiveMessage(final String cid, final String messageKey) {
    // 1. 校驗cid
    // 2. 校驗messageKey
    // 3. 校驗cid 和 messageKey
    Long confirmId = tpcMqConfirmMapper.getIdMqConfirm(cid, messageKey);
    // 3. 更新消費信息的狀態
    tpcMqConfirmMapper.confirmReceiveMessage(confirmId);
}
  1. 第三、4步執行後,返回切面,執行下面代碼,再返回註解修飾的方法;
result = joinPoint.proceed();
  1. 註解修飾方法,經過參數 MessageExt 獲取該消息的 topic(主題)tag(標籤)keys(惟一鍵)body(消息體)
  2. 冪等性(避免重複消費)redis 中存儲消費過的該消息的 keys
  3. 根據消息的 topic 執行相應的操做處理該消息
好比此流程的發送激活郵箱,使用 spring 框架的 TaskExecutor執行郵箱發送任務。
@Override
public int sendSimpleMail(String subject, String text, Set<String> to) {
    log.info("sendSimpleMail - 發送簡單郵件. subject={}, text={}, to={}", subject, text, to);
    int result = 1;
    try {
        SimpleMailMessage message = MailEntity.createSimpleMailMessage(subject, text, to);
        message.setFrom(from);
        taskExecutor.execute(() -> mailSender.send(message));
    } catch (Exception e) {
        log.info("sendSimpleMail [FAIL] ex={}", e.getMessage(), e);
        result = 0;
    }
    return result;
}
  1. 第8步 若是消息消費成功,郵件發送成功,redis 中存儲該消息(冪等,過時時間 10 天),同時返回消費成功代碼;
ops.set(keys, keys, 10, TimeUnit.DAYS);
// 業務實現消費回調的時候,當且僅當返回下面代碼時,RocketMQ 纔會認爲這批消息是消費完成的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  1. 第8步 若是消息消費失敗,好比數據庫異常,扣款失敗,郵件發送失敗等須要重試的場景,返回重試消費代碼。
} catch (IllegalArgumentException ex) {
    log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex);
} catch (Exception e) {
    log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e);
    // 若是消息消費失敗,例如數據庫異常等,扣款失敗,發送失敗須要重試的場景,
    // 返回下面代碼,RocketMQ就認爲消費失敗。
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
  1. 執行完註解修飾方法,再次返回切面中,繼續執行,判斷返回結果;
String methodName = joinPoint.getSignature().getName();
    try {
        result = joinPoint.proceed();
        log.info("result={}", result);
        if (CONSUME_SUCCESS.equals(result.toString())) {
            mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey);
        }
    } catch (Exception e) {
        log.error("發送可靠消息, 目標方法[{}], 出現異常={}", methodName, e.getMessage(), e);
        throw e;
    } finally {
        log.info("發送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime);
    }
    return result;
  1. 第11步中,若是返回 CONSUME_SUCCESS,保存並確認消息完成;redis

    • 調用遠端服務 TCP,更新消費確認消息列表 pc_tpc_mq_confirm,狀態爲已消費
@Override
public void saveAndConfirmFinishMessage(String cid, String messageKey) {
    // 1. 調用遠端服務tcp,確認完成消費消息
    Wrapper wrapper = tpcMqMessageFeignApi.confirmConsumedMessage(cid, messageKey);
    log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper);
    if (wrapper == null) {
        throw new TpcBizException(ErrorCodeEnum.GL99990002);
    }
    if (wrapper.error()) {
        throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey);
    }
}

發送激活成功郵箱過程

發送激活成功郵箱同上面 發送激活郵箱同樣利用 可靠消息服務完成分佈式事務操做。

controller層

@GetMapping(value = "/activeUser/{activeUserToken}")
@ApiOperation(httpMethod = "POST", value = "激活用戶")
public Wrapper activeUser(@PathVariable String activeUserToken) {
    uacUserService.activeUser(activeUserToken);
    return WrapMapper.ok("激活成功");
}

service 層

  1. activeuser()
@Override
    public void activeUser(String activeUserToken) {
        Preconditions.checkArgument(!StringUtils.isEmpty(activeUserToken), "激活用戶失敗");

        String activeUserKey = RedisKeyUtil.getActiveUserKey(activeUserToken);

        String email = redisService.getKey(activeUserKey);

        if (StringUtils.isEmpty(email)) {
            throw new UacBizException(ErrorCodeEnum.UAC10011030);
        }
        // 修改用戶狀態, 綁定訪客角色
        UacUser uacUser = new UacUser();
        uacUser.setEmail(email);

        uacUser = uacUserMapper.selectOne(uacUser);
        if (uacUser == null) {
            logger.error("找不到用戶信息. email={}", email);
            throw new UacBizException(ErrorCodeEnum.UAC10011004, email);
        }

        UacUser update = new UacUser();
        update.setId(uacUser.getId());
        update.setStatus(UacUserStatusEnum.ENABLE.getKey());
        LoginAuthDto loginAuthDto = new LoginAuthDto();
        loginAuthDto.setUserId(uacUser.getId());
        loginAuthDto.setUserName(uacUser.getLoginName());
        loginAuthDto.setLoginName(uacUser.getLoginName());
        update.setUpdateInfo(loginAuthDto);

        UacUser user = this.queryByUserId(uacUser.getId());

        Map<String, Object> param = Maps.newHashMap();
        param.put("loginName", user.getLoginName());
        param.put("dateTime", DateUtil.formatDateTime(new Date()));

        Set<String> to = Sets.newHashSet();
        to.add(user.getEmail());

        // 構建激活成功消息體
        MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER_SUCCESS, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER_SUCCESS, param);
        // 1. 可靠消息服務發送郵件
        userManager.activeUser(mqMessageData, update, activeUserKey);
    }
  1. 調用userManager.activeUser():能夠看到該方法也是註解@MqProducerStore修飾;
@MqProducerStore
    public void activeUser(final MqMessageData mqMessageData, final UacUser uacUser, final String activeUserKey) {
        log.info("激活用戶. mqMessageData={}, user={}", mqMessageData, uacUser);
        // 更新用戶信息
        int result = uacUserMapper.updateByPrimaryKeySelective(uacUser);
        if (result < 1) {
            throw new UacBizException(ErrorCodeEnum.UAC10011038, uacUser.getId());
        }

        // 綁定一個訪客角色默認值roleId=10000
        final Long userId = uacUser.getId();
        Preconditions.checkArgument(userId != null, "用戶Id不能爲空");

        final Long roleId = 10000L;

        UacRoleUser roleUser = new UacRoleUser();
        roleUser.setUserId(userId);
        roleUser.setRoleId(roleId);
        uacRoleUserMapper.insertSelective(roleUser);
        // 綁定一個組織
        UacGroupUser groupUser = new UacGroupUser();
        groupUser.setUserId(userId);
        groupUser.setGroupId(GlobalConstant.Sys.SUPER_MANAGER_GROUP_ID);
        uacGroupUserMapper.insertSelective(groupUser);
        // 刪除 activeUserToken
        redisService.deleteKey(activeUserKey);
    }
  1. 本地事務執行用戶信息更新和 redis 郵箱激活token刪除,切面編程發送激活成功郵箱分析過程和上面發送激活郵箱流程是同樣的,這裏再也不贅述。
  2. 這兩個過程根據發送消息的 tag 不一樣,從而處理邏輯不一樣。Topic 都是 SEND_EMAIL_TOPIC
此處具體爲郵箱內容模板不一樣,其他消息生產端和消費端流程同樣。
public enum MqTagEnum {

        /**
         * 激活用戶.
         */
        ACTIVE_USER("ACTIVE_USER", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶"),
        /**
         * 激活用戶成功.
         */
        ACTIVE_USER_SUCCESS("ACTIVE_USER_SUCCESS", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶成功"),
        
        // ...省略其餘tag

        String tag;
        String topic;
        String tagName;

        MqTagEnum(String tag, String topic, String tagName) {
            this.tag = tag;
            this.topic = topic;
            this.tagName = tagName;
        }
        
        public String getTag() {
            return tag;
        }
        public String getTopic() {
            return topic;
        }
    }
相關文章
相關標籤/搜索