flume攔截器

攔截器做用:攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件,在寫入channel以前,攔截器均可以進行轉換或者刪除這些事件。每一個攔截器只處理同一個source接收到的事件。能夠自定義攔截器。git

flume修改時間戳的插件見 https://github.com/haebin/flume-timestamp-interceptorgithub

 

有一個缺陷是,DateUtils.parseDate(timestamp, dateFormat)裏面的dateFormat不支持unix時間戳,只能本身手動添加了apache

原來是:app

  1. String timestamp = get(index, data);
  2. now = DateUtils.parseDate(timestamp, dateFormat).getTime();
  3. headers.put(TIMESTAMP, Long.toString(now));

修改後ui

  1. String timestamp = get(index, data);
  2. if (dateFormat[0].equals("tsecond")){
  3. now = Long.parseLong(timestamp)*1000;
  4. }
  5. else if(dateFormat[0].equals("tmillisecond")){
  6. now = Long.parseLong(timestamp);
  7. }
  8. else if(dateFormat[0].equals("tnanosecond")){
  9. now = Long.parseLong(timestamp)/1000000;
  10. }
  11. else {
  12. now = DateUtils.parseDate(timestamp, dateFormat).getTime();
  13. }
  14. headers.put(TIMESTAMP, Long.toString(now));

flume配置:spa

  1. kafka_sn_hive.sources.s1.interceptors = timestamp
  2. kafka_sn_hive.sources.s1.interceptors.timestamp.type = org.apache.flume.interceptor.EventTimestampInterceptor$Builder
  3. kafka_sn_hive.sources.s1.interceptors.timestamp.preserveExisting = false
  4. kafka_sn_hive.sources.s1.interceptors.timestamp.delimiter = ,
  5. kafka_sn_hive.sources.s1.interceptors.timestamp.dateIndex = 4
  6. kafka_sn_hive.sources.s1.interceptors.timestamp.dateFormat = tsecond

表示按逗號作分隔符的第四個(從0開始)字段是一個秒單位的時間戳。插件

在flume裏面,時間戳是毫秒級別的,因此要判斷這個字段是秒仍是毫秒納秒unix

 

見http://lisux.me/lishuai/?p=867orm

相關文章
相關標籤/搜索