flume收集到的日誌,採用ElasticSearch做爲存儲,運行一段時間後,發現天天早點八點纔會建立索引文件,比北京時間晚了8個小時,致使0點到8點這段時間的數據並無存儲到今天的索引文件中,而是存儲在前一天的索引中,當時覺得數據丟失了,全文檢索後發現數據被存在了前一天的索引文件中。首先肯定系統的時間是準備的,基本定位到應該是flume自身建立索引的時候除了問題,查看源代碼。java
flume建立ElasticSearch索引文件的代碼以下app
if (indexRequestBuilderFactory == null) { indexRequestBuilder = client .prepareIndex(indexNameBuilder.getIndexName(event), indexType) .setSource(serializer.getContentBuilder(event).bytes()); } else { indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest( client, indexNameBuilder.getIndexPrefix(event), indexType, event); }
下面看indexNameBuilder.getIndexName(event)獲取索引ide
@Override public String getIndexName(Event event) { TimestampedEvent timestampedEvent = new TimestampedEvent(event); long timestamp = timestampedEvent.getTimestamp(); return new StringBuilder(indexPrefix).append('-') .append(fastDateFormat.format(timestamp)).toString(); }
TimestampedEvent(Event base) { setBody(base.getBody()); Map<String, String> headers = Maps.newHashMap(base.getHeaders()); String timestampString = headers.get("timestamp"); if (StringUtils.isBlank(timestampString)) { timestampString = headers.get("@timestamp"); } if (StringUtils.isBlank(timestampString)) { this.timestamp = DateTimeUtils.currentTimeMillis(); headers.put("timestamp", String.valueOf(timestamp )); } else { this.timestamp = Long.valueOf(timestampString); } setHeaders(headers); }
TimestampedEvent會先獲取事件中的timestamp,若是事件中沒有timestamp,就取當前的毫秒時間。ui
fastDateFormat會格式化timestamp,也就是生成索引後面的日期,如flume-2015-01-01,fastDateFormat默認會採用Etc/UTC時區,也就是會比北京時間晚8小時,北京時間早上8點,獲取到的就是0點。this
private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd", TimeZone.getTimeZone("Etc/UTC"));
另外能夠經過配置文件來配置fastDateFormat的時區,咱們採用便可spa
a1.sinks.k1.timeZone=Asia/Shanghai
使用ElasticSearch sink的時候,要加上上面這句話,這樣就能夠解決建立索引晚8小時的問題了。日誌