# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.xp.cn.TailFileSource a1.sources.r1.filePath = /usr/local/devtools/cdhbigdata/cdhflume/xpdata/data.txt a1.sources.r1.posiFile = /usr/local/devtools/cdhbigdata/cdhflume/xpdata/posi.txt a1.sources.r1.interval = 2 a1.sources.r1.charset = UTF-8 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory=/usr/local/devtools/cdhbigdata/cdhflume/xpdata/file_roll # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 #bin/flume-ng agent -n a1 -f /usr/local/devtools/cdhbigdata/cdhflume/apache-flume-1.5.0-cdh5.3.6-bin/conf/flume-myflume.properties -Dflume.root.logger=INFO,console
package com.xp.cn; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.apache.flume.channel.ChannelProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Created by xupan on 2017/12/26. * flume source的生命週期,構造器>configure>start>processor.process>stop */ public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable { private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class); private String filePath; private String charset; private String posiFile; private Long interval; private ExecutorService executor; FileRunnable fileRunnable; /** * 讀取配置文件 * <p> * 配置文件內容: * 1.讀取哪一個配置文件 * 2.編碼集 * 3.偏移量寫到哪一個配置文件 * 4.多長時間檢測一下文件是否有新內容 * * @param context */ @Override public void configure(Context context) { filePath = context.getString("filePath");//指定監聽的文件 charset = context.getString("charset", "UTF-8"); posiFile = context.getString("posiFile"); interval = context.getLong("interval"); } /** * 1.建立一個線程監聽文件 */ @Override public synchronized void start() { //建立一個單線程的線程池 executor = Executors.newSingleThreadExecutor(); //定義一個實現Runnable接口類 fileRunnable = new FileRunnable(filePath, posiFile, interval, charset, this.getChannelProcessor()); //實現Runable接口類提交到線程池 executor.submit(fileRunnable); } @Override public synchronized void stop() { fileRunnable.setFlag(Boolean.FALSE); executor.shutdown(); while (!this.executor.isTerminated()) { logger.debug("Waiting for exec executor service to stop"); try { this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS); } catch (InterruptedException var2) { logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting."); Thread.currentThread().interrupt(); } } super.stop(); } private static class FileRunnable implements Runnable { private Long interval; private String charset; private ChannelProcessor channelProcessor; private Long offset = 0L; private RandomAccessFile raf; private Boolean flag = true; private File positionFile; public void setFlag(Boolean flag) { this.flag = flag; } /** * @param filePath * @param posiFile * @param interval * @param charset * @param channelProcessor */ private FileRunnable(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) { this.interval = interval; this.charset = charset; this.channelProcessor = channelProcessor; //讀取偏移量,有就接着讀,沒有就從頭讀 positionFile = new File(posiFile); if (!positionFile.exists()) { try { positionFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); logger.error("create positionFile file error"); } } //讀取偏移量 try { String offsetString = FileUtils.readFileToString(positionFile); //第一次可能時空 if (StringUtils.isNotEmpty(offsetString)) { offset = Long.parseLong(offsetString); } //從指定的位置讀取數據 r:只讀 raf = new RandomAccessFile(filePath, "r"); raf.seek(offset); } catch (IOException e) { e.printStackTrace(); logger.error("read positionFile file error"); } } @Override public void run() { while (flag) { //每隔一段時間讀取文件新數據 try { String line = raf.readLine(); if (StringUtils.isNotEmpty(line)) { line = new String(line.getBytes("ISO-8859-1"), charset); //將數據發送給Channel channelProcessor.processEvent(EventBuilder.withBody(line.getBytes())); //獲取最新偏移量 offset = raf.getFilePointer(); //將offset寫入文件中,覆蓋 FileUtils.writeStringToFile(positionFile, offset + ""); } else { Thread.sleep(interval); } } catch (IOException e) { e.printStackTrace(); logger.error("read log error"); } catch (InterruptedException e) { e.printStackTrace(); logger.error("Thread sleep error"); } } } } }