本文主要研究一下canal的CanalAdapterServicejava
canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterService.javagit
@Component @RefreshScope public class CanalAdapterService { private static final Logger logger = LoggerFactory.getLogger(CanalAdapterService.class); private CanalAdapterLoader adapterLoader; @Resource private ContextRefresher contextRefresher; @Resource private AdapterCanalConfig adapterCanalConfig; @Resource private Environment env; // 注入bean保證優先註冊 @Resource private SpringContext springContext; @Resource private SyncSwitch syncSwitch; private volatile boolean running = false; @PostConstruct public synchronized void init() { if (running) { return; } try { logger.info("## start the canal client adapters."); adapterLoader = new CanalAdapterLoader(adapterCanalConfig); adapterLoader.init(); running = true; logger.info("## the canal client adapters are running now ......"); } catch (Exception e) { logger.error("## something goes wrong when starting up the canal client adapters:", e); } } @PreDestroy public synchronized void destroy() { if (!running) { return; } try { running = false; logger.info("## stop the canal client adapters"); if (adapterLoader != null) { adapterLoader.destroy(); adapterLoader = null; } for (DruidDataSource druidDataSource : DatasourceConfig.DATA_SOURCES.values()) { try { druidDataSource.close(); } catch (Exception e) { logger.error(e.getMessage(), e); } } DatasourceConfig.DATA_SOURCES.clear(); } catch (Throwable e) { logger.warn("## something goes wrong when stopping canal client adapters:", e); } finally { logger.info("## canal client adapters are down."); } } }
canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/loader/CanalAdapterLoader.javagithub
public class CanalAdapterLoader { private static final Logger logger = LoggerFactory.getLogger(CanalAdapterLoader.class); private CanalClientConfig canalClientConfig; private Map<String, CanalAdapterWorker> canalWorkers = new HashMap<>(); private Map<String, AbstractCanalAdapterWorker> canalMQWorker = new HashMap<>(); private ExtensionLoader<OuterAdapter> loader; public CanalAdapterLoader(CanalClientConfig canalClientConfig){ this.canalClientConfig = canalClientConfig; } /** * 初始化canal-client */ public void init() { loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class); String canalServerHost = this.canalClientConfig.getCanalServerHost(); SocketAddress sa = null; if (canalServerHost != null) { String[] ipPort = canalServerHost.split(":"); sa = new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1])); } String zkHosts = this.canalClientConfig.getZookeeperHosts(); if ("tcp".equalsIgnoreCase(canalClientConfig.getMode())) { // 初始化canal-client的適配器 for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) { List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>(); for (CanalClientConfig.Group connectorGroup : canalAdapter.getGroups()) { List<OuterAdapter> canalOutConnectors = new ArrayList<>(); for (OuterAdapterConfig c : connectorGroup.getOuterAdapters()) { loadAdapter(c, canalOutConnectors); } canalOuterAdapterGroups.add(canalOutConnectors); } CanalAdapterWorker worker; if (sa != null) { worker = new CanalAdapterWorker(canalClientConfig, canalAdapter.getInstance(), sa, canalOuterAdapterGroups); } else if (zkHosts != null) { worker = new CanalAdapterWorker(canalClientConfig, canalAdapter.getInstance(), zkHosts, canalOuterAdapterGroups); } else { throw new RuntimeException("No canal server connector found"); } canalWorkers.put(canalAdapter.getInstance(), worker); worker.start(); logger.info("Start adapter for canal instance: {} succeed", canalAdapter.getInstance()); } } else if ("kafka".equalsIgnoreCase(canalClientConfig.getMode())) { // 初始化canal-client-kafka的適配器 for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) { for (CanalClientConfig.Group group : canalAdapter.getGroups()) { List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>(); List<OuterAdapter> canalOuterAdapters = new ArrayList<>(); for (OuterAdapterConfig config : group.getOuterAdapters()) { loadAdapter(config, canalOuterAdapters); } canalOuterAdapterGroups.add(canalOuterAdapters); CanalAdapterKafkaWorker canalKafkaWorker = new CanalAdapterKafkaWorker(canalClientConfig, canalClientConfig.getMqServers(), canalAdapter.getInstance(), group.getGroupId(), canalOuterAdapterGroups, canalClientConfig.getFlatMessage()); canalMQWorker.put(canalAdapter.getInstance() + "-kafka-" + group.getGroupId(), canalKafkaWorker); canalKafkaWorker.start(); logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-" + group.getGroupId()); } } } else if ("rocketMQ".equalsIgnoreCase(canalClientConfig.getMode())) { // 初始化canal-client-rocketMQ的適配器 for (CanalClientConfig.CanalAdapter canalAdapter : canalClientConfig.getCanalAdapters()) { for (CanalClientConfig.Group group : canalAdapter.getGroups()) { List<List<OuterAdapter>> canalOuterAdapterGroups = new ArrayList<>(); List<OuterAdapter> canalOuterAdapters = new ArrayList<>(); for (OuterAdapterConfig config : group.getOuterAdapters()) { loadAdapter(config, canalOuterAdapters); } canalOuterAdapterGroups.add(canalOuterAdapters); CanalAdapterRocketMQWorker rocketMQWorker = new CanalAdapterRocketMQWorker(canalClientConfig, canalClientConfig.getMqServers(), canalAdapter.getInstance(), group.getGroupId(), canalOuterAdapterGroups, canalClientConfig.getAccessKey(), canalClientConfig.getSecretKey(), canalClientConfig.getFlatMessage(), canalClientConfig.isEnableMessageTrace(), canalClientConfig.getCustomizedTraceTopic(), canalClientConfig.getAccessChannel(), canalClientConfig.getNamespace()); canalMQWorker.put(canalAdapter.getInstance() + "-rocketmq-" + group.getGroupId(), rocketMQWorker); rocketMQWorker.start(); logger.info("Start adapter for canal-client mq topic: {} succeed", canalAdapter.getInstance() + "-" + group.getGroupId()); } } } } private void loadAdapter(OuterAdapterConfig config, List<OuterAdapter> canalOutConnectors) { try { OuterAdapter adapter; adapter = loader.getExtension(config.getName(), StringUtils.trimToEmpty(config.getKey())); ClassLoader cl = Thread.currentThread().getContextClassLoader(); // 替換ClassLoader Thread.currentThread().setContextClassLoader(adapter.getClass().getClassLoader()); Environment env = (Environment) SpringContext.getBean(Environment.class); Properties evnProperties = null; if (env instanceof StandardEnvironment) { evnProperties = new Properties(); for (PropertySource<?> propertySource : ((StandardEnvironment) env).getPropertySources()) { if (propertySource instanceof EnumerablePropertySource) { String[] names = ((EnumerablePropertySource<?>) propertySource).getPropertyNames(); for (String name : names) { Object val = propertySource.getProperty(name); if (val != null) { evnProperties.put(name, val); } } } } } adapter.init(config, evnProperties); Thread.currentThread().setContextClassLoader(cl); canalOutConnectors.add(adapter); logger.info("Load canal adapter: {} succeed", config.getName()); } catch (Exception e) { logger.error("Load canal adapter: {} failed", config.getName(), e); } } /** * 銷燬全部適配器 爲防止canal實例太多形成銷燬阻塞, 並行銷燬 */ public void destroy() { if (!canalWorkers.isEmpty()) { ExecutorService stopExecutorService = Executors.newFixedThreadPool(canalWorkers.size()); for (CanalAdapterWorker canalAdapterWorker : canalWorkers.values()) { stopExecutorService.execute(canalAdapterWorker::stop); } stopExecutorService.shutdown(); try { while (!stopExecutorService.awaitTermination(1, TimeUnit.SECONDS)) { // ignore } } catch (InterruptedException e) { // ignore } } if (!canalMQWorker.isEmpty()) { ExecutorService stopMQWorkerService = Executors.newFixedThreadPool(canalMQWorker.size()); for (AbstractCanalAdapterWorker canalAdapterMQWorker : canalMQWorker.values()) { stopMQWorkerService.execute(canalAdapterMQWorker::stop); } stopMQWorkerService.shutdown(); try { while (!stopMQWorkerService.awaitTermination(1, TimeUnit.SECONDS)) { // ignore } } catch (InterruptedException e) { // ignore } } logger.info("All canal adapters destroyed"); } }
CanalAdapterService提供了init及destroy方法;其init方法會使用adapterCanalConfig建立CanalAdapterLoader,而後執行adapterLoader.init();其destroy方法會執行adapterLoader.destroy(),而後遍歷DatasourceConfig.DATA_SOURCES執行druidDataSource.close(),最後清空DatasourceConfig.DATA_SOURCESspring