最近在重構項目中,須要兼容多數據源,故此實現下多數據源事務。java
此次重構項目中,爲了支持後續龐大的數據量接入,更迭了數據庫,可是爲了要兼容老版本,也不能直接拿掉老的數據庫。因此就有了兼容多數據源的需求,尤爲是要保證事務。git
其實這個需求就是要實現分佈式事務,可是咱們的這個場景是在一個服務內,因此能夠利用AOP來輕量的實現這個需求,如果多個服務的話,就須要實現一個管理器。github
用過spring的都知道,咱們通常都是使用@Transactional
註解,可是這個註解在多數據源下,只能支持指定的數據源(不指定就是默認的)。spring
因此咱們新建個自定義註解:sql
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactional {
String[] values() default "";
}
複製代碼
再定義一個切面:數據庫
/**
* 多數據源事務
*
* @author 7le
*/
@Slf4j
@Aspect
@Order(-7)
@Component
public class MultiTransactionalAspect {
@Autowired
private ApplicationContext applicationContext;
@Around(value = "@annotation(multiTransactional)")
public Object around(ProceedingJoinPoint pjp, MultiTransactional multiTransactional) throws Throwable {
Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack = new Stack<>();
Stack<TransactionStatus> transactionStatusStack = new Stack<>();
try {
if (!openTransaction(dataSourceTransactionManagerStack, transactionStatusStack, multiTransactional)) {
return null;
}
Object ret = pjp.proceed();
commit(dataSourceTransactionManagerStack, transactionStatusStack);
return ret;
} catch (Throwable e) {
rollback(dataSourceTransactionManagerStack, transactionStatusStack);
log.error(String.format(
"MultiTransactionalAspect catch exception class: %s method: %s detail:", pjp.getTarget().getClass().getSimpleName(),
pjp.getSignature().getName()), e);
throw e;
}
}
private boolean openTransaction(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatusStack, MultiTransactional multiTransactional) {
String[] transactionMangerNames = multiTransactional.values();
if (ArrayUtils.isEmpty(multiTransactional.values())) {
return false;
}
for (String beanName : transactionMangerNames) {
DataSourceTransactionManager dataSourceTransactionManager = (DataSourceTransactionManager) applicationContext
.getBean(beanName);
TransactionStatus transactionStatus = dataSourceTransactionManager
.getTransaction(new DefaultTransactionDefinition());
transactionStatusStack.push(transactionStatus);
dataSourceTransactionManagerStack.push(dataSourceTransactionManager);
}
return true;
}
private void commit(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatusStack) {
while (!dataSourceTransactionManagerStack.isEmpty()) {
dataSourceTransactionManagerStack.pop().commit(transactionStatusStack.pop());
}
}
private void rollback(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatusStack) {
while (!dataSourceTransactionManagerStack.isEmpty()) {
dataSourceTransactionManagerStack.pop().rollback(transactionStatusStack.pop());
}
}
}
複製代碼
這樣就大功告成,只須要在須要的方法上,加上@MultiTransactional({"xxxx","xxxxx"})
bash
實現的代碼在 springcloud-gateway架構
在使用的時候,須要注意一些細節,要加上@EnableTransactionManagement
。app
以及@MultiTransactional({"xxxx","xxxxx"})
是AOP的方式,那就意味着是動態代理的,那下面的方式就會失效:分佈式
上面也提到過這個方案比較輕量,也是針對一些對數據一致性要求不高的場景,由於會存在數據不一致的可能。
咱們用僞代碼來描述下,假設2個數據源
begin1
begin2
sql1
sql2
commit1
commit2
複製代碼
這種方案是能夠實如今sql1 sql2之間的異常回滾。若是出現commit1提交成功,commit2提交失敗(或者超時)這種狀況,就會形成數據不一致,雖然這種狀況機率很低,但也是一個隱患。
這個實現很相似於2PC,都會有在一個參與者執行任務提交後,另外一個參與者出現異常而致使數據不一致的問題。
其實通常狀況下,系統的需求只是要達到最終一致性,那就能夠考慮使用TCC,對每一個事物進行Try,若是執行沒有問題,再執行Confirm,若是執行過程當中出了問題,則執行操做的逆操Cancel(自動化補償手段)。
可是TCC對每一個事務都須要Try,再執行Confirm,略微顯得臃腫,根據不一樣的業務場景能夠有更好的方案(好比補償模式,按期校對模式之類),具體的能夠看分佈式服務化系統一致性的「最佳實幹」
抽了點時間,本身實現了一個分佈式事務的中間件,在一些對數據一致性要求高的場景可使用。shine-mq 相應的設計思路在 分佈式事務:基於可靠消息服務
李豔鵬. 著. 分佈式服務架構:原理、設計與實戰[M].