自定義可記錄偏移量的TailFileSource

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.xp.cn.TailFileSource
a1.sources.r1.filePath = /usr/local/devtools/cdhbigdata/cdhflume/xpdata/data.txt
a1.sources.r1.posiFile = /usr/local/devtools/cdhbigdata/cdhflume/xpdata/posi.txt
a1.sources.r1.interval = 2
a1.sources.r1.charset = UTF-8


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory=/usr/local/devtools/cdhbigdata/cdhflume/xpdata/file_roll



# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


#bin/flume-ng agent -n a1 -f    /usr/local/devtools/cdhbigdata/cdhflume/apache-flume-1.5.0-cdh5.3.6-bin/conf/flume-myflume.properties -Dflume.root.logger=INFO,console
package com.xp.cn;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.channel.ChannelProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Created by xupan on 2017/12/26.
 * flume source的生命週期,構造器>configure>start>processor.process>stop
 */
public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);
    private String filePath;
    private String charset;
    private String posiFile;
    private Long interval;

    private ExecutorService executor;
    FileRunnable fileRunnable;

    /**
     * 讀取配置文件
     * <p>
     * 配置文件內容:
     * 1.讀取哪一個配置文件
     * 2.編碼集
     * 3.偏移量寫到哪一個配置文件
     * 4.多長時間檢測一下文件是否有新內容
     *
     * @param context
     */
    @Override
    public void configure(Context context) {
        filePath = context.getString("filePath");//指定監聽的文件
        charset = context.getString("charset", "UTF-8");
        posiFile = context.getString("posiFile");
        interval = context.getLong("interval");
    }

    /**
     * 1.建立一個線程監聽文件
     */
    @Override
    public synchronized void start() {
        //建立一個單線程的線程池
        executor = Executors.newSingleThreadExecutor();

        //定義一個實現Runnable接口類
        fileRunnable = new FileRunnable(filePath, posiFile, interval, charset, this.getChannelProcessor());

        //實現Runable接口類提交到線程池
        executor.submit(fileRunnable);
    }

    @Override
    public synchronized void stop() {
        fileRunnable.setFlag(Boolean.FALSE);
        executor.shutdown();

        while (!this.executor.isTerminated()) {
            logger.debug("Waiting for exec executor service to stop");

            try {
                this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException var2) {
                logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }
        super.stop();
    }

    private static class FileRunnable implements Runnable {


        private Long interval;
        private String charset;
        private ChannelProcessor channelProcessor;

        private Long offset = 0L;
        private RandomAccessFile raf;
        private Boolean flag = true;
        private File positionFile;


        public void setFlag(Boolean flag) {
            this.flag = flag;
        }

        /**
         * @param filePath
         * @param posiFile
         * @param interval
         * @param charset
         * @param channelProcessor
         */
        private FileRunnable(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) {

            this.interval = interval;
            this.charset = charset;
            this.channelProcessor = channelProcessor;

            //讀取偏移量,有就接着讀,沒有就從頭讀
            positionFile = new File(posiFile);
            if (!positionFile.exists()) {
                try {
                    positionFile.createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error("create positionFile file error");
                }
            }

            //讀取偏移量
            try {
                String offsetString = FileUtils.readFileToString(positionFile);

                //第一次可能時空
                if (StringUtils.isNotEmpty(offsetString)) {
                    offset = Long.parseLong(offsetString);
                }

                //從指定的位置讀取數據 r:只讀
                raf = new RandomAccessFile(filePath, "r");
                raf.seek(offset);

            } catch (IOException e) {
                e.printStackTrace();
                logger.error("read positionFile file error");
            }
        }

        @Override
        public void run() {

            while (flag) {
                //每隔一段時間讀取文件新數據
                try {
                    String line = raf.readLine();

                    if (StringUtils.isNotEmpty(line)) {
                        line = new String(line.getBytes("ISO-8859-1"), charset);
                        //將數據發送給Channel
                        channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));

                        //獲取最新偏移量
                        offset = raf.getFilePointer();

                        //將offset寫入文件中,覆蓋
                        FileUtils.writeStringToFile(positionFile, offset + "");

                    } else {
                        Thread.sleep(interval);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error("read log error");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    logger.error("Thread sleep error");
                }
            }

        }
    }


}

 

 

 

相關文章
相關標籤/搜索