在上一篇文章中,經過Lottor Sample介紹了快速體驗分佈式事務Lottor。本文將會介紹如何將微服務中的生產方和消費方服務接入Lottor。java
Lottor-Samples中的場景爲:客戶端調用User服務建立一個用戶,用戶服務的user表中增長了一條用戶記錄。除此以外,還會調用Auth服務建立該用戶對應的角色和權限信息。mysql
咱們經過上面的請求流程圖入手,介紹接入Lottor服務。當您啓動好docker-compose中的組件時,會建立好兩個服務對應的user和auth數據庫。其中User和Auth服務所須要的初始化數據已經準備好,放在各自的classpath下,服務在啓動時會自動初始化數據庫,所須要的預置數據(如角色、權限信息)也放在sql文件中。git
Lottor Client中提供了一個ExternalNettyService
接口,用以發送三類消息到Lottor Server:github
public interface ExternalNettyService {
/**
* pre-commit msgs
*
* @param preCommitMsgs
*/
public Boolean preSend(List<TransactionMsg> preCommitMsgs);
/**
* confirm msgs
*
* @param success
*/
public void postSend(Boolean success, Object message);
/**
* msgs after consuming
*
* @param msg
* @param success
*/
public void consumedSend(TransactionMsg msg, Boolean success);
}
複製代碼
預發送#preSend
的入參爲預提交的消息列表,一個生產者可能有對應的多個消費者;確認提交#postSend
的入參爲生產方本地事務執行的狀態,若是失敗,第二個參數記錄異常信息;#consumedSend
爲消費方消費成功的發送的異步消息,第一個入參爲其接收到的事務消息,第二個爲消費的狀態。redis
public class TransactionMsg implements Serializable {
/** * 用於消息的追溯 */
private String groupId;
/** * 事務消息id */
@NonNull
private String subTaskId;
/** * 源服務,即調用發起方 */
private String source;
/** * 目標方服務 */
private String target;
/** * 執行的方法,適配成枚舉 */
private String method;
/** * 參數,即要傳遞的內容,能夠爲null */
private Object args;
/** * 建立時間 */
private Long createTime = Timestamp.valueOf(DateUtils.getCurrentDateTime()).getTime();
/** * 操做結果信息 */
private String message;
/** * 更新時間 */
private Long updateTime;
/** * 是否消費,默認爲否 * * {@linkplain com.blueskykong.lottor.common.enums.ConsumedStatus} */
private int consumed = ConsumedStatus.UNCONSUMED.getStatus();
...
}
複製代碼
在構建事務消息時,事務消息id、源服務、目標服務、目標方法和目標方法的傳參args都是必不可少的。消費方消費完以後,將會設置consumed的狀態,出現異常將會設置異常message信息。spring
建立用戶時,須要建立對應的角色。生產方接入分爲三步:sql
首先,須要引入Lottor客戶端的依賴:docker
<dependency>
<groupId>com.blueskykong</groupId>
<artifactId>lottor-starter</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
複製代碼
在UserService
中定義了建立用戶的方法,咱們須要在執行本地事務以前,構造事務消息並預發送到Lottor Server(對應流程圖中的步驟1)。若是遇到預發送失敗,則直接中止本地事務的執行。若是本地事務執行成功(對應步驟3),則發送confirm消息,不然發送回滾消息到Lottor Server(對應步驟4)。數據庫
@Service
public class UserServiceImpl implements UserService {
private static final Logger LOGGER = LoggerFactory.getLogger(UserServiceImpl.class);
//注入ExternalNettyService
@Autowired
private ExternalNettyService nettyService;
@Autowired
private UserMapper userMapper;
@Override
@Transactional
public Boolean createUser(UserEntity userEntity, StateEnum flag) {
UserRoleDTO userRoleDTO = new UserRoleDTO(RoleEnum.ADMIN, userEntity.getId());
//構造消費方的TransactionMsg
TransactionMsg transactionMsg = new TransactionMsg.Builder()
.setSource(ServiceNameEnum.TEST_USER.getServiceName())
.setTarget(ServiceNameEnum.TEST_AUTH.getServiceName())
.setMethod(MethodNameEnum.AUTH_ROLE.getMethod())
.setSubTaskId(IdWorkerUtils.getInstance().createUUID())
.setArgs(userRoleDTO)
.build();
if (flag == StateEnum.CONSUME_FAIL) {
userRoleDTO.setUserId(null);
transactionMsg.setArgs(userRoleDTO);
}
//發送預處理消息
if (!nettyService.preSend(Collections.singletonList(transactionMsg))) {
return false;//預發送失敗,本地事務中止執行
}
//local transaction本地事務
try {
LOGGER.debug("執行本地事務!");
if (flag != StateEnum.PRODUCE_FAIL) {
userMapper.saveUser(userEntity);
} else {
userMapper.saveUserFailure(userEntity);
}
} catch (Exception e) {
//本地事務異常,發送回滾消息
nettyService.postSend(false, e.getMessage());
LOGGER.error("執行本地事務失敗,cause is 【{}】", e.getLocalizedMessage());
return false;
}
//發送確認消息
nettyService.postSend(true, null);
return true;
}
}
複製代碼
代碼如上所示,實現不是很複雜。本地事務執行前,必然已經成功發送了預提交消息,當本地事務執行成功,Lottor Client將會記錄本地事務執行的狀態,避免異步發送的確認消息的丟失,便於後續的Lottor Server回查。緩存
lottor:
enabled: true
core:
cache: true
cache-type: redis
tx-redis-config:
host-name: localhost
port: 6379
serializer: kryo
netty-serializer: kryo
tx-manager-id: lottor
spring:
datasource:
url: jdbc:mysql://localhost:3306/user?autoReconnect=true&useSSL=false
continue-on-error: false
initialize: true
max-active: 50
max-idle: 10
max-wait: 10000
min-evictable-idle-time-millis: 60000
min-idle: 8
name: dbcp1
test-on-borrow: false
test-on-return: false
test-while-idle: false
time-between-eviction-runs-millis: 5000
username: root
password: _123456_
schema[0]: classpath:/user.sql
複製代碼
如上爲User服務的部分配置文件,lottor.enabled: true
開啓Lottor 客戶端服務。cache 開啓本地緩存記錄。cache-type指定了本地事務記錄的緩存方式,能夠爲redis或者MongoDB。serializer爲序列化和反序列化方式。tx-manager-id爲對應的Lottor Server的服務名。
多個微服務的接入,對Lottor Server其實沒什麼侵入性。這裏須要注意的是,TransactionMsg
中設置的source
和target
字段來源於lottor-common中的com.blueskykong.lottor.common.enums.ServiceNameEnum
:
public enum ServiceNameEnum {
TEST_USER("user", "tx-user"),
TEST_AUTH("auth", "tx-auth");
//服務名
String serviceName;
//消息中間件的topic
String topic;
...
}
複製代碼
消息中間件的topic是在服務名的基礎上,加上tx-
前綴。消費方在設置訂閱的topic時,須要按照這樣的規則命名。Lottor Server完成的步驟爲上面流程圖中的2(成功收到預提交消息)和5(發送事務消息到指定的消費方),除此以外,還會定時輪詢異常狀態的事務組和事務消息。
<dependency>
<groupId>com.blueskykong</groupId>
<artifactId>lottor-starter</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
複製代碼
引入了Lottor客戶端starter,spring-cloud-stream用於消費方接收來自Lottor Server的事務消息。
@Component
@EnableBinding({TestSink.class})
public class ListenerStream extends InitStreamHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ListenerStream.class);
@Autowired
private RoleUserService roleUserService;
@Autowired
public ListenerStream(ExternalNettyService nettyService, ObjectSerializer objectSerializer) {
super(nettyService, objectSerializer);
}
@StreamListener(TestSink.INPUT)
public void processSMS(Message message) {
//解析接收到的TransactionMsg
process(init(message));
}
@Transactional
public void process(TransactionMsg message) {
try {
if (Objects.nonNull(message)) {
LOGGER.info("===============consume notification message: =======================" + message.toString());
if (StringUtils.isNotBlank(message.getMethod())) {
MethodNameEnum method = MethodNameEnum.fromString(message.getMethod());
LOGGER.info(message.getMethod());
//根據目標方法進行處理,由於一個服務能夠對應多個生產方,有多個目標方法
switch (method) {
case AUTH_ROLE:
UserRoleDTO userRoleDTO = (UserRoleDTO) message.getArgs();
RoleEntity roleEntity = roleUserService.getRole(userRoleDTO.getRoleEnum().getName());
String roleId = "";
if (Objects.nonNull(roleEntity)) {
roleId = roleEntity.getId();
}
roleUserService.saveRoleUser(new UserRole(UUID.randomUUID().toString(), userRoleDTO.getUserId(), roleId));
LOGGER.info("matched case {}", MethodNameEnum.AUTH_ROLE);
break;
default:
LOGGER.warn("no matched consumer case!");
message.setMessage("no matched consumer case!");
nettyService.consumedSend(message, false);
return;
}
}
}
} catch (Exception e) {
//處理異常,發送消費失敗的消息
LOGGER.error(e.getLocalizedMessage());
message.setMessage(e.getLocalizedMessage());
nettyService.consumedSend(message, false);
return;
}
//成功消費
nettyService.consumedSend(message, true);
return;
}
}
複製代碼
消費方監聽指定的topic(如上實現中,爲test-input中指定的topic,spring-cloud-stream更加簡便調用的接口),解析接收到的TransactionMsg。根據目標方法進行處理,由於一個服務能夠對應多個生產方,有多個目標方法。執行本地事務時,Auth會根據TransactionMsg中提供的args做爲入參執行指定的方法(對應步驟7),最後向Lottor Server發送消費的結果(對應步驟8)。
---
spring:
cloud:
stream:
bindings:
test-input:
group: testGroup
content-type: application/x-java-object;type=com.blueskykong.lottor.common.entity.TransactionMsgAdapter
destination: tx-auth
binder: rabbit1
binders:
rabbit1:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
---
lottor:
enabled: true
core:
cache: true
cache-type: redis
tx-redis-config:
host-name: localhost
port: 6379
serializer: kryo
netty-serializer: kryo
tx-manager-id: lottor
複製代碼
配置和User服務的差異在於增長了spring-cloud-stream的配置,配置了rabbitmq的相關信息,監聽的topic爲tx-auth。
本文主要經過User和Auth的示例服務講解了如何接入Lottor客戶端。生產方構造涉及的事務消息,首先預發送事務消息到Lottor Server,成功預提交以後便執行本地事務;本地事務執行完則異步發送確認消息(可能成功,也可能失敗)。Lottor Server根據接收到的確認消息決定是否將對應的事務組消息發送到對應的消費方。Lottor Server還會定時輪詢異常狀態的事務組和事務消息,以防由於異步的確認消息發送失敗。消費方收到事務消息以後,將會根據目標方法執行對應的處理操做,最後將消費結果異步回寫到Lottor Server。
Lottor項目地址:https://github.com/keets2012/Lottor