聊聊canal的ApplicationConfigMonitor

本文主要研究一下canal的ApplicationConfigMonitorjava

ApplicationConfigMonitor

canal-1.1.4/client-adapter/launcher/src/main/java/com/alibaba/otter/canal/adapter/launcher/monitor/ApplicationConfigMonitor.javac++

@Component
public class ApplicationConfigMonitor {

    private static final Logger   logger = LoggerFactory.getLogger(ApplicationConfigMonitor.class);

    @Resource
    private ContextRefresher      contextRefresher;

    @Resource
    private CanalAdapterService   canalAdapterService;

    private FileAlterationMonitor fileMonitor;

    @PostConstruct
    public void init() {
        File confDir = Util.getConfDirPath();
        try {
            FileAlterationObserver observer = new FileAlterationObserver(confDir,
                FileFilterUtils.and(FileFilterUtils.fileFileFilter(),
                    FileFilterUtils.prefixFileFilter("application"),
                    FileFilterUtils.suffixFileFilter("yml")));
            FileListener listener = new FileListener();
            observer.addListener(listener);
            fileMonitor = new FileAlterationMonitor(3000, observer);
            fileMonitor.start();

        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    @PreDestroy
    public void destroy() {
        try {
            fileMonitor.stop();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    private class FileListener extends FileAlterationListenerAdaptor {

        @Override
        public void onFileChange(File file) {
            super.onFileChange(file);
            try {
                // 檢查yml格式
                new Yaml().loadAs(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8), Map.class);

                canalAdapterService.destroy();

                // refresh context
                contextRefresher.refresh();

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // ignore
                }
                canalAdapterService.init();
                logger.info("## adapter application config reloaded.");
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}
  • ApplicationConfigMonitor在PostConstruct的時候建立FileAlterationObserver,添加FileListener,而後使用FileAlterationObserver建立FileAlterationMonitor,而後執行其start方法;在PreDestroy的時候執行fileMonitor.stop();FileListener繼承了FileAlterationListenerAdaptor,其onFileChange方法會檢查yml格式,執行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()

FileAlterationMonitor

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationMonitor.javagit

public final class FileAlterationMonitor implements Runnable {

    private final long interval;
    private final List<FileAlterationObserver> observers = new CopyOnWriteArrayList<FileAlterationObserver>();
    private Thread thread = null;
    private ThreadFactory threadFactory;
    private volatile boolean running = false;

    public FileAlterationMonitor() {
        this(10000);
    }

    public FileAlterationMonitor(long interval) {
        this.interval = interval;
    }

    public FileAlterationMonitor(long interval, FileAlterationObserver... observers) {
        this(interval);
        if (observers != null) {
            for (FileAlterationObserver observer : observers) {
                addObserver(observer);
            }
        }
    }

    //......

    public synchronized void start() throws Exception {
        if (running) {
            throw new IllegalStateException("Monitor is already running");
        }
        for (FileAlterationObserver observer : observers) {
            observer.initialize();
        }
        running = true;
        if (threadFactory != null) {
            thread = threadFactory.newThread(this);
        } else {
            thread = new Thread(this);
        }
        thread.start();
    }

    /**
     * Stop monitoring.
     *
     * @throws Exception if an error occurs initializing the observer
     */
    public synchronized void stop() throws Exception {
        stop(interval);
    }

    /**
     * Stop monitoring.
     *
     * @param stopInterval the amount of time in milliseconds to wait for the thread to finish.
     * A value of zero will wait until the thread is finished (see {@link Thread#join(long)}).
     * @throws Exception if an error occurs initializing the observer
     * @since 2.1
     */
    public synchronized void stop(long stopInterval) throws Exception {
        if (running == false) {
            throw new IllegalStateException("Monitor is not running");
        }
        running = false;
        try {
            thread.join(stopInterval);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        for (FileAlterationObserver observer : observers) {
            observer.destroy();
        }
    }

    public void run() {
        while (running) {
            for (FileAlterationObserver observer : observers) {
                observer.checkAndNotify();
            }
            if (!running) {
                break;
            }
            try {
                Thread.sleep(interval);
            } catch (final InterruptedException ignored) {
            }
        }
    }    
}
  • FileAlterationMonitor的start方法會使用本身的runnable建立Thread,而後執行thread.start();其stop方法則執行thread.join(stopInterval),而後遍歷observers,執行observer.destroy();其run方法則遍歷observers,執行observer.checkAndNotify()

FileAlterationObserver

commons-io-2.4-sources.jar!/org/apache/commons/io/monitor/FileAlterationObserver.javagithub

public class FileAlterationObserver implements Serializable {

    private final List<FileAlterationListener> listeners = new CopyOnWriteArrayList<FileAlterationListener>();
    private final FileEntry rootEntry;
    private final FileFilter fileFilter;
    private final Comparator<File> comparator;

    //......

    public void checkAndNotify() {

        /* fire onStart() */
        for (FileAlterationListener listener : listeners) {
            listener.onStart(this);
        }

        /* fire directory/file events */
        File rootFile = rootEntry.getFile();
        if (rootFile.exists()) {
            checkAndNotify(rootEntry, rootEntry.getChildren(), listFiles(rootFile));
        } else if (rootEntry.isExists()) {
            checkAndNotify(rootEntry, rootEntry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);
        } else {
            // Didn't exist and still doesn't
        }

        /* fire onStop() */
        for (FileAlterationListener listener : listeners) {
            listener.onStop(this);
        }
    }

    private void checkAndNotify(FileEntry parent, FileEntry[] previous, File[] files) {
        int c = 0;
        FileEntry[] current = files.length > 0 ? new FileEntry[files.length] : FileEntry.EMPTY_ENTRIES;
        for (FileEntry entry : previous) {
            while (c < files.length && comparator.compare(entry.getFile(), files[c]) > 0) {
                current[c] = createFileEntry(parent, files[c]);
                doCreate(current[c]);
                c++;
            }
            if (c < files.length && comparator.compare(entry.getFile(), files[c]) == 0) {
                doMatch(entry, files[c]);
                checkAndNotify(entry, entry.getChildren(), listFiles(files[c]));
                current[c] = entry;
                c++;
            } else {
                checkAndNotify(entry, entry.getChildren(), FileUtils.EMPTY_FILE_ARRAY);
                doDelete(entry);
            }
        }
        for (; c < files.length; c++) {
            current[c] = createFileEntry(parent, files[c]);
            doCreate(current[c]);
        }
        parent.setChildren(current);
    }

    //......
}
  • FileAlterationObserver的checkAndNotify方法會遍歷以前的FileEntry,而後使用NameFileComparator遞歸遍歷對比文件變化,分別觸發doCreate、doMatch、doDelete,他們會回調FileAlterationListener的對應方法

小結

ApplicationConfigMonitor在PostConstruct的時候建立FileAlterationObserver,添加FileListener,而後使用FileAlterationObserver建立FileAlterationMonitor,而後執行其start方法;在PreDestroy的時候執行fileMonitor.stop();FileListener繼承了FileAlterationListenerAdaptor,其onFileChange方法會檢查yml格式,執行canalAdapterService.destroy()、contextRefresher.refresh()、canalAdapterService.init()apache

doc

相關文章
相關標籤/搜索