分佈式事務是微服務實踐中一個比較棘手的問題,在筆者所實施的微服務實踐方案中,都採用了折中或者規避強一致性的方案。參考Ebay
多年前提出的本地消息表方案,基於RabbitMQ
和MySQL
(JDBC
)作了輕量級的封裝,實現了低入侵性的事務消息模塊。本文的內容就是詳細分析整個方案的設計思路和實施。環境依賴以下:java
JDK1.8+
spring-boot-start-web:2.x.x
、spring-boot-start-jdbc:2.x.x
、spring-boot-start-amqp:2.x.x
HikariCP:3.x.x
(spring-boot-start-jdbc
自帶)、mysql-connector-java:5.1.48
redisson:3.12.1
事務消息原則上只適合弱一致性(或者說最終一致性)的場景,常見的弱一致性場景如:mysql
強一致性的場景通常不該該選用事務消息。git
通常狀況下,要求強一致性說明要嚴格同步,也就是全部操做必須同時成功或者同時失敗,這樣就會引入同步帶來的額外消耗。若是一個事務消息模塊設計合理,補償、查詢、監控等等功能都完畢,因爲系統交互是異步的,總體吞吐要比嚴格同步高。在筆者負責的業務系統中基於事務消息使用還定製了一條基本原則:消息內容正確的前提下,消費方出現異常須要自理。github
簡單來講就是:上游保證了自身的業務正確性,成功推送了正確的消息到RabbitMQ就認爲上游義務已經結束。web
爲了下降代碼的入侵性,事務消息須要藉助Spring
的編程式事務或者聲明式事務。編程式事務通常依賴於TransactionTemplate
,而聲明式事務依託於AOP
模塊,依賴於註解@Transactional
。redis
接着須要自定義一個事務消息功能模塊,新增一個事務消息記錄表(其實就是本地消息表),用於保存每一條須要發送的消息記錄。事務消息功能模塊的主要功能是:算法
RabbitMQ
服務端。在事務執行的邏輯單元裏面,須要進行待推送的事務消息記錄的保存,也就是:本地(業務)邏輯和事務消息記錄保存操做綁定在同一個事務。spring
發送消息到RabbitMQ
服務端這一步須要延後到事務提交以後,這樣才能保證事務提交成功和消息成功發送到RabbitMQ
服務端這兩個操做是一致的。爲了把保存待發送的事務消息和發送消息到RabbitMQ兩個動做從使用者感知角度合併爲一個動做,這裏須要用到Spring
特有的事務同步器TransactionSynchronization
,這裏分析一下事務同步器的主要方法的回調位置,主要參考AbstractPlatformTransactionManager#commit()
或者AbstractPlatformTransactionManager#processCommit()
方法:sql
上圖僅僅演示了事務正確提交的場景(不包含異常的場景)。這裏能夠明確知道,事務同步器TransactionSynchronization
的afterCommit()
和afterCompletion(int status)
方法都在真正的事務提交點AbstractPlatformTransactionManager#doCommit()
以後回調,所以能夠選用這兩個方法其中之一用於執行推送消息到RabbitMQ
服務端,總體的僞代碼以下:shell
@Transactional
public Dto businessMethod(){
business transaction code block ...
// 保存事務消息
[saveTransactionMessageRecord()]
// 註冊事務同步器 - 在afterCommit()方法中推送消息到RabbitMQ
[register TransactionSynchronization,send message in method afterCommit()] business transaction code block ... } 複製代碼
上面僞代碼中,保存事務消息和註冊事務同步器兩個步驟能夠安插在事務方法中的任意位置,也就是說與執行順序無關。
雖然以前提到筆者建議下游服務自理自身服務消費異常的場景,可是有些時候迫於無奈仍是須要上游把對應的消息從新推送,這個算是特殊的場景。另外還有一個場景須要考慮:事務提交以後觸發事務同步器TransactionSynchronization
的afterCommit()
方法失敗。這是一個低機率的場景,可是在生產中必定會出現,一個比較典型的緣由就是:事務提交完成後還沒有來得及觸發TransactionSynchronization#afterCommit()
方法進行推送服務實例就被重啓。以下圖所示:
爲了統一處理補償推送的問題,使用了有限狀態判斷消息是否已經推送成功:
TransactionSynchronization
的afterCommit()
方法的實現中,推送對應的消息到RabbitMQ
,而後更變事務消息記錄的狀態爲推送成功。還有一種極爲特殊的狀況是RabbitMQ
服務端自己出現故障致使消息推送異常,這種狀況下須要進行重試(補償推送),經驗證實短期內的反覆重試是沒有意義的,故障的服務通常不會瞬時恢復,因此能夠考慮使用指數退避算法進行重試,同時須要限制最大重試次數。
指數值、間隔值和最大重試次數上限須要根據實際狀況設定,不然容易出現消息延時過大或者重試過於頻繁等問題。
引入核心依賴:
<properties>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<redisson.version>3.12.1</redisson.version>
<mysql.connector.version>5.1.48</mysql.connector.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
</dependencies>
複製代碼
spring-boot-starter-jdbc
、mysql-connector-java
和spring-boot-starter-aop
是MySQL
事務相關,而spring-boot-starter-amqp
是RabbitMQ
客戶端的封裝,redisson
主要使用其分佈式鎖,用於補償定時任務的加鎖執行(以防止服務多個節點併發執行補償推送)。
事務消息模塊主要涉及兩張表,以MySQL
爲例,建表DDL
以下:
CREATE TABLE `t_transactional_message`
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
creator VARCHAR(20) NOT NULL DEFAULT 'admin',
editor VARCHAR(20) NOT NULL DEFAULT 'admin',
deleted TINYINT NOT NULL DEFAULT 0,
current_retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '當前重試次數',
max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重試次數',
queue_name VARCHAR(255) NOT NULL COMMENT '隊列名',
exchange_name VARCHAR(255) NOT NULL COMMENT '交換器名',
exchange_type VARCHAR(8) NOT NULL COMMENT '交換類型',
routing_key VARCHAR(255) COMMENT '路由鍵',
business_module VARCHAR(32) NOT NULL COMMENT '業務模塊',
business_key VARCHAR(255) NOT NULL COMMENT '業務鍵',
next_schedule_time DATETIME NOT NULL COMMENT '下一次調度時間',
message_status TINYINT NOT NULL DEFAULT 0 COMMENT '消息狀態',
init_backoff BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,單位爲秒',
backoff_factor TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指數)',
INDEX idx_queue_name (queue_name),
INDEX idx_create_time (create_time),
INDEX idx_next_schedule_time (next_schedule_time),
INDEX idx_business_key (business_key)
) COMMENT '事務消息表';
CREATE TABLE `t_transactional_message_content`
(
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
message_id BIGINT UNSIGNED NOT NULL COMMENT '事務消息記錄ID',
content TEXT COMMENT '消息內容'
) COMMENT '事務消息內容表';
複製代碼
由於此模塊有可能擴展出一個後臺管理模塊,因此要把消息的管理和狀態相關字段和大致積的消息內容分別存放在兩個表,從而避免大批量查詢消息記錄的時候MySQL
服務IO
使用率太高的問題(這是和上一個公司的DBA
團隊商討後獲得的一個比較合理的方案)。預留了兩個業務字段business_module
和business_key
用於標識業務模塊和業務鍵(通常是惟一識別號,例如訂單號)。
通常狀況下,若是服務經過配置自行提早聲明隊列和交換器的綁定關係,那麼發送RabbitMQ
消息的時候其實只依賴於exchangeName
和routingKey
兩個字段(header
類型的交換器是特殊的,也比較少用,這裏暫時不用考慮),考慮到服務可能會遺漏聲明操做,發送消息的時候會基於隊列進行首次綁定聲明而且緩存相關的信息(RabbitMQ
中的隊列-交換器綁定聲明只要每次聲明綁定關係的參數一致,則不會拋出異常)。
下面的方案設計描述中,暫時忽略了消息事務管理後臺的API
設計,這些能夠在後期補充。
定義貧血模型實體類TransactionalMessage
和TransactionalMessageContent
:
@Data
public class TransactionalMessage {
private Long id;
private LocalDateTime createTime;
private LocalDateTime editTime;
private String creator;
private String editor;
private Integer deleted;
private Integer currentRetryTimes;
private Integer maxRetryTimes;
private String queueName;
private String exchangeName;
private String exchangeType;
private String routingKey;
private String businessModule;
private String businessKey;
private LocalDateTime nextScheduleTime;
private Integer messageStatus;
private Long initBackoff;
private Integer backoffFactor;
}
@Data
public class TransactionalMessageContent {
private Long id;
private Long messageId;
private String content;
}
複製代碼
而後定義dao
接口(這裏暫時不展開實現的細節代碼,存儲使用MySQL
,若是要替換爲其餘類型的數據庫,只須要使用不一樣的實現便可):
public interface TransactionalMessageDao {
void insertSelective(TransactionalMessage record);
void updateStatusSelective(TransactionalMessage record);
List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime, LocalDateTime maxScheduleTime, int limit);
}
public interface TransactionalMessageContentDao {
void insert(TransactionalMessageContent record);
List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}
複製代碼
接着定義事務消息服務接口TransactionalMessageService
:
// 對外提供的服務類接口
public interface TransactionalMessageService {
void sendTransactionalMessage(Destination destination, TxMessage message);
}
@Getter
@RequiredArgsConstructor
public enum ExchangeType {
FANOUT("fanout"),
DIRECT("direct"),
TOPIC("topic"),
DEFAULT(""),
;
private final String type;
}
// 發送消息的目的地
public interface Destination {
ExchangeType exchangeType();
String queueName();
String exchangeName();
String routingKey();
}
@Builder
public class DefaultDestination implements Destination {
private ExchangeType exchangeType;
private String queueName;
private String exchangeName;
private String routingKey;
@Override
public ExchangeType exchangeType() {
return exchangeType;
}
@Override
public String queueName() {
return queueName;
}
@Override
public String exchangeName() {
return exchangeName;
}
@Override
public String routingKey() {
return routingKey;
}
}
// 事務消息
public interface TxMessage {
String businessModule();
String businessKey();
String content();
}
@Builder
public class DefaultTxMessage implements TxMessage {
private String businessModule;
private String businessKey;
private String content;
@Override
public String businessModule() {
return businessModule;
}
@Override
public String businessKey() {
return businessKey;
}
@Override
public String content() {
return content;
}
}
// 消息狀態
@RequiredArgsConstructor
public enum TxMessageStatus {
/** * 成功 */
SUCCESS(1),
/** * 待處理 */
PENDING(0),
/** * 處理失敗 */
FAIL(-1),
;
private final Integer status;
}
複製代碼
TransactionalMessageService
的實現類是事務消息的核心功能實現,代碼以下:
@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {
private final AmqpAdmin amqpAdmin;
private final TransactionalMessageManagementService managementService;
private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();
@Override
public void sendTransactionalMessage(Destination destination, TxMessage message) {
String queueName = destination.queueName();
String exchangeName = destination.exchangeName();
String routingKey = destination.routingKey();
ExchangeType exchangeType = destination.exchangeType();
// 原子性的預聲明
QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {
Queue queue = new Queue(queueName);
amqpAdmin.declareQueue(queue);
Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());
amqpAdmin.declareExchange(exchange);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
amqpAdmin.declareBinding(binding);
return true;
});
TransactionalMessage record = new TransactionalMessage();
record.setQueueName(queueName);
record.setExchangeName(exchangeName);
record.setExchangeType(exchangeType.getType());
record.setRoutingKey(routingKey);
record.setBusinessModule(message.businessModule());
record.setBusinessKey(message.businessKey());
String content = message.content();
// 保存事務消息記錄
managementService.saveTransactionalMessageRecord(record, content);
// 註冊事務同步器
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
managementService.sendMessageSync(record, content);
}
});
}
}
複製代碼
消息記錄狀態和內容持久化的管理統一放在TransactionalMessageManagementService
中:
@Slf4j
@RequiredArgsConstructor
@Service
public class TransactionalMessageManagementService {
private final TransactionalMessageDao messageDao;
private final TransactionalMessageContentDao contentDao;
private final RabbitTemplate rabbitTemplate;
private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);
private static final long DEFAULT_INIT_BACKOFF = 10L;
private static final int DEFAULT_BACKOFF_FACTOR = 2;
private static final int DEFAULT_MAX_RETRY_TIMES = 5;
private static final int LIMIT = 100;
public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {
record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF,
DEFAULT_BACKOFF_FACTOR, 0));
record.setCurrentRetryTimes(0);
record.setInitBackoff(DEFAULT_INIT_BACKOFF);
record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
messageDao.insertSelective(record);
TransactionalMessageContent messageContent = new TransactionalMessageContent();
messageContent.setContent(content);
messageContent.setMessageId(record.getId());
contentDao.insert(messageContent);
}
public void sendMessageSync(TransactionalMessage record, String content) {
try {
rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);
if (log.isDebugEnabled()) {
log.debug("發送消息成功,目標隊列:{},消息內容:{}", record.getQueueName(), content);
}
// 標記成功
markSuccess(record);
} catch (Exception e) {
// 標記失敗
markFail(record, e);
}
}
private void markSuccess(TransactionalMessage record) {
// 標記下一次執行時間爲最大值
record.setNextScheduleTime(END);
record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
record.setEditTime(LocalDateTime.now());
messageDao.updateStatusSelective(record);
}
private void markFail(TransactionalMessage record, Exception e) {
log.error("發送消息失敗,目標隊列:{}", record.getQueueName(), e);
record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
// 計算下一次的執行時間
LocalDateTime nextScheduleTime = calculateNextScheduleTime(
record.getNextScheduleTime(),
record.getInitBackoff(),
record.getBackoffFactor(),
record.getCurrentRetryTimes()
);
record.setNextScheduleTime(nextScheduleTime);
record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
record.setEditTime(LocalDateTime.now());
messageDao.updateStatusSelective(record);
}
/** * 計算下一次執行時間 * * @param base 基礎時間 * @param initBackoff 退避基準值 * @param backoffFactor 退避指數 * @param round 輪數 * @return LocalDateTime */
private LocalDateTime calculateNextScheduleTime(LocalDateTime base, long initBackoff, long backoffFactor, long round) {
double delta = initBackoff * Math.pow(backoffFactor, round);
return base.plusSeconds((long) delta);
}
/** * 推送補償 - 裏面的參數應該根據實際場景定製 */
public void processPendingCompensationRecords() {
// 時間的右值爲當前時間減去退避初始值,這裏預防把剛保存的消息也推送了
LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
// 時間的左值爲右值減去1小時
LocalDateTime min = max.plusHours(-1);
Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT)
.stream()
.collect(Collectors.toMap(TransactionalMessage::getId, x -> x));
if (!collect.isEmpty()) {
StringJoiner joiner = new StringJoiner(",", "(", ")");
collect.keySet().forEach(x -> joiner.add(x.toString()));
contentDao.queryByMessageIds(joiner.toString())
.forEach(item -> {
TransactionalMessage message = collect.get(item.getMessageId());
sendMessageSync(message, item.getContent());
});
}
}
}
複製代碼
這裏有一點尚待優化:更新事務消息記錄狀態的方法能夠優化爲批量更新,在limit
比較大的時候,批量更新的效率會更高。
最後是定時任務的配置類:
@Slf4j
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {
private final TransactionalMessageManagementService managementService;
/** * 這裏用的是本地的Redis,實際上要作成配置 */
private final RedissonClient redisson = Redisson.create();
@Scheduled(fixedDelay = 10000)
public void transactionalMessageCompensationTask() throws Exception {
RLock lock = redisson.getLock("transactionalMessageCompensationTask");
// 等待時間5秒,預期300秒執行完畢,這兩個值須要按照實際場景定製
boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);
if (tryLock) {
try {
long start = System.currentTimeMillis();
log.info("開始執行事務消息推送補償定時任務...");
managementService.processPendingCompensationRecords();
long end = System.currentTimeMillis();
long delta = end - start;
// 以防鎖過早釋放
if (delta < 5000) {
Thread.sleep(5000 - delta);
}
log.info("執行事務消息推送補償定時任務完畢,耗時:{} ms...", end - start);
} finally {
lock.unlock();
}
}
}
}
複製代碼
基本代碼編寫完,整個項目的結構以下:
最後添加兩個測試類:
@RequiredArgsConstructor
@Component
public class MockBusinessRunner implements CommandLineRunner {
private final MockBusinessService mockBusinessService;
@Override
public void run(String... args) throws Exception {
mockBusinessService.saveOrder();
}
}
@Slf4j
@RequiredArgsConstructor
@Service
public class MockBusinessService {
private final JdbcTemplate jdbcTemplate;
private final TransactionalMessageService transactionalMessageService;
private final ObjectMapper objectMapper;
@Transactional(rollbackFor = Exception.class)
public void saveOrder() throws Exception {
String orderId = UUID.randomUUID().toString();
BigDecimal amount = BigDecimal.valueOf(100L);
Map<String, Object> message = new HashMap<>();
message.put("orderId", orderId);
message.put("amount", amount);
jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> {
p.setString(1, orderId);
p.setBigDecimal(2, amount);
});
String content = objectMapper.writeValueAsString(message);
transactionalMessageService.sendTransactionalMessage(
DefaultDestination.builder()
.exchangeName("tm.test.exchange")
.queueName("tm.test.queue")
.routingKey("tm.test.key")
.exchangeType(ExchangeType.DIRECT)
.build(),
DefaultTxMessage.builder()
.businessKey(orderId)
.businessModule("SAVE_ORDER")
.content(content)
.build()
);
log.info("保存訂單:{}成功...", orderId);
}
}
複製代碼
某次測試結果以下:
2020-02-05 21:10:13.287 INFO 49556 --- [ main] club.throwable.cm.MockBusinessService : 保存訂單:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...
複製代碼
模擬訂單數據成功保存,並且RabbitMQ
消息在事務成功提交後正常發送到RabbitMQ
服務端中,如RabbitMQ
控制檯數據所示。
事務消息模塊的設計僅僅是使異步消息推送這個功能實現趨向於完備,其實一個合理的異步消息交互系統,必定會提供同步查詢接口,這一點是基於異步消息沒有回調或者沒有響應的特性致使的。通常而言,一個系統的吞吐量和系統的異步化處理佔比成正相關(這一點能夠參考Amdahl's Law
),因此在系統架構設計實際中應該儘量使用異步交互,提升系統吞吐量同時減小同步阻塞帶來的無謂等待。事務消息模塊能夠擴展出一個後臺管理,甚至能夠配合Micrometer
、Prometheus
和Grafana
體系作實時數據監控。
本文demo
項目倉庫:rabbit-transactional-message
demo
必須本地安裝MySQL
、Redis
和RabbitMQ
才能正常啓動,本地必須新建一個數據庫命名local
。
(本文完 c-5-d e-a-20200202 疫情嚴重,立刻要開始在家辦公,少出門多看書)