paascloud-master
:https://github.com/paascloud/... 基於可靠消息的最終一致性
:https://github.com/paascloud/... 本篇文章目的是理解該項目 可靠消息服務中心(TCP)發送消息、消費消息的流程,用戶註冊發送激活郵箱和激活後發送註冊成功郵箱都是利用可靠消息服務來解決分佈式事務,理解了該流程也就弄懂了該項目中其餘業務流程。
UAC
TPC
OPC
用戶註冊後,向註冊郵箱發送一封激活郵箱。
大體流程爲:java
UAC
先持久化 預發送消息
(等待確認消息),表 pc_mq_message_data
;TPC
持久化預發送消息
,可靠消息表pc_tpc_mq_message
;保存用戶信息
;TPC
更新第2步中的等待確認
狀態爲發送中sending
;Topic
類型的消息被哪些消費者訂閱監聽的全部消費待確認列表,狀態爲未確認
,表pc_tpc_mq_confirm
;RocketMQ
。@PostMapping(value = "/register") @ApiOperation(httpMethod = "POST", value = "註冊用戶") public Wrapper registerUser(UserRegisterDto user) { uacUserService.register(user); return WrapMapper.ok(); }
SpringSecurity
BCryptPasswordEncoder
強哈希方法加密,每次加密的結果都不同。
bcrypt
能夠有效抵禦彩虹表暴力破解,其原理就是
在加鹽的基礎上屢次 hash,關於密碼參考:
https://mp.weixin.qq.com/s/Dk...
key(active_token):email:過時時間1天
,即激活接口參數:activeUserToken
;activeUserTemplate.ftl
MqMessageData(pc_mq_message_data)
各個子系統消息落地的消息表,好比 用戶服務系統主要就是 郵件消息、短信消息等。
@Override public void register(UserRegisterDto registerDto) { // 校驗註冊信息 validateRegisterInfo(registerDto); String mobileNo = registerDto.getMobileNo(); String email = registerDto.getEmail(); Date row = new Date(); String salt = String.valueOf(generateId()); // 封裝註冊信息 long id = generateId(); // id 雪花算法生成 UacUser uacUser = new UacUser(); uacUser.setLoginName(registerDto.getLoginName()); uacUser.setSalt(salt); uacUser.setLoginPwd(Md5Util.encrypt(registerDto.getLoginPwd())); uacUser.setMobileNo(mobileNo); uacUser.setStatus(UacUserStatusEnum.DISABLE.getKey()); uacUser.setUserSource(UacUserSourceEnum.REGISTER.getKey()); uacUser.setCreatedTime(row); uacUser.setUpdateTime(row); uacUser.setEmail(email); uacUser.setId(id); uacUser.setCreatorId(id); uacUser.setCreator(registerDto.getLoginName()); uacUser.setLastOperatorId(id); uacUser.setUserName(registerDto.getLoginName()); uacUser.setLastOperator(registerDto.getLoginName()); // 發送激活郵件 String activeToken = PubUtils.uuid() + super.generateId(); redisService.setKey(RedisKeyUtil.getActiveUserKey(activeToken), email, 1, TimeUnit.DAYS); Map<String, Object> param = Maps.newHashMap(); param.put("loginName", registerDto.getLoginName()); param.put("email", registerDto.getEmail()); param.put("activeUserUrl", activeUserUrl + activeToken); param.put("dateTime", DateUtil.formatDateTime(new Date())); Set<String> to = Sets.newHashSet(); to.add(registerDto.getEmail()); MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER, param); // 即下面的第6步 userManager.register(mqMessageData, uacUser); }
userManager.register()
經過註解 @MqProducerStore
發送消息服務。執行該方法前,先進入切面編程
@MqProducerStore public void register(final MqMessageData mqMessageData, final UacUser uacUser) { log.info("註冊用戶. mqMessageData={}, user={}", mqMessageData, uacUser); uacUserMapper.insertSelective(uacUser); }
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface MqProducerStore { // WAIT_CONFIRM:等待確認;SAVE_AND_SEND:直接發送; MqSendTypeEnum sendType() default MqSendTypeEnum.WAIT_CONFIRM; // ORDER(1):有序;DIS_ORDER(0):無序 MqOrderTypeEnum orderType() default MqOrderTypeEnum.ORDER; // Rocketmq 默認延時級別 // ZERO(0, 不延時);ONE(1, 1秒)....EIGHTEEN(18, 2小時) DelayLevelEnum delayLevel() default DelayLevelEnum.ZERO; }
切面中,由於郵件激活發送消息類型爲默認的:等待確認
。mysql
UAC
消息落地:保存待確認消息 MqMessageData 到 mysql
,表pc_mq_message_data
;// 切面 MqMessageData domain = null; for (Object object : args) { if (object instanceof MqMessageData) { domain = (MqMessageData) object; break; } } domain.setOrderType(orderType); domain.setProducerGroup(producerGroup); // 1. 等待確認 if (type == MqSendTypeEnum.WAIT_CONFIRM) { if (delayLevelEnum != DelayLevelEnum.ZERO) { domain.setDelayLevel(delayLevelEnum.delayLevel()); } // 1.1 發送待確認消息到可靠消息系統 // 本地服務消息落地,可靠消息服務中心也持久化預發送消息,可是不發送 mqMessageService.saveWaitConfirmMessage(domain); } result = joinPoint.proceed(); // 返回註解方法,執行業務
@Override @Transactional(rollbackFor = Exception.class) public void saveWaitConfirmMessage(final MqMessageData mqMessageData) { // 1. 持久化到本地mysql this.saveMqProducerMessage(mqMessageData); // 2. 發送預發送狀態的消息給消息中心 TpcMqMessageDto tpcMqMessageDto = mqMessageData.getTpcMqMessageDto(); // 3. 調用遠端可靠消息服務(tpc),持久化等待確認消息 tpcMqMessageFeignApi.saveMessageWaitingConfirm(tpcMqMessageDto); // 4. mqMessageData 此時爲調用遠端服務返回來的數據 log.info("<== saveWaitConfirmMessage - 存儲預發送消息成功. messageKey={}", mqMessageData.getMessageKey()); }
持久化TpcMqMessage
,即pc_tpc_mq_message
(可靠消息表)
@Override public void saveMessageWaitingConfirm(TpcMqMessageDto messageDto) { if (StringUtils.isEmpty(messageDto.getMessageTopic())) { throw new TpcBizException(ErrorCodeEnum.TPC10050001); } Date now = new Date(); TpcMqMessage message = new ModelMapper().map(messageDto, TpcMqMessage.class); // 消息狀態:WAIT_SEND(10, "未發送");SENDING(20, "已發送");FINISH(30, "已完成"); message.setMessageStatus(MqSendStatusEnum.WAIT_SEND.sendStatus()); message.setUpdateTime(now); message.setCreatedTime(now); tpcMqMessageMapper.insertSelective(message); }
@MqProducerStore
所在方法,執行本地事務:保存用戶到 mysql。result = joinPoint.proceed(); // 返回註解方法,執行業務
result = joinPoint.proceed(); // 返回註解方法,執行業務 // 2. 直接發送 if (type == MqSendTypeEnum.SAVE_AND_SEND) { mqMessageService.saveAndSendMessage(domain); // 3. XXX } else if (type == MqSendTypeEnum.DIRECT_SEND) { mqMessageService.directSendMessage(domain); } else { // type = WAIT_CONFIRM final MqMessageData finalDomain = domain; taskExecutor.execute(() -> mqMessageService.confirmAndSendMessage(finalDomain.getMessageKey())); } return result;
messageKey
確認併發送以前已經持久化的預發送消息。// TpcMqMessageFeignClient.java @Override @ApiOperation(httpMethod = "POST", value = "確認併發送消息") public Wrapper confirmAndSendMessage(@RequestParam("messageKey") String messageKey) { logger.info("確認併發送消息. messageKey={}", messageKey); tpcMqMessageService.confirmAndSendMessage(messageKey); return WrapMapper.ok(); } // TpcMqMessageServiceImpl.java @Override public void confirmAndSendMessage(String messageKey) { final TpcMqMessage message = tpcMqMessageMapper.getByMessageKey(messageKey); if (message == null) { throw new TpcBizException(ErrorCodeEnum.TPC10050002); } TpcMqMessage update = new TpcMqMessage(); update.setMessageStatus(MqSendStatusEnum.SENDING.sendStatus()); update.setId(message.getId()); update.setUpdateTime(new Date()); // 1. 更新消息狀態爲:SENDING tpcMqMessageMapper.updateByPrimaryKeySelective(update); // 2. 建立消費待確認列表(此處topic:SEND_EMAIL_TOPIC) this.createMqConfirmListByTopic(message.getMessageTopic(), message.getId(), message.getMessageKey()); // 3. 直接發送消息 this.directSendMessage(message.getMessageBody(), message.getMessageTopic(), message.getMessageTag(), message.getMessageKey(), message.getProducerGroup(), message.getDelayLevel()); }
TCP
服務中,建立消費待確認列表,根據表 pc_tpc_mq_subscribe
,查詢出不一樣 topic
下相對應的全部 consumer_code
(消費監聽者),即設置該消息被哪些服務(CID)監聽消費;
- ``SEND_EMAIL_TOPIC`` --> ``CID_OPC``:該消息會被 `consumerGroup` 爲 `CID_OPC` 的服務監聽並消費。 - 同時,保存**確認消息**:``TpcMqConfirm`` --> 表 ``pc_tpc_mq_confirm``
@Override public void createMqConfirmListByTopic(final String topic, final Long messageId, final String messageKey) { List<TpcMqConfirm> list = Lists.newArrayList(); TpcMqConfirm tpcMqConfirm; List<String> consumerGroupList = tpcMqConsumerService.listConsumerGroupByTopic(topic); if (PublicUtil.isEmpty(consumerGroupList)) { throw new TpcBizException(ErrorCodeEnum.TPC100500010, topic); } for (final String cid : consumerGroupList) { tpcMqConfirm = new TpcMqConfirm(UniqueIdGenerator.generateId(), messageId, messageKey, cid); list.add(tpcMqConfirm); } tpcMqConfirmMapper.batchCreateMqConfirm(list); }
RocketMQ
隊列中。@Override public void directSendMessage(String body, String topic, String tag, String key, String pid, Integer delayLevel) { RocketMqProducer.sendSimpleMessage(body, topic, tag, key, pid, delayLevel); } // 核心方法:重試發送消息(重試次數3次) // pid:producerGroup --> 發送郵件服務是 PID_UAC // cid: consumerGroup --> 監聽郵件消息服務是 CID_OPC private static SendResult retrySendMessage(String pid, Message msg) { int iniCount = 1; SendResult result; while (true) { try { // Message中屬性 result = MqProducerBeanFactory.getBean(pid).send(msg); break; } catch (Exception e) { log.error("發送消息失敗:", e); if (iniCount++ >= PRODUCER_RETRY_TIMES) { throw new TpcBizException(ErrorCodeEnum.TPC100500014, msg.getTopic(), msg.getKeys()); } } } log.info("<== 發送MQ SendResult={}", result); return result; }
大體流程:git
AliyunMqConfiguration.java
啓動DefaultMQPushConsumer
RocketMQ 消費端,並設置消息邏輯處理監聽器OptPushMessageListener
;OPC
持久化消費者確認消息,表 pc_mq_message_data
;TPC
,更新以前生產端持久化的消費確認列表狀態,未確認
--> 已確認
,表pc_tpc_mq_confirm
;TPC
,繼續更新第3步表中消費確認消息的狀態爲已消費
;DefaultMQPushConsumer
根據配置信息啓動;subscribe
訂閱 該服務 OPC
全部的 Topic
和 tags
消息。包括短信、郵箱激活、附件更新刪除等全部消息。
@Slf4j @Configuration public class AliyunMqConfiguration { @Resource private PaascloudProperties paascloudProperties; @Resource private OptPushMessageListener optPushConsumer; @Resource private TaskExecutor taskExecutor; /** * Default mq push consumer default mq push consumer. * * @return the default mq push consumer * * @throws MQClientException the mq client exception */ @Bean public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException { // 1. 新建消費者組 // RocketMQ實際上都是拉模式,這裏的DefaultMQPushConsumer實現了推模式, // 也只是對拉消息服務作了一層封裝,即拉到消息的時候觸發業務消費者註冊到這裏的callback DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(paascloudProperties.getAliyun().getRocketMq().getConsumerGroup()); // 2. 指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr(paascloudProperties.getAliyun().getRocketMq().getNamesrvAddr()); // 3. 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費 // 若是非第一次啓動,那麼按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); String[] strArray = AliyunMqTopicConstants.ConsumerTopics.OPT.split(GlobalConstant.Symbol.COMMA); for (String aStrArray : strArray) { String[] topicArray = aStrArray.split(GlobalConstant.Symbol.AT); String topic = topicArray[0]; String tags = topicArray[1]; if (PublicUtil.isEmpty(tags)) { tags = "*"; } // 4. 進行Topic訂閱,訂閱PushTopic下Tag爲push的消息 consumer.subscribe(topic, tags); log.info("RocketMq OpcPushConsumer topic = {}, tags={}", topic, tags); } // 5. 設置消息處理器 consumer.registerMessageListener(optPushConsumer); consumer.setConsumeThreadMax(2); consumer.setConsumeThreadMin(2); taskExecutor.execute(() -> { try { Thread.sleep(5000); consumer.start(); log.info("RocketMq OpcPushConsumer OK."); } catch (InterruptedException | MQClientException e) { log.error("RocketMq OpcPushConsumer, 出現異常={}", e.getMessage(), e); } }); return consumer; } }
consumeMessage()
上有註解 @MqConsumerStore
,執行前先進入切面編程;@Slf4j @Component public class OptPushMessageListener implements MessageListenerConcurrently { @Resource private OptSendSmsTopicConsumer optSendSmsTopicService; @Resource private OptSendEmailTopicConsumer optSendEmailTopicService; @Resource private MdcTopicConsumer mdcTopicConsumer; @Resource private MqMessageService mqMessageService; @Resource private StringRedisTemplate srt; /** * Consume message consume concurrently status. * * @param messageExtList the message ext list * @param consumeConcurrentlyContext the consume concurrently context * * @return the consume concurrently status */ @Override @MqConsumerStore public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt msg = messageExtList.get(0); String body = new String(msg.getBody()); String topicName = msg.getTopic(); String tags = msg.getTags(); String keys = msg.getKeys(); log.info("MQ消費Topic={},tag={},key={}", topicName, tags, keys); ValueOperations<String, String> ops = srt.opsForValue(); // 控制冪等性使用的key try { MqMessage.checkMessage(body, topicName, tags, keys); String mqKV = null; if (srt.hasKey(keys)) { mqKV = ops.get(keys); } if (PublicUtil.isNotEmpty(mqKV)) { log.error("MQ消費Topic={},tag={},key={}, 重複消費", topicName, tags, keys); // 消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if (AliyunMqTopicConstants.MqTopicEnum.SEND_SMS_TOPIC.getTopic().equals(topicName)) { optSendSmsTopicService.handlerSendSmsTopic(body, topicName, tags, keys); } if (AliyunMqTopicConstants.MqTopicEnum.SEND_EMAIL_TOPIC.getTopic().equals(topicName)) { optSendEmailTopicService.handlerSendEmailTopic(body, topicName, tags, keys); } if (AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic().equals(topicName)) { mqMessageService.deleteMessageTopic(body, tags); } if (AliyunMqTopicConstants.MqTopicEnum.MDC_TOPIC.getTopic().equals(topicName)) { mdcTopicConsumer.handlerSendSmsTopic(body, topicName, tags, keys); } else { log.info("OPC訂單信息消 topicName={} 不存在", topicName); } } catch (IllegalArgumentException ex) { log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex); } catch (Exception e) { log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e); // 若是消息消費失敗,例如數據庫異常等,扣款失敗,發送失敗須要重試的場景, // 返回下面代碼,RocketMQ就認爲消費失敗。 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } ops.set(keys, keys, 10, TimeUnit.DAYS); // 業務實現消費回調的時候,當且僅當返回下面代碼時,RocketMQ纔會認爲這批消息是消費完成的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
@Around(value = "mqConsumerStoreAnnotationPointcut()") public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable { // ... MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0)); final String messageKey = dto.getMessageKey(); if (isStorePreStatus) { // 執行下面三、4步 mqMessageService.confirmReceiveMessage(consumerGroup, dto); } String methodName = joinPoint.getSignature().getName(); try { // 返回註解方法; result = joinPoint.proceed(); log.info("result={}", result); if (CONSUME_SUCCESS.equals(result.toString())) { mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey); } } catch (Exception e) { log.error("發送可靠消息, 目標方法[{}], 出現異常={}", methodName, e.getMessage(), e); throw e; } finally { log.info("發送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime); } return result; }
confirmReceiveMessage
:消費者確認收到消息;在上面目錄【發送激活郵箱的消息/service層】的第 12 步持久化 消費者確認消息MqMessageData
到本地服務OPC
mysql 中,表pc_mq_message_data
;
@Override @Transactional(rollbackFor = Exception.class) public void confirmReceiveMessage(String cid, MqMessageData messageData) { final String messageKey = messageData.getMessageKey(); log.info("confirmReceiveMessage - 消費者={}, 確認收到key={}的消息", cid, messageKey); // 持久化消費者確認消息 MqMessageData 到本地服務 mysql 中,表 pc_mq_message_data messageData.setMessageType(MqMessageTypeEnum.CONSUMER_MESSAGE.messageType()); messageData.setId(UniqueIdGenerator.generateId()); mqMessageDataMapper.insertSelective(messageData); // 調用遠端服務 TPC,更新確認收到消息表狀態爲已確認,TpcMqConfirm,表 pc_tpc_mq_confirm; Wrapper wrapper = tpcMqMessageFeignApi.confirmReceiveMessage(cid, messageKey); log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper); if (wrapper == null) { throw new TpcBizException(ErrorCodeEnum.GL99990002); } if (wrapper.error()) { throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey); } }
緊接着第3步,調用遠端服務 TPC
,更新確認收到消息狀態爲已確認,TpcMqConfirm
,表 pc_tpc_mq_confirm
;github
@Override public void confirmReceiveMessage(final String cid, final String messageKey) { // 1. 校驗cid // 2. 校驗messageKey // 3. 校驗cid 和 messageKey Long confirmId = tpcMqConfirmMapper.getIdMqConfirm(cid, messageKey); // 3. 更新消費信息的狀態 tpcMqConfirmMapper.confirmReceiveMessage(confirmId); }
result = joinPoint.proceed();
MessageExt
獲取該消息的 topic(主題)
、tag(標籤)
、keys(惟一鍵)
、body(消息體)
;redis
中存儲消費過的該消息的 keys
;topic
執行相應的操做處理該消息
好比此流程的發送激活郵箱,使用 spring 框架的
TaskExecutor
執行郵箱發送任務。
@Override public int sendSimpleMail(String subject, String text, Set<String> to) { log.info("sendSimpleMail - 發送簡單郵件. subject={}, text={}, to={}", subject, text, to); int result = 1; try { SimpleMailMessage message = MailEntity.createSimpleMailMessage(subject, text, to); message.setFrom(from); taskExecutor.execute(() -> mailSender.send(message)); } catch (Exception e) { log.info("sendSimpleMail [FAIL] ex={}", e.getMessage(), e); result = 0; } return result; }
第8步
若是消息消費成功,郵件發送成功,redis
中存儲該消息(冪等,過時時間 10 天),同時返回消費成功代碼;ops.set(keys, keys, 10, TimeUnit.DAYS); // 業務實現消費回調的時候,當且僅當返回下面代碼時,RocketMQ 纔會認爲這批消息是消費完成的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
第8步
若是消息消費失敗,好比數據庫異常,扣款失敗,郵件發送失敗等須要重試的場景,返回重試消費代碼。} catch (IllegalArgumentException ex) { log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex); } catch (Exception e) { log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e); // 若是消息消費失敗,例如數據庫異常等,扣款失敗,發送失敗須要重試的場景, // 返回下面代碼,RocketMQ就認爲消費失敗。 return ConsumeConcurrentlyStatus.RECONSUME_LATER; }
String methodName = joinPoint.getSignature().getName(); try { result = joinPoint.proceed(); log.info("result={}", result); if (CONSUME_SUCCESS.equals(result.toString())) { mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey); } } catch (Exception e) { log.error("發送可靠消息, 目標方法[{}], 出現異常={}", methodName, e.getMessage(), e); throw e; } finally { log.info("發送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime); } return result;
第11步中,若是返回 CONSUME_SUCCESS
,保存並確認消息完成;redis
TCP
,更新消費確認消息列表 pc_tpc_mq_confirm
,狀態爲已消費
;@Override public void saveAndConfirmFinishMessage(String cid, String messageKey) { // 1. 調用遠端服務tcp,確認完成消費消息 Wrapper wrapper = tpcMqMessageFeignApi.confirmConsumedMessage(cid, messageKey); log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper); if (wrapper == null) { throw new TpcBizException(ErrorCodeEnum.GL99990002); } if (wrapper.error()) { throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey); } }
發送激活成功郵箱同上面 發送激活郵箱同樣利用 可靠消息服務完成分佈式事務操做。
@GetMapping(value = "/activeUser/{activeUserToken}") @ApiOperation(httpMethod = "POST", value = "激活用戶") public Wrapper activeUser(@PathVariable String activeUserToken) { uacUserService.activeUser(activeUserToken); return WrapMapper.ok("激活成功"); }
activeuser()
:@Override public void activeUser(String activeUserToken) { Preconditions.checkArgument(!StringUtils.isEmpty(activeUserToken), "激活用戶失敗"); String activeUserKey = RedisKeyUtil.getActiveUserKey(activeUserToken); String email = redisService.getKey(activeUserKey); if (StringUtils.isEmpty(email)) { throw new UacBizException(ErrorCodeEnum.UAC10011030); } // 修改用戶狀態, 綁定訪客角色 UacUser uacUser = new UacUser(); uacUser.setEmail(email); uacUser = uacUserMapper.selectOne(uacUser); if (uacUser == null) { logger.error("找不到用戶信息. email={}", email); throw new UacBizException(ErrorCodeEnum.UAC10011004, email); } UacUser update = new UacUser(); update.setId(uacUser.getId()); update.setStatus(UacUserStatusEnum.ENABLE.getKey()); LoginAuthDto loginAuthDto = new LoginAuthDto(); loginAuthDto.setUserId(uacUser.getId()); loginAuthDto.setUserName(uacUser.getLoginName()); loginAuthDto.setLoginName(uacUser.getLoginName()); update.setUpdateInfo(loginAuthDto); UacUser user = this.queryByUserId(uacUser.getId()); Map<String, Object> param = Maps.newHashMap(); param.put("loginName", user.getLoginName()); param.put("dateTime", DateUtil.formatDateTime(new Date())); Set<String> to = Sets.newHashSet(); to.add(user.getEmail()); // 構建激活成功消息體 MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER_SUCCESS, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER_SUCCESS, param); // 1. 可靠消息服務發送郵件 userManager.activeUser(mqMessageData, update, activeUserKey); }
userManager.activeUser()
:能夠看到該方法也是註解@MqProducerStore
修飾;@MqProducerStore public void activeUser(final MqMessageData mqMessageData, final UacUser uacUser, final String activeUserKey) { log.info("激活用戶. mqMessageData={}, user={}", mqMessageData, uacUser); // 更新用戶信息 int result = uacUserMapper.updateByPrimaryKeySelective(uacUser); if (result < 1) { throw new UacBizException(ErrorCodeEnum.UAC10011038, uacUser.getId()); } // 綁定一個訪客角色默認值roleId=10000 final Long userId = uacUser.getId(); Preconditions.checkArgument(userId != null, "用戶Id不能爲空"); final Long roleId = 10000L; UacRoleUser roleUser = new UacRoleUser(); roleUser.setUserId(userId); roleUser.setRoleId(roleId); uacRoleUserMapper.insertSelective(roleUser); // 綁定一個組織 UacGroupUser groupUser = new UacGroupUser(); groupUser.setUserId(userId); groupUser.setGroupId(GlobalConstant.Sys.SUPER_MANAGER_GROUP_ID); uacGroupUserMapper.insertSelective(groupUser); // 刪除 activeUserToken redisService.deleteKey(activeUserKey); }
redis
郵箱激活token刪除,切面編程發送激活成功郵箱分析過程和上面發送激活郵箱流程是同樣的,這裏再也不贅述。tag
不一樣,從而處理邏輯不一樣。Topic
都是 SEND_EMAIL_TOPIC
;此處具體爲郵箱內容模板不一樣,其他消息生產端和消費端流程同樣。
public enum MqTagEnum { /** * 激活用戶. */ ACTIVE_USER("ACTIVE_USER", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶"), /** * 激活用戶成功. */ ACTIVE_USER_SUCCESS("ACTIVE_USER_SUCCESS", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶成功"), // ...省略其餘tag String tag; String topic; String tagName; MqTagEnum(String tag, String topic, String tagName) { this.tag = tag; this.topic = topic; this.tagName = tagName; } public String getTag() { return tag; } public String getTopic() { return topic; } }