聊聊canal的BinLogFileQueue

本文主要研究一下canal的BinLogFileQueuejava

BinLogFileQueue

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.javamysql

public class BinLogFileQueue {

    private String              baseName       = "mysql-bin.";
    private List<File>          binlogs        = new ArrayList<File>();
    private File                directory;
    private ReentrantLock       lock           = new ReentrantLock();
    private Condition           nextCondition  = lock.newCondition();
    private Timer               timer          = new Timer(true);
    private long                reloadInterval = 10 * 1000L;           // 10秒
    private CanalParseException exception      = null;

    public BinLogFileQueue(String directory){
        this(new File(directory));
    }

    public BinLogFileQueue(File directory){
        this.directory = directory;

        if (!directory.canRead()) {
            throw new CanalParseException("Binlog index missing or unreadable;  " + directory.getAbsolutePath());
        }

        List<File> files = listBinlogFiles();
        for (File file : files) {
            offer(file);
        }

        timer.scheduleAtFixedRate(new TimerTask() {

            public void run() {
                try {
                    // File errorFile = new File(BinLogFileQueue.this.directory,
                    // errorFileName);
                    // if (errorFile.isFile() && errorFile.exists()) {
                    // String text = StringUtils.join(IOUtils.readLines(new
                    // FileInputStream(errorFile)), "\n");
                    // exception = new CanalParseException(text);
                    // }
                    List<File> files = listBinlogFiles();
                    for (File file : files) {
                        offer(file);
                    }
                } catch (Throwable e) {
                    exception = new CanalParseException(e);
                }

                if (exception != null) {
                    offer(null);
                }
            }
        }, reloadInterval, reloadInterval);
    }

    private List<File> listBinlogFiles() {
        List<File> files = new ArrayList<File>();
        files.addAll(FileUtils.listFiles(directory, new IOFileFilter() {

            public boolean accept(File file) {
                Pattern pattern = Pattern.compile("\\d+$");
                Matcher matcher = pattern.matcher(file.getName());
                return file.getName().startsWith(baseName) && matcher.find();
            }

            public boolean accept(File dir, String name) {
                return true;
            }
        }, null));
        // 排一下序列
        Collections.sort(files, new Comparator<File>() {

            public int compare(File o1, File o2) {
                return o1.getName().compareTo(o2.getName());
            }

        });
        return files;
    }

    private boolean offer(File file) {
        try {
            lock.lockInterruptibly();
            if (file != null) {
                if (!binlogs.contains(file)) {
                    binlogs.add(file);
                    nextCondition.signalAll();// 喚醒
                    return true;
                }
            }

            nextCondition.signalAll();// 喚醒
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        } finally {
            lock.unlock();
        }
    }

    //......

    /**
     * 獲取當前全部binlog文件
     */
    public List<File> currentBinlogs() {
        return new ArrayList<File>(binlogs);
    }

    public void destory() {
        try {
            lock.lockInterruptibly();
            timer.cancel();
            binlogs.clear();

            nextCondition.signalAll();// 喚醒線程,通知退出
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    //......
}
  • BinLogFileQueue的構造器經過listBinlogFiles加載directory目錄下的以baseName開頭的文件並按文件名排序,而後挨個執行offer方法,最後使用timer定時調度執行listBinlogFiles及offer方法;其currentBinlogs返回binlogs文件列表;其destory方法取消timer,而後清空binlogs

waitForNextFile

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.javagit

public class BinLogFileQueue {

    //......

    public File waitForNextFile(File pre) throws InterruptedException {
        try {
            lock.lockInterruptibly();
            if (binlogs.size() == 0) {
                nextCondition.await();// 等待新文件
            }

            if (exception != null) {
                throw exception;
            }
            if (pre == null) {// 第一次
                return binlogs.get(0);
            } else {
                int index = seek(pre);
                if (index < binlogs.size() - 1) {
                    return binlogs.get(index + 1);
                } else {
                    nextCondition.await();// 等待新文件
                    return waitForNextFile(pre);// 喚醒以後遞歸調用一下
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private int seek(File file) {
        for (int i = 0; i < binlogs.size(); i++) {
            File binlog = binlogs.get(i);
            if (binlog.getName().equals(file.getName())) {
                return i;
            }
        }

        return -1;
    }

    public File getNextFile(File pre) {
        try {
            lock.lockInterruptibly();
            if (exception != null) {
                throw exception;
            }

            if (binlogs.size() == 0) {
                return null;
            } else {
                if (pre == null) {// 第一次
                    return binlogs.get(0);
                } else {
                    int index = seek(pre);
                    if (index < binlogs.size() - 1) {
                        return binlogs.get(index + 1);
                    } else {
                        return null;
                    }
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }

    public File getBefore(File file) {
        try {
            lock.lockInterruptibly();
            if (exception != null) {
                throw exception;
            }

            if (binlogs.size() == 0) {
                return null;
            } else {
                if (file == null) {// 第一次
                    return binlogs.get(binlogs.size() - 1);
                } else {
                    int index = seek(file);
                    if (index > 0) {
                        return binlogs.get(index - 1);
                    } else {
                        return null;
                    }
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }

    //......

}
  • BinLogFileQueue還提供了waitForNextFile方法,它根據指定文件的index,找下一個binlog文件,找不到則經過nextCondition.await();它還提供了getNextFile方法,該方法根據指定文件找下一個binlog文件,找不到則返回null,不等待;它還提供了getBefore方法,該方法根據指定文件找上一個binlog文件,找不到則返回null

LocalBinLogConnection

canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.javagithub

public class LocalBinLogConnection implements ErosaConnection {

    private static final Logger logger     = LoggerFactory.getLogger(LocalBinLogConnection.class);
    private BinLogFileQueue     binlogs    = null;
    private boolean             needWait;
    private String              directory;
    private int                 bufferSize = 16 * 1024;
    private boolean             running    = false;
    private long                serverId;
    private FileParserListener  parserListener;

    public LocalBinLogConnection(){
    }

    public LocalBinLogConnection(String directory, boolean needWait){
        this.needWait = needWait;
        this.directory = directory;
    }

    @Override
    public void connect() throws IOException {
        if (this.binlogs == null) {
            this.binlogs = new BinLogFileQueue(this.directory);
        }
        this.running = true;
    }

    public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
        File current = new File(directory, binlogfilename);

        FileLogFetcher fetcher = new FileLogFetcher(bufferSize);
        LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
        LogContext context = new LogContext();
        try {
            fetcher.open(current, binlogPosition);
            context.setLogPosition(new LogPosition(binlogfilename, binlogPosition));
            while (running) {
                boolean needContinue = true;
                LogEvent event = null;
                while (fetcher.fetch()) {
                    event = decoder.decode(fetcher, context);
                    if (event == null) {
                        continue;
                    }
                    if (serverId != 0 && event.getServerId() != serverId) {
                        throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !");
                    }

                    if (!func.sink(event)) {
                        needContinue = false;
                        break;
                    }
                }

                fetcher.close(); // 關閉上一個文件
                parserFinish(current.getName());
                if (needContinue) {// 讀取下一個

                    File nextFile;
                    if (needWait) {
                        nextFile = binlogs.waitForNextFile(current);
                    } else {
                        nextFile = binlogs.getNextFile(current);
                    }

                    if (nextFile == null) {
                        break;
                    }

                    current = nextFile;
                    fetcher.open(current);
                    context.setLogPosition(new LogPosition(nextFile.getName()));
                } else {
                    break;// 跳出
                }
            }
        } catch (InterruptedException e) {
            logger.warn("LocalBinLogConnection dump interrupted");
        } finally {
            if (fetcher != null) {
                fetcher.close();
            }
        }
    }

    //......

}
  • LocalBinLogConnection的connect方法建立BinLogFileQueue,其dump方法建立FileLogFetcher,而後使用while執行fetcher.fetch(),而後經過LogDecoder來解析數據,而後經過SinkFunction的sink方法來消費事件,fetch完以後執行fetcher.close(),以後經過binlogs.waitForNextFile(current)方法獲取下一個binlog文件,替換current,而後執行fetcher.open(current)及context.setLogPosition(new LogPosition(nextFile.getName())),繼續while循環fetcher.fetch()消費事件

小結

  • BinLogFileQueue的構造器經過listBinlogFiles加載directory目錄下的以baseName開頭的文件並按文件名排序,而後挨個執行offer方法,最後使用timer定時調度執行listBinlogFiles及offer方法;其currentBinlogs返回binlogs文件列表;其destory方法取消timer,而後清空binlogs

doc

相關文章
相關標籤/搜索