三、flume的簡單示例

#一、用exec sources 執行一個tail -F /root/logs/test.log文件,而後輸出到hdfs上 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1java

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本
a1.sinks.k1.hdfs.fileType = DataStream



# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#二、監視一個文件夾,判斷是否有新文件,同時把新文件上傳到HDFS,而後將文件更名字 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1nginx

# Describe/configure the source
##注意:不能往監控目中重複丟同名文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/logs
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path =/flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認是Sequencefile,可用DataStream,則爲普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

#三、兩級鏈接,第一級用tail實時檢測數據,而後sink到另外一集的sources中git

  • 第一級的配置web

    ##################
      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
    
      # Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /root/logs/test.log
      a1.sources.r1.channels = c1
    
      # Describe the sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.channel = c1
      a1.sinks.k1.hostname = hadoop1
      a1.sinks.k1.port = 44444
      a1.sinks.k1.batch-size = 2
    
    
    
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
    
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
  • 第二級配置apache

    # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
    
      # Describe/configure the source
      a1.sources.r1.type = avro
      a1.sources.r1.channels = c1
      a1.sources.r1.bind = 0.0.0.0
      a1.sources.r1.port = 4141
    
      # Describe the sink
      a1.sinks.k1.type = logger
    
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
    
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1

#四、多臺服務器中有多個文件,實現將多臺服務器的多個文件採集而且分類保存合併。 #思路:用到二級flume。數組

  • 第一級用exec執行tail來讀取,同時利用flume默認的static鏈接器,對每個event的頭不設置一個k,v信息,而後avro的sink。傳到另外一個flume的avro sources。服務器

  • 第二級的flume的sources--avro讀取到一個信息,就將它sinks到一個HDFS上,而後路徑是 /flume/tmp/%Y-%m-%D/%{key}/
    經過key的不一樣追加到不一樣的文件中,從而實現不一樣文件的分類。app

  • 第一級文件的flume函數

    # Name the components on this agent
      a1.sources = r1 r2 r3
      a1.sinks = k1
      a1.channels = c1
    
      # Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /root/logs/access.log
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = static
      a1.sources.r1.interceptors.i1.key = type
      a1.sources.r1.interceptors.i1.value = access
    
      a1.sources.r2.type = exec
      a1.sources.r2.command = tail -F /root/logs/nginx.log
      a1.sources.r2.interceptors = i2
      a1.sources.r2.interceptors.i2.type = static
      a1.sources.r2.interceptors.i2.key = type
      a1.sources.r2.interceptors.i2.value = nginx
    
      a1.sources.r3.type = exec
      a1.sources.r3.command = tail -F /root/logs/web.log
      a1.sources.r3.interceptors = i3
      a1.sources.r3.interceptors.i3.type = static
      a1.sources.r3.interceptors.i3.key = type
      a1.sources.r3.interceptors.i3.value = web
    
      # Describe the sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop03
      a1.sinks.k1.port = 41414
    
      # Use a channel which buffers events in memory
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 2000000
      a1.channels.c1.transactionCapacity = 100000
    
      # Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sources.r2.channels = c1
      a1.sources.r3.channels = c1
      a1.sinks.k1.channel = c1
  • 第二級flume #定義agent名, source、channel、sink的名稱 a1.sources = r1 a1.sinks = k1 a1.channels = c1oop

    #定義source
      a1.sources.r1.type = avro
      a1.sources.r1.bind = hadoop03
      a1.sources.r1.port =41414
    
      #添加時間攔截器
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    
    
      #定義channels
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 20000
      a1.channels.c1.transactionCapacity = 10000
    
      #定義sink
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/source/logs/%{type}/%Y%m%d
      a1.sinks.k1.hdfs.filePrefix =events
      a1.sinks.k1.hdfs.fileType = DataStream
      a1.sinks.k1.hdfs.writeFormat = Text
      #時間類型
      #a1.sinks.k1.hdfs.useLocalTimeStamp = true
      #生成的文件不按條數生成
      a1.sinks.k1.hdfs.rollCount = 0
      #生成的文件不按時間生成
      a1.sinks.k1.hdfs.rollInterval = 30
      #生成的文件按大小生成
      a1.sinks.k1.hdfs.rollSize = 10485760
      #a1.sinks.k1.hdfs.rollSize =0
      #批量寫入hdfs的個數
      a1.sinks.k1.hdfs.batchSize = 10000
      flume操做hdfs的線程數(包括新建,寫入等)
      a1.sinks.k1.hdfs.threadsPoolSize=10
      #操做hdfs超時時間
      a1.sinks.k1.hdfs.callTimeout=30000
    
      #組裝source、channel、sink
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1

#五、自定義攔截器

不少時候咱們須要對咱們的文件進行:字段抽取、字段加密等操做,而flume默認的攔截器沒有提供這些功能,因此咱們須要本身去寫這些功能,就須要用到自定義攔截器

  • 自定義攔截器

    /**
       * [@author](https://my.oschina.net/arthor) lishas
       *
       */
      public class CustomParameterInterceptor implements Interceptor{
    
    
       /** The field_separator.指明每一行字段的分隔符 */
       private final String fields_separator;
    
       /** The indexs.經過分隔符分割後,指明須要那列的字段 下標*/
       private final String indexs;
    
       /** The indexs_separator. 多個下標的分隔符*/
       private final String indexs_separator;
    
       /** The encrypted_field_index. 須要加密的字段下標*/
       private final String encrypted_field_index; 
    
    
    
       /**
        * 
        * [@param](https://my.oschina.net/u/2303379) regex
        * [@param](https://my.oschina.net/u/2303379) field_separator
        * [@param](https://my.oschina.net/u/2303379) indexs
        * [@param](https://my.oschina.net/u/2303379) indexs_separator
        */
       public CustomParameterInterceptor( String fields_separator,
         String indexs, String indexs_separator,String encrypted_field_index) {
        String f = fields_separator.trim();
        String i = indexs_separator.trim();
        this.indexs = indexs;
        this.encrypted_field_index=encrypted_field_index.trim();
        if (!f.equals("")) {
         f = UnicodeToString(f);
        }
        this.fields_separator =f;
        if (!i.equals("")) {
         i = UnicodeToString(i);
        }
        this.indexs_separator = i;
       }
    
       /*
        * 
        * \t 製表符 ('\u0009') \n 新行(換行)符 (' ') \r 回車符 (' ') \f 換頁符 ('\u000C') \a 報警
        * (bell) 符 ('\u0007') \e 轉義符 ('\u001B') \cx 空格(\u0020)對應於 x 的控制符
        * 
        * @param str
        * @return
        * @data:2015-6-30
        */
    
       public static String UnicodeToString(String str) {
        Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
        Matcher matcher = pattern.matcher(str);
        char ch;
        while (matcher.find()) {
         ch = (char) Integer.parseInt(matcher.group(2), 16);
         str = str.replace(matcher.group(1), ch + "");
        }
        return str;
       }
    
       /*
        * @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
        */
       public Event intercept(Event event) {
        if (event == null) {
         return null;
        }
        try {
         String line = new String(event.getBody(), Charsets.UTF_8);
         String[] fields_spilts = line.split(fields_separator);
         String[] indexs_split = indexs.split(indexs_separator);
         String newLine="";
         for (int i = 0; i < indexs_split.length; i++) {
          int parseInt = Integer.parseInt(indexs_split[i]);
          //對加密字段進行加密
          if(!"".equals(encrypted_field_index)&&encrypted_field_index.equals(indexs_split[i])){
           newLine+=StringUtils.GetMD5Code(fields_spilts[parseInt]);
          }else{
           newLine+=fields_spilts[parseInt];
          }
    
          if(i!=indexs_split.length-1){
           newLine+=fields_separator;
          }
         }
         event.setBody(newLine.getBytes(Charsets.UTF_8));
         return event;
        } catch (Exception e) {
         return event;
        }
       }
    
       /* 
        * @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
        */
       public List<Event> intercept(List<Event> events) {
        List<Event> out = new ArrayList<Event>();
        for (Event event : events) {
         Event outEvent = intercept(event);
         if (outEvent != null) {
          out.add(outEvent);
         }
        }
        return out;
       }
    
       /* 
        * @see org.apache.flume.interceptor.Interceptor#initialize()
        */
       public void initialize() {
        // TODO Auto-generated method stub
    
       }
    
       /*
        * @see org.apache.flume.interceptor.Interceptor#close()
        */
       public void close() {
        // TODO Auto-generated method stub
    
       }
    
    
    
       public static class Builder implements Interceptor.Builder {
    
        /** The fields_separator.指明每一行字段的分隔符 */
        private String fields_separator;
    
        /** The indexs.經過分隔符分割後,指明須要那列的字段 下標*/
        private String indexs;
    
        /** The indexs_separator. 多個下標下標的分隔符*/
        private String indexs_separator;
    
        /** The encrypted_field. 須要加密的字段下標*/
        private String encrypted_field_index; 
    
        /* 
         * @see org.apache.flume.conf.Configurable#configure(org.apache.flume.Context)
         */
        public void configure(Context context) {
         fields_separator = context.getString(FIELD_SEPARATOR, DEFAULT_FIELD_SEPARATOR);
         indexs = context.getString(INDEXS, DEFAULT_INDEXS);
         indexs_separator = context.getString(INDEXS_SEPARATOR, DEFAULT_INDEXS_SEPARATOR);
         encrypted_field_index= context.getString(ENCRYPTED_FIELD_INDEX, DEFAULT_ENCRYPTED_FIELD_INDEX);
    
        }
    
        /* 
         * @see org.apache.flume.interceptor.Interceptor.Builder#build()
         */
        public Interceptor build() {
    
         return new CustomParameterInterceptor(fields_separator, indexs, indexs_separator,encrypted_field_index);
        }
       }
    
    
    
    
       /**
        * The Class Constants.
        *
        * @author lishas
        */
       public static class Constants {
        /** The Constant FIELD_SEPARATOR. */
        public static final String FIELD_SEPARATOR = "fields_separator";
    
        /** The Constant DEFAULT_FIELD_SEPARATOR. */
        public static final String DEFAULT_FIELD_SEPARATOR =" ";
    
        /** The Constant INDEXS. */
        public static final String INDEXS = "indexs";
    
        /** The Constant DEFAULT_INDEXS. */
        public static final String DEFAULT_INDEXS = "0";
    
        /** The Constant INDEXS_SEPARATOR. */
        public static final String INDEXS_SEPARATOR = "indexs_separator";
    
        /** The Constant DEFAULT_INDEXS_SEPARATOR. */
        public static final String DEFAULT_INDEXS_SEPARATOR = ",";
    
        /** The Constant ENCRYPTED_FIELD_INDEX. */
        public static final String ENCRYPTED_FIELD_INDEX = "encrypted_field_index";
    
        /** The Constant DEFAUL_TENCRYPTED_FIELD_INDEX. */
        public static final String DEFAULT_ENCRYPTED_FIELD_INDEX = "";
    
        /** The Constant PROCESSTIME. */
        public static final String PROCESSTIME = "processTime";
        /** The Constant PROCESSTIME. */
        public static final String DEFAULT_PROCESSTIME = "a";
    
       }
    
    
    
       /**
        * 字符串md5加密
        */
       public static class StringUtils {
           // 全局數組
           private final static String[] strDigits = { "0", "1", "2", "3", "4", "5",
                   "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
    
           // 返回形式爲數字跟字符串
           private static String byteToArrayString(byte bByte) {
               int iRet = bByte;
               // System.out.println("iRet="+iRet);
               if (iRet < 0) {
                   iRet += 256;
               }
               int iD1 = iRet / 16;
               int iD2 = iRet % 16;
               return strDigits[iD1] + strDigits[iD2];
           }
    
           // 返回形式只爲數字
           private static String byteToNum(byte bByte) {
               int iRet = bByte;
               System.out.println("iRet1=" + iRet);
               if (iRet < 0) {
                   iRet += 256;
               }
               return String.valueOf(iRet);
           }
    
           // 轉換字節數組爲16進制字串
           private static String byteToString(byte[] bByte) {
               StringBuffer sBuffer = new StringBuffer();
               for (int i = 0; i < bByte.length; i++) {
                   sBuffer.append(byteToArrayString(bByte[i]));
               }
               return sBuffer.toString();
           }
    
           public static String GetMD5Code(String strObj) {
               String resultString = null;
               try {
                   resultString = new String(strObj);
                   MessageDigest md = MessageDigest.getInstance("MD5");
                   // md.digest() 該函數返回值爲存放哈希值結果的byte數組
                   resultString = byteToString(md.digest(strObj.getBytes()));
               } catch (NoSuchAlgorithmException ex) {
                   ex.printStackTrace();
               }
               return resultString;
           }
       }
    
      }

#將這個類達成jar包,而後上傳到flume安裝目錄下的lib文件夾中。

接着就能夠在一級flume配置文件中去使用它
a1.sources.r1.spoolDir = /root/logs/
a1.sources.r1.batchSize= 50
a1.sources.r1.inputCharset = UTF-8

a1.sources.r1.interceptors =i1 i2
a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
a1.sources.r1.interceptors.i1.fields_separator=\\u0009
a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
a1.sources.r1.interceptors.i1.indexs_separator =\\u002c
a1.sources.r1.interceptors.i1.encrypted_field_index =0

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


#sink
a1.sinks.s1.channel = c1
a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.path =hdfs://mini1:9000/flume/%Y%m%d
a1.sinks.s1.hdfs.filePrefix = event
a1.sinks.s1.hdfs.fileSuffix = .log
a1.sinks.s1.hdfs.rollSize = 10485760
a1.sinks.s1.hdfs.rollInterval =20
a1.sinks.s1.hdfs.rollCount = 0
a1.sinks.s1.hdfs.batchSize = 2
a1.sinks.s1.hdfs.round = true
a1.sinks.s1.hdfs.roundUnit = minute
a1.sinks.s1.hdfs.threadsPoolSize = 25
a1.sinks.s1.hdfs.useLocalTimeStamp = true
a1.sinks.s1.hdfs.minBlockReplicas = 1
a1.sinks.s1.hdfs.fileType =DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.callTimeout = 60000
a1.sinks.s1.hdfs.idleTimeout =60
相關文章
相關標籤/搜索