APDPlat充分利用Compass的OSEM和ORM integration特性,提供了簡單易用且功能強大的內置搜索特性。java
APDPlat的內置搜索,在設計簡潔優雅的同時,還具有了強大的實時搜索能力,用戶只需用註解的方式在模型中指定須要搜索哪些字段(還可在模型之間進行關聯搜索)就得到了搜索能力,而不用編寫任何代碼。平臺自動處理索引維護、查詢解析、結果高亮等支撐功能。node
然而APDPlat的內置搜索只能在單機上面使用,不支持分佈式,只能用於中小規模的場景。爲了支持大規模的分佈式搜索和實時分析,APDPlat選用Compass的進化版ElasticSearch (Compass和ElasticSearch的關係)。git
ElasticSearch提供了Java Client API,可是因爲該API依賴於Lucene的org.apache.lucene.util包中的幾個類,以至於沒法和APDPlat集成,緣由是APDPlat中Compass依賴的Lucene的版本和ElasticSearch依賴的版本衝突。github
從這裏能夠得知,ElasticSearch的Java Client API若是徹底移除對Lucene的依賴,僅僅做爲用戶和ElasticSearch集羣之間通訊的接口,使用起來就會更方便。數據庫
所以,APDPlat只能採用ElasticSearch的RESTful API。apache
接下來咱們看一個APDPlat和ElasticSearch集成的例子:json
APDPlat提供了可擴展的日誌處理接口,用戶可編寫本身的插件並在配置文件中指定啓用哪些插件,日誌處理接口以下:服務器
/** * 日誌處理接口: * 可將日誌存入獨立日誌數據庫(非業務數據庫) * 可將日誌傳遞到activemq\rabbitmq\zeromq等消息隊列 * 可將日誌傳遞到kafka\flume\chukwa\scribe等日誌聚合系統 * 可將日誌傳遞到elasticsearch\solr等搜索服務器 * @author 楊尚川 */ public interface LogHandler { public <T extends Model> void handle(List<T> list); }
將日誌傳遞到ElasticSearch搜索服務器的實現使用了幾個配置信息,這些配置信息默認存放在config.properties中,以下所示:app
#elasticsearch服務器配置 elasticsearch.host=localhost elasticsearch.port=9200 elasticsearch.log.index.name=apdplat_for_log
由於LogHandler接口中定義的參數List<T> list爲泛型,只知道T是Model的子類,而不知道具體是哪個類,因此咱們使用反射的機制來獲取具體對象類型:elasticsearch
String simpleName = model.getClass().getSimpleName(); LOG.debug((j++)+"、simpleName: 【"+simpleName+"】"); json.append("{\"index\":{\"_index\":\"") .append(INDEX_NAME) .append("\",\"_type\":\"") .append(simpleName) .append("\"}}") .append("\n"); json.append("{");
同時,咱們利用反射的方式獲取對象的字段以及相應的值,並正確處理類型問題:
Field[] fields = model.getClass().getDeclaredFields(); int len = fields.length; for(int i = 0; i < len; i++){ Field field = fields[i]; String name = field.getName(); field.setAccessible(true); Object value = field.get(model); //當心空指針異常,LogHandler線程會悄無聲息地退出! if(value == null){ LOG.debug("忽略空字段:"+name); continue; } if(i>0){ json.append(","); } String valueClass=value.getClass().getSimpleName(); LOG.debug("name: "+name+" type: "+valueClass); if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){ //提交給ES的日期時間值要爲"2014-01-31T13:53:54"這樣的形式 value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T"); } String prefix = "\""; String suffix = "\""; //提交給ES的數字和布爾值不要加雙引號 if("Float".equals(valueClass) || "Double".equals(valueClass) || "Long".equals(valueClass) || "Integer".equals(valueClass) || "Short".equals(valueClass) || "Boolean".equals(valueClass)){ prefix=""; suffix=""; } json.append("\"") .append(name) .append("\":") .append(prefix) .append(value) .append(suffix); } json.append("}\n");
構造完要提交的JSON數據以後,向服務器發送HTTP PUT請求:
HttpURLConnection conn = (HttpURLConnection) URL.openConnection(); conn.setRequestMethod("PUT"); conn.setDoOutput(true); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8")); writer.write(json.toString()); writer.flush(); StringBuilder result = new StringBuilder(); try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) { String line = reader.readLine(); while(line != null){ result.append(line); line = reader.readLine(); } }
服務器會以JSON數據格式返回處理結果,咱們使用Jackson解析返回的JSON字符串:
JsonNode node = MAPPER.readTree(resultStr); for(JsonNode item : node.get("items")){ JsonNode createJsonNode = item.get("create"); JsonNode okJsonNode = createJsonNode.get("ok"); if(okJsonNode != null){ boolean r = okJsonNode.getBooleanValue(); if(r){ success++; } }else{ JsonNode errorJsonNode = createJsonNode.get("error"); if(errorJsonNode != null){ String errorMessage = errorJsonNode.getTextValue(); LOG.error("索引失敗:"+errorMessage); } } }
下面是ElasticSearchLogHandler完整的實現:
/** * * 日誌處理實現: * 將日誌保存到ElasticSearch中 * 進行高性能實時搜索和分析 * 支持大規模分佈式搜索 * * @author 楊尚川 */ @Service public class ElasticSearchLogHandler implements LogHandler{ private static final APDPlatLogger LOG = new APDPlatLogger(ElasticSearchLogHandler.class); private static final String INDEX_NAME = PropertyHolder.getProperty("elasticsearch.log.index.name"); private static final String HOST = PropertyHolder.getProperty("elasticsearch.host"); private static final String PORT = PropertyHolder.getProperty("elasticsearch.port"); private static final ObjectMapper MAPPER = new ObjectMapper(); private static URL URL; private int success; public ElasticSearchLogHandler(){ LOG.info("elasticsearch.log.index.name: "+INDEX_NAME); LOG.info("elasticsearch.host: "+HOST); LOG.info("elasticsearch.port: "+PORT); try { URL = new URL("http://"+HOST+":"+PORT+"/_bulk"); } catch (MalformedURLException ex) { LOG.error("構造URL失敗",ex); } } /** * 批量索引 * 批量提交 * * @param <T> 泛型參數 * @param list 批量模型 */ public <T extends Model> void index(List<T> list){ success = 0; StringBuilder json = new StringBuilder(); int j = 1; //構造批量索引請求 for(T model : list){ try{ String simpleName = model.getClass().getSimpleName(); LOG.debug((j++)+"、simpleName: 【"+simpleName+"】"); json.append("{\"index\":{\"_index\":\"") .append(INDEX_NAME) .append("\",\"_type\":\"") .append(simpleName) .append("\"}}") .append("\n"); json.append("{"); Field[] fields = model.getClass().getDeclaredFields(); int len = fields.length; for(int i = 0; i < len; i++){ Field field = fields[i]; String name = field.getName(); field.setAccessible(true); Object value = field.get(model); //當心空指針異常,LogHandler線程會悄無聲息地退出! if(value == null){ LOG.debug("忽略空字段:"+name); continue; } if(i>0){ json.append(","); } String valueClass=value.getClass().getSimpleName(); LOG.debug("name: "+name+" type: "+valueClass); if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){ //提交給ES的日期時間值要爲"2014-01-31T13:53:54"這樣的形式 value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T"); } String prefix = "\""; String suffix = "\""; //提交給ES的數字和布爾值不要加雙引號 if("Float".equals(valueClass) || "Double".equals(valueClass) || "Long".equals(valueClass) || "Integer".equals(valueClass) || "Short".equals(valueClass) || "Boolean".equals(valueClass)){ prefix=""; suffix=""; } json.append("\"") .append(name) .append("\":") .append(prefix) .append(value) .append(suffix); } json.append("}\n"); }catch(SecurityException | IllegalArgumentException | IllegalAccessException e){ LOG.error("構造索引請求失敗【"+model.getMetaData()+"】\n"+model, e); } } //批量提交索引 try{ LOG.debug("提交JSON數據:\n"+json.toString()); HttpURLConnection conn = (HttpURLConnection) URL.openConnection(); conn.setRequestMethod("PUT"); conn.setDoOutput(true); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8")); writer.write(json.toString()); writer.flush(); StringBuilder result = new StringBuilder(); try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) { String line = reader.readLine(); while(line != null){ result.append(line); line = reader.readLine(); } } String resultStr = result.toString(); LOG.debug(resultStr); //使用Jackson解析返回的JSON JsonNode node = MAPPER.readTree(resultStr); for(JsonNode item : node.get("items")){ JsonNode createJsonNode = item.get("create"); JsonNode okJsonNode = createJsonNode.get("ok"); if(okJsonNode != null){ boolean r = okJsonNode.getBooleanValue(); if(r){ success++; } }else{ JsonNode errorJsonNode = createJsonNode.get("error"); if(errorJsonNode != null){ String errorMessage = errorJsonNode.getTextValue(); LOG.error("索引失敗:"+errorMessage); } } } }catch(IOException e){ LOG.error("批量提交索引失敗", e); } } @Override public <T extends Model> void handle(List<T> list) { LOG.info("開始將 "+list.size()+" 個日誌對象索引到ElasticSearch服務器"); long start = System.currentTimeMillis(); index(list); long cost = System.currentTimeMillis() - start; if(success != list.size()){ LOG.info("索引失敗: "+(list.size()-success)+" 個"); } if(success > 0){ LOG.info("索引成功: "+success+" 個"); } LOG.info("耗時:"+ConvertUtils.getTimeDes(cost)); } }
最後咱們在配置文件config.local.properties中指定log.handlers的值爲ElasticSearchLogHandler類的Spring bean name elasticSearchLogHandler,由於ElasticSearchLogHandler類加了Spring的@Service註解:
log.handlers=elasticSearchLogHandler