摘要: 原創出處 www.iocoder.cn/Sharding-JD… 「芋道源碼」歡迎轉載,保留摘要,謝謝!java
本文主要基於 Sharding-JDBC 1.5.0 正式版 git
🙂🙂🙂關注微信公衆號:【芋道源碼】有福利: github
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
數據庫表分庫後,業務場景下的單庫本地事務可能變成跨庫分佈式事務。雖然咱們能夠經過合適的分庫規則讓操做的數據在同庫下,繼續保證單庫本地事務,這也是很是推崇的,但不是全部場景下都能適用。若是這些場景對事務的一致性有要求,咱們就不得不解決分佈式事務的「麻煩」。sql
分佈式事務是個很大的話題,咱們來看看 Sharding-JDBC 對她的權衡:數據庫
Sharding-JDBC因爲性能方面的考量,決定不支持強一致性分佈式事務。咱們已明確規劃線路圖,將來會支持最終一致性的柔性事務。bash
Sharding-JDBC 提供了兩種 柔性事務:微信
本文分享 最大努力送達型 的實現。建議前置閱讀:《Sharding-JDBC 源碼分析 —— SQL 執行》。架構
Sharding-JDBC 正在收集使用公司名單:傳送門。
🙂 你的登記,會讓更多人蔘與和使用 Sharding-JDBC。傳送門
Sharding-JDBC 也會所以,可以覆蓋更多的業務場景。傳送門
登記吧,騷年!傳送門運維
概念dom
在分佈式數據庫的場景下,相信對於該數據庫的操做最終必定能夠成功,因此經過最大努力反覆嘗試送達操做。
從概念看,可能不是很直白的理解是什麼意思,本文會最大努力讓你乾淨理解。
架構圖
執行過程有 四種 狀況:
總體成漏斗倒三角,上一個階段失敗,交給下一個階段重試:
整個過程經過以下 組件 完成:
下面,咱們逐節分享每一個組件。
柔性事務管理器,SoftTransactionManager 實現,負責對柔性事務配置( SoftTransactionConfiguration ) 、柔性事務( AbstractSoftTransaction )的管理。
調用 #init()
初始化柔性管理器:
// SoftTransactionManager.java
/** * 柔性事務配置對象 */
@Getter
private final SoftTransactionConfiguration transactionConfig;
// SoftTransactionManager.java
/** * 初始化事務管理器. */
public void init() throws SQLException {
// 初始化 最大努力送達型事務監聽器
EventBusInstance.getInstance().register(new BestEffortsDeliveryListener());
// 初始化 事務日誌數據庫存儲表
if (TransactionLogDataSourceType.RDB == transactionConfig.getStorageType()) {
Preconditions.checkNotNull(transactionConfig.getTransactionLogDataSource());
createTable();
}
// 初始化 內嵌的最大努力送達型異步做業
if (transactionConfig.getBestEffortsDeliveryJobConfiguration().isPresent()) {
new NestedBestEffortsDeliveryJobFactory(transactionConfig).init();
}
}複製代碼
transaction_log
)不存在則進行建立。在『事務日誌存儲器』小節會詳細分享SoftTransactionConfiguration
SoftTransactionConfiguration,柔性事務配置對象。
public class SoftTransactionConfiguration {
/** * 事務管理器管理的數據源. */
@Getter(AccessLevel.NONE)
private final DataSource targetDataSource;
/** * 同步的事務送達的最大嘗試次數. */
private int syncMaxDeliveryTryTimes = 3;
/** * 事務日誌存儲類型. */
private TransactionLogDataSourceType storageType = RDB;
/** * 存儲事務日誌的數據源. */
private DataSource transactionLogDataSource;
/** * 內嵌的最大努力送達型異步做業配置對象. */
private Optional<NestedBestEffortsDeliveryJobConfiguration> bestEffortsDeliveryJobConfiguration = Optional.absent();
}複製代碼
在 Sharding-JDBC 裏,目前柔性事務分紅兩種:
繼承 AbstractSoftTransaction
public abstract class AbstractSoftTransaction {
/** * 分片鏈接原自動提交狀態 */
private boolean previousAutoCommit;
/** * 分片鏈接 */
@Getter
private ShardingConnection connection;
/** * 事務類型 */
@Getter
private SoftTransactionType transactionType;
/** * 事務編號 */
@Getter
private String transactionId;
}複製代碼
AbstractSoftTransaction 實現了開啓柔性事務、關閉柔性事務兩個方法提供給子類調用:
#beginInternal()
/** * 開啓柔性 * * @param conn 分片鏈接 * @param type 事務類型 * @throws SQLException */
protected final void beginInternal(final Connection conn, final SoftTransactionType type) throws SQLException {
// TODO 判斷若是在傳統事務中,則拋異常
Preconditions.checkArgument(conn instanceof ShardingConnection, "Only ShardingConnection can support eventual consistency transaction.");
// 設置執行錯誤,不拋出異常
ExecutorExceptionHandler.setExceptionThrown(false);
connection = (ShardingConnection) conn;
transactionType = type;
// 設置自動提交狀態
previousAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(true);
// 生成事務編號
// TODO 替換UUID爲更有效率的id生成器
transactionId = UUID.randomUUID().toString();
}複製代碼
* 調用 `ExecutorExceptionHandler.setExceptionThrown(false)` 設置執行 SQL 錯誤時,也不拋出異常。
* 對異常處理的代碼:[ExecutorExceptionHandler#setExceptionThrown()](https://github.com/dangdangdotcom/sharding-jdbc/blob/884b38f4c2402e31464d15b444f4b405e07fe211/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/threadlocal/ExecutorExceptionHandler.java#L59)
* 對於其餘 SQL,不會由於 SQL 錯誤不執行,會繼續執行
* 對於上層業務,不會由於 SQL 錯誤終止邏輯,會繼續執行。這裏有一點要注意下,上層業務不能對該 SQL 執行結果有強依賴,由於 SQL 錯誤須要重試達到數據最終一致性
* 對於**最大努力型事務**( TCC暫未實現 ),會對執行錯誤的 SQL 進行重試
* 調用 `connection.setAutoCommit(true);`,設置執行自動提交。**使用最大努力型事務時,上層業務執行 SQL 會立刻提交,即便調用 `Connection#rollback()` 也是沒法回滾的,這點必定要注意。**複製代碼
#end()
/** * 結束柔性事務. */
public final void end() throws SQLException {
if (connection != null) {
ExecutorExceptionHandler.setExceptionThrown(true);
connection.setAutoCommit(previousAutoCommit);
SoftTransactionManager.closeCurrentTransactionManager();
}
}
// SoftTransactionManager.java
/** * 關閉當前的柔性事務管理器. */
static void closeCurrentTransactionManager() {
ExecutorDataMap.getDataMap().put(TRANSACTION, null);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, null);
}複製代碼
#end()
清理線程變量。不然,下次請求使用到該線程,會繼續在這個柔性事務內。BEDSoftTransaction
BEDSoftTransaction,最大努力送達型柔性事務。
public class BEDSoftTransaction extends AbstractSoftTransaction {
/** * 開啓柔性事務. * * @param connection 數據庫鏈接對象 */
public void begin(final Connection connection) throws SQLException {
beginInternal(connection, SoftTransactionType.BestEffortsDelivery);
}
}複製代碼
TCCSoftTransaction
TCCSoftTransaction,TCC 型柔性事務,暫未實現。實現後,會更新到 《Sharding-JDBC 源碼分析 —— 分佈式事務(二)之事務補償型》。
經過調用 SoftTransactionManager#getTransaction()
建立柔性事務對象:
/** * {@link ExecutorDataMap#dataMap} 柔性事務對象 key */
private static final String TRANSACTION = "transaction";
/** * {@link ExecutorDataMap#dataMap} 柔性事務配置 key */
private static final String TRANSACTION_CONFIG = "transactionConfig";
// SoftTransactionManager.java
/** * 建立柔性事務. * * @param type 柔性事務類型 * @return 柔性事務 */
public AbstractSoftTransaction getTransaction(final SoftTransactionType type) {
AbstractSoftTransaction result;
switch (type) {
case BestEffortsDelivery:
result = new BEDSoftTransaction();
break;
case TryConfirmCancel:
result = new TCCSoftTransaction();
break;
default:
throw new UnsupportedOperationException(type.toString());
}
// TODO 目前使用不支持嵌套事務,之後這裏須要可配置
if (getCurrentTransaction().isPresent()) {
throw new UnsupportedOperationException("Cannot support nested transaction.");
}
ExecutorDataMap.getDataMap().put(TRANSACTION, result);
ExecutorDataMap.getDataMap().put(TRANSACTION_CONFIG, transactionConfig);
return result;
}複製代碼
後續能夠從 ExecutorDataMap 中獲取當前線程的柔性事務和柔性事務配置:
// SoftTransactionManager.java
/** * 獲取當前線程的柔性事務配置. * * @return 當前線程的柔性事務配置 */
public static Optional<SoftTransactionConfiguration> getCurrentTransactionConfiguration() {
Object transactionConfig = ExecutorDataMap.getDataMap().get(TRANSACTION_CONFIG);
return (null == transactionConfig)
? Optional.<SoftTransactionConfiguration>absent()
: Optional.of((SoftTransactionConfiguration) transactionConfig);
}
/** * 獲取當前的柔性事務. * * @return 當前的柔性事務 */
public static Optional<AbstractSoftTransaction> getCurrentTransaction() {
Object transaction = ExecutorDataMap.getDataMap().get(TRANSACTION);
return (null == transaction)
? Optional.<AbstractSoftTransaction>absent()
: Optional.of((AbstractSoftTransaction) transaction);
}複製代碼
柔性事務執行過程當中,會經過事務日誌( TransactionLog ) 記錄每條 SQL 執行狀態:
經過實現事務日誌存儲器接口( TransactionLogStorage ),提供存儲功能。目前有兩種實現:
本節只分析 RdbTransactionLogStorage。對 MemoryTransactionLogStorage 感興趣的同窗能夠點擊連接傳送到達。
TransactionLogStorage 有五個接口方法,下文每一個小標題都是一個方法。
// TransactionLogStorage.java
/** * 存儲事務日誌. * * @param transactionLog 事務日誌 */
void add(TransactionLog transactionLog);
// RdbTransactionLogStorage.java
@Override
public void add(final TransactionLog transactionLog) {
String sql = "INSERT INTO `transaction_log` (`id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`) VALUES (?, ?, ?, ?, ?, ?);";
try (
// ... 省略你熟悉的代碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}複製代碼
#add()
和下文的 #remove()
異常時,都打印下異常日誌都文件系統TransactionLog (transaction_log) 數據庫表結構以下:
字段 | 名字 | 數據庫類型 | 備註 |
---|---|---|---|
id | 事件編號 | VARCHAR(40) | EventBus 事件編號,非事務編號 |
transaction_type | 柔性事務類型 | VARCHAR(30) | |
data_source | 真實數據源名 | VARCHAR(255) | |
sql | 執行 SQL | TEXT | 已經改寫過的 SQL |
parameters | 佔位符參數 | TEXT | JSON 字符串存儲 |
creation_time | 記錄時間 | LONG | |
async_delivery_try_times | 已異步重試次數 | INT |
// TransactionLogStorage.java
/** * 根據主鍵刪除事務日誌. * * @param id 事務日誌主鍵 */
void remove(String id);
// RdbTransactionLogStorage.java
@Override
public void remove(final String id) {
String sql = "DELETE FROM `transaction_log` WHERE `id`=?;";
try (
// ... 省略你熟悉的代碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}複製代碼
// TransactionLogStorage.java
/** * 讀取須要處理的事務日誌. * * <p>須要處理的事務日誌爲: </p> * <p>1. 異步處理次數小於最大處理次數.</p> * <p>2. 異步處理的事務日誌早於異步處理的間隔時間.</p> * * @param size 獲取日誌的數量 * @param maxDeliveryTryTimes 事務送達的最大嘗試次數 * @param maxDeliveryTryDelayMillis 執行送達事務的延遲毫秒數. */
List<TransactionLog> findEligibleTransactionLogs(int size, int maxDeliveryTryTimes, long maxDeliveryTryDelayMillis);
// RdbTransactionLogStorage.java
@Override
public List<TransactionLog> findEligibleTransactionLogs(final int size, final int maxDeliveryTryTimes, final long maxDeliveryTryDelayMillis) {
List<TransactionLog> result = new ArrayList<>(size);
String sql = "SELECT `id`, `transaction_type`, `data_source`, `sql`, `parameters`, `creation_time`, `async_delivery_try_times` "
+ "FROM `transaction_log` WHERE `async_delivery_try_times`<? AND `transaction_type`=? AND `creation_time`<? LIMIT ?;";
try (Connection conn = dataSource.getConnection()) {
// ... 省略你熟悉的代碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
return result;
}複製代碼
// TransactionLogStorage.java
/** * 增長事務日誌異步重試次數. * * @param id 事務主鍵 */
void increaseAsyncDeliveryTryTimes(String id);
// RdbTransactionLogStorage.java
@Override
public void increaseAsyncDeliveryTryTimes(final String id) {
String sql = "UPDATE `transaction_log` SET `async_delivery_try_times`=`async_delivery_try_times`+1 WHERE `id`=?;";
try (
// ... 省略你熟悉的代碼
} catch (final SQLException ex) {
throw new TransactionLogStorageException(ex);
}
}複製代碼
// TransactionLogStorage.java
/** * 處理事務數據. * * @param connection 業務數據庫鏈接 * @param transactionLog 事務日誌 * @param maxDeliveryTryTimes 事務送達的最大嘗試次數 */
boolean processData(Connection connection, TransactionLog transactionLog, int maxDeliveryTryTimes);
// RdbTransactionLogStorage.java
@Override
public boolean processData(final Connection connection, final TransactionLog transactionLog, final int maxDeliveryTryTimes) {
// 重試執行失敗 SQL
try (
Connection conn = connection;
PreparedStatement preparedStatement = conn.prepareStatement(transactionLog.getSql())) {
for (int parameterIndex = 0; parameterIndex < transactionLog.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, transactionLog.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
} catch (final SQLException ex) {
// 重試失敗,更新事務日誌,增長已異步重試次數
increaseAsyncDeliveryTryTimes(transactionLog.getId());
throw new TransactionCompensationException(ex);
}
// 移除重試執行成功 SQL 對應的事務日誌
remove(transactionLog.getId());
return true;
}複製代碼
#processData()
是帶有一些邏輯的。根據事務日誌( TransactionLog )重試執行失敗的 SQL,若成功,移除事務日誌;若失敗,更新事務日誌,增長已異步重試次數最大努力送達型事務監聽器,BestEffortsDeliveryListener,負責記錄事務日誌、同步重試執行失敗 SQL。
// BestEffortsDeliveryListener.java
@Subscribe
@AllowConcurrentEvents
public void listen(final DMLExecutionEvent event) {
if (!isProcessContinuously()) {
return;
}
SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get();
TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource());
BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get();
switch (event.getEventExecutionType()) {
case BEFORE_EXECUTE: // 執行前,插入事務日誌
//TODO 對於批量執行的SQL須要解析成兩層列表
transactionLogStorage.add(new TransactionLog(event.getId(), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(),
event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0));
return;
case EXECUTE_SUCCESS: // 執行成功,移除事務日誌
transactionLogStorage.remove(event.getId());
return;
case EXECUTE_FAILURE: // 執行失敗,同步重試
boolean deliverySuccess = false;
for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { // 同步【屢次】重試
if (deliverySuccess) {
return;
}
boolean isNewConnection = false;
Connection conn = null;
PreparedStatement preparedStatement = null;
try {
// 得到數據庫鏈接
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
if (!isValidConnection(conn)) { // 由於可能執行失敗是數據庫鏈接異常,因此判斷一次,若是無效,從新獲取數據庫鏈接
bedSoftTransaction.getConnection().release(conn);
conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.DML);
isNewConnection = true;
}
preparedStatement = conn.prepareStatement(event.getSql());
// 同步重試
//TODO 對於批量事件須要解析成兩層列表
for (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) {
preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex));
}
preparedStatement.executeUpdate();
deliverySuccess = true;
// 同步重試成功,移除事務日誌
transactionLogStorage.remove(event.getId());
} catch (final SQLException ex) {
log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex);
} finally {
close(isNewConnection, conn, preparedStatement);
}
}
return;
default:
throw new UnsupportedOperationException(event.getEventExecutionType().toString());
}
}複製代碼
#isProcessContinuously()
方法判斷是否處於最大努力送達型事務中,當且僅當處於該狀態才進行監聽事件處理SQL 執行失敗,根據柔性事務配置( SoftTransactionConfiguration )同步的事務送達的最大嘗試次數( syncMaxDeliveryTryTimes
)進行屢次重試直到成功。整體邏輯和 RdbTransactionLogStorage#processData()
方法邏輯相似,區別在於獲取分片數據庫鏈接的特殊處理:此處調用失敗,數據庫鏈接多是異常無效的,所以調用了 #isValidConnection()
判斷鏈接的有效性。若無效,則從新獲取分片數據庫鏈接。另外,如果從新獲取分片數據庫鏈接,須要進行關閉釋放 (Connection#close()
):
// BestEffortsDeliveryListener.java
/** * 經過 SELECT 1 校驗數據庫鏈接是否有效 * * @param conn 數據庫鏈接 * @return 是否有效 */
private boolean isValidConnection(final Connection conn) {
try (PreparedStatement preparedStatement = conn.prepareStatement("SELECT 1")) {
try (ResultSet rs = preparedStatement.executeQuery()) {
return rs.next() && 1 == rs.getInt("1");
}
} catch (final SQLException ex) {
return false;
}
}
/** * 關閉釋放預編譯SQL對象和數據庫鏈接 * * @param isNewConnection 是否新建立的數據庫鏈接,是的狀況下才釋放 * @param conn 數據庫鏈接 * @param preparedStatement 預編譯SQL */
private void close(final boolean isNewConnection, final Connection conn, final PreparedStatement preparedStatement) {
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (final SQLException ex) {
log.error("PreparedStatement closed error:", ex);
}
}
if (isNewConnection && null != conn) {
try {
conn.close();
} catch (final SQLException ex) {
log.error("Connection closed error:", ex);
}
}
}複製代碼
當最大努力送達型事務監聽器( BestEffortsDeliveryListener )屢次同步重試失敗後,交給最大努力送達型異步做業進行屢次異步重試,而且屢次執行有固定間隔。
Sharding-JDBC 提供了兩個最大努力送達型異步做業實現:
二者實現代碼邏輯基本一致。前者相比後者,用於開發測試,去除對 Zookeeper 依賴,沒法實現高可用,所以生產環境下不適合使用。
BestEffortsDeliveryJob 所在 Maven 項目爲 sharding-jdbc-transaction-async-job
,基於噹噹開源的 Elastic-Job 實現。以下是官方對該 Maven 項目的簡要說明:
因爲柔性事務採用異步嘗試,須要部署獨立的做業和Zookeeper。sharding-jdbc-transaction採用elastic-job實現的sharding-jdbc-transaction-async-job,經過簡單配置便可啓動高可用做業異步送達柔性事務,啓動腳本爲start.sh。
BestEffortsDeliveryJob
public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {
/** * 最大努力送達型異步做業配置對象 */
@Setter
private BestEffortsDeliveryConfiguration bedConfig;
/** * 事務日誌存儲器對象 */
@Setter
private TransactionLogStorage transactionLogStorage;
@Override
public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {
return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(),
bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
}
@Override
public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
try (
Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {
transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
} catch (final SQLException | TransactionCompensationException ex) {
log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1,
bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
return false;
}
return true;
}
@Override
public boolean isStreamingProcess() {
return false;
}
}複製代碼
#fetchData()
方法獲取須要處理的事務日誌 (TransactionLog),內部調用了 TransactionLogStorage#findEligibleTransactionLogs()
方法#processData()
方法處理事務日誌,重試執行失敗的 SQL,內部調用了 TransactionLogStorage#processData()
#fetchData()
和 #processData()
調用是 Elastic-Job 控制的。每一輪定時調度,每條事務日誌只執行一次。當超過最大異步調用次數後,該條事務日誌再也不處理,因此生產使用時,最好增長下相應監控超過最大異步重試次數的事務日誌。AsyncSoftTransactionJobConfiguration,異步柔性事務做業配置對象。
public class AsyncSoftTransactionJobConfiguration {
/** * 做業名稱. */
private String name = "bestEffortsDeliveryJob";
/** * 觸發做業的cron表達式. */
private String cron = "0/5 * * * * ?";
/** * 每次做業獲取的事務日誌最大數量. */
private int transactionLogFetchDataCount = 100;
/** * 事務送達的最大嘗試次數. */
private int maxDeliveryTryTimes = 3;
/** * 執行事務的延遲毫秒數. * * <p>早於此間隔時間的入庫事務纔會被做業執行.</p> */
private long maxDeliveryTryDelayMillis = 60 * 1000L;
}複製代碼
Sharding-JDBC 提供的最大努力送達型異步做業實現( BestEffortsDeliveryJob ),經過與 Elastic-Job 集成,能夠很便捷而且有質量保證的高可用、高性能使用。一部分團隊,可能已經引入或自研了相似 Elastic-Job 的分佈式做業中間件解決方案,每多一箇中間件,就是多一個學習與運維成本。那麼是否可使用本身的分佈式做業解決方案?答案是,能夠的。參考 BestEffortsDeliveryJob 的實現,經過調用 TransactionLogStorage 來實現:
// 僞代碼(不考慮性能、異常)
List<TransactionLog> transactionLogs = transactionLogStorage.findEligibleTransactionLogs(....);
for (TransactionLog transactionLog : transactionLogs) {
transactionLogStorage.processData(conn, log, maxDeliveryTryTimes);
}複製代碼
固然,我的仍是很推薦 Elastic-Job。
😈 筆者要開始寫《Elastic-Job 源碼分析》。
另外,若是有支持事務消息的分佈式隊列系統,能夠經過 TransactionLogStorage 實現存儲事務消息存儲成消息。爲何要支持事務消息?若是 SQL 執行是成功的,須要回滾(刪除)事務消息。
哈哈哈
算是堅持把這個系列寫完了,給本身 32 個贊。
知足!
《Elastic-Job 源碼分析》 走起!不 High 不結束!