本文主要研究一下canal的BinLogFileQueuejava
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.javamysql
public class BinLogFileQueue { private String baseName = "mysql-bin."; private List<File> binlogs = new ArrayList<File>(); private File directory; private ReentrantLock lock = new ReentrantLock(); private Condition nextCondition = lock.newCondition(); private Timer timer = new Timer(true); private long reloadInterval = 10 * 1000L; // 10秒 private CanalParseException exception = null; public BinLogFileQueue(String directory){ this(new File(directory)); } public BinLogFileQueue(File directory){ this.directory = directory; if (!directory.canRead()) { throw new CanalParseException("Binlog index missing or unreadable; " + directory.getAbsolutePath()); } List<File> files = listBinlogFiles(); for (File file : files) { offer(file); } timer.scheduleAtFixedRate(new TimerTask() { public void run() { try { // File errorFile = new File(BinLogFileQueue.this.directory, // errorFileName); // if (errorFile.isFile() && errorFile.exists()) { // String text = StringUtils.join(IOUtils.readLines(new // FileInputStream(errorFile)), "\n"); // exception = new CanalParseException(text); // } List<File> files = listBinlogFiles(); for (File file : files) { offer(file); } } catch (Throwable e) { exception = new CanalParseException(e); } if (exception != null) { offer(null); } } }, reloadInterval, reloadInterval); } private List<File> listBinlogFiles() { List<File> files = new ArrayList<File>(); files.addAll(FileUtils.listFiles(directory, new IOFileFilter() { public boolean accept(File file) { Pattern pattern = Pattern.compile("\\d+$"); Matcher matcher = pattern.matcher(file.getName()); return file.getName().startsWith(baseName) && matcher.find(); } public boolean accept(File dir, String name) { return true; } }, null)); // 排一下序列 Collections.sort(files, new Comparator<File>() { public int compare(File o1, File o2) { return o1.getName().compareTo(o2.getName()); } }); return files; } private boolean offer(File file) { try { lock.lockInterruptibly(); if (file != null) { if (!binlogs.contains(file)) { binlogs.add(file); nextCondition.signalAll();// 喚醒 return true; } } nextCondition.signalAll();// 喚醒 return false; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } finally { lock.unlock(); } } //...... /** * 獲取當前全部binlog文件 */ public List<File> currentBinlogs() { return new ArrayList<File>(binlogs); } public void destory() { try { lock.lockInterruptibly(); timer.cancel(); binlogs.clear(); nextCondition.signalAll();// 喚醒線程,通知退出 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { lock.unlock(); } } //...... }
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/local/BinLogFileQueue.javagit
public class BinLogFileQueue { //...... public File waitForNextFile(File pre) throws InterruptedException { try { lock.lockInterruptibly(); if (binlogs.size() == 0) { nextCondition.await();// 等待新文件 } if (exception != null) { throw exception; } if (pre == null) {// 第一次 return binlogs.get(0); } else { int index = seek(pre); if (index < binlogs.size() - 1) { return binlogs.get(index + 1); } else { nextCondition.await();// 等待新文件 return waitForNextFile(pre);// 喚醒以後遞歸調用一下 } } } finally { lock.unlock(); } } private int seek(File file) { for (int i = 0; i < binlogs.size(); i++) { File binlog = binlogs.get(i); if (binlog.getName().equals(file.getName())) { return i; } } return -1; } public File getNextFile(File pre) { try { lock.lockInterruptibly(); if (exception != null) { throw exception; } if (binlogs.size() == 0) { return null; } else { if (pre == null) {// 第一次 return binlogs.get(0); } else { int index = seek(pre); if (index < binlogs.size() - 1) { return binlogs.get(index + 1); } else { return null; } } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { lock.unlock(); } } public File getBefore(File file) { try { lock.lockInterruptibly(); if (exception != null) { throw exception; } if (binlogs.size() == 0) { return null; } else { if (file == null) {// 第一次 return binlogs.get(binlogs.size() - 1); } else { int index = seek(file); if (index > 0) { return binlogs.get(index - 1); } else { return null; } } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { lock.unlock(); } } //...... }
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/LocalBinLogConnection.javagithub
public class LocalBinLogConnection implements ErosaConnection { private static final Logger logger = LoggerFactory.getLogger(LocalBinLogConnection.class); private BinLogFileQueue binlogs = null; private boolean needWait; private String directory; private int bufferSize = 16 * 1024; private boolean running = false; private long serverId; private FileParserListener parserListener; public LocalBinLogConnection(){ } public LocalBinLogConnection(String directory, boolean needWait){ this.needWait = needWait; this.directory = directory; } @Override public void connect() throws IOException { if (this.binlogs == null) { this.binlogs = new BinLogFileQueue(this.directory); } this.running = true; } public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { File current = new File(directory, binlogfilename); FileLogFetcher fetcher = new FileLogFetcher(bufferSize); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); try { fetcher.open(current, binlogPosition); context.setLogPosition(new LogPosition(binlogfilename, binlogPosition)); while (running) { boolean needContinue = true; LogEvent event = null; while (fetcher.fetch()) { event = decoder.decode(fetcher, context); if (event == null) { continue; } if (serverId != 0 && event.getServerId() != serverId) { throw new ServerIdNotMatchException("unexpected serverId " + serverId + " in binlog file !"); } if (!func.sink(event)) { needContinue = false; break; } } fetcher.close(); // 關閉上一個文件 parserFinish(current.getName()); if (needContinue) {// 讀取下一個 File nextFile; if (needWait) { nextFile = binlogs.waitForNextFile(current); } else { nextFile = binlogs.getNextFile(current); } if (nextFile == null) { break; } current = nextFile; fetcher.open(current); context.setLogPosition(new LogPosition(nextFile.getName())); } else { break;// 跳出 } } } catch (InterruptedException e) { logger.warn("LocalBinLogConnection dump interrupted"); } finally { if (fetcher != null) { fetcher.close(); } } } //...... }