攔截器做用:攔截器是簡單的插件式組件,設置在source和channel之間。source接收到的事件,在寫入channel以前,攔截器均可以進行轉換或者刪除這些事件。每一個攔截器只處理同一個source接收到的事件。能夠自定義攔截器。git
flume修改時間戳的插件見 https://github.com/haebin/flume-timestamp-interceptorgithub
有一個缺陷是,DateUtils.parseDate(timestamp, dateFormat)裏面的dateFormat不支持unix時間戳,只能本身手動添加了apache
原來是:app
- String timestamp = get(index, data);
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- headers.put(TIMESTAMP, Long.toString(now));
修改後ui
- String timestamp = get(index, data);
- if (dateFormat[0].equals("tsecond")){
- now = Long.parseLong(timestamp)*1000;
- }
- else if(dateFormat[0].equals("tmillisecond")){
- now = Long.parseLong(timestamp);
- }
- else if(dateFormat[0].equals("tnanosecond")){
- now = Long.parseLong(timestamp)/1000000;
- }
- else {
- now = DateUtils.parseDate(timestamp, dateFormat).getTime();
- }
- headers.put(TIMESTAMP, Long.toString(now));
flume配置:spa
- kafka_sn_hive.sources.s1.interceptors = timestamp
- kafka_sn_hive.sources.s1.interceptors.timestamp.type = org.apache.flume.interceptor.EventTimestampInterceptor$Builder
- kafka_sn_hive.sources.s1.interceptors.timestamp.preserveExisting = false
- kafka_sn_hive.sources.s1.interceptors.timestamp.delimiter = ,
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateIndex = 4
- kafka_sn_hive.sources.s1.interceptors.timestamp.dateFormat = tsecond
表示按逗號作分隔符的第四個(從0開始)字段是一個秒單位的時間戳。插件
在flume裏面,時間戳是毫秒級別的,因此要判斷這個字段是秒仍是毫秒納秒unix
見http://lisux.me/lishuai/?p=867orm