本服務須要提供一個sdk和數據庫初始語句建立數據庫表,而且對外提供可掃描的domain、mapper、service,使用的技術框架zk + mapper3 + pagehelper + feign(edas) , 使用者(上游系統、下游系統) 只須要在對應的接口上寫上響應註解便可實現可靠消息, 若是不熟悉上述框架,可選擇對應框架替換,好比redis替換zk,放棄mapper3和pagehelper使用傳統的mybatis,使用http接口替換fein(eads)的解決辦法,本文不提供替換的解決方案前端
https://segmentfault.com/a/1190000011479826
複製代碼
本文爲分佈式系統解決方案,此方案涉及 3 個模塊:vue
暫時未設計java
上游應用將本地業務執行和消息發送綁定在同一個本地事務中,保證要麼本地操做成功併發送 MQ 消息,要麼兩步操做都失敗並回滾。這裏能夠採用自定義切面完成,後續會有介紹。git
以上每一步均可能出現失敗狀況,分析一下這 5 步出現異常後上遊業務和消息發送是否一致:github
失敗步驟 | 現象 | 一致性 |
---|---|---|
第1步 | 上游應用業務未執行,MQ消息未發送 | 一致 |
第2步 | 上游應用業務未執行,MQ消息未發送 | 一致 |
第3步 | 上游應用事物回滾,MQ消息未發送 | 一致 |
第4步 | 上游應用業務執行,MQ消息未發送 | 不一致 |
第5步 | 上游應用業務執行,MQ消息未發送 | 不一致 |
上游應用執行完成,下游應用還沒有執行或執行失敗時,此事務即處於 BASE 理論的 Soft State 狀態。web
下游應用監聽 MQ 消息並執行業務,而且將消息的消費結果通知可靠消息服務。(本地消息落地)redis
可靠消息的狀態須要和下游應用的業務執行保持一致,可靠消息狀態不是已完成時,確保下游應用未執行,可靠消息狀態是已完成時,確保下游應用已執行。 下游應用和可靠消息服務之間的交互圖以下: spring
下游應用監聽 MQ 消息組件並獲取消息, 並存儲本地消息數據庫
下游系統通知可靠消息服務已接收到消息segmentfault
可靠消息把消息更新爲已接收狀態
下游應用根據 MQ 消息體信息處理本地業務
下游應用向 MQ 組件自動發送 ACK 確認消息被消費
下游應用通知可靠消息系統消息被成功消費,可靠消息將該消息狀態更改成以消費,任務表狀態修改成已完成。
失敗步驟 | 現象 | 一致性 |
---|---|---|
第1步 | 下游應用業務未接收MQ消息,MQ消息爲已發送未接收 | 不一致 |
第2步 | 通知可靠消息服務,接收到消息 | 不一致 |
第3步 | 下游應用異步通知 | 不一致 |
第4步 | 下游應用數據回滾,本地消息存儲成功,消息狀態爲已接收未成功消費 | 一致 |
第5步 | MQ未收到ack確認 | 一致 |
第6步 | 下游應用異步通知 | 不一致 |
可靠消息服務定時監聽消息的狀態,若是存在狀態爲待確認而且超時的消息,則表示上游應用和可靠消息交互中的步驟 4 或者 5 出現異常。
可靠消息則攜帶消息體內的信息向上遊應用發起請求查詢該業務是否已執行。上游應用提供一個可查詢接口供可靠消息追溯業務執行狀態,若是業務執行成功則更改消息狀態爲已發送,不然刪除此消息確保數據一致。具體流程以下:
下游消費MQ服務異步通知可靠消息的過程當中可能出現異常,在此可能致使兩個現象1、消息已接到但可靠消息沒有確認接到2、消息已成功消費但可靠消息沒有確認接到,爲此下游系統須要提供消費者消息狀態查詢接口,從而可靠消息從新確認.在確認過程當中若是是可靠消息爲已消費而下游消費系統爲已接收則不進行更新操做. 具體流程以下:
消息已發送則表示上游應用已經執行,接下來則確保下游應用也能正常執行。 可靠消息服務發現可靠消息服務中存在消息狀態爲已發送而且超時的消息,則表示可靠消息服務和下游應用中存在異常的步驟,不管哪一個步驟出現異常,可靠消息服務都將此消息從新投遞到 MQ 組件中供下游應用監聽。 下游應用監聽到此消息後,在保證冪等性的狀況下從新執行業務並通知可靠消息服務此消息已經成功消費,最終確保上游應用、下游應用的數據最終一致性。具體流程以下:
在預發送執行MQ消息的時候本地消息若是落庫則須要刪除消息,不然業務系統須要額外提供查詢消息發送狀態接口, 這裏介紹兩種方法
第一種,RPC服務接口來實現, 在生產者和消費者註冊到可靠消息的時候把生產者和消費者存儲到BeanFactory的Map裏在定時清理任務的時候去處理在線的RPC服務
第二種,發可靠消息來實現, 確保100%到達
在消費MQ消息的時候本地消息若是落庫則須要刪除消息,不然業務系統須要額外提供查詢消息發送狀態接口,刪除實現同3.6
天天將成功消息刪除並備份到對應數據庫提供歷史消息查詢功能,固然若是你選擇mongo能夠不考慮備份消息
這裏作一個說明,由於項目採用的是rocketmq,一個topic對應一個生產者,而可靠消息採用的是中間件負責發送消息,又不能採用中間件的生產者爲全部上游系統發送消息,這裏引入了zookeeper作註冊中心,因此依賴可靠消息的服務,在啓動項目的時候會像中間件去註冊生產者,而中間件的watch機制會及時的更新生產者和消費者狀態,而中間件會爲使用中間件的系統提供sdk,使用者無需關注實現,只須要引入中間件的sdk和對應的註解便可完成可靠消息的發送和消費,詳見下圖: 普通消息發送流程:
可靠消息發送流程: 可靠消息發送和消費流程:public static void startup(PaascloudProperties paascloudProperties, String host, String app) {
CoordinatorRegistryCenter coordinatorRegistryCenter = createCoordinatorRegistryCenter(paascloudProperties.getZk());
RegisterDto dto = new RegisterDto(app, host, coordinatorRegistryCenter);
Long serviceId = new IncrementIdGenerator(dto).nextId();
IncrementIdGenerator.setServiceId(serviceId);
registerMq(paascloudProperties, host, app);
}
private static void registerMq(PaascloudProperties paascloudProperties, String host, String app) {
CoordinatorRegistryCenter coordinatorRegistryCenter = createCoordinatorRegistryCenter(paascloudProperties.getZk());
AliyunProperties.RocketMqProperties rocketMq = paascloudProperties.getAliyun().getRocketMq();
String consumerGroup = rocketMq.isReliableMessageConsumer() ? rocketMq.getConsumerGroup() : null;
String namesrvAddr = rocketMq.getNamesrvAddr();
String producerGroup = rocketMq.isReliableMessageProducer() ? rocketMq.getProducerGroup() : null;
coordinatorRegistryCenter.registerMq(app, host, producerGroup, consumerGroup, namesrvAddr);
}
@Override
public void registerMq(final String app, final String host, final String producerGroup, final String consumerGroup, String namesrvAddr) {
// 註冊生產者
final String producerRootPath = GlobalConstant.ZK_REGISTRY_PRODUCER_ROOT_PATH + GlobalConstant.Symbol.SLASH + app;
final String consumerRootPath = GlobalConstant.ZK_REGISTRY_CONSUMER_ROOT_PATH + GlobalConstant.Symbol.SLASH + app;
ReliableMessageRegisterDto dto;
if (StringUtils.isNotEmpty(producerGroup)) {
dto = new ReliableMessageRegisterDto().setProducerGroup(producerGroup).setNamesrvAddr(namesrvAddr);
String producerJson = JSON.toJSONString(dto);
this.persist(producerRootPath, producerJson);
this.persistEphemeral(producerRootPath + GlobalConstant.Symbol.SLASH + host, DateUtil.now());
}
// 註冊消費者
if (StringUtils.isNotEmpty(consumerGroup)) {
dto = new ReliableMessageRegisterDto().setConsumerGroup(consumerGroup).setNamesrvAddr(namesrvAddr);
String producerJson = JSON.toJSONString(dto);
this.persist(consumerRootPath, producerJson);
this.persistEphemeral(consumerRootPath + GlobalConstant.Symbol.SLASH + host, DateUtil.now());
}
}
複製代碼
@Around(value = "mqProducerStoreAnnotationPointcut()")
public Object processMqProducerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("processMqProducerStoreJoinPoint - 線程id={}", Thread.currentThread().getId());
Object result;
Object[] args = joinPoint.getArgs();
MqProducerStore annotation = getAnnotation(joinPoint);
MqSendTypeEnum type = annotation.sendType();
int orderType = annotation.orderType().orderType();
DelayLevelEnum delayLevelEnum = annotation.delayLevel();
if (args.length == 0) {
throw new TpcBizException(ErrorCodeEnum.TPC10050005);
}
MqMessageData domain = null;
for (Object object : args) {
if (object instanceof MqMessageData) {
domain = (MqMessageData) object;
break;
}
}
if (domain == null) {
throw new TpcBizException(ErrorCodeEnum.TPC10050005);
}
domain.setOrderType(orderType);
domain.setProducerGroup(producerGroup);
if (type == MqSendTypeEnum.WAIT_CONFIRM) {
if (delayLevelEnum != DelayLevelEnum.ZERO) {
domain.setDelayLevel(delayLevelEnum.delayLevel());
}
mqMessageService.saveWaitConfirmMessage(domain);
}
result = joinPoint.proceed();
if (type == MqSendTypeEnum.SAVE_AND_SEND) {
mqMessageService.saveAndSendMessage(domain);
} else if (type == MqSendTypeEnum.DIRECT_SEND) {
mqMessageService.directSendMessage(domain);
} else {
mqMessageService.confirmAndSendMessage(domain.getMessageKey());
}
return result;
}
複製代碼
@Around(value = "mqConsumerStoreAnnotationPointcut()")
public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable {
log.info("processMqConsumerStoreJoinPoint - 線程id={}", Thread.currentThread().getId());
Object result;
long startTime = System.currentTimeMillis();
Object[] args = joinPoint.getArgs();
MqConsumerStore annotation = getAnnotation(joinPoint);
boolean isStorePreStatus = annotation.storePreStatus();
List<MessageExt> messageExtList;
if (args == null || args.length == 0) {
throw new TpcBizException(ErrorCodeEnum.TPC10050005);
}
if (!(args[0] instanceof List)) {
throw new TpcBizException(ErrorCodeEnum.GL99990001);
}
try {
messageExtList = (List<MessageExt>) args[0];
} catch (Exception e) {
log.error("processMqConsumerStoreJoinPoint={}", e.getMessage(), e);
throw new TpcBizException(ErrorCodeEnum.GL99990001);
}
MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0));
final String messageKey = dto.getMessageKey();
if (isStorePreStatus) {
mqMessageService.confirmReceiveMessage(consumerGroup, dto);
}
String methodName = joinPoint.getSignature().getName();
try {
result = joinPoint.proceed();
log.info("result={}", result);
if (CONSUME_SUCCESS.equals(result.toString())) {
mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey);
}
} catch (Exception e) {
log.error("發送可靠消息, 目標方法[{}], 出現異常={}", methodName, e.getMessage(), e);
throw e;
} finally {
log.info("發送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime);
}
return result;
}
複製代碼
@Slf4j
@ElasticJobConfig(cron = "0 0 0 1/1 * ?")
public class DeleteRpcConsumerMessageJob implements SimpleJob {
@Resource
private PaascloudProperties paascloudProperties;
@Resource
private TpcMqMessageService tpcMqMessageService;
/** * Execute. * * @param shardingContext the sharding context */
@Override
public void execute(final ShardingContext shardingContext) {
ShardingContextDto shardingContextDto = new ShardingContextDto(shardingContext.getShardingTotalCount(), shardingContext.getShardingItem());
final TpcMqMessageDto message = new TpcMqMessageDto();
message.setMessageBody(JSON.toJSONString(shardingContextDto));
message.setMessageTag(AliyunMqTopicConstants.MqTagEnum.DELETE_CONSUMER_MESSAGE.getTag());
message.setMessageTopic(AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic());
message.setProducerGroup(paascloudProperties.getAliyun().getRocketMq().getProducerGroup());
String refNo = Long.toString(UniqueIdGenerator.generateId());
message.setRefNo(refNo);
message.setMessageKey(refNo);
tpcMqMessageService.saveAndSendMessage(message);
}
}
複製代碼
@Slf4j
@ElasticJobConfig(cron = "0 0 0 1/1 * ?")
public class DeleteRpcExpireFileJob implements SimpleJob {
@Resource
private OpcRpcService opcRpcService;
/** * Execute. * * @param shardingContext the sharding context */
@Override
public void execute(final ShardingContext shardingContext) {
opcRpcService.deleteExpireFile();
}
}
複製代碼
@Slf4j
@ElasticJobConfig(cron = "0 0 1 1/1 * ?")
public class DeleteRpcProducerMessageJob implements SimpleJob {
@Resource
private PaascloudProperties paascloudProperties;
@Resource
private TpcMqMessageService tpcMqMessageService;
/** * Execute. * * @param shardingContext the sharding context */
@Override
public void execute(final ShardingContext shardingContext) {
final TpcMqMessageDto message = new TpcMqMessageDto();
message.setMessageBody(JSON.toJSONString(shardingContext));
message.setMessageTag(AliyunMqTopicConstants.MqTagEnum.DELETE_PRODUCER_MESSAGE.getTag());
message.setMessageTopic(AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic());
message.setProducerGroup(paascloudProperties.getAliyun().getRocketMq().getProducerGroup());
String refNo = Long.toString(UniqueIdGenerator.generateId());
message.setRefNo(refNo);
message.setMessageKey(refNo);
tpcMqMessageService.saveAndSendMessage(message);
}
}
複製代碼
@Component
@Slf4j
@ElasticJobConfig(cron = "0/30 * * * * ?", jobParameter = "fetchNum=200")
public class HandleSendingMessageJob extends AbstractBaseDataflowJob<TpcMqMessage> {
@Resource
private TpcMqMessageService tpcMqMessageService;
@Value("${paascloud.message.handleTimeout}")
private int timeOutMinute;
@Value("${paascloud.message.maxSendTimes}")
private int messageMaxSendTimes;
@Value("${paascloud.message.resendMultiplier}")
private int messageResendMultiplier;
@Resource
private TpcMqConfirmMapper tpcMqConfirmMapper;
/** * Fetch job data list. * * @param jobParameter the job parameter * * @return the list */
@Override
protected List<TpcMqMessage> fetchJobData(JobParameter jobParameter) {
MessageTaskQueryDto query = new MessageTaskQueryDto();
query.setCreateTimeBefore(DateUtil.getBeforeTime(timeOutMinute));
query.setMessageStatus(MqSendStatusEnum.SENDING.sendStatus());
query.setFetchNum(jobParameter.getFetchNum());
query.setShardingItem(jobParameter.getShardingItem());
query.setShardingTotalCount(jobParameter.getShardingTotalCount());
query.setTaskStatus(JobTaskStatusEnum.TASK_CREATE.status());
return tpcMqMessageService.listMessageForWaitingProcess(query);
}
/** * Process job data. * * @param taskList the task list */
@Override
@Transactional(rollbackFor = Exception.class)
protected void processJobData(List<TpcMqMessage> taskList) {
for (TpcMqMessage message : taskList) {
Integer resendTimes = message.getResendTimes();
if (resendTimes >= messageMaxSendTimes) {
tpcMqMessageService.setMessageToAlreadyDead(message.getId());
continue;
}
int times = (resendTimes == 0 ? 1 : resendTimes) * messageResendMultiplier;
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
long needTime = currentTimeInMillis - times * 60 * 1000;
long hasTime = message.getUpdateTime().getTime();
// 判斷是否達到了能夠再次發送的時間條件
if (hasTime > needTime) {
log.debug("currentTime[" + com.xiaoleilu.hutool.date.DateUtil.formatDateTime(new Date()) + "],[SENDING]消息上次發送時間[" + com.xiaoleilu.hutool.date.DateUtil.formatDateTime(message.getUpdateTime()) + "],必須過了[" + times + "]分鐘才能夠再發送。");
continue;
}
// 前置狀態
List<Integer> preStatusList = Lists.newArrayList(JobTaskStatusEnum.TASK_CREATE.status());
// 設置任務狀態爲執行中
message.setPreStatusList(preStatusList);
message.setTaskStatus(JobTaskStatusEnum.TASK_EXETING.status());
int updateRes = tpcMqMessageService.updateMqMessageTaskStatus(message);
if (updateRes > 0) {
try {
// 查詢是否所有訂閱者都確認了消息 是 則更新消息狀態完成, 不然重發消息
int count = tpcMqConfirmMapper.selectUnConsumedCount(message.getMessageKey());
int status = JobTaskStatusEnum.TASK_CREATE.status();
if (count < 1) {
TpcMqMessage update = new TpcMqMessage();
update.setMessageStatus(MqSendStatusEnum.FINISH.sendStatus());
update.setId(message.getId());
tpcMqMessageService.updateMqMessageStatus(update);
status = JobTaskStatusEnum.TASK_SUCCESS.status();
} else {
tpcMqMessageService.resendMessageByMessageId(message.getId());
}
// 前置狀態
preStatusList = Lists.newArrayList(JobTaskStatusEnum.TASK_EXETING.status());
// 設置任務狀態爲執行中
message.setPreStatusList(preStatusList);
message.setTaskStatus(status);
tpcMqMessageService.updateMqMessageTaskStatus(message);
} catch (Exception e) {
log.error("重發失敗 ex={}", e.getMessage(), e);
// 設置任務狀態爲執行中
preStatusList = Lists.newArrayList(JobTaskStatusEnum.TASK_EXETING.status());
message.setPreStatusList(preStatusList);
message.setTaskStatus(JobTaskStatusEnum.TASK_SUCCESS.status());
tpcMqMessageService.updateMqMessageTaskStatus(message);
}
}
}
}
}
複製代碼
@Slf4j
@Component
@ElasticJobConfig(cron = "0 0/10 * * * ?", jobParameter = "fetchNum=1000")
public class HandleWaitingConfirmMessageJob extends AbstractBaseDataflowJob<String> {
@Resource
private TpcMqMessageService tpcMqMessageService;
@Resource
private UacRpcService uacRpcService;
@Value("${paascloud.message.handleTimeout}")
private int timeOutMinute;
private static final String PID_UAC = "PID_UAC";
/** * Fetch job data list. * * @param jobParameter the job parameter * * @return the list */
@Override
protected List<String> fetchJobData(JobParameter jobParameter) {
MessageTaskQueryDto query = new MessageTaskQueryDto();
query.setCreateTimeBefore(DateUtil.getBeforeTime(timeOutMinute));
query.setMessageStatus(MqSendStatusEnum.WAIT_SEND.sendStatus());
query.setFetchNum(jobParameter.getFetchNum());
query.setShardingItem(jobParameter.getShardingItem());
query.setShardingTotalCount(jobParameter.getShardingTotalCount());
query.setTaskStatus(JobTaskStatusEnum.TASK_CREATE.status());
query.setProducerGroup(PID_UAC);
return tpcMqMessageService.queryWaitingConfirmMessageKeyList(query);
}
/** * */
@Override
protected void processJobData(List<String> messageKeyList) {
if (messageKeyList == null) {
return;
}
List<String> resendMessageList = uacRpcService.queryWaitingConfirmMessageKeyList(messageKeyList);
if (resendMessageList == null) {
resendMessageList = Lists.newArrayList();
}
messageKeyList.removeAll(resendMessageList);
tpcMqMessageService.handleWaitingConfirmMessage(messageKeyList, resendMessageList);
}
}
複製代碼
例子
@MqProducerStore
public void resetLoginPwd(final MqMessageData mqMessageData, final UacUser update) {
log.info("重置密碼. mqMessageData={}, user={}", mqMessageData, update);
int updateResult = uacUserMapper.updateByPrimaryKeySelective(update);
if (updateResult < 1) {
log.error("用戶【 {} 】重置密碼失敗", update.getLoginName());
} else {
log.info("用戶【 {} 】重置密碼失敗", update.getLoginName());
}
}
複製代碼
強制: 須要使用的使用加上述兩個註解,方法參數須要加入 MqMessageData
若是對本文感興趣,或者本文對您有所幫助,可靠參考github代碼,本套代碼是spring cloud E版本 + vue兩套全家桶實現
後端項目:https://github.com/paascloud/paascloud-master
https://gitee.com/passcloud/paascloud-master
登陸入口:https://github.com/paascloud/paascloud-login-web
https://gitee.com/passcloud/paascloud-login-web
後端入口:https://github.com/paascloud/paascloud-admin-web
https://gitee.com/passcloud/paascloud-admin-web
前端入口:https://github.com/paascloud/paascloud-mall-web
https://gitee.com/passcloud/paascloud-mall-web
複製代碼
若是有時間最好能給點加個星或者follow一下,筆者在這裏先謝過了。對不知道怎麼加星的朋友,請用電腦登陸github或者碼雲,這裏兩個截圖
更多內容請參考paascloud 建站文檔
https://document.paascloud.net/
複製代碼