高德打車通用可編排訂單狀態機引擎設計

簡介: 訂單狀態流轉是交易系統的最爲核心的工做,訂單系統每每都會存在狀態多、鏈路長、邏輯複雜的特色,還存在多場景、多類型、多業務維度等業務特性。在保證訂單狀態流轉穩定性的前提下、可擴展性和可維護性是咱們須要重點關注和解決的問題。
image.pngredis

做者 | 亮言
來源 | 阿里技術公衆號spring

一 背景數據庫

訂單狀態流轉是交易系統的最爲核心的工做,訂單系統每每都會存在狀態多、鏈路長、邏輯複雜的特色,還存在多場景、多類型、多業務維度等業務特性。在保證訂單狀態流轉穩定性的前提下、可擴展性和可維護性是咱們須要重點關注和解決的問題。小程序

以高德打車業務的訂單狀態爲例,訂單狀態就有乘客下單、司機接單、司機已到達乘車點、開始行程、行程結束、確認費用、支付成功、訂單取消、訂單關閉等;訂單車型有專車、快車、出租車等幾種車型,而專車又分溫馨型、豪華型、商務型等;業務場景接送機、企業用車、城際拼車等等場景。設計模式

當訂單狀態、類型、場景、以及其餘一些維度組合時,每一種組合均可能會有不一樣的處理邏輯、也可能會存在共性的業務邏輯,這種狀況下代碼中各類if-else確定是不敢想象的。怎麼處理這種"多狀態+多類型+多場景+多維度"的複雜訂單狀態流轉業務,又要保證整個系統的可擴展性和可維護性,本文的解決思路和方案同你們一塊兒探討。服務器

二 實現方案多線程

要解決"多狀態+多類型+多場景+多維度"的複雜訂單狀態流轉業務,咱們從縱向和橫向兩個維度進行設計。縱向主要從業務隔離和流程編排的角度出發解決問題、而橫向主要從邏輯複用和業務擴展的角度解決問題。架構

1 縱向解決業務隔離和流程編排
狀態模式的應用併發

一般咱們處理一個多狀態或者多維度的業務邏輯,都會採用狀態模式或者策略模式來解決,咱們這裏不討論兩種設計模式的異同,其核心其實能夠歸納爲一個詞"分而治之",抽象一個基礎邏輯接口、每個狀態或者類型都實現該接口,業務處理時根據不一樣的狀態或者類型調用對應的業務實現,以到達邏輯相互獨立互不干擾、代碼隔離的目的。框架

這不只僅是從可擴展性和可維護性的角度出發,其實咱們作架構作穩定性、隔離是一種減小影響面的基本手段,相似的隔離環境作灰度、分批發布等,這裏不作擴展。
image.png

/**
 * 狀態機處理器接口
 */
public interface StateProcessor {
    /**
     * 執行狀態遷移的入口
     */
    void action(StateContext context) throws Exception;
}
/**
 * 狀態A對應的狀態處理器
 */
public class StateAProcessor interface StateProcessor {
    /**
     * 執行狀態遷移的入口
     */
    @Override
    public void action(StateContext context) throws Exception {
    }
}

單一狀態或類型能夠經過上面的方法解決,那麼"多狀態+多類型+多場景+多維度"這種組合業務呢,固然也能夠採用這種模式或思路來解決。首先在開發階段經過一個註解@OrderPorcessor將不一樣的維度予以組合、開發出多個對應的具體實現類,在系統運行階段,經過判斷上下文來動態選擇具體使用哪個實現類執行。@OrderPorcessor中分別定義state表明當前處理器要處理的狀態,bizCode和sceneId分別表明業務類型和場景,這兩個字段留給業務進行擴展,好比能夠用bizCode表明產品或訂單類型、sceneId表明業務形態或來源場景等等,若是要擴展多個維度的組合、也能夠用多個維度拼接後的字符串賦值到bizCode和sceneId上。

受限於Java枚舉不能繼承的規範,若是要開發通用的功能、註解中就不能使用枚舉、因此此處只好使用String。

image.png

/**
 * 狀態機引擎的處理器註解標識
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Component
public @interface OrderProcessor {
    /**
     * 指定狀態,state不能同時存在
     */
    String[] state() default {};
    /**
     * 業務
     */
    String[] bizCode() default {};
    /**
     * 場景
     */
    String[] sceneId() default {};
}
/**
* 建立訂單狀態對應的狀態處理器
 */
@OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5")
public class StateCreateProcessor interface StateProcessor {
}

再想一下,由於涉及到狀態流轉,不可能會是一個狀態A只能流轉到狀態B、狀態A可能在不一樣的場景下流轉到狀態B、狀態C、狀態D;還有雖然都是由狀態A流轉到狀態B、可是不一樣的場景處理流程也可能不同,好比都是將訂單從從待支付狀態進行支付、用戶主動發起支付和系統免密支付的流程可能就不同。針對上面這兩種狀況、咱們把這裏的"場景"統一封裝爲"事件(event)",以"事件驅動"的方式來控制狀態的流向,一個狀態遇到一個特定的處理事件來決定該狀態的業務處理流程和最終狀態流向。咱們能夠總結下,其實狀態機模式簡單說就是:基於某些特定業務和場景下,根據源狀態和發生的事件,來執行下一步的流程處理邏輯,並設置一個目標狀態。

這裏有人可能有一些疑問,這個"事件"和上面說的"多場景"、"多維度"有什麼不同。解釋一下,咱們這裏說的是"事件"是一個具體的業務要執行的動做,好比用戶下單是一個業務事件、用戶取消訂單是一個業務事件、用戶支付訂單也是一個業務事件。而"多場景"、"多維度"則是可交由業務自行進行擴展的維度,好比自有標準模式來源的訂單、經過開放平臺API來的訂單、經過第三方標準來源的訂單,某某小程序、某某APP來源能夠定義爲不一樣場景,而接送機、企業用車、拼車等能夠定義爲維度。

image.png

public @interface OrderProcessor {
    /**
     * 指定狀態
     */
    String[] state() default {};
    /**
     * 訂單操做事件
     */
    String event();
    ......
}
/**
 * 訂單狀態遷移事件
 */
public interface OrderStateEvent {
    /**
     * 訂單狀態事件
     */
    String getEventType();
    /**
     * 訂單ID
     */
    String getOrderId();
    /**
     * 若是orderState不爲空,則表明只有訂單是當前狀態才進行遷移
     */
    default String orderState() {
        return null;
    }
    /**
     * 是否要新建立訂單
     */
    boolean newCreate();
}

狀態遷移流程的封裝

在知足了上面說的多維度組合的業務場景、開發多個實現類來執行的狀況,咱們思考執行這些實現類在流程上是否有再次抽象和封裝的地方、以減小研發工做量和儘可能的實現通用流程。咱們通過觀察和抽象,發現每個訂單狀態流轉的流程中,都會有三個流程:校驗、業務邏輯執行、數據更新持久化;因而再次抽象,能夠將一個狀態流轉分爲數據準備(prepare)——>校驗(check)——>獲取下一個狀態(getNextState)——>業務邏輯執行(action)——>數據持久化(save)——>後續處理(after)這六個階段;而後經過一個模板方法將六個階段方法串聯在一塊兒、造成一個有順序的執行邏輯。這樣一來整個狀image.png
清晰和簡單了、可維護性上也獲得的必定的提高。

![上傳中...]()

/**
 * 狀態遷移動做處理步驟
 */
public interface StateActionStep<T, C> {
    /**
     * 準備數據
     */
    default void prepare(StateContext<C> context) {
    }
    /**
     * 校驗
     */
    ServiceResult<T> check(StateContext<C> context);
    /**
     * 獲取當前狀態處理器處理完畢後,所處於的下一個狀態
     */
    String getNextState(StateContext<C> context);
    /**
     * 狀態動做方法,主要狀態遷移邏輯
     */
    ServiceResult<T> action(String nextState, StateContext<C> context) throws Exception;
    /**
     * 狀態數據持久化
     */
    ServiceResult<T> save(String nextState, StateContext<C> context) throws Exception;
    /**
     * 狀態遷移成功,持久化後執行的後續處理
     */
    void after(StateContext<C> context);
}
/**
 * 狀態機處理器模板類
 */
@Component
public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T, C>, StateActionStep<T, C> {
    @Override
    public final ServiceResult<T> action(StateContext<C> context) throws Exception {
        ServiceResult<T> result = null;
        try {
            // 數據準備
            this.prepare(context);
            // 串行校驗器
            result = this.check(context);
            if (!result.isSuccess()) {
                return result;
            }
            // getNextState不能在prepare前,由於有的nextState是根據prepare中的數據轉換而來
            String nextState = this.getNextState(context);
            // 業務邏輯
            result = this.action(nextState, context);
            if (!result.isSuccess()) {
                return result;
            }
            // 持久化
            result = this.save(nextState, context);
            if (!result.isSuccess()) {
                return result;
            }
            // after
            this.after(context);
            return result;
        } catch (Exception e) {
            throw e;
        }
    }
/**
 * 狀態A對應的狀態處理器
 */
@OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5")
public class StateCreateProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
    ......
}

(1)校驗器

上面提到了校驗(check),咱們都知道任何一個狀態的流轉甚至接口的調用其實都少不了一些校驗規則,尤爲是對於複雜的業務、其校驗規則和校驗邏輯也會更加複雜。那麼對於這些校驗規則怎麼解耦呢,既要將校驗邏輯從複雜的業務流程中解耦出來、同時又須要把複雜的校驗規則簡單化,使整個校驗邏輯更具備可擴展性和可維護性。其實作法也比較簡單、參考上面的邏輯,只須要抽象一個校驗器接口checker、把複雜的校驗邏輯拆開、造成多個單一邏輯的校驗器實現類,狀態處理器在調用check時只須要調用一個接口、由校驗器執行多個checker的集合就能夠了。將校驗器checker進行封裝以後,發現要加入一個新的校驗邏輯就十分簡單了,只須要寫一個新的checker實現類加入校驗器就行、對其餘代碼基本沒有改動。

/**
 * 狀態機校驗器
 */
public interface Checker<T, C> {
    ServiceResult<T> check(StateContext<C> context);
    /**
     * 多個checker時的執行順序
     */
    default int order() {
        return 0;
    }
}

邏輯簡單了、擴展性和維護性解決了、性能問題就會顯現出來。多個校驗器checker串行執行性能確定性能比較差,此時很簡單的能夠想到使用並行執行,是的、此處使用多線程並行執行多個校驗器checker能顯著提升執行效率。可是也應該意識到,有些校驗器邏輯多是有先後依賴的(其實不該該出現),還有寫業務流程中要求某些校驗器的執行必須有先後順序,還有些流程不要求校驗器的執行順序可是要求錯誤時的返回順序、那麼怎麼在並行的前提下保證順序呢、此處就能夠用order+Future實現了。通過一系列的思考和總結,咱們把校驗器分爲參數校驗(paramChecker)、同步校驗(syncChecker)、異步校驗(asyncChecker)三種類型,其中參數校驗paramChecker是須要在狀態處理器最開始處執行的,爲何這麼作、由於參數都不合法了確定沒有繼續向下執行的必要了。

image.png

/**
 * 狀態機校驗器
 */
public interface Checkable {
    /**
     * 參數校驗
     */
    default List<Checker> getParamChecker() {
        return Collections.EMPTY_LIST;
    }
    /**
     * 需同步執行的狀態檢查器
     */
    default List<Checker> getSyncChecker() {
        return Collections.EMPTY_LIST;
    }
    /**
     * 可異步執行的校驗器
     */
    default List<Checker> getAsyncChecker() {
        return Collections.EMPTY_LIST;
    }
}
/**
 * 校驗器的執行器
 */
public class CheckerExecutor {
    /**
     * 執行並行校驗器,
     * 按照任務投遞的順序判斷返回。
     */
    public ServiceResult<T, C> parallelCheck(List<Checker> checkers, StateContext<C> context) {
        if (!CollectionUtils.isEmpty(checkers)) {
            if (checkers.size() == 1) {
                return checkers.get(0).check(context);
            }
            List<Future<ServiceResult>> resultList = Collections.synchronizedList(new ArrayList<>(checkers.size()));
            checkers.sort(Comparator.comparingInt(Checker::order));
            for (Checker c : checkers) {
                Future<ServiceResult> future = executor.submit(() -> c.check(context));
                resultList.add(future);
            }
            for (Future<ServiceResult> future : resultList) {
                try {
                    ServiceResult sr = future.get();
                    if (!sr.isSuccess()) {
                        return sr;
                    }
                } catch (Exception e) {
                    log.error("parallelCheck executor.submit error.", e);
                    throw new RuntimeException(e);
                }
            }
        }
        return new ServiceResult<>();
    }
}
checkable在模板方法中的使用。

public interface StateActionStep<T, C> {
    Checkable getCheckable(StateContext<C> context);
    ....
}
public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T>, StateActionStep<T, C> {
    @Resource
    private CheckerExecutor checkerExecutor;
    @Override
    public final ServiceResult<T> action(StateContext<C> context) throws Exception {
        ServiceResult<T> result = null;
        Checkable checkable = this.getCheckable(context);
        try {
            // 參數校驗器
            result = checkerExecutor.serialCheck(checkable.getParamChecker(), context);
            if (!result.isSuccess()) {
                return result;
            }
            // 數據準備
            this.prepare(context);
            // 串行校驗器
            result = checkerExecutor.serialCheck(checkable.getSyncChecker(), context);
            if (!result.isSuccess()) {
                return result;
            }
            // 並行校驗器
            result = checkerExecutor.parallelCheck(checkable.getAsyncChecker(), context);
            if (!result.isSuccess()) {
                return result;
            }
        ......
}
checkable在具體狀態處理器中的代碼應用舉例。

@OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
    @Resource
    private CreateParamChecker createParamChecker;
    @Resource
    private UserChecker userChecker;
    @Resource
    private UnfinshChecker unfinshChecker;
    @Override
    public Checkable getCheckable(StateContext<CreateOrderContext> context) {
        return new Checkable() {
            @Override
            public List<Checker> getParamChecker() {
                return Arrays.asList(createParamChecker);
            }
            @Override
            public List<Checker> getSyncChecker() {
                return Collections.EMPTY_LIST;
            }
            @Override
            public List<Checker> getAsyncChecker() {
                return Arrays.asList(userChecker, unfinshChecker);
            }
        };
    }
......
checker的定位是校驗器,負責校驗參數或業務的合法性,但實際編碼過程當中、checker中可能會有一些臨時狀態類操做,好比在校驗以前進行計數或者加鎖操做、在校驗完成後根據結果進行釋放,這裏就須要支持統一的釋放功能。

public interface Checker<T, C> {
    ......
    /**
     * 是否需求release
     */
    default boolean needRelease() {
        return false;
    }
    /**
     * 業務執行完成後的釋放方法,
     * 好比有些業務會在checker中加一些狀態操做,等業務執行完成後根據結果選擇處理這些狀態操做,
     * 最典型的就是checker中加一把鎖,release根據結果釋放鎖.
     */
    default void release(StateContext<C> context, ServiceResult<T> result) {
    }
}
public class CheckerExecutor {
    /**
     * 執行checker的釋放操做
     */
    public <T, C> void releaseCheck(Checkable checkable, StateContext<C> context, ServiceResult<T> result) {
        List<Checker> checkers = new ArrayList<>();
        checkers.addAll(checkable.getParamChecker());
        checkers.addAll(checkable.getSyncChecker());
        checkers.addAll(checkable.getAsyncChecker());
        checkers.removeIf(Checker::needRelease);
        if (!CollectionUtils.isEmpty(checkers)) {
            if (checkers.size() == 1) {
                checkers.get(0).release(context, result);
                return;
            }
            CountDownLatch latch = new CountDownLatch(checkers.size());
            for (Checker c : checkers) {
                executor.execute(() -> {
                    try {
                        c.release(context, result);
                    } finally {
                        latch.countDown();
                    }
                });
            }
            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

2)上下文

從上面代碼能夠發現,整個狀態遷移的幾個方法都是使用上下文Context對象串聯的。Context對象中一共有三類對象,(1)訂單的基本信息(訂單ID、狀態、業務屬性、場景屬性)、(2)事件對象(其參數基本就是狀態遷移行爲的入參)、(3)具體處理器決定的泛型類。通常要將數據在多個方法中進行傳遞有兩種方案:一個是包裝使用ThreadLocal、每一個方法均可以對當前ThreadLocal進行賦值和取值;另外一種是使用一個上下文Context對象作爲每一個方法的入參傳遞。這種方案都有一些優缺點,使用ThreadLocal實際上是一種"隱式調用",雖然能夠在"隨處"進行調用、可是對使用方其實不明顯的、在中間件中會大量使用、在開發業務代碼中是須要儘可能避免的;而使用Context作爲參數在方法中進行傳遞、能夠有效的減小"不可知"的問題。

無論是使用ThreadLocal仍是Context作爲參數傳遞,對於實際承載的數據載體有兩種方案,常見的是使用Map作爲載體,業務在使用的時候能夠根據須要隨意的設置任何kv,可是這種狀況對代碼的可維護性和可讀性是極大的挑戰,因此這裏使用泛型類來固定數據格式,一個具體的狀態處理流程到底須要對哪些數據作傳遞須要明肯定義好。其實原則是同樣的,業務開發儘可能用用可見性避免不可知。

public class StateContext<C> {
    /**
     * 訂單操做事件
     */
    private OrderStateEvent orderStateEvent;
    /**
     * 狀態機須要的訂單基本信息
     */
    private FsmOrder fsmOrder;
    /**
     * 業務可定義的上下文泛型對象
     */
    private C context;
    public StateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) {
        this.orderStateEvent = orderStateEvent;
        this.fsmOrder = fsmOrder;
    }
    ......
/**
 * 狀態機引擎所需的訂單信息基類信息
 */
public interface FsmOrder {
    /**
     * 訂單ID
     */
    String getOrderId();
    /**
     * 訂單狀態
     */
    String getOrderState();
    /**
     * 訂單的業務屬性
     */
    String bizCode();
    /**
     * 訂單的場景屬性
     */
    String sceneId();
}

(3)遷移到的狀態斷定

爲何要把下一個狀態(getNextState)抽象爲單獨一個步驟、而不是交由業務本身進行設置呢?是由於要遷移到的下一個狀態不必定是固定的,就是說根據當前狀態和發生的事件、再遇到更加細節的邏輯時也可能會流轉到不一樣的狀態。舉個例子,當前狀態是用戶已下單完成、要發生的事件是用戶取消訂單,此時根據不一樣的邏輯,訂單有可能流轉到取消狀態、也有可能流轉到取消待審覈狀態、甚至有可能流轉到取消待支付費用狀態。固然這裏要取決於業務系統對狀態和事件定義的粗細和狀態機的複雜程度,作爲狀態機引擎、這裏把下一個狀態的斷定交由業務根據上下文對象本身來判斷。

getNextState()使用及狀態遷移持久化舉例:

@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
    
    ........
    
    @Override
    public String getNextState(StateContext<CreateOrderContext> context) {
    // if (context.getOrderStateEvent().getEventType().equals("xxx")) {
    //     return OrderStateEnum.INIT;
    //  }
        return OrderStateEnum.NEW;
    }
    @Override
    public ServiceResult<String> save(String nextState, StateContext<CreateOrderContext> context) throws Exception {
        OrderInfo orderInfo = context.getContext().getOrderInfo();
        // 更新狀態
        orderInfo.setOrderState(nextState);
        // 持久化
//        this.updateOrderInfo(orderInfo);
        log.info("save BUSINESS order success, userId:{}, orderId:{}", orderInfo.getUserId(), orderInfo.getOrderId());
        return new ServiceResult<>(orderInfo.getOrderId(), "business下單成功");
    }
}

狀態消息

通常來講,全部的狀態遷移都應該發出對應的消息,由下游消費方訂閱進行相應的業務處理。

(1)狀態消息內容

對於狀態遷移消息的發送內容一般有兩種形式,一個是隻髮狀態發生遷移這個通知、舉例子就是隻發送"訂單ID、變動前狀態、變動後狀態"等幾個關鍵字段,具體下游業務須要哪些具體內容在調用相應的接口進行反查;還有一種是發送全部字段出去、相似於發一個狀態變動後的訂單內容快照,下游接到消息後幾乎不須要在調用接口進行反查。

(2)狀態消息的時序

狀態遷移是有時序的,所以不少下游依賴方也須要判斷消息的順序。一種實現方案是使用順序消息(rocketmq、kafka等),但基於併發吞吐量的考慮不多采用這種方案;通常都是在消息體中加入"消息發送時間"或者"狀態變動時間"字段,有消費方本身進行處理。

(3)數據庫狀態變動和消息的一致性

狀態變動須要和消息保持一致嗎?

不少時候是須要的,若是數據庫狀態變動成功了、可是狀態消息沒有發送出去、則會致使一些下游依賴方處理邏輯的缺失。而咱們知道,數據庫和消息系統是沒法保證100%一致的,咱們要保證的是主要數據庫狀態變動了、消息就要儘可能接近100%的發送成功。

那麼怎麼保證呢?

其實一般確實有幾種方案:

a)使用rocketmq等支持的兩階段式消息提交方式:

先向消息服務器發送一條預處理消息
當本地數據庫變動提交以後、再向消息服務器發送一條確認發送的消息
若是本地數據庫變動失敗、則向消息服務器發送一條取消發送的消息
若是長時間沒有向消息服務器發生確認發送的消息,消息系統則會回調一個提早約定的接口、來查看本地業務是否成功,以此決定是否真正發生消息
image.png

b)使用數據庫事務方案保證:

建立一個消息發送表,將要發送的消息插入到該表中,同本地業務在一個數據庫事務中進行提交
以後在由一個定時任務來輪詢發送、直到發送成功後在刪除當前表記錄
c)仍是使用數據庫事務方案保證:

建立一個消息發送表,將要發送的消息插入到該表中,同本地業務在一個數據庫事務中進行提交
向消息服務器發送消息
發送成功則刪除掉當前表記錄
對於沒有發送成功的消息(也就是表裏面沒有被刪除的記錄),再由定時任務來輪詢發送
image.png

還有其餘方案嗎?有的。

d)數據對帳、發現不一致時進行補償處理、以此保證數據的最終一致。其實無論使用哪一種方案來保證數據庫狀態變動和消息的一致,數據對帳的方案都是"必須"要有的一種兜底方案。

那麼、還有其餘方案嗎?仍是有的,對於數據庫狀態變動和消息的一致性的問題,細節比較多,每種方案又都有相應的優缺點,本文主要是介紹狀態機引擎的設計,對於消息一致性的問題就不過多介紹,後面也許會有單獨的文章對數據庫變動和消息的一致性的問題進行介紹和討論。

2 橫向解決邏輯複用和實現業務擴展
實現基於"多類型+多場景+多維度"的代碼分離治理、以及標準處理流程模板的狀態機模型以後,其實在真正編碼的時候會發現不一樣類型不一樣維度對於同一個狀態的流程處理過程,有時多個處理邏輯中的一部分流程同樣的或者是類似的,好比支付環節無論是採用免密仍是其餘方式、其中核銷優惠券的處理邏輯、設置發票金額的處理邏輯等都是同樣的;甚至有些時候多個類型間的處理邏輯大部分是相同的而差別是小部分,好比下單流程的處理邏輯基本邏輯都差很少,而出租車對比網約車可能就多了出租車紅包、無預估價等個別流程的差別。

對於上面這種狀況、其實就是要實如今縱向解決業務隔離和流程編排的基礎上,須要支持小部分邏輯或代碼段的複用、或者大部分流程的複用,減小重複建設和開發。對此咱們在狀態機引擎中支持兩種解決方案:

基於插件化的解決方案

插件的主要邏輯是:能夠在業務邏輯執行(action)、數據持久化(save)這兩個節點前加載對應到的插件類進行執行,主要是對上下文Context對象進行操做、或者根據Context參數發起不一樣的流程調用,已到達改變業務數據或流程的目的。

(1)標準流程+差別化插件

上面講到同一個狀態模型下、不一樣的類型或維度有些邏輯或處理流程是同樣的小部分邏輯是不一樣的。因而咱們能夠把一種處理流程定義爲標準的或默認的處理邏輯,把差別化的代碼寫成插件,當業務執行到具體差別化邏輯時會調用到不一樣的插件進行處理,這樣只須要爲不一樣的類型或維度編寫對應有差別邏輯的插件便可、標準的處理流程由默認的處理器執行就行。

image.png

(2)差別流程+公用插件

固然對於小部分邏輯和代碼能夠公用的場景,也能夠用插件化的方案解決。好比對於同一個狀態下多個維修下不一樣處理器中、咱們能夠把相同的邏輯或代碼封裝成一個插件,多個處理器中均可以識別加載該插件進行執行,從而實現多個差別的流程使用想用插件的形式。

/**
 * 插件註解
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Component
public @interface ProcessorPlugin {
    /**
     * 指定狀態,state不能同時存在
     */
    String[] state() default {};
    /**
     * 訂單操做事件
     */
    String event();
    /**
     * 業務
     */
    String[] bizCode() default {};
    /**
     * 場景
     */
    String[] sceneId() default {};
}
 * 插件處理器
 */
public interface PluginHandler<T, C> extends StateProcessor<T, C> {
}
Plug在處理器模板中的執行邏輯。

public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T>, StateActionStep<T, C> {
    @Override
    public final ServiceResult<T> action(StateContext<C> context) throws Exception {
        ServiceResult<T> result = null;
        try {
            ......
            // 業務邏輯
            result = this.action(nextState, context);
            if (!result.isSuccess()) {
                return result;
            }
            
            // 在action和save之間執行插件邏輯
            this.pluginExecutor.parallelExecutor(context);
            // 持久化
            result = this.save(nextState, context));
            if (!result.isSuccess()) {
                return result;
            }
            ......
        } catch (Exception e) {
            throw e;
        }
    }
插件使用的例子:

/**
 * 預估價插件
 */
@ProcessorPlugin(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS")
public class EstimatePricePlugin implements PluginHandler<String, CreateOrderContext> {
    @Override
    public ServiceResult action(StateContext<CreateOrderContext> context) throws Exception {
//        String price = priceSerive.getPrice();
        String price = "";
        context.getContext().setEstimatePriceInfo(price);
        return new ServiceResult();
    }
}

基於代碼繼承方式的解決方案

當發現新增一個狀態不一樣維度的處理流程,和當前已存在的一個處理器大部分邏輯是相同的,此時就可使新寫的這個處理器B繼承已存在的處理器A,只須要讓處理器B覆寫A中不一樣方法邏輯、實現差別邏輯的替換。這種方案比較好理解,可是須要處理器A已經規劃好一些能夠擴展的點、其餘處理器能夠基於這些擴展點進行覆寫替換。固然更好的方案實際上是,先實現一個默認的處理器,把全部的標準處理流程和可擴展點進行封裝實現、其餘處理器進行繼承、覆寫、替換就好。

@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "CHEAP")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
    @Override
    public ServiceResult action(String nextState, StateContext<CreateOrderContext> context) throws Exception {
        CreateEvent createEvent = (CreateEvent) context.getOrderStateEvent();
        // 促銷信息信息
        String promtionInfo = this.doPromotion();
        ......
    }
    
    /**
     * 促銷相關擴展點
     */
    protected String doPromotion() {
        return "1";
    }
}
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "TAXI")
public class OrderCreatedProcessor4Taxi extends OrderCreatedProcessor<String, CreateOrderContext>  {
    @Override
    protected String doPromotion() {
        return "taxt1";
    }
}

3 狀態遷移流程的執行流程
狀態機引擎的執行過程

經過上面的介紹,大致明白了怎麼實現狀態流程編排、業務隔離和擴展等等,可是狀態機引擎具體是怎麼把這個過程串聯起來的呢?簡單說、分爲兩個階段:初始化階段和運行時階段。

(1)狀態機引擎初始化階段

首先在代碼編寫階段、根據上面的分析,業務經過實現AbstractStateProcessor模板類、並添加@OrderProcessor註解來實現本身的多個須要的特定狀態處理器。

那麼在系統初始化階段,全部添加了@OrderProcessor註解的實現類都會被spring所管理成爲spring bean,狀態機引擎在經過監聽spring bean的註冊(BeanPostProcessor)來將這些狀態處理器processor裝載到本身管理的容器中。直白來講、這個狀態處理器容器其實就是一個多層map實現的,第一層map的key是狀態(state),第二層map的key是狀態對應的事件(event)、一個狀態能夠有多個要處理的事件,第三層map的key是具體的場景code(也就是bizCode和sceneId的組合),最後的value是AbstractStateProcessor集合。

public class DefaultStateProcessRegistry implements BeanPostProcessor {

/**
 * 第一層key是訂單狀態。
 * 第二層key是訂單狀態對應的事件,一個狀態能夠有多個事件。
 * 第三層key是具體場景code,場景下對應的多個處理器,須要後續進行過濾選擇出一個具體的執行。
*/
    private static Map<String, Map<String, Map<String, List<AbstractStateProcessor>>>> stateProcessMap = new ConcurrentHashMap<>();
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof AbstractStateProcessor && bean.getClass().isAnnotationPresent(OrderProcessor.class)) {
            OrderProcessor annotation = bean.getClass().getAnnotation(OrderProcessor.class);
            String[] states = annotation.state();
            String event = annotation.event();
            String[] bizCodes = annotation.bizCode().length == 0 ? new String[]{"#"} : annotation.bizCode();
            String[] sceneIds = annotation.sceneId().length == 0 ? new String[]{"#"} : annotation.sceneId();
            initProcessMap(states, event, bizCodes, sceneIds, stateProcessMap, (AbstractStateProcessor) bean);
        }
        return bean;
    }
    private <E extends StateProcessor> void initProcessMap(String[] states, String event, String[] bizCodes, String[] sceneIds,
            Map<String, Map<String, Map<String, List<E>>>> map, E processor) {
        for (String bizCode : bizCodes) {
            for (String sceneId : sceneIds) {
                Arrays.asList(states).parallelStream().forEach(orderStateEnum -> {
                    registerStateHandlers(orderStateEnum, event, bizCode, sceneId, map, processor);
                });
            }
        }
    }
    /**
     * 初始化狀態機處理器
     */
    public <E extends StateProcessor> void registerStateHandlers(String orderStateEnum, String event, String bizCode, String sceneId,
                                      Map<String, Map<String, Map<String, List<E>>>> map, E processor) {
        // state維度
        if (!map.containsKey(orderStateEnum)) {
            map.put(orderStateEnum, new ConcurrentHashMap<>());
        }
        Map<String, Map<String, List<E>>> stateTransformEventEnumMap = map.get(orderStateEnum);
        // event維度
        if (!stateTransformEventEnumMap.containsKey(event)) {
            stateTransformEventEnumMap.put(event, new ConcurrentHashMap<>());
        }
        // bizCode and sceneId
        Map<String, List<E>> processorMap = stateTransformEventEnumMap.get(event);
        String bizCodeAndSceneId = bizCode + "@" + sceneId;
        if (!processorMap.containsKey(bizCodeAndSceneId)) {
            processorMap.put(bizCodeAndSceneId, new CopyOnWriteArrayList<>());
        }
        processorMap.get(bizCodeAndSceneId).add(processor);
    }
}

(2)狀態機引擎運行時階段

通過初始化以後,全部的狀態處理器processor都被裝載到容器。在運行時,經過一個入口來發起對狀態機的調用,方法的主要參數是操做事件(event)和業務入參,若是是新建立訂單請求須要攜帶業務(bizCode)和場景(sceneId)信息、若是是已存在訂單的更新狀態機引擎會根據oderId自動獲取業務(bizCode)、場景(sceneId)和當前狀態(state)。以後引擎根據state+event+bizCode+sceneId從狀態處理器容器中獲取到對應的具體處理器processor,從而進行狀態遷移處理。

/**
 * 狀態機執行引擎
 */
public interface OrderFsmEngine {
    /**
     * 執行狀態遷移事件,不傳FsmOrder默認會根據orderId從FsmOrderService接口獲取
     */
    <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent) throws Exception;
    /**
     * 執行狀態遷移事件,可攜帶FsmOrder參數
     */
    <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) throws Exception;
}
@Component
public class DefaultOrderFsmEngine implements OrderFsmEngine {
    @Override
    public <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent) throws Exception {
        FsmOrder fsmOrder = null;
        if (orderStateEvent.newCreate()) {
            fsmOrder = this.fsmOrderService.getFsmOrder(orderStateEvent.getOrderId());
            if (fsmOrder == null) {
                throw new FsmException(ErrorCodeEnum.ORDER_NOT_FOUND);
            }
        }
        return sendEvent(orderStateEvent, fsmOrder);
    }
    @Override
    public <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) throws Exception {
        // 構造當前事件上下文
        StateContext context = this.getStateContext(orderStateEvent, fsmOrder);
        // 獲取當前事件處理器
        StateProcessor<T> stateProcessor = this.getStateProcessor(context);
        // 執行處理邏輯
        return stateProcessor.action(context);
    }
    private <T> StateProcessor<T, ?> getStateProcessor(StateContext<?> context) {
        OrderStateEvent stateEvent = context.getOrderStateEvent();
        FsmOrder fsmOrder = context.getFsmOrder();
        // 根據狀態+事件對象獲取所對應的業務處理器集合
        List<AbstractStateProcessor> processorList = stateProcessorRegistry.acquireStateProcess(fsmOrder.getOrderState(),
                stateEvent.getEventType(), fsmOrder.bizCode(), fsmOrder.sceneId());
        if (processorList == null) {
            // 訂單狀態發生改變
            if (!Objects.isNull(stateEvent.orderState()) && !stateEvent.orderState().equals(fsmOrder.getOrderState())) {
                throw new FsmException(ErrorCodeEnum.ORDER_STATE_NOT_MATCH);
            }
            throw new FsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR);
        }
        if (CollectionUtils.isEmpty(processorResult)) {
            throw new FsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR);
        }
        if (processorResult.size() > 1) {
            throw new FsmException(ErrorCodeEnum.FOUND_MORE_PROCESSOR);
        }
        return processorResult.get(0);
    }
    private StateContext<?> getStateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) {
        StateContext<?> context = new StateContext(orderStateEvent, fsmOrder);
        return context;
    }
}

檢測到多個狀態執行器怎麼處理

有一點要說明,有可能根據state+event+bizCode+sceneId信息獲取到的是多個狀態處理器processor,有可能確實業務須要單純依賴bizCode和sceneId兩個屬性沒法有效識別和定位惟一processor,那麼咱們這裏給業務開一個口、由業務決定從多個處理器中選一個適合當前上下文的,具體作法是業務processor經過filter方法根據當前context來判斷是否符合調用條件。

private <T> StateProcessor<T, ?> getStateProcessor(StateContext<?> context) {
    // 根據狀態+事件對象獲取所對應的業務處理器集合
    List<AbstractStateProcessor> processorList = ...
    ......
    
    List<AbstractStateProcessor> processorResult = new ArrayList<>(processorList.size());
    // 根據上下文獲取惟一的業務處理器
    for (AbstractStateProcessor processor : processorList) {
        if (processor.filter(context)) {
            processorResult.add(processor);
        }
    }
    ......
}

filter在具體狀態處理器processor中的使用舉例:

@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS")
public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> {
    ......
    @Override
    public boolean filter(StateContext<CreateOrderContext> context) {
        OrderInfo orderInfo = (OrderInfo) context.getFsmOrder();
        if (orderInfo.getServiceType() == ServiceType.TAKEOFF_CAR) {
            return true;
        }
        return false;
    }
    ......
}

固然,若是最終通過業務filter以後,仍是有多個狀態處理器符合條件,那麼這裏只能拋異常處理了。這個須要在開發時,對狀態和多維度處理器有詳細規劃。

4 狀態機引擎執行總結

狀態機引擎處理流程

簡易的狀態機引擎的執行流程整理,主要介紹運行時的狀態機執行過程。

image.png

狀態處理器的原理

簡易的狀態機處理器的原理和依賴整理,主要介紹狀態處理器的流程和細節。

image.png

三 其餘

還有其餘問題麼,想一下。

1 狀態流轉併發問題怎麼處理?

若是一個訂單當前是狀態A、此刻從不一樣的維度或入口分別發起了不一樣的事件請求,此時怎麼處理?

好比當前訂單是新建立完成狀態,用戶發起了取消同時客服也發起了取消,在或者訂單是待支付狀態、系統發起了免密支付同時客服或者用戶發起了改價。這些場景無論是系統照成的併發仍是業務操做形成的併發,併發是真實存在的。對於這種狀況、原則是同一時刻一個訂單隻能有一個狀態變動事件可進行,其餘的請求要麼排隊、要麼返回由上游進行處理或重試等。

咱們的作法是:

在狀態機OrderFsmEngine的sendEvent入口處,針對同一個訂單維度加鎖(redis分佈式鎖)、同一時間只容許有一個狀態變動操做進行,其餘請求則進行排隊等待。

在數據庫層對當前state作校驗、相似與樂觀鎖方式。最終是將其餘請求拋錯、由上游業務進行處理。

2 能不能動態實現狀態流程的切換和編排?

最開始咱們有一個版本,狀態處理器的定義不是由註解方式實現、而是將state、event、bizCode、sceneId、processor這些經過數據庫表來保存,初始化時從數據庫加載後進行處理器的裝載。同時經過一個後臺能夠動態的調整state、event、bizCode、sceneId、processor對應關係、以此來達到動態靈活配置流程的效果,可是隨着業務的上線,基本歷來沒有進行動態變動過,其實也不敢操做,畢竟狀態流轉事十分核心的業務、一旦因變動致使故障是不可想象的。

3 通用性的問題

其實不只僅訂單系統、甚至不只是狀態機邏輯能夠用上面講的這些思路處理,不少平常中其餘一些多維度的業務均可以採起這些方案進行處理。

4 與TMF的結合

其實這套狀態機引擎仍是比較簡單的、對於業務擴展點處的定義也不是十分友好,目前咱們也正在結合TMF框架來定製擴展點,TMF是從執行具體擴展點實現的角度出發,達到標準流程和具體業務邏輯分離的效果。

固然無論那種方案,擴展點的定義是業務須要核心關心和友好封裝的事情。

原文連接本文爲阿里雲原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索