把日誌灌入到Elasticsearch的好處以及具體實現

通常來說一個高併發高性能的系統,日誌是很是龐大的,隨時可能高達幾個T,一臺服務器的硬盤極有可能裝不下,而Elasticsearch的集羣能夠分佈在不一樣的機器上,而又對整個集羣做爲一個總體,對其大容量的內容進行存儲以及它的最牛掰的能力--檢索.服務器

首先在配置文件中作以下配置併發

elasticsearch:
  clusterName: aubin-cluster
  clusterNodes: 192.168.5.182:9300
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {

   private String clusterName;

   private String clusterNodes;

    /**
     * 使用elasticsearch實現類時才觸發
     *
     * @return
     */
   @Bean
    @ConditionalOnBean(value = EsLogServiceImpl.class)
   public TransportClient getESClient() {
      // 設置集羣名字
      Settings settings = Settings.builder().put("cluster.name", this.clusterName).build();
      TransportClient client = new PreBuiltTransportClient(settings);
      try {
         // 讀取的ip列表是以逗號分隔的
         for (String clusterNode : this.clusterNodes.split(",")) {
            String ip = clusterNode.split(":")[0];
            String port = clusterNode.split(":")[1];
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(ip), Integer.parseInt(port)));
         }
      } catch (UnknownHostException e) {
         e.printStackTrace();
      }

      return client;
   }
}

再定義一個操做接口app

public interface LogService {

   void save(Log log);

   Page<Log> findLogs(Map<String, Object> params);

}

具體實現類爲elasticsearch

@Service
public class EsLogServiceImpl implements LogService, ApplicationContextAware {

   private static final Logger logger = LoggerFactory.getLogger(EsLogServiceImpl.class);

   private static final String INDEX = "index_logs";
   private static final String TYPE = "type_logs";

   @Autowired
   private TransportClient client;

   @Async
   @Override
   public void save(Log log) {
      if (log.getCreateTime() == null) {
         log.setCreateTime(new Date());
      }
      if (log.getFlag() == null) {
         log.setFlag(Boolean.TRUE);
      }
      logger.info("{}", log);

      String string = JSONObject.toJSONString(log);

      IndexRequestBuilder builder = client.prepareIndex(INDEX, TYPE).setSource(string, XContentType.JSON);
      builder.execute();
   }

   @Override
   public Page<Log> findLogs(Map<String, Object> params) {
      SearchRequestBuilder builder = client.prepareSearch().setIndices(INDEX).setTypes(TYPE);
      if (!CollectionUtils.isEmpty(params)) {
         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();

         // 用戶名模糊匹配
         String username = MapUtils.getString(params, "username");
         if (StringUtils.isNoneBlank(username)) {
            queryBuilder.must(QueryBuilders.wildcardQuery("username", "*" + username + "*"));
         }

         // 模塊精確匹配
         String module = MapUtils.getString(params, "module");
         if (StringUtils.isNoneBlank(module)) {
            queryBuilder.must(QueryBuilders.matchQuery("module", module));
         }

         String flag = MapUtils.getString(params, "flag");
         if (StringUtils.isNoneBlank(flag)) {
            Boolean bool = Boolean.FALSE;
            if ("1".equals(flag) || "true".equalsIgnoreCase(flag)) {
               bool = Boolean.TRUE;
            }
            queryBuilder.must(QueryBuilders.matchQuery("flag", bool));
         }

         // 大於等於開始日期,格式yyyy-MM-dd
         String beginTime = MapUtils.getString(params, "beginTime");
         if (StringUtils.isNoneBlank(beginTime)) {
            // 轉化爲0點0分0秒
            Long timestamp = toTimestamp(beginTime + "T00:00:00");
            queryBuilder.must(QueryBuilders.rangeQuery("createTime").from(timestamp));
         }

         // 小於等於結束日期,格式yyyy-MM-dd
         String endTime = MapUtils.getString(params, "endTime");
         if (StringUtils.isNoneBlank(endTime)) {
            // 轉化爲23點59分59秒
            Long timestamp = toTimestamp(endTime + "T23:59:59");
            queryBuilder.must(QueryBuilders.rangeQuery("createTime").to(timestamp));
         }

         if (queryBuilder != null) {
            builder.setPostFilter(queryBuilder);
         }
      }

      builder.addSort("createTime", SortOrder.DESC);

      PageUtil.pageParamConver(params, true);
      Integer start = MapUtils.getInteger(params, PageUtil.START);
      if (start != null) {
         builder.setFrom(start);
      }

      Integer length = MapUtils.getInteger(params, PageUtil.LENGTH);
      if (length != null) {
         builder.setSize(length);
      }

      SearchResponse searchResponse = builder.get();

      SearchHits searchHits = searchResponse.getHits();
      // 總數量
      Long total = searchHits.getTotalHits();

      int size = searchHits.getHits().length;
      List<Log> list = new ArrayList<>(size);
      if (size > 0) {
         searchHits.forEach(hit -> {
            String val = hit.getSourceAsString();
            list.add(JSONObject.parseObject(val, Log.class));
         });
      }

      return new Page<>(total.intValue(), list);
   }

   private Long toTimestamp(String str) {
      LocalDateTime localDateTime = LocalDateTime.parse(str);
      Date date = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());

      return date.getTime();
   }

   private static ApplicationContext applicationContext = null;

   @Override
   public void setApplicationContext(ApplicationContext context) throws BeansException {
      applicationContext = context;
   }

   /**
    * 初始化日誌es索引
    */
   @PostConstruct
   public void initIndex() {
      LogService logService = applicationContext.getBean(LogService.class);
      // 日誌實現是否採用elasticsearch
      boolean flag = (logService instanceof EsLogServiceImpl);
      if (!flag) {
         return;
      }

      try {
         // 判斷索引是否存在
         IndicesExistsResponse indicesExistsResponse = client.admin().indices()
               .exists(new IndicesExistsRequest(INDEX)).get();
         if (indicesExistsResponse.isExists()) {
            return;
         }
      } catch (InterruptedException e) {
         e.printStackTrace();
      } catch (ExecutionException e) {
         e.printStackTrace();
      }

      CreateIndexRequestBuilder requestBuilder = client.admin().indices().prepareCreate(INDEX);

      CreateIndexResponse createIndexResponse = requestBuilder.execute().actionGet();
      if (createIndexResponse.isAcknowledged()) {
         logger.info("索引:{},建立成功", INDEX);
      } else {
         logger.error("索引:{},建立失敗", INDEX);
      }
   }

}
相關文章
相關標籤/搜索