Myth源碼解析系列之六- 訂單下單流程源碼解析(發起者)

前面一章咱們走完了服務啓動的源碼,此次咱們進入下單流程的源碼解析~html

訂單下單流程源碼解析(發起者)

首先保證myth-demo-springcloud-order、myth-demo-springcloud-inventory、myth-demo-springcloud-eureka、myth-demo-springcloud-account 服務以正常啓動java

進入源碼分析前,這裏先給你們預熱下,介紹下幾個關鍵部分git

  1. 事務角色
public enum MythRoleEnum {
    /** * Start myth role enum. * 這裏主要爲: orderServer */
    START(1, "發起者"),
    /** * Consumer myth role enum. */
    LOCAL(2, "本地執行"),
    /** * Provider myth role enum. * 這裏主要爲: accountServer, inventoryServer */
    PROVIDER(3, "提供者")
  }
複製代碼
  1. 事務狀態
public enum MythStatusEnum {

    /** * Commit myth status enum. */
    COMMIT(1, "已經提交"),
    /** * Begin myth status enum. */
    BEGIN(2, "開始"),
    /** * Failure myth status enum. */
    FAILURE(4, "失敗")
  }
  這裏主要列了咱們所使用的部分,沒用的忽略
複製代碼

這個東西爲何在這裏先講,主要是爲了讓你們先了解下有這個東西,這樣有助於後續代碼理解 ~~ 正所謂擒賊先擒王,抓住重點部位你就離成功不遠鳥 O(∩_∩)Ogithub

時序圖 spring

時序圖

訂單下單接口入口:http://localhost:8884/swagger-ui.htmlapi

swagger api

輸入: 下單數量count: 1, 金額amount: 100 ,狠狠點 【Try it out!】,發起下單請求, 咱們會進入OrderController.orderPay方法併發

@PostMapping(value = "/orderPay")
@ApiOperation(value = "訂單下單接口(注意這裏模擬的是建立訂單並進行下單扣減庫存等操做)")
public String orderPay(@RequestParam(value = "count") Integer count, @RequestParam(value = "amount") BigDecimal amount) {

    return orderService.orderPay(count, amount);

}

接着進入orderServiceImpl.orderPay 方法
@Override
    public String orderPay(Integer count, BigDecimal amount) {
        final Order order = buildOrder(count, amount);
        final int rows = orderMapper.save(order);

        if (rows > 0) {
            paymentService.makePayment(order);
        }


        return "success";
    }

複製代碼

這裏咱們發現封裝了order對象而後進行了持久化操做,成功後,咱們調用paymentService.makePayment(order); 重點來了,咱們先來瞅瞅paymentService.makePayment方法體的代碼app

@Override
    @Myth(destination = "")
    public void makePayment(Order order) {


        //檢查數據 這裏只是demo 只是demo 只是demo

        final AccountDO accountDO =
                accountClient.findByUserId(order.getUserId());

        if(accountDO.getBalance().compareTo(order.getTotalAmount())<= 0){
            throw new MythRuntimeException("餘額不足!");
        }

        final InventoryDO inventoryDO =
                inventoryClient.findByProductId(order.getProductId());

        if(inventoryDO.getTotalInventory() < order.getCount()){
            throw new MythRuntimeException("庫存不足!");
        }

        order.setStatus(OrderStatusEnum.PAY_SUCCESS.getCode());
        orderMapper.update(order);
        //扣除用戶餘額
        AccountDTO accountDTO = new AccountDTO();
        accountDTO.setAmount(order.getTotalAmount());
        accountDTO.setUserId(order.getUserId());

        accountClient.payment(accountDTO);

        //進入扣減庫存操做
        InventoryDTO inventoryDTO = new InventoryDTO();
        inventoryDTO.setCount(order.getCount());
        inventoryDTO.setProductId(order.getProductId());
        inventoryClient.decrease(inventoryDTO);
    }
複製代碼

方法具體業務咱們暫且先不看,重點咱們關注方法頭部是否是有個@Myth註解,這就是實現分佈式事務的關鍵,分佈式事務主要經過@Myth註解來關聯實現,這是基於aop切面思想,既然這樣那麼一定會有一個切入點,下面咱們找到myth-core工程AbstractMythTransactionAspect類,原來這就是定義@Myth切入點的地方啊~框架

@Aspect
public abstract class AbstractMythTransactionAspect {

    private MythTransactionInterceptor mythTransactionInterceptor;

    public void setMythTransactionInterceptor(MythTransactionInterceptor mythTransactionInterceptor) {
        this.mythTransactionInterceptor = mythTransactionInterceptor;
    }

    @Pointcut("@annotation(com.github.myth.annotation.Myth)")
    public void mythTransactionInterceptor() {

    }

    @Around("mythTransactionInterceptor()")
    public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return mythTransactionInterceptor.interceptor(proceedingJoinPoint);
    }

    /** * spring Order 接口,該值的返回直接會影響springBean的加載順序 * * @return int 類型 */
    public abstract int getOrder();
}
複製代碼

能夠知道Spring實現類的方法凡是加了**@Myth**註解的,在調用的時候,都會進行 mythTransactionInterceptor.interceptor調用。也就是說一個調用鏈中,只要有標記該註解的業務方法,都會被加入到同一組分佈式事務當中來, 其目的就是保證這麼些個業務方法,要麼所有執行成功,反之所有不執行~分佈式

AbstractMythTransactionAspect是一個抽象類,下面咱們看看它的實現

圖片發自簡書App

咱們發現針對每種rpc都對應着有一個實現類,下面咱們來看

@Aspect
@Component
                                    public class SpringCloudMythTransactionAspect extends AbstractMythTransactionAspect implements Ordered {

    @Autowired
    public SpringCloudMythTransactionAspect(SpringCloudMythTransactionInterceptor springCloudMythTransactionInterceptor) {
        this.setMythTransactionInterceptor(springCloudMythTransactionInterceptor);
    }

    public void init() {
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}
複製代碼

咱們注意到它實現了Spring的Ordered接口,並重寫了 getOrder 方法,返回了 Ordered.HIGHEST_PRECEDENCE 那麼能夠知道,他是優先級最高的切面,這裏dubbo,motan也是同樣。 咱們再來看MythTransactionInterceptor 接口,也是相似,springcloud、dubbo、motan都對應一個實現類

圖片發自簡書App

看完這裏咱們知道,咱們知道此時代碼不會直接進入paymentService.makePayment方法,而是先進入切面SpringCloudMythTransactionInterceptor,代碼以下

@Override
    public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
        MythTransactionContext mythTransactionContext = TransactionContextLocal.getInstance().get();
        if (Objects.nonNull(mythTransactionContext) &&
                mythTransactionContext.getRole() == MythRoleEnum.LOCAL.getCode()) {
            mythTransactionContext = TransactionContextLocal.getInstance().get();
        } else {
            RequestAttributes requestAttributes = RequestContextHolder.currentRequestAttributes();
            HttpServletRequest request = requestAttributes == null ? null : ((ServletRequestAttributes) requestAttributes).getRequest();
            String context = request == null ? null : request.getHeader(CommonConstant.MYTH_TRANSACTION_CONTEXT);
            if (StringUtils.isNoneBlank(context)) {
                mythTransactionContext =
                        GsonUtils.getInstance().fromJson(context, MythTransactionContext.class);
            }
        }
        return mythTransactionAspectService.invoke(mythTransactionContext, pjp);
    }
複製代碼

走到這裏,由於第一次進來這些變量都沒有值(按理說第一次通常都記憶深入啊~~),因此咱們會直接進入mythTransactionAspectService.invoke(mythTransactionContext, pjp), 此時mythTransactionContext爲null, 接下來咱們進入MythTransactionAspectServiceImpl.invoke方法

public Object invoke(MythTransactionContext mythTransactionContext, ProceedingJoinPoint point) throws Throwable {
        final Class clazz = mythTransactionFactoryService.factoryOf(mythTransactionContext);
        final MythTransactionHandler mythTransactionHandler =
                (MythTransactionHandler) SpringBeanUtils.getInstance().getBean(clazz);
        return mythTransactionHandler.handler(point, mythTransactionContext);
    }
複製代碼

而後再進入MythTransactionFactoryServiceImpl.factoryOf

/** * 返回 實現TxTransactionHandler類的名稱 * * @param context 事務上下文 * @return Class<T> * @throws Throwable 拋出異常 */
    @Override
    public Class factoryOf(MythTransactionContext context) throws Throwable {
        //若是事務還沒開啓或者 myth事務上下文是空, 那麼應該進入發起調用
        if (!mythTransactionManager.isBegin() && Objects.isNull(context)) {
            return StartMythTransactionHandler.class;
        } else {
            if (context.getRole() == MythRoleEnum.LOCAL.getCode()) {
                return LocalMythTransactionHandler.class;
            }
            return ActorMythTransactionHandler.class;
        }
    }
複製代碼

重點來了,前面咱們介紹了分佈式事務角色,這裏就是判斷分佈式事務角色的入口,根據判斷咱們進入發起者角色,也就是StartMythTransactionHandler.handler方法

/** * Myth分佈式事務處理接口 * * @param point point 切點 * @param mythTransactionContext myth事務上下文 * @return Object * @throws Throwable 異常 */
    @Override
    public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) throws Throwable {
        try {
            //主要防止併發問題,對事務日誌的寫形成壓力,加了鎖進行處理
            try {
                LOCK.lock();
                mythTransactionManager.begin(point);
            } finally {
                LOCK.unlock();
            }
           return  point.proceed();
        } finally {
            //發送消息
            mythTransactionManager.sendMessage();
            mythTransactionManager.cleanThreadLocal();
            TransactionContextLocal.getInstance().remove();
        }
    }
複製代碼

接下來進入mythTransactionManager.begin(point);

public MythTransaction begin(ProceedingJoinPoint point) {
        LogUtil.debug(LOGGER, () -> "開始執行Myth分佈式事務!start");
        MythTransaction mythTransaction = getCurrentTransaction();
        if (Objects.isNull(mythTransaction)) {

            MethodSignature signature = (MethodSignature) point.getSignature();
            Method method = signature.getMethod();

            Class<?> clazz = point.getTarget().getClass();

            mythTransaction = new MythTransaction();
            mythTransaction.setStatus(MythStatusEnum.BEGIN.getCode());
            mythTransaction.setRole(MythRoleEnum.START.getCode());
            mythTransaction.setTargetClass(clazz.getName());
            mythTransaction.setTargetMethod(method.getName());
        }
        //保存當前事務信息
        coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, mythTransaction));

        //當前事務保存到ThreadLocal
        CURRENT.set(mythTransaction);

        //設置myth事務上下文,這個類會傳遞給遠端
        MythTransactionContext context = new MythTransactionContext();

        //設置事務id
        context.setTransId(mythTransaction.getTransId());

        //設置爲發起者角色
        context.setRole(MythRoleEnum.START.getCode());

        TransactionContextLocal.getInstance().set(context);

        return mythTransaction;

    }
複製代碼

這個方法主要作兩件事,首先封裝分佈式事務消息(MythTransaction)進行持久化(這裏重點關注下mythTransaction.setStatus(MythStatusEnum.BEGIN.getCode事務狀態爲開始, mythTransaction.setRole(MythRoleEnum.START.getCode());事務角色爲發起者,後續會用到),而後再設置MythTransactionContext事務上下文,這個主要用來傳輸給遠端服務,這裏遠端服務能夠理解爲事務參與方。(注意這裏兩個消息對象都分別放到了ThreadLocal變量中)

關注下持久化操做是如何作的?

//保存當前事務信息
        coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, mythTransaction));

//跟蹤進去最後執行的是下面這段代碼
        @Override
           public Boolean submit(CoordinatorAction coordinatorAction) {
               try {
                   QUEUE.put(coordinatorAction);
               } catch (InterruptedException e) {
                   e.printStackTrace();
                   return Boolean.FALSE;
               }
               return Boolean.TRUE;
           }

複製代碼

咱們發現消息是發送給一個QUEUE隊列,你們還記得以前講服務啓動源碼解析,專門開了一個線程池任務來消費QUEUE隊列作消息持久化操做,對的,消息就是在這裏放進去的。

好了,到此咱們begin方法已經走完, 下面會調用point.proceed就正式進入到業務方法paymentService.makePayment中,這個方法裏主要也是作兩件事,一個調用Account服務進行帳戶餘額扣減,另外一個調用Inventory服務進行庫存扣減,這兩個是經過調用接口來實現, 關鍵代碼以下

accountClient.payment(accountDTO);
//AccountClient.payment方法定義
  /** * 用戶帳戶付款 * * @param accountDO 實體類 * @return true 成功 */
   @PostMapping("/account-service/account/payment")
   @Myth(destination = "account", target = AccountService.class)
   Boolean payment(@RequestBody AccountDTO accountDO);

    inventoryClient.decrease(inventoryDTO);
//InventoryClient.decrease方法定義
/** * 庫存扣減 * * @param inventoryDTO 實體對象 * @return true 成功 */
@Myth(destination = "inventory",target = InventoryService.class)
@RequestMapping("/inventory-service/inventory/decrease")
Boolean decrease(@RequestBody InventoryDTO inventoryDTO);
複製代碼

注意到麼,帳戶扣款和庫存扣減方法都標記有@Myth註解,而另外兩個查詢方法是沒有的。咱們知道springAop的特性,在接口上加註解,是沒法進入切面的,因此咱們在這裏,要採用rpc框架的某些特性來幫助咱們獲取到 @Myth註解信息, 這一步很重要。這裏咱們演示的是springcloud,因此進入myth-springcloud工程的MythFeignHandler類。(其中dubbo,與motan這部分實現分別對應:DubboMythTransactionFilter 和 MotanMythTransactionFilter 類, 都是經過框架自身過濾器特性來實現, 邏輯與springcloud同樣,只是實現上有少量差異~
這裏重點提一下dubbo,dubbo是增長了動態代理方式來處理的,由於當(參與者)服務宕機後,zk心跳檢測失敗,所以調用者不會走到filter就會返回,因此須要增長代理方式在發起遠程調用以前來獲取事務消息並進行存儲,避免事務信息丟失而致使後續不能成功向參與者發起(MQ)本地事務執行動做。dubbo代理類實現:MythInvokerInvocationHandler ,其餘rpc實現童鞋們一看便知, 再也不過於贅述~)

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (Object.class.equals(method.getDeclaringClass())) {
            return method.invoke(this, args);
        } else {

            final Myth myth = method.getAnnotation(Myth.class);
            if (Objects.isNull(myth)) {
                return this.handlers.get(method).invoke(args);
            }
            try {
                final MythTransactionManager mythTransactionManager =
                        SpringBeanUtils.getInstance().getBean(MythTransactionManager.class);

                final MythParticipant participant = buildParticipant(myth, method, args);
                if (Objects.nonNull(participant)) {
                    mythTransactionManager.registerParticipant(participant);
                }
                return this.handlers.get(method).invoke(args);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
                return new Object();
            }


        }
    }


    private MythParticipant buildParticipant(Myth myth, Method method, Object[] args) {

        final MythTransactionContext mythTransactionContext =
                TransactionContextLocal.getInstance().get();
        MythParticipant participant;
        if (Objects.nonNull(mythTransactionContext)) {

            final Class declaringClass = myth.target();

            MythInvocation mythInvocation = new MythInvocation(declaringClass,
                    method.getName(),
                    method.getParameterTypes(), args);

            final Integer pattern = myth.pattern().getCode();

            //封裝調用點
            participant = new MythParticipant(
                    mythTransactionContext.getTransId(),
                    myth.destination(),
                    pattern,
                    mythInvocation);

            return participant;
        }
        return null;
    }
複製代碼

從源碼得知,只有方法上打了@Myth註解的纔會進入後續邏輯,不然直接執行返回,咱們詳細看下buildParticipant(myth, method, args);方法,首先從TransactionContextLocal.getInstance().get獲取事務上下文對象MythTransactionContext,這裏有值嗎? 是的由於前面咱們已經設置值了,因此這裏能拿到值,這裏你們注意下destination,target這兩個屬性, destination這是就是消息中間件使用的隊列名稱,不一樣服務定義不一樣隊列來發送消息,target爲目標業務class,獲取到相關值後對MythParticipant進行封裝並返回,接下來咱們繼續往下走,進入MythTransactionManager.registerParticipant

public void registerParticipant(MythParticipant participant) {
       final MythTransaction transaction = this.getCurrentTransaction();
       transaction.registerParticipant(participant);
       coordinatorService.updateParticipant(transaction);
}
複製代碼

咱們發現這裏其實就是對前面MythTransaction持久化的一個更新操做,主要更新調用其餘子系統接口須要執行的class,method等信息,這個消息後面會經過mq進行投遞,其餘子系統經過消費來進行本地執行。

好了,到此咱們已經走完了發起者大部分了,緊接着就是調用執行account,Inventory服務了,也就是參與者部分,這裏咱們發現還有finally快部分代碼未走完, 稍稍休息下,詳情請見下回分解~~

你們有任何問題或者建議歡迎溝通 ,歡迎加入QQ羣:162614487 進行交流

相關文章
相關標籤/搜索