前面一章咱們走完了訂單下單流程發起者部分的源碼,此次咱們進入參與者部分源碼解析~java
前面order服務中已經發起了對account服務的調用,接下來進入account服務扣款接口的實現部分spring
//order服務調用端
@PostMapping("/account-service/account/payment")
@Myth(destination = "account", target = AccountService.class)
Boolean payment(@RequestBody AccountDTO accountDO);
//account服務接口實現 AccountServiceImpl.payment(AccountDTO accountDTO)
@Override
@Myth(destination = "account")
public boolean payment(AccountDTO accountDTO) {
LOGGER.info("============springcloud執行付款接口===============");
final AccountDO accountDO = accountMapper.findByUserId(accountDTO.getUserId());
if (accountDO.getBalance().compareTo(accountDTO.getAmount()) <= 0) {
throw new MythRuntimeException("spring cloud account-service 資金不足!");
}
accountDO.setBalance(accountDO.getBalance().subtract(accountDTO.getAmount()));
accountDO.setUpdateTime(new Date());
final int update = accountMapper.update(accountDO);
if (update != 1) {
throw new MythRuntimeException("spring cloud account-service 資金不足!");
}
return Boolean.TRUE;
}
複製代碼
咱們發如今實現類方法頭部也進行了@Myth 註解的標記,AccountServiceImpl 是一個實現類,所以這裏必然也會走aop切面,aop切面流程入口同order服務相同,區別在於order爲發起方,而account,inventory爲參與者,咱們是否還記得角色判斷代碼實現部分?MythTransactionFactoryServiceImpl.factoryOf 咱們再來回顧下代碼併發
public Class factoryOf(MythTransactionContext context) throws Throwable {
//若是事務還沒開啓或者 myth事務上下文是空, 那麼應該進入發起調用
if (!mythTransactionManager.isBegin() && Objects.isNull(context)) {
return StartMythTransactionHandler.class;
} else {
if (context.getRole() == MythRoleEnum.LOCAL.getCode()) {
return LocalMythTransactionHandler.class;
}
return ActorMythTransactionHandler.class;
}
}
複製代碼
判斷條件要想進入參與者角色分支,這裏事務必須開啓狀態 或者 myth事務上下文必須有值 ,這兩個條件又是在哪裏進行了設值呢? 咱們往回看看調用處,找到 SpringCloudMythTransactionInterceptor.interceptor(ProceedingJoinPoint pjp) 方法app
@Override
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
MythTransactionContext mythTransactionContext = TransactionContextLocal.getInstance().get();
if (Objects.nonNull(mythTransactionContext) &&
mythTransactionContext.getRole() == MythRoleEnum.LOCAL.getCode()) {
mythTransactionContext = TransactionContextLocal.getInstance().get();
} else {
RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
String context = request == null ? null : request.getHeader(CommonConstant.MYTH_TRANSACTION_CONTEXT);
if (StringUtils.isNoneBlank(context)) {
mythTransactionContext =
GsonUtils.getInstance().fromJson(context, MythTransactionContext.class);
}
}
return mythTransactionAspectService.invoke(mythTransactionContext, pjp);
}
複製代碼
由於第一次進來,顯然mythTransactionContext值爲空,進入else分支,這裏咱們發現是從request請求頭中獲取的事務上下文信息的。 既然是從請求頭信息中拿到數據, 那必然在調用端要先設置對不對, 咱們找到myth-springcloud工程下MythRestTemplateInterceptor類框架
//springcloud
@Configuration
public class MythRestTemplateInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
final MythTransactionContext mythTransactionContext =
TransactionContextLocal.getInstance().get();
requestTemplate.header(CommonConstant.MYTH_TRANSACTION_CONTEXT,
GsonUtils.getInstance().toJson(mythTransactionContext));
}
}
// motan
@Component
public class MotanMythTransactionInterceptor implements MythTransactionInterceptor {
private final MythTransactionAspectService mythTransactionAspectService;
@Autowired
public MotanMythTransactionInterceptor(MythTransactionAspectService mythTransactionAspectService) {
this.mythTransactionAspectService = mythTransactionAspectService;
}
@Override
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
MythTransactionContext mythTransactionContext = null;
final Request request = RpcContext.getContext().getRequest();
if (Objects.nonNull(request)) {
final Map<String, String> attachments = request.getAttachments();
if (attachments != null && !attachments.isEmpty()) {
String context = attachments.get(CommonConstant.MYTH_TRANSACTION_CONTEXT);
mythTransactionContext =
GsonUtils.getInstance().fromJson(context, MythTransactionContext.class);
}
} else {
mythTransactionContext = TransactionContextLocal.getInstance().get();
}
return mythTransactionAspectService.invoke(mythTransactionContext, pjp);
}
}
//dubbo
@Component
public class DubboMythTransactionInterceptor implements MythTransactionInterceptor {
private final MythTransactionAspectService mythTransactionAspectService;
@Autowired
public DubboMythTransactionInterceptor(MythTransactionAspectService mythTransactionAspectService) {
this.mythTransactionAspectService = mythTransactionAspectService;
}
@Override
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
final String context = RpcContext.getContext().getAttachment(CommonConstant.MYTH_TRANSACTION_CONTEXT);
MythTransactionContext mythTransactionContext;
if (StringUtils.isNoneBlank(context)) {
mythTransactionContext =
GsonUtils.getInstance().fromJson(context, MythTransactionContext.class);
}else{
mythTransactionContext= TransactionContextLocal.getInstance().get();
}
return mythTransactionAspectService.invoke(mythTransactionContext, pjp);
}
}
複製代碼
咱們發現是經過實現feign的RequestInterceptor接口來實現mythTransactionContext設置到頭信息中的,這裏dubbo,motan也相似,只是實現方式不一樣。這裏也是實現分佈式事務的最關鍵一部分,經過同一個事務上下文來關聯多子系統之間事務關係,是分佈式事務實現的核心所在。分佈式
接下來咱們進入參與者角色ActorMythTransactionHandler.handleride
public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) throws Throwable {
try {
//處理併發問題
LOCK.lock();
//先保存事務日誌
mythTransactionManager.actorTransaction(point, mythTransactionContext);
//發起調用 執行try方法
final Object proceed = point.proceed();
//執行成功 更新狀態爲commit
mythTransactionManager.updateStatus(mythTransactionContext.getTransId(),
MythStatusEnum.COMMIT.getCode());
return proceed;
} catch (Throwable throwable) {
LogUtil.error(LOGGER, "執行分佈式事務接口失敗,事務id:{}", mythTransactionContext::getTransId);
mythTransactionManager.updateStatus(mythTransactionContext.getTransId(),
MythStatusEnum.FAILURE.getCode());
throw throwable;
} finally {
LOCK.unlock();
TransactionContextLocal.getInstance().remove();
}
}
複製代碼
參與者實現比較簡單, 執行業務方法前主要封裝MythTransaction消息(狀態爲:開始,角色爲:參與者),而後進行持久化操做,再執行業務方法,若是成功更新MythTransaction狀態爲:COMMIT,反之狀態爲:FAILURE,到這裏咱們參與者也是走完了 ~~ 那咱們這個流程是否是完了呢? 其實尚未,上一章最後咱們留了一小塊,咱們再來回顧下this
/** * Myth分佈式事務處理接口 * * @param point point 切點 * @param mythTransactionContext myth事務上下文 * @return Object * @throws Throwable 異常 */
@Override
public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) throws Throwable {
try {
//主要防止併發問題,對事務日誌的寫形成壓力,加了鎖進行處理
try {
LOCK.lock();
mythTransactionManager.begin(point);
} finally {
LOCK.unlock();
}
return point.proceed();
} finally {
//發送消息
mythTransactionManager.sendMessage();
mythTransactionManager.cleanThreadLocal();
TransactionContextLocal.getInstance().remove();
}
}
複製代碼
在走account流程時,其實發起者一直在 point.proceed(); 這裏等待返回結果呢,這裏須要等待orderService.orderPay業務方法所有執行完纔會返回,然而咱們上面才走account一個扣款接口,還有inventory扣減庫存接口,這裏inventory接口與account接口角色都是參與者,流程上是同樣的,只是業務不同而已,這裏也就不作過多介紹了,童鞋們本身過一遍便可。spa
到這裏有童鞋可能就要說了,myth打着是一個基於消息隊列解決分佈式事務框架,可是前面講了這麼多,貌似都未涉及到消息隊列啊, 好了,咱們這就帶大家飛進mq,咱們來看 mythTransactionManager.sendMessage(); 直接進入關鍵代碼部分 CoordinatorServiceImpl.sendMessage 方法線程
public Boolean sendMessage(MythTransaction mythTransaction) {
final List<MythParticipant> mythParticipants = mythTransaction.getMythParticipants();
/* * 這裏的這個判斷很重要,不爲空,表示本地的方法執行成功,須要執行遠端的rpc方法 * 爲何呢,由於我會在切面的finally裏面發送消息,意思是切面不管如何都須要發送mq消息 * 那麼考慮問題,若是本地執行成功,調用rpc的時候才須要發 * 若是本地異常,則不須要發送mq ,此時mythParticipants爲空 */
if (CollectionUtils.isNotEmpty(mythParticipants)) {
for (MythParticipant mythParticipant : mythParticipants) {
MessageEntity messageEntity =
new MessageEntity(mythParticipant.getTransId(),
mythParticipant.getMythInvocation());
try {
final byte[] message = serializer.serialize(messageEntity);
getMythMqSendService().sendMessage(mythParticipant.getDestination(),
mythParticipant.getPattern(),
message);
} catch (Exception e) {
e.printStackTrace();
return Boolean.FALSE;
}
}
//這裏爲何要這麼作呢? 主要是爲了防止在極端狀況下,發起者執行過程當中,忽然自身down 機
//形成消息未發送,新增一個狀態標記,若是出現這種狀況,經過定時任務發送消息
this.updateStatus(mythTransaction.getTransId(), MythStatusEnum.COMMIT.getCode());
}
return Boolean.TRUE;
}
private synchronized MythMqSendService getMythMqSendService() {
if (mythMqSendService == null) {
synchronized (CoordinatorServiceImpl.class) {
if (mythMqSendService == null) {
mythMqSendService = SpringBeanUtils.getInstance().getBean(MythMqSendService.class);
}
}
}
return mythMqSendService;
}
複製代碼
根據代碼咱們知道,這裏主要是將分佈式消息封裝至MessageEntity中,而後進行序列化發送至mq消息隊列,這裏有兩點要注意:
既然產生了消息,必然會有消費者去消費,咱們回到 myth-demo-springcloud-account工程下的RocketmqConsumer類 , account服務對應topic=「account」, Inventory服務對應的topic=「inventory」, 咱們進入關鍵代碼部分: mythMqReceiveService.processMessage(message);
public Boolean processMessage(byte[] message) {
try {
MessageEntity entity;
try {
entity = serializer.deSerialize(message, MessageEntity.class);
} catch (MythException e) {
e.printStackTrace();
throw new MythRuntimeException(e.getMessage());
}
/* * 1 檢查該事務有沒被處理過,已經處理過的 則不處理 * 2 發起發射調用,調用接口,進行處理 * 3 記錄本地日誌 */
LOCK.lock();
final String transId = entity.getTransId();
final MythTransaction mythTransaction = findByTransId(transId);
//若是是空或者是失敗的
if (Objects.isNull(mythTransaction)
|| mythTransaction.getStatus() == MythStatusEnum.FAILURE.getCode()) {
try {
//設置事務上下文,這個類會傳遞給遠端
MythTransactionContext context = new MythTransactionContext();
//設置事務id
context.setTransId(transId);
//設置爲發起者角色
context.setRole(MythRoleEnum.LOCAL.getCode());
TransactionContextLocal.getInstance().set(context);
executeLocalTransaction(entity.getMythInvocation());
//會進入LocalMythTransactionHandler 那裏有保存
} catch (Exception e) {
e.printStackTrace();
throw new MythRuntimeException(e.getMessage());
} finally {
TransactionContextLocal.getInstance().remove();
}
}
} finally {
LOCK.unlock();
}
return Boolean.TRUE;
}
@SuppressWarnings("unchecked")
private void executeLocalTransaction(MythInvocation mythInvocation) throws Exception {
if (Objects.nonNull(mythInvocation)) {
final Class clazz = mythInvocation.getTargetClass();
final String method = mythInvocation.getMethodName();
final Object[] args = mythInvocation.getArgs();
final Class[] parameterTypes = mythInvocation.getParameterTypes();
final Object bean = SpringBeanUtils.getInstance().getBean(clazz);
MethodUtils.invokeMethod(bean, method, args, parameterTypes);
LogUtil.debug(LOGGER, "Myth執行本地協調事務:{}", () -> mythInvocation.getTargetClass()
+ ":" + mythInvocation.getMethodName());
}
}
複製代碼
消費者在接收到消息後,進行反序列化,拿到transId查詢分佈式事務消息MythTransaction,這裏能查到數據嗎? 答案是確定的,由於前面咱們走服務調用時就已經對事務消息進行了持久化操做,咱們發現這裏須要進行事務狀態判斷, mythTransaction 爲空或者事務狀態爲FAILURE才執行本地協調事務,由於正常接口調用會走一次,因此這裏須要避免重複執行,致使數據不一致。
好了,到此爲止咱們源碼解析部分就所有講解完畢, myth實現是沒有回滾機制的,這裏有別於tcc,也不一樣於2pc, 只要發起者本地事務執行成功,那麼認爲這個事務就必須一直執行下去,直到成功爲止,即便在調用其餘子系統接口出現超時或者本地宕機這種異常狀況,待服務恢復後便會經過調度線程藉助mq把事務消息傳輸給參與者,來達到最終的一致性!
若是你們有任何問題或者建議歡迎溝通 ,歡迎加入QQ羣:162614487 進行交流。