本文主要研究一下servicecomb-saga的alpha-serverjava
alpha-server是servicecomb-saga的分佈式事務協調中心,採用spring boot開發,能夠直接從jar包啓動,須要依賴mysql或pg數據庫,同時初始化數據。啓動命令以下:mysql
java -jar -Dspring.profiles.active=prd \ alpha-server-0.2.0-exec.jar \ --spring.datasource.url=jdbc:postgresql://localhost:5432/postgres \ --spring.datasource.username=postgres \ --spring.datasource.password=postgres
@EntityScan(basePackages = "org.apache.servicecomb.saga.alpha") @Configuration class AlphaConfig { private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @Value("${alpha.compensation.retry.delay:3000}") private int delay; @Bean Map<String, Map<String, OmegaCallback>> omegaCallbacks() { return new ConcurrentHashMap<>(); } @Bean OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) { return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks)); } @Bean TxEventRepository springTxEventRepository(TxEventEnvelopeRepository eventRepo) { return new SpringTxEventRepository(eventRepo); } @Bean CommandRepository springCommandRepository(TxEventEnvelopeRepository eventRepo, CommandEntityRepository commandRepository) { return new SpringCommandRepository(eventRepo, commandRepository); } @Bean TxTimeoutRepository springTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) { return new SpringTxTimeoutRepository(timeoutRepo); } @Bean ScheduledExecutorService compensationScheduler() { return scheduler; } @Bean GrpcServerConfig grpcServerConfig() { return new GrpcServerConfig(); } @Bean TxConsistentService txConsistentService( @Value("${alpha.event.pollingInterval:500}") int eventPollingInterval, GrpcServerConfig serverConfig, ScheduledExecutorService scheduler, TxEventRepository eventRepository, CommandRepository commandRepository, TxTimeoutRepository timeoutRepository, OmegaCallback omegaCallback, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { new EventScanner(scheduler, eventRepository, commandRepository, timeoutRepository, omegaCallback, eventPollingInterval).run(); TxConsistentService consistentService = new TxConsistentService(eventRepository); ServerStartable startable = buildGrpc(serverConfig, consistentService, omegaCallbacks); new Thread(startable::start).start(); return consistentService; } private ServerStartable buildGrpc(GrpcServerConfig serverConfig, TxConsistentService txConsistentService, Map<String, Map<String, OmegaCallback>> omegaCallbacks) { return new GrpcStartable(serverConfig, new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks)); } @PostConstruct void init() { new PendingTaskRunner(pendingCompensations, delay).run(); } @PreDestroy void shutdown() { scheduler.shutdownNow(); } }