官網說明支持的事務
柔性事務管理器DEMO
// 1. 配置SoftTransactionConfiguration
SoftTransactionConfiguration transactionConfig = new SoftTransactionConfiguration(dataSource);
transactionConfig.setXXX();
// 2. 初始化SoftTransactionManager
SoftTransactionManager transactionManager = new SoftTransactionManager(transactionConfig);
transactionManager.init();
// 3. 獲取BEDSoftTransaction
BEDSoftTransaction transaction = (BEDSoftTransaction) transactionManager.getTransaction(SoftTransactionType.BestEffortsDelivery);
// 4. 開啓事務
transaction.begin(connection);
// 5. 執行JDBC
// 6.關閉事務
transaction.end();
- 分佈式事務除了強XA,無非都是借住外部存儲,心存這個概念,看代碼順暢不少。
- @1處可發現,事務庫可支持數據庫或內存hashMap存儲,可配置。
- @2處,先準備數據,建立事務庫的sql,本質是個table,記錄相關信息,包括sql,參數,時間,重試次數等。 上下文參數保存傳遞,經過threadlocal包裝後的工具,很通用,不作重點說明 其中關鍵方法SoftTransactionManager 中的getTransaction
柔性事務管理器
@RequiredArgsConstructor
public final class SoftTransactionManager {
private static final String TRANSACTION = "transaction";
private static final String TRANSACTION_CONFIG = "transactionConfig";
@Getter
private final SoftTransactionConfiguration transactionConfig;
public void init() throws SQLException {
//@3
EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
//@1
if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
//@2
createTable();
}
......略
}
private void createTable() throws SQLException {
String dbSchema = "CREATE TABLE IF NOT EXISTS `transaction_log` ("
+ "`id` VARCHAR(40) NOT NULL, "
+ "`transaction_type` VARCHAR(30) NOT NULL, "
+ "`data_source` VARCHAR(255) NOT NULL, "
+ "`sql` TEXT NOT NULL, "
+ "`parameters` TEXT NOT NULL, "
+ "`creation_time` LONG NOT NULL, "
+ "`async_delivery_try_times` INT NOT NULL DEFAULT 0, "
+ "PRIMARY KEY (`id`));";
try (
Connection conn = transactionConfig.getTransactionLogDataSource().getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(dbSchema)) {
preparedStatement.executeUpdate();
}
}
- @3比較關鍵,guava的EventBus工具,沒接觸過的話,可理解爲一個內存隊列,在柔性事務管理器裏註冊消費者的行爲,舉例:如更新10條記錄,執行前會發送這10個sql相關信息,在事務管理器裏收到消息,記錄到事務庫。
- @4 更改表以前,預先插入sql相關信息到事務庫,@5表更新完成後,清除事務庫對應記錄,@6執行sql重試操做,包括處理重試次數,成功後刪除事務庫記錄
最大努力推送監聽器
@Slf4j
public final class BestEffortsDeliveryListener {
@Subscribe
@AllowConcurrentEvents
public void listen(final DMLExecutionEvent event) {
.....略
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
//@4
case BEFORE_EXECUTE:
//TODO for batch SQL need split to 2-level records
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
//@5
case EXECUTE_SUCCESS:
transactionLogStorage.remove(event.getId());
return;
//@6
case EXECUTE_FAILURE:
boolean deliverySuccess = false;
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) {
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
.....
preparedStatement.executeUpdate();
.....
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
//2種事務實現
public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
AbstractSoftTransaction result;
switch (type) {
case BestEffortsDelivery:
result = new BEDSoftTransaction();
break;
case TryConfirmCancel:
result = new TCCSoftTransaction();
break;
default:
throw new UnsupportedOperationException(type.toString());
}
//繼續深刻
public class BEDSoftTransaction extends AbstractSoftTransaction {
public void begin(final Connection connection) throws SQLException {
beginInternal(connection, SoftTransactionType.BestEffortsDelivery);
}
- 看到了熟悉的connection.setAutoCommit(true);開啓自動提交,由事務管理器操做Connection完成
- 由於是 單Connection裏異常了數據也不會改變,跨Connection的事務由定時任務+事務庫保證,因此設置自動提交。
開啓事務
public abstract class AbstractSoftTransaction {
private boolean previousAutoCommit;
@Getter
private ShardingConnection connection;
@Getter
private SoftTransactionType transactionType;
@Getter
private String transactionId;
protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {
connection = (ShardingConnection) conn;
previousAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
}
}
- 到此,可大概瞭解事務管理實現分佈式功能,操做要被事務管理的Connection,而且接受來自EventBus的消息,操做事務庫,完成單Connection的事務
具體的Connection的包裝,執行更改發送EventBus消息,下面繼續
- 比較直接的源碼,根據查詢流程一步步進到ShardingPreparedStatement
- @1,route()根據分庫分表生成路由PreoareStatement,到@2去執行
路由,執行
@Override
public boolean execute() throws SQLException {
try {
//@1
Collection<PreparedStatementUnit> preparedStatementUnits = route();
//@2
return new PreparedStatementExecutor(
getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute();
} finally {
clearBatch();
}
}
- 一路跟進到ExecutorEngine執行引擎,發現最後更新表方法最後到execute方法
- @1,無論成不成功,先插入事務表,成功再刪。比較可靠的方法了....
- @2,仍是Guava的異步執行方法,見Guava API,事實上在這裏已經執行完表更改,哪怕有異常,只要沒有get(),一切繼續
- @3,獲取異步執行結果,不懂的話,見JDK Future API
- @4,異常出現了,投遞消息去重試了
- @5,一切正常,投遞消息,清除事務庫。前面已設置自動提交,或交給spring事務管理器等。操做定義在事務器裏,下面繼續
具體執行流程
@Slf4j
public final class ExecutorEngine implements AutoCloseable {
private final ListeningExecutorService executorService;
private <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
//@1 執行前插入
OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size());
//投遞到EventBus,事務管理區去處理
EventBusInstance.getInstance().post(event);
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
BaseStatementUnit firstInput = iterator.next();
//@2
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
//@3
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
event.setException(ex);
event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
//@4
EventBusInstance.getInstance().post(event);
ExecutorExceptionHandler.handleException(ex);
return null;
}
//@5
event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(event);
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
- @1,收集Connection的setAutoCommit操做,在準備執行sql前反射調用,更改數據庫的自動提交。代碼比較簡單,一筆帶過
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
@Getter
private final Map<String, Connection> cachedConnections = new HashMap<>();
@Override
public final boolean getAutoCommit() throws SQLException {
return autoCommit;
}
@Override
public final void setAutoCommit(final boolean autoCommit) throws SQLException {
this.autoCommit = autoCommit;
//@1
recordMethodInvocation(Connection.class, "setAutoCommit", new Class[] {boolean.class}, new Object[] {autoCommit});
for (Connection each : cachedConnections.values()) {
each.setAutoCommit(autoCommit);
}
}
@Override
public final void commit() throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
try {
each.commit();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}
@Override
public final void rollback() throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
try {
each.rollback();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}
分佈式事務在Sharding-jdbc的實現
- 3個經典的JDBC操做,Sharding-jdbc,在這裏實現了具體Connection操做的實現,這就比較靈活了,經常使用的如Spring的DataSourceTransactionManager事務管理器,在Connection上調用實現時,交給sharding-jdbc裏的這些實現了。
- 最後總結一下上面沒說清的問題,分佈式事務若XA在Sharding-jdbc中的流程。
- 開啓自動提交,收集Connection到集合
- 執行前插入事務庫
- 執行JDBC代碼
- 異常,遍歷集合中Connection,事務回滾/ 正常,遍歷集合中Connection,事務提交
- 異常會有定時任務輪訓重試事務庫中sql/ 正常,清除事務庫記錄
- 解釋了上面的3個關鍵字
弱XA事務支持
,非跨庫事務
,不支持因網絡、硬件異常致使的跨庫事務
- 弱XA,單Connection事務保證
- 跨庫,仍是由於是基於Connection
- 硬件啊,網絡異常,可能致使事務庫記錄有問題,沒辦法恢復。不像Mysql會有binlog崩潰備份恢復。