flume寫hdfs的操做在HDFSEventSink.process方法中,路徑建立由BucketPath完成java
分析其源碼(參考:http://caiguangguang.blog.51cto.com/1652935/1619539)nginx
能夠使用%{}變量替換的形式實現,只須要獲取event中時間字段(nginx日誌的local time)傳入hdfs.path便可apache
具體實現以下:json
1.在KafkaSource的process方法中增長:ide
dt = KafkaSourceUtil.getDateMessage(new String(kafkaMessage)); hour = KafkaSourceUtil.getHourMessage(new String(kafkaMessage)); headers.put("eventdate",dt); headers.put("eventhour",hour); log.debug("source get one event header info");
增長兩個頭部,分別用來記錄日誌的day和hourdebug
2.KafkaSourceUtil中的方法日誌
由於咱們的消息body是json的,所以用得了java的json-lib包,好比取消息的day:orm
public static String getDateMessage(String message) { String dt = null; JSONObject json = JSONObject.fromObject(message); String[] splitMessage = json.getString("message").split("\t"); String logTime = splitMessage[3].trim(); log.debug("in getDateMessage logTime is: " + logTime); String format = "[dd/MMM/yyyy:HH:mm:ss Z]"; SimpleDateFormat rawDateFormat = null; Date date = null; SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyyMMdd"); rawDateFormat = new SimpleDateFormat(format,Locale.ENGLISH); try{ date = rawDateFormat.parse(logTime); dt = dateFormat2.format(date); log.debug("in getDateMessage dt is: " + dt); }catch(Exception ex){ dt = "empty"; } return dt; }
2.hdfs.path設置頭便可server
agent-server4.sinks.hdfs-sink2.type = hdfs agent-server4.sinks.hdfs-sink2.hdfs.path = hdfs://xxx:8020/data/flume/mobile-ubt-all/%{eventdate}/%{eventhour}
最終日誌:xml
flume-server4.log.3:09 Apr 2015 15:18:49,966 DEBUG [hdfs-hdfs-sink1-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter$2.call:276) - Rolling file (hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp): Roll scheduled after 60 sec elapsed. flume-server4.log.3:09 Apr 2015 15:18:49,969 INFO [hdfs-hdfs-sink1-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:363) - Closing hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp flume-server4.log.3:09 Apr 2015 15:18:49,990 INFO [hdfs-hdfs-sink1-call-runner-2] (org.apache.flume.sink.hdfs.BucketWriter$8.call:629) - Renaming hdfs://xxx:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866.tmp to hdfs://192.168.101.6:8020/data/flume/mobile-ubt-all/20150409/12/192.168.101.52-04-01-.1428563869866