Myth源碼解析系列之七- 訂單下單流程源碼解析(參與者)

前面一章咱們走完了訂單下單流程發起者部分的源碼,此次咱們進入參與者部分源碼解析~java

訂單下單流程源碼解析(參與者)

前面order服務中已經發起了對account服務的調用,接下來進入account服務扣款接口的實現部分spring

//order服務調用端
  @PostMapping("/account-service/account/payment")
  @Myth(destination = "account", target = AccountService.class)
  Boolean payment(@RequestBody AccountDTO accountDO);

 //account服務接口實現 AccountServiceImpl.payment(AccountDTO accountDTO)
 @Override
    @Myth(destination = "account")
    public boolean payment(AccountDTO accountDTO) {
        LOGGER.info("============springcloud執行付款接口===============");
        final AccountDO accountDO = accountMapper.findByUserId(accountDTO.getUserId());
        if (accountDO.getBalance().compareTo(accountDTO.getAmount()) <= 0) {
            throw new MythRuntimeException("spring cloud account-service 資金不足!");
        }
        accountDO.setBalance(accountDO.getBalance().subtract(accountDTO.getAmount()));
        accountDO.setUpdateTime(new Date());
        final int update = accountMapper.update(accountDO);
        if (update != 1) {
            throw new MythRuntimeException("spring cloud account-service 資金不足!");
        }
        return Boolean.TRUE;
    }

複製代碼

咱們發如今實現類方法頭部也進行了@Myth 註解的標記,AccountServiceImpl 是一個實現類,所以這裏必然也會走aop切面,aop切面流程入口同order服務相同,區別在於order爲發起方,而account,inventory爲參與者,咱們是否還記得角色判斷代碼實現部分?MythTransactionFactoryServiceImpl.factoryOf 咱們再來回顧下代碼併發

public Class factoryOf(MythTransactionContext context) throws Throwable {
        //若是事務還沒開啓或者 myth事務上下文是空, 那麼應該進入發起調用
        if (!mythTransactionManager.isBegin() && Objects.isNull(context)) {
            return StartMythTransactionHandler.class;
        } else {
            if (context.getRole() == MythRoleEnum.LOCAL.getCode()) {
                return LocalMythTransactionHandler.class;
            }
            return ActorMythTransactionHandler.class;
        }
    }

複製代碼

判斷條件要想進入參與者角色分支,這裏事務必須開啓狀態 或者 myth事務上下文必須有值 ,這兩個條件又是在哪裏進行了設值呢? 咱們往回看看調用處,找到 SpringCloudMythTransactionInterceptor.interceptor(ProceedingJoinPoint pjp) 方法app

@Override
    public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
        MythTransactionContext mythTransactionContext = TransactionContextLocal.getInstance().get();
        if (Objects.nonNull(mythTransactionContext) &&
                mythTransactionContext.getRole() == MythRoleEnum.LOCAL.getCode()) {
            mythTransactionContext = TransactionContextLocal.getInstance().get();
        } else {
            RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
            HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
            String context = request == null ? null : request.getHeader(CommonConstant.MYTH_TRANSACTION_CONTEXT);
            if (StringUtils.isNoneBlank(context)) {
                mythTransactionContext =
                        GsonUtils.getInstance().fromJson(context, MythTransactionContext.class);
            }
        }
        return mythTransactionAspectService.invoke(mythTransactionContext, pjp);
    }
複製代碼

由於第一次進來,顯然mythTransactionContext值爲空,進入else分支,這裏咱們發現是從request請求頭中獲取的事務上下文信息的。 既然是從請求頭信息中拿到數據, 那必然在調用端要先設置對不對, 咱們找到myth-springcloud工程下MythRestTemplateInterceptor框架

//springcloud
@Configuration
public class MythRestTemplateInterceptor implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate requestTemplate) {
        final MythTransactionContext mythTransactionContext =
                TransactionContextLocal.getInstance().get();
        requestTemplate.header(CommonConstant.MYTH_TRANSACTION_CONTEXT,
                GsonUtils.getInstance().toJson(mythTransactionContext));
    }

}

// motan
@Component
public class MotanMythTransactionInterceptor implements MythTransactionInterceptor {

    private final MythTransactionAspectService mythTransactionAspectService;

    @Autowired
    public MotanMythTransactionInterceptor(MythTransactionAspectService mythTransactionAspectService) {
        this.mythTransactionAspectService = mythTransactionAspectService;
    }

    @Override
    public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
        MythTransactionContext mythTransactionContext = null;

        final Request request = RpcContext.getContext().getRequest();
        if (Objects.nonNull(request)) {
            final Map<String, String> attachments = request.getAttachments();
            if (attachments != null && !attachments.isEmpty()) {
                String context = attachments.get(CommonConstant.MYTH_TRANSACTION_CONTEXT);
                mythTransactionContext =
                        GsonUtils.getInstance().fromJson(context, MythTransactionContext.class);
            }
        } else {
            mythTransactionContext = TransactionContextLocal.getInstance().get();
        }

        return mythTransactionAspectService.invoke(mythTransactionContext, pjp);
    }
}

//dubbo
@Component
public class DubboMythTransactionInterceptor implements MythTransactionInterceptor {

    private final MythTransactionAspectService mythTransactionAspectService;

    @Autowired
    public DubboMythTransactionInterceptor(MythTransactionAspectService mythTransactionAspectService) {
        this.mythTransactionAspectService = mythTransactionAspectService;
    }

    @Override
    public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
        final String context = RpcContext.getContext().getAttachment(CommonConstant.MYTH_TRANSACTION_CONTEXT);
        MythTransactionContext mythTransactionContext;
        if (StringUtils.isNoneBlank(context)) {
            mythTransactionContext =
                    GsonUtils.getInstance().fromJson(context, MythTransactionContext.class);
        }else{
            mythTransactionContext= TransactionContextLocal.getInstance().get();
        }
        return mythTransactionAspectService.invoke(mythTransactionContext, pjp);
    }
}
複製代碼

咱們發現是經過實現feign的RequestInterceptor接口來實現mythTransactionContext設置到頭信息中的,這裏dubbo,motan也相似,只是實現方式不一樣。這裏也是實現分佈式事務的最關鍵一部分,經過同一個事務上下文來關聯多子系統之間事務關係,是分佈式事務實現的核心所在。分佈式

接下來咱們進入參與者角色ActorMythTransactionHandler.handleride

public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) throws Throwable {

        try {
            //處理併發問題
            LOCK.lock();
            //先保存事務日誌
            mythTransactionManager.actorTransaction(point, mythTransactionContext);

            //發起調用 執行try方法
            final Object proceed = point.proceed();

            //執行成功 更新狀態爲commit
            mythTransactionManager.updateStatus(mythTransactionContext.getTransId(),
                    MythStatusEnum.COMMIT.getCode());

            return proceed;

        } catch (Throwable throwable) {
            LogUtil.error(LOGGER, "執行分佈式事務接口失敗,事務id:{}", mythTransactionContext::getTransId);
            mythTransactionManager.updateStatus(mythTransactionContext.getTransId(),
                    MythStatusEnum.FAILURE.getCode());
            throw throwable;
        } finally {
            LOCK.unlock();
            TransactionContextLocal.getInstance().remove();
        }
    }
複製代碼

參與者實現比較簡單, 執行業務方法前主要封裝MythTransaction消息(狀態爲:開始,角色爲:參與者),而後進行持久化操做,再執行業務方法,若是成功更新MythTransaction狀態爲:COMMIT,反之狀態爲:FAILURE,到這裏咱們參與者也是走完了 ~~ 那咱們這個流程是否是完了呢? 其實尚未,上一章最後咱們留了一小塊,咱們再來回顧下this

/** * Myth分佈式事務處理接口 * * @param point point 切點 * @param mythTransactionContext myth事務上下文 * @return Object * @throws Throwable 異常 */
   @Override
   public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) throws Throwable {

       try {

           //主要防止併發問題,對事務日誌的寫形成壓力,加了鎖進行處理
           try {
               LOCK.lock();
               mythTransactionManager.begin(point);
           } finally {
               LOCK.unlock();
           }

          return  point.proceed();

       } finally {
           //發送消息
           mythTransactionManager.sendMessage();
           mythTransactionManager.cleanThreadLocal();
           TransactionContextLocal.getInstance().remove();
       }
   }

複製代碼

在走account流程時,其實發起者一直在 point.proceed(); 這裏等待返回結果呢,這裏須要等待orderService.orderPay業務方法所有執行完纔會返回,然而咱們上面才走account一個扣款接口,還有inventory扣減庫存接口,這裏inventory接口與account接口角色都是參與者,流程上是同樣的,只是業務不同而已,這裏也就不作過多介紹了,童鞋們本身過一遍便可。spa

到這裏有童鞋可能就要說了,myth打着是一個基於消息隊列解決分佈式事務框架,可是前面講了這麼多,貌似都未涉及到消息隊列啊, 好了,咱們這就帶大家飛進mq,咱們來看 mythTransactionManager.sendMessage(); 直接進入關鍵代碼部分 CoordinatorServiceImpl.sendMessage 方法線程

public Boolean sendMessage(MythTransaction mythTransaction) {
        final List<MythParticipant> mythParticipants = mythTransaction.getMythParticipants();
            /* * 這裏的這個判斷很重要,不爲空,表示本地的方法執行成功,須要執行遠端的rpc方法 * 爲何呢,由於我會在切面的finally裏面發送消息,意思是切面不管如何都須要發送mq消息 * 那麼考慮問題,若是本地執行成功,調用rpc的時候才須要發 * 若是本地異常,則不須要發送mq ,此時mythParticipants爲空 */
        if (CollectionUtils.isNotEmpty(mythParticipants)) {

            for (MythParticipant mythParticipant : mythParticipants) {
                MessageEntity messageEntity =
                        new MessageEntity(mythParticipant.getTransId(),
                                mythParticipant.getMythInvocation());
                try {
                    final byte[] message = serializer.serialize(messageEntity);
                    getMythMqSendService().sendMessage(mythParticipant.getDestination(),
                            mythParticipant.getPattern(),
                            message);
                } catch (Exception e) {
                    e.printStackTrace();
                    return Boolean.FALSE;
                }
            }
            //這裏爲何要這麼作呢? 主要是爲了防止在極端狀況下,發起者執行過程當中,忽然自身down 機
            //形成消息未發送,新增一個狀態標記,若是出現這種狀況,經過定時任務發送消息
            this.updateStatus(mythTransaction.getTransId(), MythStatusEnum.COMMIT.getCode());
        }
        return Boolean.TRUE;
    }


    private synchronized MythMqSendService getMythMqSendService() {
       if (mythMqSendService == null) {
           synchronized (CoordinatorServiceImpl.class) {
               if (mythMqSendService == null) {
                   mythMqSendService = SpringBeanUtils.getInstance().getBean(MythMqSendService.class);
               }
           }
       }
       return mythMqSendService;
   }
複製代碼

根據代碼咱們知道,這裏主要是將分佈式消息封裝至MessageEntity中,而後進行序列化發送至mq消息隊列,這裏有兩點要注意:

  1. serializer.serialize(messageEntity); serializer對象爲服務啓動時經過spi機制加載注入
  2. mythMqSendService 爲applicationContext.xml 配置的rocketmq

既然產生了消息,必然會有消費者去消費,咱們回到 myth-demo-springcloud-account工程下的RocketmqConsumer類 , account服務對應topic=「account」, Inventory服務對應的topic=「inventory」, 咱們進入關鍵代碼部分: mythMqReceiveService.processMessage(message);

public Boolean processMessage(byte[] message) {
        try {
            MessageEntity entity;
            try {
                entity = serializer.deSerialize(message, MessageEntity.class);
            } catch (MythException e) {
                e.printStackTrace();
                throw new MythRuntimeException(e.getMessage());
            }
            /* * 1 檢查該事務有沒被處理過,已經處理過的 則不處理 * 2 發起發射調用,調用接口,進行處理 * 3 記錄本地日誌 */
            LOCK.lock();

            final String transId = entity.getTransId();
            final MythTransaction mythTransaction = findByTransId(transId);

            //若是是空或者是失敗的
            if (Objects.isNull(mythTransaction)
                    || mythTransaction.getStatus() == MythStatusEnum.FAILURE.getCode()) {
                try {

                    //設置事務上下文,這個類會傳遞給遠端
                    MythTransactionContext context = new MythTransactionContext();

                    //設置事務id
                    context.setTransId(transId);

                    //設置爲發起者角色
                    context.setRole(MythRoleEnum.LOCAL.getCode());

                    TransactionContextLocal.getInstance().set(context);
                    executeLocalTransaction(entity.getMythInvocation());

                    //會進入LocalMythTransactionHandler 那裏有保存

                } catch (Exception e) {
                    e.printStackTrace();
                    throw new MythRuntimeException(e.getMessage());
                } finally {
                    TransactionContextLocal.getInstance().remove();
                }
            }


        } finally {
            LOCK.unlock();
        }
        return Boolean.TRUE;

    }

    @SuppressWarnings("unchecked")
    private void executeLocalTransaction(MythInvocation mythInvocation) throws Exception {
        if (Objects.nonNull(mythInvocation)) {
            final Class clazz = mythInvocation.getTargetClass();
            final String method = mythInvocation.getMethodName();
            final Object[] args = mythInvocation.getArgs();
            final Class[] parameterTypes = mythInvocation.getParameterTypes();
            final Object bean = SpringBeanUtils.getInstance().getBean(clazz);
            MethodUtils.invokeMethod(bean, method, args, parameterTypes);
            LogUtil.debug(LOGGER, "Myth執行本地協調事務:{}", () -> mythInvocation.getTargetClass()
                    + ":" + mythInvocation.getMethodName());
        }
    }
複製代碼

消費者在接收到消息後,進行反序列化,拿到transId查詢分佈式事務消息MythTransaction,這裏能查到數據嗎? 答案是確定的,由於前面咱們走服務調用時就已經對事務消息進行了持久化操做,咱們發現這裏須要進行事務狀態判斷, mythTransaction 爲空或者事務狀態爲FAILURE才執行本地協調事務,由於正常接口調用會走一次,因此這裏須要避免重複執行,致使數據不一致。

好了,到此爲止咱們源碼解析部分就所有講解完畢, myth實現是沒有回滾機制的,這裏有別於tcc,也不一樣於2pc, 只要發起者本地事務執行成功,那麼認爲這個事務就必須一直執行下去,直到成功爲止,即便在調用其餘子系統接口出現超時或者本地宕機這種異常狀況,待服務恢復後便會經過調度線程藉助mq把事務消息傳輸給參與者,來達到最終的一致性!

異常狀況處理機制介紹

  1. order服務異常(此時還未涉及調用account和Inventory服務),order本地事務回滾,account及Inventory服務無需處理。
  2. order服務調用account或Inventory服務超時,account及Inventory服務未接受到請求,此時order會經過MQ將分佈式事務消息投遞給消費者即(account及Inventory服務),account及Inventory消費消息後查詢本地事務消息(此時事務狀態爲開始),並執行本地協調事務,以保證數據一致性。
  3. order服務調用account或Inventory服務超時,account及Inventory服務已接受到請求並處理,此時order仍是會經過MQ將分佈式事務消息投遞給消費者即(account及Inventory服務),account及Inventory消費消息查詢本地事務消息,判斷事務狀態,因前面服務以接收到請求並處理,因此此時事務狀態爲提交,固不會再次執行本地協調事務,所以這裏是支持冪等的。
  4. 如account及Inventory服務已接受到請求處理出現異常,此種狀況會修改事務消息狀態爲:FAILURE,此時用戶可登錄管理後臺查看到異常事務信息,這裏須要用戶自行決定後續處理邏輯,其目的是要保證數據一致性。

若是你們有任何問題或者建議歡迎溝通 ,歡迎加入QQ羣:162614487 進行交流。

相關文章
相關標籤/搜索