本文主要研究一下canal的ApplicationConfigMonitorjava
canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.javac++
@Component public class ApplicationConfigMonitor { private static final Logger logger = LoggerFactory.getLogger(ApplicationConfigMonitor.class); @Resource private ContextRefresher contextRefresher; @Resource private CanalAdapterService canalAdapterService; private FileAlterationMonitor fileMonitor; @PostConstruct public void init() { File confDir = Util.getConfDirPath(); try { FileAlterationObserver observer = new FileAlterationObserver(confDir, FileFilterUtils.and(FileFilterUtils.fileFileFilter(), FileFilterUtils.prefixFileFilter("application"), FileFilterUtils.suffixFileFilter("yml"))); FileListener listener = new FileListener(); observer.addListener(listener); fileMonitor = new FileAlterationMonitor(3000, observer); fileMonitor.start(); } catch (Exception e) { logger.error(e.getMessage(), e); } } @PreDestroy public void destroy() { try { fileMonitor.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } } private class FileListener extends FileAlterationListenerAdaptor { @Override public void onFileChange(File file) { super.onFileChange(file); try { // 檢查yml格式 new Yaml().loadAs(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8), Map.class); canalAdapterService.destroy(); // refresh context contextRefresher.refresh(); try { Thread.sleep(2000); } catch (InterruptedException e) { // ignore } canalAdapterService.init(); logger.info("## adapter application config reloaded."); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }
commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationMonitor.javagit
public final class FileAlterationMonitor implements Runnable { private final long interval; private final List<FileAlterationObserver> observers = new CopyOnWriteArrayList<FileAlterationObserver>(); private Thread thread = null; private ThreadFactory threadFactory; private volatile boolean running = false; public FileAlterationMonitor() { this(10000); } public FileAlterationMonitor(long interval) { this.interval = interval; } public FileAlterationMonitor(long interval, FileAlterationObserver... observers) { this(interval); if (observers != null) { for (FileAlterationObserver observer : observers) { addObserver(observer); } } } //...... public synchronized void start() throws Exception { if (running) { throw new IllegalStateException("Monitor is already running"); } for (FileAlterationObserver observer : observers) { observer.initialize(); } running = true; if (threadFactory != null) { thread = threadFactory.newThread(this); } else { thread = new Thread(this); } thread.start(); } /** * Stop monitoring. * * @throws Exception if an error occurs initializing the observer */ public synchronized void stop() throws Exception { stop(interval); } /** * Stop monitoring. * * @param stopInterval the amount of time in milliseconds to wait for the thread to finish. * A value of zero will wait until the thread is finished (see {@link Thread#join(long)}). * @throws Exception if an error occurs initializing the observer * @since 2.1 */ public synchronized void stop(long stopInterval) throws Exception { if (running == false) { throw new IllegalStateException("Monitor is not running"); } running = false; try { thread.join(stopInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } for (FileAlterationObserver observer : observers) { observer.destroy(); } } public void run() { while (running) { for (FileAlterationObserver observer : observers) { observer.checkAndNotify(); } if (!running) { break; } try { Thread.sleep(interval); } catch (final InterruptedException ignored) { } } } }
commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationObserver.javagithub
public class FileAlterationObserver implements Serializable { private final List<FileAlterationListener> listeners = new CopyOnWriteArrayList<FileAlterationListener>(); private final FileEntry rootEntry; private final FileFilter fileFilter; private final Comparator<File> comparator; //...... public void checkAndNotify() { /* fire onStart() */ for (FileAlterationListener listener : listeners) { listener.onStart(this); } /* fire directory/file events */ File rootFile = rootEntry.getFile(); if (rootFile.exists()) { checkAndNotify(rootEntry, rootEntry.getChildren(), listFiles(rootFile)); } else if (rootEntry.isExists()) { checkAndNotify(rootEntry, rootEntry.getChildren(), FileUtils.EMPTY_FILE_ARRAY); } else { // Didn't exist and still doesn't } /* fire onStop() */ for (FileAlterationListener listener : listeners) { listener.onStop(this); } } private void checkAndNotify(FileEntry parent, FileEntry[] previous, File[] files) { int c = 0; FileEntry[] current = files.length > 0 ? new FileEntry[files.length] : FileEntry.EMPTY_ENTRIES; for (FileEntry entry : previous) { while (c < files.length && comparator.compare(entry.getFile(), files[c]) > 0) { current[c] = createFileEntry(parent, files[c]); doCreate(current[c]); c++; } if (c < files.length && comparator.compare(entry.getFile(), files[c]) == 0) { doMatch(entry, files[c]); checkAndNotify(entry, entry.getChildren(), listFiles(files[c])); current[c] = entry; c++; } else { checkAndNotify(entry, entry.getChildren(), FileUtils.EMPTY_FILE_ARRAY); doDelete(entry); } } for (; c < files.length; c++) { current[c] = createFileEntry(parent, files[c]); doCreate(current[c]); } parent.setChildren(current); } //...... }
ApplicationConfigMonitor在PostConstruct的時候建立FileAlterationObserver,添加FileListener,而後使用FileAlterationObserver建立FileAlterationMonitor,而後執行其start方法;在PreDestroy的時候執行fileMonitor.stop();FileListener繼承了FileAlterationListenerAdaptor,其onFileChange方法會檢查yml格式,執行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()apache