Spring StateMachine 狀態機引擎在項目中的應用(二)--持久化

背景

每次用到的時候新建立一個狀態機,太奢侈了,官方文檔裏面也提到過這點。java

並且建立出來的實例,其狀態也跟當前訂單的不符;spring statemachine暫時不支持每次建立時指定當前狀態,因此對狀態機引擎實例的持久化,就成了必需要考慮的問題。(不過在後續版本有直接指定狀態的方式,這個後面會寫)redis

擴展一下

這裏擴展說明一下,狀態機引擎的持久化一直是比較容易引發討論的,由於不少場景並不但願再多存儲一些中間非業務數據,以前在淘寶工做時,淘寶的訂單系統tradeplatform本身實現了一套workflowEngine,其實說白了也就是一套狀態機引擎,全部的配置都放在xml中,每次每一個環節的請求過來,都會從新建立一個狀態機引擎實例,並根據當前的訂單狀態來設置引擎實例的狀態。spring

workflowEngine沒有作持久化,私下裏猜想下這樣實現的緣由:一、淘係數據量太大,一天幾千萬筆訂單,額外的信息存儲就要耗費不少存儲資源;二、徹底自主開發的狀態機引擎,可定製化比較強,根據本身的業務須要能夠按本身的須要處理。apache

而反過來,spring statemachine並不支持隨意指定初始狀態,每次建立都是固定的初始化狀態,其實也只是有好處的,標準版流程,並且能夠保證安全,每一個節點都是按照事先定義好的流程跑下來,而不是隨意指定。因此,狀態機引擎實例的持久化,咱們此次的主題,那就繼續聊下去吧。數組

持久化

spring statemachine 自己支持了內存、redis及db的持久化,內存持久化就不說了,看源碼實現就是放在了hashmap裏,平時也沒誰項目中能夠這麼奢侈,啥啥都放在內存中,並且一旦重啓…..😓。下面詳細說下利用redis進行的持久化操做。緩存

依賴引入

spring statemachine 自己是提供了一個redis存儲的組件的,在1.2.10.RELEASE版本中,這個組件須要經過依賴引入,同時須要引入的還有序列化的組件kyro、data-common:安全

gradle引入依賴 (build.gradle 或者 libraries.gradle,由本身項目的gradle組織方式來定):markdown

compile 'org.springframework.statemachine:spring-statemachine-core:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-data-common:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-kyro:1.2.10.RELEASE'
compile 'org.springframework.statemachine:spring-statemachine-redis:1.2.10.RELEASE'複製代碼

固然若是是maven的話,同樣的,pom.xml以下:mybatis

<dependencies>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-core</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-data-common</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-kyro</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.statemachine</groupId>
        <artifactId>spring-statemachine-redis</artifactId>
        <version>1.2.10.RELEASE</version>
    </dependency>
</dependencies>複製代碼

先把持久化的調用軌跡說明下

spring-statemachine-持久化.png

說明:dom

spring statemachine持久化時,採用了三層結構設計,persister —>persist —>repository。

  • 其中persister中封裝了write和restore兩個方法,分別用於持久化寫及反序列化讀出。
  • persist只是一層皮,主要仍是調用repository中的實際實現;可是在這裏,因爲redis存儲不保證百分百數據安全,因此我實現了一個自定義的persist,其中封裝了數據寫入db、從db中讀取的邏輯。
  • repository中作了兩件事兒
    • 序列化/反序列化數據,將引擎實例與二進制數組互相轉換
    • 讀、寫redis
詳細的實現

Persister

import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.statemachine.redis.RedisStateMachinePersister;

@Configuration
public class BizOrderRedisStateMachinePersisterConfig {

    @Autowired
    private StateMachinePersist bizOrderRedisStateMachineContextPersist;

    @Bean(name = "bizOrderRedisStateMachinePersister",autowire = Autowire.BY_TYPE)
    public StateMachinePersister<BizOrderStatusEnum, BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister() {
        return new RedisStateMachinePersister<>(bizOrderRedisStateMachineContextPersist);
    }

}複製代碼

這裏採用官方samples中初始化的方式,經過@Bean註解來建立一個RedisStateMachinePersister實例,注意其中傳遞進去的Persist爲自定義的bizOrderRedisStateMachineContextPersist

Persist

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageHeaders;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.kryo.MessageHeadersSerializer;
import org.springframework.statemachine.kryo.StateMachineContextSerializer;
import org.springframework.statemachine.kryo.UUIDSerializer;
import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Base64;
import java.util.UUID;

@Component("bizOrderRedisStateMachineContextPersist")
public class BizOrderRedisStateMachineContextPersist implements StateMachinePersist<BizOrderStatusEnum, BizOrderStatusChangeEventEnum, String> {

    @Autowired
    @Qualifier("redisStateMachineContextRepository")
    private RedisStateMachineContextRepository<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> redisStateMachineContextRepository;

    @Autowired
    private BizOrderStateMachineContextRepository bizOrderStateMachineContextRepository;

    //  加入存儲到DB的數據repository, biz_order_state_machine_context表結構:
    //  bizOrderId
    //  contextStr
    //  curStatus
    //  updateTime

    /**
     * Write a {@link StateMachineContext} into a persistent store
     * with a context object {@code T}.
     *
     * @param context    the context
     * @param contextObj the context ojb
     * @throws Exception the exception
     */
    @Override
    @Transactional
    public void write(StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context, String contextObj) throws Exception {

        redisStateMachineContextRepository.save(context, contextObj);
        //  save to db
        BizOrderStateMachineContext queryResult = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);

        if (null == queryResult) {
            BizOrderStateMachineContext bosmContext = new BizOrderStateMachineContext(contextObj,
                    context.getState().getStatus(), serialize(context));
            bizOrderStateMachineContextRepository.insertSelective(bosmContext);
        } else {
            queryResult.setCurOrderStatus(context.getState().getStatus());
            queryResult.setContext(serialize(context));
            bizOrderStateMachineContextRepository.updateByPrimaryKeySelective(queryResult);
        }
    }

    /**
     * Read a {@link StateMachineContext} from a persistent store
     * with a context object {@code T}.
     *
     * @param contextObj the context ojb
     * @return the state machine context
     * @throws Exception the exception
     */
    @Override
    public StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> read(String contextObj) throws Exception {

        StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context = redisStateMachineContextRepository.getContext(contextObj);
        //redis 訪緩存擊穿
        if (null != context && BizOrderConstants.STATE_MACHINE_CONTEXT_ISNULL.equalsIgnoreCase(context.getId())) {
            return null;
        }
        //redis 爲空走db
        if (null == context) {
            BizOrderStateMachineContext boSMContext = bizOrderStateMachineContextRepository.selectByOrderId(contextObj);
            if (null != boSMContext) {
                context = deserialize(boSMContext.getContext());
                redisStateMachineContextRepository.save(context, contextObj);
            } else {
                context = new StateMachineContextIsNull();
                redisStateMachineContextRepository.save(context, contextObj);
            }
        }
        return context;
    }

    private String serialize(StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> context) throws UnsupportedEncodingException {
        Kryo kryo = kryoThreadLocal.get();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Output output = new Output(out);
        kryo.writeObject(output, context);
        output.close();
        return Base64.getEncoder().encodeToString(out.toByteArray());
    }

    @SuppressWarnings("unchecked")
    private StateMachineContext<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> deserialize(String data) throws UnsupportedEncodingException {
        if (StringUtils.isEmpty(data)) {
            return null;
        }
        Kryo kryo = kryoThreadLocal.get();
        ByteArrayInputStream in = new ByteArrayInputStream(Base64.getDecoder().decode(data));
        Input input = new Input(in);
        return kryo.readObject(input, StateMachineContext.class);
    }

    private static final ThreadLocal<Kryo> kryoThreadLocal = new ThreadLocal<Kryo>() {

        @SuppressWarnings("rawtypes")
        @Override
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            kryo.addDefaultSerializer(StateMachineContext.class, new StateMachineContextSerializer());
            kryo.addDefaultSerializer(MessageHeaders.class, new MessageHeadersSerializer());
            kryo.addDefaultSerializer(UUID.class, new UUIDSerializer());
            return kryo;
        }
    };
}複製代碼

說明:

  1. 若是隻是持久化到redis中,那麼BizOrderStateMachineContextRepository相關的全部內容都可刪除。不過因爲redis沒法承諾百分百的數據安全,因此我這裏作了兩層持久化,redis+db
  2. 存入redis中的數據默認採用kryo來序列化及反序列化,RedisStateMachineContextRepository中實現了對應代碼。可是spring statemachine默認的db存儲比較複雜,須要建立多張表,參加下圖:

jpa-table.png

這裏須要額外建立5張表,分別存儲ActionGuardStateStateMachineTransition,比較複雜。

  1. 因此這裏建立了一張表bizorderstatemachinecontext,結構很簡單:bizOrderId,contextStr,curStatus,updateTime,其中關鍵是contextStr,用於存儲與redis中相同的內容
Repository

有兩個repository,一個是spring statemachine提供的redisRepo,另外一個則是項目中基於mybatis的repo,先是db-repo:

import org.apache.ibatis.annotations.Param;
   import org.springframework.data.domain.Pageable;
   import org.springframework.stereotype.Repository;
   
   import java.util.List;
   
   @Repository
   public interface BizOrderStateMachineContextRepository {
   
         int deleteByPrimaryKey(Long id);
       
       BizOrderStateMachineContext selectByOrderId(String bizOrderId);
       
       int updateByPrimaryKey(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       int updateByPrimaryKeySelective(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       int insertSelective(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       int selectCount(BizOrderStateMachineContext BizOrderStateMachineContext);
   
       List<BizOrderStateMachineContext> selectPage(@Param("BizOrderStateMachineContext") BizOrderStateMachineContext BizOrderStateMachineContext, @Param("pageable") Pageable pageable);
       
   }複製代碼

而後是redisRepo

import org.springframework.beans.factory.annotation.Autowire;
   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.context.annotation.Bean;
   import org.springframework.context.annotation.Configuration;
   import org.springframework.data.redis.connection.RedisConnectionFactory;
   import org.springframework.statemachine.redis.RedisStateMachineContextRepository;
   
   @Configuration
   public class BizOrderRedisStateMachineRepositoryConfig {
   
       /**
        * 接入asgard後,redis的connectionFactory能夠經過serviceName + InnerConnectionFactory來注入
        */
       @Autowired
       private RedisConnectionFactory finOrderRedisInnerConnectionFactory;
   
       @Bean(name = "redisStateMachineContextRepository", autowire = Autowire.BY_TYPE)
       public RedisStateMachineContextRepository<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> redisStateMachineContextRepository() {
   
           return new RedisStateMachineContextRepository<>(finOrderRedisInnerConnectionFactory);
       }
   }
   複製代碼

使用方式

@Autowired
    @Qualifier("bizOrderRedisStateMachinePersister")
    private StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister;
   
   ......
     bizOrderRedisStateMachinePersister.persist(stateMachine, request.getBizCode());
   ......
     StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
                   =      bizOrderRedisStateMachinePersister.restore(srcStateMachine,statusRequest.getBizCode());
   ......複製代碼

支持,關於spring statemachine的持久化就交代完了,下面就是最關鍵的,怎麼利用狀態機來串聯業務,下一節將會詳細描述。

相關文章
相關標籤/搜索