修改Flume源碼使taildir source支持遞歸(可配置)

Flume的source選哪一個?
taildir source首選!
1.斷點還原 positionFile能夠記錄偏移量
2.可配置文件組,裏面使用正則表達式配置多個要監控的文件
就憑第一點其餘的source都被比下去了!
這麼好的taildir source有一點不完美,不能支持遞歸監控文件夾。
因此就只能修改源代碼了……好玩,我喜歡~node

改源碼,先讀源碼

Flume的taildir source啓動會調用start()方法做初始化,裏面建立一個ReliableTaildirEventReader,這裏用到了建造者模式git

@Override
public synchronized void start() {
    logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);
    try {
        reader = new ReliableTaildirEventReader.Builder()
                .filePaths(filePaths)
                .headerTable(headerTable)
                .positionFilePath(positionFilePath)
                .skipToEnd(skipToEnd)
                .addByteOffset(byteOffsetHeader)
                .cachePatternMatching(cachePatternMatching)
                .recursive(isRecursive)
                .annotateFileName(fileHeader)
                .fileNameHeader(fileHeaderKey)
                .build();
    } catch (IOException e) {
        throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
    }
    idleFileChecker = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
    idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
            idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);

    positionWriter = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
    positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
            writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);

    super.start();
    logger.debug("TaildirSource started");
    sourceCounter.start();
}

taildir source屬於PollableSourcegithub

/**
 * A {@link Source} that requires an external driver to poll to determine
 * whether there are {@linkplain Event events} that are available to ingest
 * from the source.
 *
 * @see org.apache.flume.source.EventDrivenSourceRunner
 */
public interface PollableSource extends Source {
    ...

這段註釋的意思是PollableSource是須要一個外部驅動去查看有沒有須要消費的事件,從而拉取事件,講白了就是定時拉取。因此flume也不必定是真正實時的,只是隔一下子不停地來查看事件而已。(與之相應的是另外一種EventDrivenSourceRunner)
那麼taildir source在定時拉取事件的時候是調用的process方法正則表達式

@Override
public Status process() {
    Status status = Status.READY;
    try {
        existingInodes.clear();
        existingInodes.addAll(reader.updateTailFiles());
        for (long inode : existingInodes) {
            TailFile tf = reader.getTailFiles().get(inode);
            if (tf.needTail()) {
                tailFileProcess(tf, true);
            }
        }
        closeTailFiles();
        try {
            TimeUnit.MILLISECONDS.sleep(retryInterval);
        } catch (InterruptedException e) {
            logger.info("Interrupted while sleeping");
        }
    } catch (Throwable t) {
        logger.error("Unable to tail files", t);
        status = Status.BACKOFF;
    }
    return status;
}

重點就是下面這幾行apache

existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
} }
reader.updateTailFiles()獲取須要監控的文件,而後對每個進行處理,查看最後修改時間,斷定是否須要 tail,須要 tailtail
那麼進入 reader.updateTailFiles()
for (TaildirMatcher taildir : taildirCache) {
      Map<String, String> headers = headerTable.row(taildir.getFileGroup());

      for (File f : taildir.getMatchingFiles()) {
        long inode = getInode(f);
        TailFile tf = tailFiles.get(inode);
        if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
          long startPos = skipToEnd ? f.length() : 0;
          tf = openFile(f, headers, inode, startPos);

遍歷每個正則表達式匹配對應的匹配器,每一個匹配器去獲取匹配的文件!taildir.getMatchingFiles()maven

List<File> getMatchingFiles() {
    long now = TimeUnit.SECONDS.toMillis(
            TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
    long currentParentDirMTime = parentDir.lastModified();
    List<File> result;

    // calculate matched files if
    // - we don't want to use cache (recalculate every time) OR
    // - directory was clearly updated after the last check OR
    // - last mtime change wasn't already checked for sure
    //   (system clock hasn't passed that second yet)
    if (!cachePatternMatching ||
            lastSeenParentDirMTime < currentParentDirMTime ||
            !(currentParentDirMTime < lastCheckedTime)) {
        lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache(isRecursive));
        lastSeenParentDirMTime = currentParentDirMTime;
        lastCheckedTime = now;
    }

    return lastMatchedFiles;
}

能夠看到getMatchingFilesNoCache(isRecursive)就是獲取匹配的文件的方法,也就是須要修改的方法了!
ps:這裏的isRecursive是我加的~
點進去:ide

private List<File> getMatchingFilesNoCache() {
    List<File> result = Lists.newArrayList();
    try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
        for (Path entry : stream) {
            result.add(entry.toFile());
        }
    } catch (IOException e) {
        logger.error("I/O exception occurred while listing parent directory. " +
                "Files already matched will be returned. " + parentDir.toPath(), e);
    }
    return result;
}

源碼是用了Files.newDirectoryStream(parentDir.toPath(), fileFilter)),將父目錄下符合正則表達式的文件都添加到一個迭代器裏。(這裏還用了try (...)的語法糖)測試


找到地方了,開始改!

我在這個getMatchingFilesNoCache()方法下面下了一個重載的方法, 可增長擴展性:ui

private List<File> getMatchingFilesNoCache(boolean recursion) {
    if (!recursion) {
        return getMatchingFilesNoCache();
    }
    List<File> result = Lists.newArrayList();
    // 使用非遞歸的方式遍歷文件夾
    Queue<File> dirs = new ArrayBlockingQueue<>(10);
    dirs.offer(parentDir);
    while (dirs.size() > 0) {
        File dir = dirs.poll();
        try {
            DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath(), fileFilter);
            stream.forEach(path -> result.add(path.toFile()));
        } catch (IOException e) {
            logger.error("I/O exception occurred while listing parent directory. " +
                    "Files already matched will be returned. (recursion)" + parentDir.toPath(), e);
        }
        File[] dirList = dir.listFiles();
        assert dirList != null;
        for (File f : dirList) {
            if (f.isDirectory()) {
                dirs.add(f);
            }
        }
    }
    return result;
}

我使用了非遞歸的方式遍歷文件夾,就是樹到隊列的轉換。
到這裏,核心部分就改完了。接下來要處理這個recursion的參數debug


華麗的分割線後,順騰摸瓜!

一路改構造方法,添加這個參數,最終參數從哪來呢?
flume的source啓動時會調用configure方法,將Context中的內容配置進reader等對象中。
isRecursive = context.getBoolean(RECURSIVE, DEFAULT_RECURSIVE);
contextTaildirSourceConfigurationConstants中獲取配置名和默認值

/**
   * Whether to support recursion. */
  public static final String RECURSIVE = "recursive";
  public static final boolean DEFAULT_RECURSIVE = false;

這裏的recursive也就是flume配置文件裏配置項了

# Whether to support recusion
a1.sources.r1.recursive = true

大功告成,打包試試!

用maven只對這一個module打包。我把這個module的pom改了下artifactId,加上了本身名字做個記念,哈哈
惋惜pom裏面不能寫中文……

<groupId>org.apache.flume.flume-ng-sources</groupId>
<artifactId>flume-taildir-source-recursive-by-Wish000</artifactId>
<name>Flume Taildir Source</name>

執行package將其放在flume的lib下,替換原來的flume-taildir-source***.jar
啓動,測試,成功!

具體代碼見GitHub地址:https://github.com/Wish000/me...

相關文章
相關標籤/搜索