public class CanalMQStarter { private static final Logger logger = LoggerFactory.getLogger(CanalMQStarter.class); private volatile boolean running = false; private ExecutorService executorService; private CanalMQProducer canalMQProducer; private MQProperties properties; private CanalServerWithEmbedded canalServer; private Map<String, CanalMQRunnable> canalMQWorks = new ConcurrentHashMap<>(); private static Thread shutdownThread = null; public CanalMQStarter(CanalMQProducer canalMQProducer){ this.canalMQProducer = canalMQProducer; } public synchronized void start(MQProperties properties, String destinations) { try { if (running) { return; } this.properties = properties; canalMQProducer.init(properties); // set filterTransactionEntry if (properties.isFilterTransactionEntry()) { System.setProperty("canal.instance.filter.transaction.entry", "true"); } canalServer = CanalServerWithEmbedded.instance(); // 對應每一個instance啓動一個worker線程 executorService = Executors.newCachedThreadPool(); logger.info("## start the MQ workers."); String[] dsts = StringUtils.split(destinations, ","); for (String destination : dsts) { destination = destination.trim(); CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination); canalMQWorks.put(destination, canalMQRunnable); executorService.execute(canalMQRunnable); } running = true; logger.info("## the MQ workers is running now ......"); shutdownThread = new Thread() { public void run() { try { logger.info("## stop the MQ workers"); running = false; executorService.shutdown(); canalMQProducer.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping MQ workers:", e); } finally { logger.info("## canal MQ is down."); } } }; Runtime.getRuntime().addShutdownHook(shutdownThread); } catch (Throwable e) { logger.error("## Something goes wrong when starting up the canal MQ workers:", e); } } public synchronized void destroy() { running = false; if (executorService != null) { executorService.shutdown(); } if (canalMQProducer != null) { canalMQProducer.stop(); } if (shutdownThread != null) { Runtime.getRuntime().removeShutdownHook(shutdownThread); shutdownThread = null; } } //...... }
private class CanalMQRunnable implements Runnable { private String destination; CanalMQRunnable(String destination){ this.destination = destination; } private AtomicBoolean running = new AtomicBoolean(true); @Override public void run() { worker(destination, running); } public void stop() { running.set(false); } }
public class CanalMQStarter { //...... private void worker(String destination, AtomicBoolean destinationRunning) { while (!running || !destinationRunning.get()) { try { Thread.sleep(100); } catch (InterruptedException e) { // ignore } } logger.info("## start the MQ producer: {}.", destination); MDC.put("destination", destination); final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, ""); while (running && destinationRunning.get()) { try { CanalInstance canalInstance = canalServer.getCanalInstances().get(destination); if (canalInstance == null) { try { Thread.sleep(3000); } catch (InterruptedException e) { // ignore } continue; } MQProperties.CanalDestination canalDestination = new MQProperties.CanalDestination(); canalDestination.setCanalDestination(destination); CanalMQConfig mqConfig = canalInstance.getMqConfig(); canalDestination.setTopic(mqConfig.getTopic()); canalDestination.setPartition(mqConfig.getPartition()); canalDestination.setDynamicTopic(mqConfig.getDynamicTopic()); canalDestination.setPartitionsNum(mqConfig.getPartitionsNum()); canalDestination.setPartitionHash(mqConfig.getPartitionHash()); canalServer.subscribe(clientIdentity); logger.info("## the MQ producer: {} is running now ......", destination); Long getTimeout = properties.getCanalGetTimeout(); int getBatchSize = properties.getCanalBatchSize(); while (running && destinationRunning.get()) { Message message; if (getTimeout != null && getTimeout > 0) { message = canalServer.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS); } else { message = canalServer.getWithoutAck(clientIdentity, getBatchSize); } final long batchId = message.getId(); try { int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size(); if (batchId != -1 && size != 0) { canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() { @Override public void commit() { canalServer.ack(clientIdentity, batchId); // 提交確認 } @Override public void rollback() { canalServer.rollback(clientIdentity, batchId); } }); // 發送message到topic } else { try { Thread.sleep(100); } catch (InterruptedException e) { // ignore } } } catch (Exception e) { logger.error(e.getMessage(), e); } } } catch (Exception e) { logger.error("process error!", e); } } } //...... }