經過前面幾篇文章,咱們搭建了環境,也進行了分佈式事務服務的體驗,相信你們對myth也有了一個大致直觀的瞭解,接下來咱們將正式步入源碼解析之旅~~java
咱們首先找到訂單demo( myth-demo-springcloud-order),第一步先找到程序入口,咱們框架是基於spring,很天然咱們會想到applicationContext.xml,請看一下配置,相信你們還有印象~mysql
<!--開啓掃描Myth分佈式框架包-->
<context:component-scan base-package="com.github.myth.*"/>
<!--開啓動態代理-->
<aop:aspectj-autoproxy expose-proxy="true"/>
<!--配置啓動類-->
<bean id="mythTransactionBootstrap" class="com.github.myth.core.bootstrap.MythTransactionBootstrap">
<property name="repositorySuffix" value="order-service"/>
<property name="serializer" value="kryo"/>
<property name="coordinatorQueueMax" value="5000"/>
<property name="coordinatorThreadMax" value="8"/>
<property name="rejectPolicy" value="Abort"/>
<property name="blockingQueueType" value="Linked"/>
<property name="needRecover" value="true"/>
<property name="scheduledDelay" value="120"/>
<property name="scheduledThreadMax" value="4"/>
<property name="recoverDelayTime" value="120"/>
<property name="retryMax" value="30"/>
<property name="repositorySupport" value="db"/>
<property name="mythDbConfig">
<bean class="com.github.myth.common.config.MythDbConfig">
<property name="url" value="jdbc:mysql://127.0.0.1:3306/myth?useUnicode=true&characterEncoding=utf8"/>
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="password" value="123456"/>
<property name="username" value="root"/>
</bean>
</property>
</bean>
複製代碼
經過以上配置咱們知道首先須要開啓Aop切面,再掃描框架的包,重點咱們來關注 MythTransactionBootstrapgit
咱們先來看看序列圖,從圖中咱們得知主要涉及MythTransactionBootstrap 、MythInitServiceImpl、CoordinatorServiceImpl三個類,後面咱們逐步來走 github
MythTransactionBootstrap 簡單類圖 spring
廢話很少說,直接上菜(精品四菜一湯O(∩_∩)O哈哈~):sql
@Component
public class MythTransactionBootstrap extends MythConfig implements ApplicationContextAware {
private final MythInitService mythInitService;
@Autowired
public MythTransactionBootstrap(MythInitService mythInitService) {
this.mythInitService = mythInitService;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringBeanUtils.getInstance().setCfgContext((ConfigurableApplicationContext) applicationContext);
start(this);
}
private void start(MythConfig tccConfig) {
mythInitService.initialization(tccConfig);
}
}
複製代碼
咱們發現MythTransactionBootstrap繼承 MythConfig, 因此能獲取在xml配置的屬性信息,它還實現了 ApplicationContextAware接口, 所以當spring容器初始化的時候,會自動的將ApplicationContext注入進來bootstrap
咱們繼續跟蹤,進入mythInitService.initialization方法api
/** * Myth分佈式事務初始化方法 * * @param mythConfig TCC配置 */
@Override
public void initialization(MythConfig mythConfig) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> LOGGER.error("系統關閉")));
try {
loadSpiSupport(mythConfig);
coordinatorService.start(mythConfig);
} catch (Exception ex) {
LogUtil.error(LOGGER, "Myth事務初始化異常:{}", ex::getMessage);
//非正常關閉
System.exit(1);
}
LogUtil.info(LOGGER, () -> "Myth事務初始化成功!");
}
複製代碼
根據註釋咱們知道,這裏主要進行相關初始化操做,代碼比較簡潔,裏面主要包含兩個方法,咱們先來看第一個loadSpiSupport(mythConfig), LoadSpiSupport 中主要採用jdk自帶的spi加載機制,spi機制很常見,好比開源框架spring, 阿里的HSF,dubbo等開源框架都有使用該機制,若是有不明白的小夥伴,能夠自行googleapp
spi 實現的相關類 框架
在LoadSpiSupport方法裏其實主要經過spi機制作了幾件事:
<property name="serializer" value="kryo"/><!-- 這裏默認是 kryo -->
//關鍵代碼以下
//加載完後的serializer,進行設置併入住到spring上下文中
serializer.ifPresent(coordinatorService::setSerializer);
serializer.ifPresent(s-> SpringBeanUtils.getInstance().registerBean(ObjectSerializer.class.getName(), s));
複製代碼
<property name="repositorySupport" value="db"/><!-- 這裏默認是 db -->
//關鍵代碼以下
//將CoordinatorRepository實現注入到spring容器中
repositoryOptional.ifPresent(repository -> {
serializer.ifPresent(repository::setSerializer);
SpringBeanUtils.getInstance().registerBean(CoordinatorRepository.class.getName(), repository);
});
複製代碼
在這裏咱們就能夠發現spi的好處,能夠在不改任何代碼的狀況下,只需作少量配置就能夠靈活設置本身想要的序列化及持久化方式~~ 有木有~~
接下來咱們來看第二個方法,coordinatorService.start(mythConfig)
/** * 保存本地事務日誌 * * @param mythConfig 配置信息 * @throws MythException 異常 */
@Override
public void start(MythConfig mythConfig) throws MythException {
this.mythConfig = mythConfig;
//在前面咱們已經作了注入操做,注入對象爲JdbcCoordinatorRepository
coordinatorRepository = SpringBeanUtils.getInstance().getBean(CoordinatorRepository.class);
// 根據配置咱們這裏的值爲:order-service
final String repositorySuffix = buildRepositorySuffix(mythConfig.getRepositorySuffix());
//初始化spi 協調資源存儲
coordinatorRepository.init(repositorySuffix, mythConfig);
//初始化 協調資源線程池
initCoordinatorPool();
//若是須要自動恢復 開啓線程 調度線程池,進行恢復
if (mythConfig.getNeedRecover()) {
scheduledAutoRecover();
}
}
複製代碼
緊接着咱們進入,JdbcCoordinatorRepository.init(repositorySuffix, mythConfig), 詳見代碼
/** * 初始化操做 * * @param modelName 模塊名稱 * @param mythConfig 配置信息 */
@Override
public void init(String modelName, MythConfig mythConfig) {
dataSource = new DruidDataSource();
final MythDbConfig tccDbConfig = mythConfig.getMythDbConfig();
dataSource.setUrl(tccDbConfig.getUrl());
dataSource.setDriverClassName(tccDbConfig.getDriverClassName());
dataSource.setUsername(tccDbConfig.getUsername());
dataSource.setPassword(tccDbConfig.getPassword());
dataSource.setInitialSize(tccDbConfig.getInitialSize());
dataSource.setMaxActive(tccDbConfig.getMaxActive());
dataSource.setMinIdle(tccDbConfig.getMinIdle());
dataSource.setMaxWait(tccDbConfig.getMaxWait());
dataSource.setValidationQuery(tccDbConfig.getValidationQuery());
dataSource.setTestOnBorrow(tccDbConfig.getTestOnBorrow());
dataSource.setTestOnReturn(tccDbConfig.getTestOnReturn());
dataSource.setTestWhileIdle(tccDbConfig.getTestWhileIdle());
dataSource.setPoolPreparedStatements(tccDbConfig.getPoolPreparedStatements());
dataSource.setMaxPoolPreparedStatementPerConnectionSize(tccDbConfig.getMaxPoolPreparedStatementPerConnectionSize());
this.tableName = RepositoryPathUtils.buildDbTableName(modelName);
executeUpdate(SqlHelper.buildCreateTableSql(tccDbConfig.getDriverClassName(), tableName));
}
複製代碼
這裏主要初始化數據源,而後建立order服務對應的一張分佈式事務消息表,用來存儲分佈式事務消息,走完代碼咱們會建立一張表:myth_order_service。 到此init代碼已走完, 接下來咱們來看 initCoordinatorPool()
private void initCoordinatorPool() {
synchronized (LOGGER) {
QUEUE = new LinkedBlockingQueue<>(mythConfig.getCoordinatorQueueMax());
final int coordinatorThreadMax = mythConfig.getCoordinatorThreadMax();
final MythTransactionThreadPool threadPool = SpringBeanUtils.getInstance().getBean(MythTransactionThreadPool.class);
final ExecutorService executorService = threadPool.newCustomFixedThreadPool(coordinatorThreadMax);
LogUtil.info(LOGGER, "啓動協調資源操做線程數量爲:{}", () -> coordinatorThreadMax);
for (int i = 0; i < coordinatorThreadMax; i++) {
executorService.execute(new Worker());
}
}
}
/** * 線程執行器 */
class Worker implements Runnable {
@Override
public void run() {
execute();
}
private void execute() {
while (true) {
try {
final CoordinatorAction coordinatorAction = QUEUE.take();
if (coordinatorAction != null) {
final int code = coordinatorAction.getAction().getCode();
if (CoordinatorActionEnum.SAVE.getCode() == code) {
save(coordinatorAction.getMythTransaction());
} else if (CoordinatorActionEnum.DELETE.getCode() == code) {
remove(coordinatorAction.getMythTransaction().getTransId());
} else if (CoordinatorActionEnum.UPDATE.getCode() == code) {
update(coordinatorAction.getMythTransaction());
}
}
} catch (Exception e) {
e.printStackTrace();
LogUtil.error(LOGGER, "執行協調命令失敗:{}", e::getMessage);
}
}
}
}
複製代碼
這個方法裏首先初始化一個LinkedBlockingQueue隊列QUEUE,該隊列做用主要用於存放分佈式消息內容,其次建立了一個線程池,線程池中執行的任務Worker,主要消費QUEUE隊列消息進行分佈式消息的持久化操做,細心的童鞋發現這裏用到了命令模式,咱們這裏的持久化爲mysql。
下一步咱們來看 scheduledAutoRecover方法
new ScheduledThreadPoolExecutor(1,
MythTransactionThreadFactory.create("MythAutoRecoverService",
true))
.scheduleWithFixedDelay(() -> {
LogUtil.debug(LOGGER, "auto recover execute delayTime:{}",
() -> mythConfig.getScheduledDelay());
try {
final List<MythTransaction> mythTransactionList =
coordinatorRepository.listAllByDelay(acquireData());
if (CollectionUtils.isNotEmpty(mythTransactionList)) {
mythTransactionList
.forEach(mythTransaction -> {
final Boolean success = sendMessage(mythTransaction);
//發送成功 ,更改狀態
if (success) {
coordinatorRepository.updateStatus(mythTransaction.getTransId(),
MythStatusEnum.COMMIT.getCode());
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
}, 30, mythConfig.getScheduledDelay(), TimeUnit.SECONDS);
複製代碼
前面咱們建立了一個線程池進行分佈式消息的持久化操做,這裏就是如何使用這些數據,建立一個調度線程,定時取出指定有效時間範圍內且消息狀態爲開始的數據,而後再往mq中投遞消息,注意這裏有個開關needRecover, 根據註釋得知只須要在事務發起方咱們才須要開啓,默認關閉狀態,咱們這裏是order服務,即爲事務發起方,因此須要開啓
<property name="needRecover" value="true"/>
複製代碼
最後咱們再來看下mq消息發送部分,經過applicationContext.xml,咱們發現咱們只放開了rocketmq的聲明,固咱們消息發送使用的是rocketmq
到此咱們order服務啓動源碼部分已走完,啓動成功後控制檯輸出內容:
account ,inventory 啓動流程與order服務大致類似,主要有如下區別
好了,這一章咱們完成了服務啓動的源碼解析,後面咱們將進入下單流程部分,感受怎麼樣 很簡單有木有 是否是你的菜 O(∩_∩)O~
你們有任何問題或者建議歡迎溝通 ,歡迎加入QQ羣:162614487 進行交流