摘要: 原創出處 http://www.iocoder.cn/TCC-Transaction/transaction-repository/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!java
本文主要基於 TCC-Transaction 1.2.3.3 正式版git
🙂🙂🙂關注**微信公衆號:【芋道源碼】**有福利:github
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文分享 事務存儲器。主要涉及以下 Maven 項目:redis
tcc-transaction-core
:tcc-transaction 底層實現。在 TCC 的過程當中,根據應用內存中的事務信息完成整個事務流程。But 實際業務場景中,將事務信息只放在應用內存中是遠遠不夠可靠的。例如:數據庫
所以,TCC-Transaction 將事務信息添加到內存中的同時,會使用外部存儲進行持久化。目前提供四種外部存儲:apache
本文涉及到的類關係以下圖( 打開大圖 ):數組
你行好事會由於獲得讚揚而愉悅
同理,開源項目貢獻者會由於 Star 而更加有動力
爲 TCC-Transaction 點贊!傳送門緩存
ps:筆者假設你已經閱讀過《tcc-transaction 官方文檔 —— 使用指南1.2.x》。安全
在《TCC-Transaction 源碼分析 —— TCC 實現》「4. 事務與參與者」,能夠看到 Transaction 是一個比較複雜的對象,內嵌 Participant 數組,而 Participant 自己也是複雜的對象,內嵌了更多的其餘對象,所以,存儲器在持久化 Transaction 時,須要序列化後才能存儲。服務器
org.mengyun.tcctransaction.serializer.ObjectSerializer
,對象序列化接口。實現代碼以下:
public interface ObjectSerializer<T> {
byte[] serialize(T t);
T deserialize(byte[] bytes);
}
複製代碼
目前提供 JDK自帶序列化 和 Kyro序列化 兩種實現。
org.mengyun.tcctransaction.serializer.JdkSerializationSerializer
,JDK 序列化實現。比較易懂,點擊連接直接查看。
TCC-Transaction 使用的默認的序列化。
org.mengyun.tcctransaction.serializer.KryoTransactionSerializer
,Kyro 序列化實現。比較易懂,點擊連接直接查看。
JDK 和 Kyro 的序列化實現,肉眼沒法直觀具體存儲事務的信息,你能夠經過實現 ObjectSerializer 接口,實現自定義的 JSON 序列化。
org.mengyun.tcctransaction.TransactionRepository
,事務存儲器接口。實現代碼以下:
public interface TransactionRepository {
/** * 新增事務 * * @param transaction 事務 * @return 新增數量 */
int create(Transaction transaction);
/** * 更新事務 * * @param transaction 事務 * @return 更新數量 */
int update(Transaction transaction);
/** * 刪除事務 * * @param transaction 事務 * @return 刪除數量 */
int delete(Transaction transaction);
/** * 獲取事務 * * @param xid 事務編號 * @return 事務 */
Transaction findByXid(TransactionXid xid);
/** * 獲取超過指定時間的事務集合 * * @param date 指定時間 * @return 事務集合 */
List<Transaction> findAllUnmodifiedSince(Date date);
}
複製代碼
不一樣的存儲器經過實現該接口,提供事務的增刪改查功能。
org.mengyun.tcctransaction.repository.CachableTransactionRepository
,可緩存的事務存儲器抽象類,實現增刪改查事務時,同時緩存事務信息。在上面類圖,咱們也能夠看到 TCC-Transaction 自帶的多種存儲器都繼承該抽象類。
CachableTransactionRepository 構造方法實現代碼以下:
public abstract class CachableTransactionRepository implements TransactionRepository {
/** * 緩存過時時間 */
private int expireDuration = 120;
/** * 緩存 */
private Cache<Xid, Transaction> transactionXidCompensableTransactionCache;
public CachableTransactionRepository() {
transactionXidCompensableTransactionCache = CacheBuilder.newBuilder().expireAfterAccess(expireDuration, TimeUnit.SECONDS).maximumSize(1000).build();
}
}
複製代碼
#create(...)
實現代碼以下:
@Override
public int create(Transaction transaction) {
int result = doCreate(transaction);
if (result > 0) {
putToCache(transaction);
}
return result;
}
/** * 添加到緩存 * * @param transaction 事務 */
protected void putToCache(Transaction transaction) {
transactionXidCompensableTransactionCache.put(transaction.getXid(), transaction);
}
/** * 新增事務 * * @param transaction 事務 * @return 新增數量 */
protected abstract int doCreate(Transaction transaction);
複製代碼
#doCreate(...)
方法,新增事務。新增成功後,調用 #putToCache(...)
方法,添加事務到緩存。#doCreate(...)
爲抽象方法,子類實現該方法,提供新增事務功能。#update(...)
實現代碼以下:
@Override
public int update(Transaction transaction) {
int result = 0;
try {
result = doUpdate(transaction);
if (result > 0) {
putToCache(transaction);
} else {
throw new OptimisticLockException();
}
} finally {
if (result <= 0) { // 更新失敗,移除緩存。下次訪問,從存儲器讀取
removeFromCache(transaction);
}
}
return result;
}
/** * 移除事務從緩存 * * @param transaction 事務 */
protected void removeFromCache(Transaction transaction) {
transactionXidCompensableTransactionCache.invalidate(transaction.getXid());
}
/** * 更新事務 * * @param transaction 事務 * @return 更新數量 */
protected abstract int doUpdate(Transaction transaction);
複製代碼
#doUpdate(...)
方法,更新事務。
#putToCache(...)
方法,添加事務到緩存。Transaction.version
)和存儲器裏的事務的版本號不一樣,更新失敗。爲何?在《TCC-Transaction 源碼分析 —— 事務恢復》詳細解析。更新失敗,意味着緩存已經不不一致,調用 #removeFromCache(...)
方法,移除事務從緩存中。#doUpdate(...)
爲抽象方法,子類實現該方法,提供更新事務功能。#delete(...)
實現代碼以下:
@Override
public int delete(Transaction transaction) {
int result;
try {
result = doDelete(transaction);
} finally {
removeFromCache(transaction);
}
return result;
}
/** * 刪除事務 * * @param transaction 事務 * @return 刪除數量 */
protected abstract int doDelete(Transaction transaction);
複製代碼
#doDelete(...)
方法,刪除事務。#removeFromCache(...)
方法,移除事務從緩存中。#doDelete(...)
爲抽象方法,子類實現該方法,提供刪除事務功能。#findByXid(...)
實現代碼以下:
@Override
public Transaction findByXid(TransactionXid transactionXid) {
Transaction transaction = findFromCache(transactionXid);
if (transaction == null) {
transaction = doFindOne(transactionXid);
if (transaction != null) {
putToCache(transaction);
}
}
return transaction;
}
/** * 得到事務從緩存中 * * @param transactionXid 事務編號 * @return 事務 */
protected Transaction findFromCache(TransactionXid transactionXid) {
return transactionXidCompensableTransactionCache.getIfPresent(transactionXid);
}
/** * 查詢事務 * * @param xid 事務編號 * @return 事務 */
protected abstract Transaction doFindOne(Xid xid);
複製代碼
#findFromCache()
方法,優先從緩存中獲取事務。#doFindOne()
方法,緩存中事務不存在,從存儲器中獲取。獲取到後,調用 #putToCache()
方法,添加事務到緩存中。#doFindOne(...)
爲抽象方法,子類實現該方法,提供查詢事務功能。#findAllUnmodifiedSince(...)
實現代碼以下:
@Override
public List<Transaction> findAllUnmodifiedSince(Date date) {
List<Transaction> transactions = doFindAllUnmodifiedSince(date);
// 添加到緩存
for (Transaction transaction : transactions) {
putToCache(transaction);
}
return transactions;
}
/** * 獲取超過指定時間的事務集合 * * @param date 指定時間 * @return 事務集合 */
protected abstract List<Transaction> doFindAllUnmodifiedSince(Date date);
複製代碼
#findAllUnmodifiedSince(...)
方法,從存儲器獲取超過指定時間的事務集合。調用 #putToCache(...)
方法,循環事務集合添加到緩存。#doFindAllUnmodifiedSince(...)
爲抽象方法,子類實現該方法,提供獲取超過指定時間的事務集合功能。org.mengyun.tcctransaction.repository.JdbcTransactionRepository
,JDBC 事務存儲器,經過 JDBC 驅動,將 Transaction 存儲到 MySQL / Oracle / PostgreSQL / SQLServer 等關係數據庫。實現代碼以下:
public class JdbcTransactionRepository extends CachableTransactionRepository {
/** * 領域 */
private String domain;
/** * 表後綴 */
private String tbSuffix;
/** * 數據源 */
private DataSource dataSource;
/** * 序列化 */
private ObjectSerializer serializer = new JdkSerializationSerializer();
}
複製代碼
domain
,領域,或者也能夠稱爲模塊名,應用名,用於惟一標識一個資源。例如,Maven 模塊 xxx-order
,咱們能夠配置該屬性爲 ORDER
。tbSuffix
,表後綴。默認存儲表名爲 TCC_TRANSACTION
,配置表名後,爲 TCC_TRANSACTION${tbSuffix}
。dataSource
,存儲數據的數據源。serializer
,序列化。**當數據庫裏已經有數據的狀況下,不要更換別的序列化,不然會致使反序列化報錯。**建議:TCC-Transaction 存儲時,新增字段,記錄序列化的方式。表結構以下:
CREATE TABLE `TCC_TRANSACTION` (
`TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT,
`DOMAIN` varchar(100) DEFAULT NULL,
`GLOBAL_TX_ID` varbinary(32) NOT NULL,
`BRANCH_QUALIFIER` varbinary(32) NOT NULL,
`CONTENT` varbinary(8000) DEFAULT NULL,
`STATUS` int(11) DEFAULT NULL,
`TRANSACTION_TYPE` int(11) DEFAULT NULL,
`RETRIED_COUNT` int(11) DEFAULT NULL,
`CREATE_TIME` datetime DEFAULT NULL,
`LAST_UPDATE_TIME` datetime DEFAULT NULL,
`VERSION` int(11) DEFAULT NULL,
PRIMARY KEY (`TRANSACTION_ID`), UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
複製代碼
TRANSACTION_ID
,僅僅數據庫自增,無實際用途。CONTENT
,Transaction 序列化。ps:點擊連接查看 JdbcTransactionRepository 代碼實現,已經添加完整中文註釋。
org.mengyun.tcctransaction.repository.RedisTransactionRepository
,Redis 事務存儲器,將 Transaction 存儲到 Redis。實現代碼以下:
public class RedisTransactionRepository extends CachableTransactionRepository {
/** * Jedis Pool */
private JedisPool jedisPool;
/** * key 前綴 */
private String keyPrefix = "TCC:";
/** * 序列化 */
private ObjectSerializer serializer = new JdkSerializationSerializer();
}
複製代碼
keyPrefix
,key 前綴。相似 JdbcTransactionRepository 的 domain
屬性。一個事務存儲到 Reids,使用 Redis 的數據結構爲 HASHES。
key : 使用 keyPrefix
+ xid
,實現代碼以下:
/** * 建立事務的 Redis Key * * @param keyPrefix key 前綴 * @param xid 事務 * @return Redis Key */
public static byte[] getRedisKey(String keyPrefix, Xid xid) {
byte[] prefix = keyPrefix.getBytes();
byte[] globalTransactionId = xid.getGlobalTransactionId();
byte[] branchQualifier = xid.getBranchQualifier();
// 拼接 key
byte[] key = new byte[prefix.length + globalTransactionId.length + branchQualifier.length];
System.arraycopy(prefix, 0, key, 0, prefix.length);
System.arraycopy(globalTransactionId, 0, key, prefix.length, globalTransactionId.length);
System.arraycopy(branchQualifier, 0, key, prefix.length + globalTransactionId.length, branchQualifier.length);
return key;
}
複製代碼
HASHES 的 key :使用 version
。
HASHES 的 value :調用 TransactionSerializer#serialize(...)
方法,序列化 Transaction。實現代碼以下:
public static byte[] serialize(ObjectSerializer serializer, Transaction transaction) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("GLOBAL_TX_ID", transaction.getXid().getGlobalTransactionId());
map.put("BRANCH_QUALIFIER", transaction.getXid().getBranchQualifier());
map.put("STATUS", transaction.getStatus().getId());
map.put("TRANSACTION_TYPE", transaction.getTransactionType().getId());
map.put("RETRIED_COUNT", transaction.getRetriedCount());
map.put("CREATE_TIME", transaction.getCreateTime());
map.put("LAST_UPDATE_TIME", transaction.getLastUpdateTime());
map.put("VERSION", transaction.getVersion());
// 序列化
map.put("CONTENT", serializer.serialize(transaction));
return serializer.serialize(map);
}
複製代碼
在實現 #doFindAllUnmodifiedSince(date)
方法,沒法像數據庫使用時間條件進行過濾,所以,加載全部事務後在內存中過濾。實現代碼以下:
@Override
protected List<Transaction> doFindAllUnmodifiedSince(Date date) {
// 得到全部事務
List<Transaction> allTransactions = doFindAll();
// 過濾時間
List<Transaction> allUnmodifiedSince = new ArrayList<Transaction>();
for (Transaction transaction : allTransactions) {
if (transaction.getLastUpdateTime().compareTo(date) < 0) {
allUnmodifiedSince.add(transaction);
}
}
return allUnmodifiedSince;
}
複製代碼
ps:點擊連接查看 RedisTransactionRepository 代碼實現,已經添加完整中文註釋。
FROM 《TCC-Transaction 官方文檔 —— 使用指南1.2.x》
使用 RedisTransactionRepository 須要配置 Redis 服務器以下:
appendonly yes
appendfsync always
org.mengyun.tcctransaction.repository.ZooKeeperTransactionRepository
,Zookeeper 事務存儲器,將 Transaction 存儲到 Zookeeper。實現代碼以下:
public class ZooKeeperTransactionRepository extends CachableTransactionRepository {
/** * Zookeeper 服務器地址數組 */
private String zkServers;
/** * Zookeeper 超時時間 */
private int zkTimeout;
/** * TCC 存儲 Zookeeper 根目錄 */
private String zkRootPath = "/tcc";
/** * Zookeeper 鏈接 */
private volatile ZooKeeper zk;
/** * 序列化 */
private ObjectSerializer serializer = new JdkSerializationSerializer();
}
複製代碼
zkRootPath
,存儲 Zookeeper 根目錄,相似 JdbcTransactionRepository 的 domain
屬性。一個事務存儲到 Zookeeper,使用 Zookeeper 的持久數據節點。
path:${zkRootPath}
+ /
+ ${xid}
。實現代碼以下:
// ZooKeeperTransactionRepository.java
private String getTxidPath(Xid xid) {
return String.format("%s/%s", zkRootPath, xid);
}
// TransactionXid.java
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("globalTransactionId:").append(UUID.nameUUIDFromBytes(globalTransactionId).toString());
stringBuilder.append(",").append("branchQualifier:").append(UUID.nameUUIDFromBytes(branchQualifier).toString());
return stringBuilder.toString();
}
複製代碼
data:調用 TransactionSerializer#serialize(...)
方法,序列化 Transaction。
version:使用 Zookeeper 數據節點自帶版本功能。這裏要注意下,Transaction 的版本從 1 開始,而 Zookeeper 數據節點版本從 0 開始。
ps:點擊連接查看 ZooKeeperTransactionRepository 代碼實現,已經添加完整中文註釋。
另外,在生產上暫時不建議使用 ZooKeeperTransactionRepository,緣由有兩點:
若是你要使用 Zookeeper 進行事務的存儲,能夠考慮使用 Apache Curator 操做 Zookeeper,重寫 ZooKeeperTransactionRepository 部分代碼。
org.mengyun.tcctransaction.repository.FileSystemTransactionRepository
,File 事務存儲器,將 Transaction 存儲到文件系統。
實現上和 ZooKeeperTransactionRepository,區別主要在於不支持樂觀鎖更新。有興趣的同窗點擊連接查看,這裏就不拓展開來。
另外,在生產上不建議使用 FileSystemTransactionRepository,由於不支持多節點共享。用分佈式存儲掛載文件另說,固然仍是不建議,由於不支持樂觀鎖併發更新。
這篇略( 超 )微( 級 )水更,哈哈哈,爲《TCC-Transaction 源碼分析 —— 事務恢復》作鋪墊啦。
使用 RedisTransactionRepository 和 ZooKeeperTransactionRepository 存儲事務仍是 Get 蠻多點的。
胖友,分享一個朋友圈可好?