基於可靠消息方案的分佈式事務(四):接入Lottor服務

上一篇文章中,經過Lottor Sample介紹了快速體驗分佈式事務Lottor。本文將會介紹如何將微服務中的生產方和消費方服務接入Lottor。java

場景描述

  • 生產方:User服務
  • 消費方:Auth服務
  • 事務管理方:Lottor Server

Lottor-Samples中的場景爲:客戶端調用User服務建立一個用戶,用戶服務的user表中增長了一條用戶記錄。除此以外,還會調用Auth服務建立該用戶對應的角色和權限信息。mysql

咱們經過上面的請求流程圖入手,介紹接入Lottor服務。當您啓動好docker-compose中的組件時,會建立好兩個服務對應的user和auth數據庫。其中User和Auth服務所須要的初始化數據已經準備好,放在各自的classpath下,服務在啓動時會自動初始化數據庫,所須要的預置數據(如角色、權限信息)也放在sql文件中。git

Lottor客戶端API

Lottor Client中提供了一個ExternalNettyService接口,用以發送三類消息到Lottor Server:github

  • 預提交消息
  • 確認提交消息
  • 消費完成消息
public interface ExternalNettyService {

    /**
     * pre-commit msgs
     *
     * @param preCommitMsgs
     */
    public Boolean preSend(List<TransactionMsg> preCommitMsgs);

    /**
     * confirm msgs
     *
     * @param success
     */
    public void postSend(Boolean success, Object message);

    /**
     * msgs after consuming
     *
     * @param msg
     * @param success
     */
    public void consumedSend(TransactionMsg msg, Boolean success);
}
複製代碼

預發送#preSend的入參爲預提交的消息列表,一個生產者可能有對應的多個消費者;確認提交#postSend的入參爲生產方本地事務執行的狀態,若是失敗,第二個參數記錄異常信息;#consumedSend爲消費方消費成功的發送的異步消息,第一個入參爲其接收到的事務消息,第二個爲消費的狀態。redis

事務消息TransactionMsg

public class TransactionMsg implements Serializable {
    /** * 用於消息的追溯 */
    private String groupId;

    /** * 事務消息id */
    @NonNull
    private String subTaskId;

    /** * 源服務,即調用發起方 */
    private String source;

    /** * 目標方服務 */
    private String target;

    /** * 執行的方法,適配成枚舉 */
    private String method;

    /** * 參數,即要傳遞的內容,能夠爲null */
    private Object args;

    /** * 建立時間 */
    private Long createTime = Timestamp.valueOf(DateUtils.getCurrentDateTime()).getTime();

    /** * 操做結果信息 */
    private String message;

    /** * 更新時間 */
    private Long updateTime;

    /** * 是否消費,默認爲否 * * {@linkplain com.blueskykong.lottor.common.enums.ConsumedStatus} */
    private int consumed = ConsumedStatus.UNCONSUMED.getStatus();
	 
	 ...
}
複製代碼

在構建事務消息時,事務消息id、源服務、目標服務、目標方法和目標方法的傳參args都是必不可少的。消費方消費完以後,將會設置consumed的狀態,出現異常將會設置異常message信息。spring

生產方-User服務

建立用戶時,須要建立對應的角色。生產方接入分爲三步:sql

  • 發送預提交消息
  • 執行本地事務
  • 發送確認提交的消息

引入依賴

首先,須要引入Lottor客戶端的依賴:docker

<dependency>
        <groupId>com.blueskykong</groupId>
        <artifactId>lottor-starter</artifactId>
        <version>2.0.0-SNAPSHOT</version>
    </dependency>
複製代碼

發起調用

UserService中定義了建立用戶的方法,咱們須要在執行本地事務以前,構造事務消息並預發送到Lottor Server(對應流程圖中的步驟1)。若是遇到預發送失敗,則直接中止本地事務的執行。若是本地事務執行成功(對應步驟3),則發送confirm消息,不然發送回滾消息到Lottor Server(對應步驟4)。數據庫

@Service
public class UserServiceImpl implements UserService {

    private static final Logger LOGGER = LoggerFactory.getLogger(UserServiceImpl.class);

	 //注入ExternalNettyService
    @Autowired
    private ExternalNettyService nettyService;

    @Autowired
    private UserMapper userMapper;

    @Override
    @Transactional
    public Boolean createUser(UserEntity userEntity, StateEnum flag) {
        UserRoleDTO userRoleDTO = new UserRoleDTO(RoleEnum.ADMIN, userEntity.getId());
		 //構造消費方的TransactionMsg
        TransactionMsg transactionMsg = new TransactionMsg.Builder()
                .setSource(ServiceNameEnum.TEST_USER.getServiceName())
                .setTarget(ServiceNameEnum.TEST_AUTH.getServiceName())
                .setMethod(MethodNameEnum.AUTH_ROLE.getMethod())
                .setSubTaskId(IdWorkerUtils.getInstance().createUUID())
                .setArgs(userRoleDTO)
                .build();

        if (flag == StateEnum.CONSUME_FAIL) {
            userRoleDTO.setUserId(null);
            transactionMsg.setArgs(userRoleDTO);
        }

        //發送預處理消息
        if (!nettyService.preSend(Collections.singletonList(transactionMsg))) {
            return false;//預發送失敗,本地事務中止執行
        }

        //local transaction本地事務
        try {
            LOGGER.debug("執行本地事務!");
            if (flag != StateEnum.PRODUCE_FAIL) {
                userMapper.saveUser(userEntity);
            } else {
                userMapper.saveUserFailure(userEntity);
            }
        } catch (Exception e) {
        	  //本地事務異常,發送回滾消息
            nettyService.postSend(false, e.getMessage());
            LOGGER.error("執行本地事務失敗,cause is 【{}】", e.getLocalizedMessage());
            return false;
        }
        //發送確認消息
        nettyService.postSend(true, null);
        return true;
    }

}
複製代碼

代碼如上所示,實現不是很複雜。本地事務執行前,必然已經成功發送了預提交消息,當本地事務執行成功,Lottor Client將會記錄本地事務執行的狀態,避免異步發送的確認消息的丟失,便於後續的Lottor Server回查。緩存

配置文件

lottor:
 enabled: true
 core:
 cache: true  
 cache-type: redis
 tx-redis-config:
 host-name: localhost
 port: 6379
 serializer: kryo
 netty-serializer: kryo
 tx-manager-id: lottor

spring:
 datasource:
 url: jdbc:mysql://localhost:3306/user?autoReconnect=true&useSSL=false
 continue-on-error: false
 initialize: true
 max-active: 50
 max-idle: 10
 max-wait: 10000
 min-evictable-idle-time-millis: 60000
 min-idle: 8
 name: dbcp1
 test-on-borrow: false
 test-on-return: false
 test-while-idle: false
 time-between-eviction-runs-millis: 5000
 username: root
 password: _123456_
    schema[0]: classpath:/user.sql
複製代碼

如上爲User服務的部分配置文件,lottor.enabled: true開啓Lottor 客戶端服務。cache 開啓本地緩存記錄。cache-type指定了本地事務記錄的緩存方式,能夠爲redis或者MongoDB。serializer爲序列化和反序列化方式。tx-manager-id爲對應的Lottor Server的服務名。

Lottor Server

多個微服務的接入,對Lottor Server其實沒什麼侵入性。這裏須要注意的是,TransactionMsg中設置的sourcetarget字段來源於lottor-common中的com.blueskykong.lottor.common.enums.ServiceNameEnum

public enum ServiceNameEnum {
    TEST_USER("user", "tx-user"),
    TEST_AUTH("auth", "tx-auth");
	//服務名
    String serviceName;
	//消息中間件的topic
    String topic;
    
    ...
}
複製代碼

消息中間件的topic是在服務名的基礎上,加上tx-前綴。消費方在設置訂閱的topic時,須要按照這樣的規則命名。Lottor Server完成的步驟爲上面流程圖中的2(成功收到預提交消息)和5(發送事務消息到指定的消費方),除此以外,還會定時輪詢異常狀態的事務組和事務消息。

消費方-Auth服務

引入依賴

<dependency>
        <groupId>com.blueskykong</groupId>
        <artifactId>lottor-starter</artifactId>
        <version>2.0.0-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
複製代碼

引入了Lottor客戶端starter,spring-cloud-stream用於消費方接收來自Lottor Server的事務消息。

topic監聽

@Component
@EnableBinding({TestSink.class})
public class ListenerStream extends InitStreamHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerStream.class);

    @Autowired
    private RoleUserService roleUserService;

    @Autowired
    public ListenerStream(ExternalNettyService nettyService, ObjectSerializer objectSerializer) {
        super(nettyService, objectSerializer);
    }

    @StreamListener(TestSink.INPUT)
    public void processSMS(Message message) {
        //解析接收到的TransactionMsg
        process(init(message));
    }

    @Transactional
    public void process(TransactionMsg message) {
        try {
            if (Objects.nonNull(message)) {
                LOGGER.info("===============consume notification message: =======================" + message.toString());
                if (StringUtils.isNotBlank(message.getMethod())) {
                    MethodNameEnum method = MethodNameEnum.fromString(message.getMethod());
                    LOGGER.info(message.getMethod());
                    //根據目標方法進行處理,由於一個服務能夠對應多個生產方,有多個目標方法
                    switch (method) {
                        case AUTH_ROLE:
                            UserRoleDTO userRoleDTO = (UserRoleDTO) message.getArgs();
                            RoleEntity roleEntity = roleUserService.getRole(userRoleDTO.getRoleEnum().getName());
                            String roleId = "";
                            if (Objects.nonNull(roleEntity)) {
                                roleId = roleEntity.getId();
                            }
                            roleUserService.saveRoleUser(new UserRole(UUID.randomUUID().toString(), userRoleDTO.getUserId(), roleId));
                            LOGGER.info("matched case {}", MethodNameEnum.AUTH_ROLE);

                            break;
                        default:
                            LOGGER.warn("no matched consumer case!");
                            message.setMessage("no matched consumer case!");
                            nettyService.consumedSend(message, false);
                            return;
                    }
                }
            }
        } catch (Exception e) {
        	  //處理異常,發送消費失敗的消息
            LOGGER.error(e.getLocalizedMessage());
            message.setMessage(e.getLocalizedMessage());
            nettyService.consumedSend(message, false);
            return;
        }
        //成功消費
        nettyService.consumedSend(message, true);
        return;
    }
}
複製代碼

消費方監聽指定的topic(如上實現中,爲test-input中指定的topic,spring-cloud-stream更加簡便調用的接口),解析接收到的TransactionMsg。根據目標方法進行處理,由於一個服務能夠對應多個生產方,有多個目標方法。執行本地事務時,Auth會根據TransactionMsg中提供的args做爲入參執行指定的方法(對應步驟7),最後向Lottor Server發送消費的結果(對應步驟8)。

配置文件

---
spring:
 cloud:
 stream:
 bindings:
 test-input:
 group: testGroup
 content-type: application/x-java-object;type=com.blueskykong.lottor.common.entity.TransactionMsgAdapter
 destination: tx-auth
 binder: rabbit1
 binders:
 rabbit1:
 type: rabbit
 environment:
 spring:
 rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest
 virtual-host: /

---
lottor:
 enabled: true
 core:
 cache: true
 cache-type: redis
 tx-redis-config:
 host-name: localhost
 port: 6379
 serializer: kryo
 netty-serializer: kryo
 tx-manager-id: lottor
複製代碼

配置和User服務的差異在於增長了spring-cloud-stream的配置,配置了rabbitmq的相關信息,監聽的topic爲tx-auth。

小結

本文主要經過User和Auth的示例服務講解了如何接入Lottor客戶端。生產方構造涉及的事務消息,首先預發送事務消息到Lottor Server,成功預提交以後便執行本地事務;本地事務執行完則異步發送確認消息(可能成功,也可能失敗)。Lottor Server根據接收到的確認消息決定是否將對應的事務組消息發送到對應的消費方。Lottor Server還會定時輪詢異常狀態的事務組和事務消息,以防由於異步的確認消息發送失敗。消費方收到事務消息以後,將會根據目標方法執行對應的處理操做,最後將消費結果異步回寫到Lottor Server。

推薦閱讀

基於可靠消息方案的分佈式事務

Lottor項目地址:https://github.com/keets2012/Lottor

訂閱最新文章,歡迎關注個人公衆號

相關文章
相關標籤/搜索