#一、用exec sources 執行一個tail -F /root/logs/test.log文件,而後輸出到hdfs上 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1java
# Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/logs/test.log a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#二、監視一個文件夾,判斷是否有新文件,同時把新文件上傳到HDFS,而後將文件更名字 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1nginx
# Describe/configure the source ##注意:不能往監控目中重複丟同名文件 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path =/flume/events/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#三、兩級鏈接,第一級用tail實時檢測數據,而後sink到另外一集的sources中git
第一級的配置web
################## # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/logs/test.log a1.sources.r1.channels = c1 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = hadoop1 a1.sinks.k1.port = 44444 a1.sinks.k1.batch-size = 2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
第二級配置apache
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#四、多臺服務器中有多個文件,實現將多臺服務器的多個文件採集而且分類保存合併。 #思路:用到二級flume。數組
第一級用exec執行tail來讀取,同時利用flume默認的static鏈接器,對每個event的頭不設置一個k,v信息,而後avro的sink。傳到另外一個flume的avro sources。服務器
第二級的flume的sources--avro讀取到一個信息,就將它sinks到一個HDFS上,而後路徑是 /flume/tmp/%Y-%m-%D/%{key}/
經過key的不一樣追加到不一樣的文件中,從而實現不一樣文件的分類。app
第一級文件的flume函數
# Name the components on this agent a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/logs/access.log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = type a1.sources.r1.interceptors.i1.value = access a1.sources.r2.type = exec a1.sources.r2.command = tail -F /root/logs/nginx.log a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = type a1.sources.r2.interceptors.i2.value = nginx a1.sources.r3.type = exec a1.sources.r3.command = tail -F /root/logs/web.log a1.sources.r3.interceptors = i3 a1.sources.r3.interceptors.i3.type = static a1.sources.r3.interceptors.i3.key = type a1.sources.r3.interceptors.i3.value = web # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop03 a1.sinks.k1.port = 41414 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 2000000 a1.channels.c1.transactionCapacity = 100000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 a1.sinks.k1.channel = c1
第二級flume #定義agent名, source、channel、sink的名稱 a1.sources = r1 a1.sinks = k1 a1.channels = c1oop
#定義source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop03 a1.sources.r1.port =41414 #添加時間攔截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #定義channels a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000 #定義sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/source/logs/%{type}/%Y%m%d a1.sinks.k1.hdfs.filePrefix =events a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text #時間類型 #a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件不按條數生成 a1.sinks.k1.hdfs.rollCount = 0 #生成的文件不按時間生成 a1.sinks.k1.hdfs.rollInterval = 30 #生成的文件按大小生成 a1.sinks.k1.hdfs.rollSize = 10485760 #a1.sinks.k1.hdfs.rollSize =0 #批量寫入hdfs的個數 a1.sinks.k1.hdfs.batchSize = 10000 flume操做hdfs的線程數(包括新建,寫入等) a1.sinks.k1.hdfs.threadsPoolSize=10 #操做hdfs超時時間 a1.sinks.k1.hdfs.callTimeout=30000 #組裝source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#五、自定義攔截器
不少時候咱們須要對咱們的文件進行:字段抽取、字段加密等操做,而flume默認的攔截器沒有提供這些功能,因此咱們須要本身去寫這些功能,就須要用到自定義攔截器
自定義攔截器
/** * [@author](https://my.oschina.net/arthor) lishas * */ public class CustomParameterInterceptor implements Interceptor{ /** The field_separator.指明每一行字段的分隔符 */ private final String fields_separator; /** The indexs.經過分隔符分割後,指明須要那列的字段 下標*/ private final String indexs; /** The indexs_separator. 多個下標的分隔符*/ private final String indexs_separator; /** The encrypted_field_index. 須要加密的字段下標*/ private final String encrypted_field_index; /** * * [@param](https://my.oschina.net/u/2303379) regex * [@param](https://my.oschina.net/u/2303379) field_separator * [@param](https://my.oschina.net/u/2303379) indexs * [@param](https://my.oschina.net/u/2303379) indexs_separator */ public CustomParameterInterceptor( String fields_separator, String indexs, String indexs_separator,String encrypted_field_index) { String f = fields_separator.trim(); String i = indexs_separator.trim(); this.indexs = indexs; this.encrypted_field_index=encrypted_field_index.trim(); if (!f.equals("")) { f = UnicodeToString(f); } this.fields_separator =f; if (!i.equals("")) { i = UnicodeToString(i); } this.indexs_separator = i; } /* * * \t 製表符 ('\u0009') \n 新行(換行)符 (' ') \r 回車符 (' ') \f 換頁符 ('\u000C') \a 報警 * (bell) 符 ('\u0007') \e 轉義符 ('\u001B') \cx 空格(\u0020)對應於 x 的控制符 * * @param str * @return * @data:2015-6-30 */ public static String UnicodeToString(String str) { Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))"); Matcher matcher = pattern.matcher(str); char ch; while (matcher.find()) { ch = (char) Integer.parseInt(matcher.group(2), 16); str = str.replace(matcher.group(1), ch + ""); } return str; } /* * @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event) */ public Event intercept(Event event) { if (event == null) { return null; } try { String line = new String(event.getBody(), Charsets.UTF_8); String[] fields_spilts = line.split(fields_separator); String[] indexs_split = indexs.split(indexs_separator); String newLine=""; for (int i = 0; i < indexs_split.length; i++) { int parseInt = Integer.parseInt(indexs_split[i]); //對加密字段進行加密 if(!"".equals(encrypted_field_index)&&encrypted_field_index.equals(indexs_split[i])){ newLine+=StringUtils.GetMD5Code(fields_spilts[parseInt]); }else{ newLine+=fields_spilts[parseInt]; } if(i!=indexs_split.length-1){ newLine+=fields_separator; } } event.setBody(newLine.getBytes(Charsets.UTF_8)); return event; } catch (Exception e) { return event; } } /* * @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List) */ public List<Event> intercept(List<Event> events) { List<Event> out = new ArrayList<Event>(); for (Event event : events) { Event outEvent = intercept(event); if (outEvent != null) { out.add(outEvent); } } return out; } /* * @see org.apache.flume.interceptor.Interceptor#initialize() */ public void initialize() { // TODO Auto-generated method stub } /* * @see org.apache.flume.interceptor.Interceptor#close() */ public void close() { // TODO Auto-generated method stub } public static class Builder implements Interceptor.Builder { /** The fields_separator.指明每一行字段的分隔符 */ private String fields_separator; /** The indexs.經過分隔符分割後,指明須要那列的字段 下標*/ private String indexs; /** The indexs_separator. 多個下標下標的分隔符*/ private String indexs_separator; /** The encrypted_field. 須要加密的字段下標*/ private String encrypted_field_index; /* * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context) */ public void configure(Context context) { fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR); indexs = context.getString(INDEXS, DEFAULT_INDEXS); indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR); encrypted_field_index= context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX); } /* * @see org.apache.flume.interceptor.Interceptor.Builder#build() */ public Interceptor build() { return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator,encrypted_field_index); } } /** * The Class Constants. * * @author lishas */ public static class Constants { /** The Constant FIELD_SEPARATOR. */ public static final String FIELD_SEPARATOR = "fields_separator"; /** The Constant DEFAULT_FIELD_SEPARATOR. */ public static final String DEFAULT_FIELD_SEPARATOR =" "; /** The Constant INDEXS. */ public static final String INDEXS = "indexs"; /** The Constant DEFAULT_INDEXS. */ public static final String DEFAULT_INDEXS = "0"; /** The Constant INDEXS_SEPARATOR. */ public static final String INDEXS_SEPARATOR = "indexs_separator"; /** The Constant DEFAULT_INDEXS_SEPARATOR. */ public static final String DEFAULT_INDEXS_SEPARATOR = ","; /** The Constant ENCRYPTED_FIELD_INDEX. */ public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index"; /** The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */ public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = ""; /** The Constant PROCESSTIME. */ public static final String PROCESSTIME = "processTime"; /** The Constant PROCESSTIME. */ public static final String DEFAULT_PROCESSTIME = "a"; } /** * 字符串md5加密 */ public static class StringUtils { // 全局數組 private final static String[] strDigits = { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" }; // 返回形式爲數字跟字符串 private static String byteToArrayString(byte bByte) { int iRet = bByte; // System.out.println("iRet="+iRet); if (iRet < 0) { iRet += 256; } int iD1 = iRet / 16; int iD2 = iRet % 16; return strDigits[iD1] + strDigits[iD2]; } // 返回形式只爲數字 private static String byteToNum(byte bByte) { int iRet = bByte; System.out.println("iRet1=" + iRet); if (iRet < 0) { iRet += 256; } return String.valueOf(iRet); } // 轉換字節數組爲16進制字串 private static String byteToString(byte[] bByte) { StringBuffer sBuffer = new StringBuffer(); for (int i = 0; i < bByte.length; i++) { sBuffer.append(byteToArrayString(bByte[i])); } return sBuffer.toString(); } public static String GetMD5Code(String strObj) { String resultString = null; try { resultString = new String(strObj); MessageDigest md = MessageDigest.getInstance("MD5"); // md.digest() 該函數返回值爲存放哈希值結果的byte數組 resultString = byteToString(md.digest(strObj.getBytes())); } catch (NoSuchAlgorithmException ex) { ex.printStackTrace(); } return resultString; } } }
#將這個類達成jar包,而後上傳到flume安裝目錄下的lib文件夾中。
接着就能夠在一級flume配置文件中去使用它 a1.sources.r1.spoolDir = /root/logs/ a1.sources.r1.batchSize= 50 a1.sources.r1.inputCharset = UTF-8 a1.sources.r1.interceptors =i1 i2 a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder a1.sources.r1.interceptors.i1.fields_separator=\\u0009 a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6 a1.sources.r1.interceptors.i1.indexs_separator =\\u002c a1.sources.r1.interceptors.i1.encrypted_field_index =0 a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #sink a1.sinks.s1.channel = c1 a1.sinks.s1.type = hdfs a1.sinks.s1.hdfs.path =hdfs://mini1:9000/flume/%Y%m%d a1.sinks.s1.hdfs.filePrefix = event a1.sinks.s1.hdfs.fileSuffix = .log a1.sinks.s1.hdfs.rollSize = 10485760 a1.sinks.s1.hdfs.rollInterval =20 a1.sinks.s1.hdfs.rollCount = 0 a1.sinks.s1.hdfs.batchSize = 2 a1.sinks.s1.hdfs.round = true a1.sinks.s1.hdfs.roundUnit = minute a1.sinks.s1.hdfs.threadsPoolSize = 25 a1.sinks.s1.hdfs.useLocalTimeStamp = true a1.sinks.s1.hdfs.minBlockReplicas = 1 a1.sinks.s1.hdfs.fileType =DataStream a1.sinks.s1.hdfs.writeFormat = Text a1.sinks.s1.hdfs.callTimeout = 60000 a1.sinks.s1.hdfs.idleTimeout =60