flume創建ElasticSearch索引時間的問題

flume收集到的日誌,採用ElasticSearch做爲存儲,運行一段時間後,發現天天早點八點纔會建立索引文件,比北京時間晚了8個小時,致使0點到8點這段時間的數據並無存儲到今天的索引文件中,而是存儲在前一天的索引中,當時覺得數據丟失了,全文檢索後發現數據被存在了前一天的索引文件中。首先肯定系統的時間是準備的,基本定位到應該是flume自身建立索引的時候除了問題,查看源代碼。java

flume建立ElasticSearch索引文件的代碼以下app

if (indexRequestBuilderFactory == null) {
      indexRequestBuilder = client
          .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
          .setSource(serializer.getContentBuilder(event).bytes());
} else {
      indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest(
          client, indexNameBuilder.getIndexPrefix(event), indexType, event);
}

下面看indexNameBuilder.getIndexName(event)獲取索引ide

@Override
  public String getIndexName(Event event) {
    TimestampedEvent timestampedEvent = new TimestampedEvent(event);
    long timestamp = timestampedEvent.getTimestamp();
    return new StringBuilder(indexPrefix).append('-')
      .append(fastDateFormat.format(timestamp)).toString();
  }
TimestampedEvent(Event base) {
    setBody(base.getBody());
    Map<String, String> headers = Maps.newHashMap(base.getHeaders());
    String timestampString = headers.get("timestamp");
    if (StringUtils.isBlank(timestampString)) {
      timestampString = headers.get("@timestamp");
    }
    if (StringUtils.isBlank(timestampString)) {
      this.timestamp = DateTimeUtils.currentTimeMillis();
      headers.put("timestamp", String.valueOf(timestamp ));
    } else {
      this.timestamp = Long.valueOf(timestampString);
    }
    setHeaders(headers);
  }

TimestampedEvent會先獲取事件中的timestamp,若是事件中沒有timestamp,就取當前的毫秒時間。ui

fastDateFormat會格式化timestamp,也就是生成索引後面的日期,如flume-2015-01-01,fastDateFormat默認會採用Etc/UTC時區,也就是會比北京時間晚8小時,北京時間早上8點,獲取到的就是0點。this

private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd",
      TimeZone.getTimeZone("Etc/UTC"));

另外能夠經過配置文件來配置fastDateFormat的時區,咱們採用便可spa

a1.sinks.k1.timeZone=Asia/Shanghai

使用ElasticSearch sink的時候,要加上上面這句話,這樣就能夠解決建立索引晚8小時的問題了。日誌

相關文章
相關標籤/搜索