APDPlat拓展搜索之集成ElasticSearch

APDPlat充分利用Compass的OSEMORM integration特性,提供了簡單易用功能強大內置搜索特性。java

 

APDPlat的內置搜索,在設計簡潔優雅的同時,還具有了強大的實時搜索能力,用戶只需用註解的方式在模型中指定須要搜索哪些字段(還可在模型之間進行關聯搜索)就得到了搜索能力,而不用編寫任何代碼。平臺自動處理索引維護、查詢解析、結果高亮等支撐功能。node

 

然而APDPlat的內置搜索只能在單機上面使用,不支持分佈式,只能用於中小規模的場景。爲了支持大規模的分佈式搜索和實時分析,APDPlat選用Compass的進化版ElasticSearch Compass和ElasticSearch的關係)。git

 

ElasticSearch提供了Java Client API,可是因爲該API依賴於Lucene的org.apache.lucene.util包中的幾個類,以至於沒法和APDPlat集成,緣由是APDPlat中Compass依賴的Lucene的版本和ElasticSearch依賴的版本衝突。github

 

從這裏能夠得知,ElasticSearch的Java Client API若是徹底移除對Lucene的依賴,僅僅做爲用戶和ElasticSearch集羣之間通訊的接口,使用起來就會更方便。數據庫

 

所以,APDPlat只能採用ElasticSearch的RESTful APIapache

 

接下來咱們看一個APDPlat和ElasticSearch集成的例子:json

 

APDPlat提供了可擴展的日誌處理接口,用戶可編寫本身的插件並在配置文件中指定啓用哪些插件,日誌處理接口以下:服務器

 

/**
 * 日誌處理接口:
 * 可將日誌存入獨立日誌數據庫(非業務數據庫)
 * 可將日誌傳遞到activemq\rabbitmq\zeromq等消息隊列
 * 可將日誌傳遞到kafka\flume\chukwa\scribe等日誌聚合系統
 * 可將日誌傳遞到elasticsearch\solr等搜索服務器
 * @author 楊尚川
 */
public interface LogHandler {
    public <T extends Model> void handle(List<T> list);
}

 

將日誌傳遞到ElasticSearch搜索服務器的實現使用了幾個配置信息,這些配置信息默認存放在config.properties中,以下所示:app

 

#elasticsearch服務器配置
elasticsearch.host=localhost
elasticsearch.port=9200
elasticsearch.log.index.name=apdplat_for_log

 

由於LogHandler接口中定義的參數List<T> list爲泛型,只知道T是Model的子類,而不知道具體是哪個類,因此咱們使用反射的機制來獲取具體對象類型:elasticsearch

 

String simpleName = model.getClass().getSimpleName();
LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");
json.append("{\"index\":{\"_index\":\"")
  .append(INDEX_NAME)
  .append("\",\"_type\":\"")
  .append(simpleName)
  .append("\"}}")
  .append("\n");
json.append("{");

 

同時,咱們利用反射的方式獲取對象的字段以及相應的值,並正確處理類型問題:

 

Field[] fields = model.getClass().getDeclaredFields();
int len = fields.length;
for(int i = 0; i < len; i++){
	Field field = fields[i];
	String name = field.getName();
	field.setAccessible(true);
	Object value = field.get(model);
	//當心空指針異常,LogHandler線程會悄無聲息地退出!
	if(value == null){
		LOG.debug("忽略空字段:"+name);
		continue;
	}
	if(i>0){
		json.append(",");
	}
	String valueClass=value.getClass().getSimpleName();
	LOG.debug("name: "+name+"   type: "+valueClass);
	if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){
		//提交給ES的日期時間值要爲"2014-01-31T13:53:54"這樣的形式
		value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T");
	}
	String prefix = "\"";
	String suffix = "\"";
	//提交給ES的數字和布爾值不要加雙引號
	if("Float".equals(valueClass)
			|| "Double".equals(valueClass) 
			|| "Long".equals(valueClass) 
			|| "Integer".equals(valueClass)
			|| "Short".equals(valueClass)
			|| "Boolean".equals(valueClass)){
		prefix="";
		suffix="";
	}
	json.append("\"")
			.append(name)
			.append("\":")
			.append(prefix)
			.append(value)
			.append(suffix);
}
json.append("}\n");

 

構造完要提交的JSON數據以後,向服務器發送HTTP PUT請求:

 

HttpURLConnection conn = (HttpURLConnection) URL.openConnection();
conn.setRequestMethod("PUT");
conn.setDoOutput(true);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8"));    
writer.write(json.toString());
writer.flush();
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) {
	String line = reader.readLine();
	while(line != null){
		result.append(line);
		line = reader.readLine();
	}
}

 

服務器會以JSON數據格式返回處理結果,咱們使用Jackson解析返回的JSON字符串:

 

JsonNode node = MAPPER.readTree(resultStr);
for(JsonNode item : node.get("items")){
	JsonNode createJsonNode = item.get("create");
	JsonNode okJsonNode = createJsonNode.get("ok");
	if(okJsonNode != null){
		boolean r = okJsonNode.getBooleanValue();
		if(r){
			success++;
		}
	}else{
		JsonNode errorJsonNode = createJsonNode.get("error");
		if(errorJsonNode != null){
			String errorMessage = errorJsonNode.getTextValue();
			LOG.error("索引失敗:"+errorMessage);
		}
	}
}

 

下面是ElasticSearchLogHandler完整的實現:

 

/**
 * 
 * 日誌處理實現:
 * 將日誌保存到ElasticSearch中
 * 進行高性能實時搜索和分析
 * 支持大規模分佈式搜索
 * 
 * @author 楊尚川
 */
@Service
public class ElasticSearchLogHandler implements LogHandler{
    private static final APDPlatLogger LOG = new APDPlatLogger(ElasticSearchLogHandler.class);
    
    private static final String INDEX_NAME = PropertyHolder.getProperty("elasticsearch.log.index.name");
    private static final String HOST = PropertyHolder.getProperty("elasticsearch.host");
    private static final String PORT = PropertyHolder.getProperty("elasticsearch.port");
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static URL URL;
    
    private int success;
   
    public ElasticSearchLogHandler(){
        LOG.info("elasticsearch.log.index.name: "+INDEX_NAME);
        LOG.info("elasticsearch.host: "+HOST);
        LOG.info("elasticsearch.port: "+PORT);
        try {
            URL = new URL("http://"+HOST+":"+PORT+"/_bulk");
        } catch (MalformedURLException ex) {
            LOG.error("構造URL失敗",ex);
        }
    }
    
    /**
     * 批量索引
     * 批量提交
     * 
     * @param <T> 泛型參數
     * @param list 批量模型
     */
    public <T extends Model> void index(List<T> list){
        success = 0;
        StringBuilder json = new StringBuilder();
        int j = 1;
        //構造批量索引請求
        for(T model : list){
            try{
                String simpleName = model.getClass().getSimpleName();
                LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");
                json.append("{\"index\":{\"_index\":\"")
                        .append(INDEX_NAME)
                        .append("\",\"_type\":\"")
                        .append(simpleName)
                        .append("\"}}")
                        .append("\n");
                json.append("{");
                Field[] fields = model.getClass().getDeclaredFields();
                int len = fields.length;
                for(int i = 0; i < len; i++){
                    Field field = fields[i];
                    String name = field.getName();
                    field.setAccessible(true);
                    Object value = field.get(model);
                    //當心空指針異常,LogHandler線程會悄無聲息地退出!
                    if(value == null){
                        LOG.debug("忽略空字段:"+name);
                        continue;
                    }
                    if(i>0){
                        json.append(",");
                    }
                    String valueClass=value.getClass().getSimpleName();
                    LOG.debug("name: "+name+"   type: "+valueClass);
                    if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){
                        //提交給ES的日期時間值要爲"2014-01-31T13:53:54"這樣的形式
                        value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T");
                    }
                    String prefix = "\"";
                    String suffix = "\"";
                    //提交給ES的數字和布爾值不要加雙引號
                    if("Float".equals(valueClass)
                            || "Double".equals(valueClass) 
                            || "Long".equals(valueClass) 
                            || "Integer".equals(valueClass)
                            || "Short".equals(valueClass)
                            || "Boolean".equals(valueClass)){
                        prefix="";
                        suffix="";
                    }
                    json.append("\"")
                            .append(name)
                            .append("\":")
                            .append(prefix)
                            .append(value)
                            .append(suffix);
                }
                json.append("}\n");
            }catch(SecurityException | IllegalArgumentException | IllegalAccessException e){
                LOG.error("構造索引請求失敗【"+model.getMetaData()+"】\n"+model, e);
            }
        }
        //批量提交索引
        try{
            LOG.debug("提交JSON數據:\n"+json.toString());
            HttpURLConnection conn = (HttpURLConnection) URL.openConnection();
            conn.setRequestMethod("PUT");
            conn.setDoOutput(true);
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8"));    
            writer.write(json.toString());
            writer.flush();
            StringBuilder result = new StringBuilder();
            try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) {
                String line = reader.readLine();
                while(line != null){
                    result.append(line);
                    line = reader.readLine();
                }
            }
            String resultStr = result.toString();
            LOG.debug(resultStr);          
            //使用Jackson解析返回的JSON
            JsonNode node = MAPPER.readTree(resultStr);
            for(JsonNode item : node.get("items")){
                JsonNode createJsonNode = item.get("create");
                JsonNode okJsonNode = createJsonNode.get("ok");
                if(okJsonNode != null){
                    boolean r = okJsonNode.getBooleanValue();
                    if(r){
                        success++;
                    }
                }else{
                    JsonNode errorJsonNode = createJsonNode.get("error");
                    if(errorJsonNode != null){
                        String errorMessage = errorJsonNode.getTextValue();
                        LOG.error("索引失敗:"+errorMessage);
                    }
                }
            }
        }catch(IOException e){
            LOG.error("批量提交索引失敗", e);
        }
    }
    
    @Override
    public <T extends Model> void handle(List<T> list) {
        LOG.info("開始將 "+list.size()+" 個日誌對象索引到ElasticSearch服務器");
        long start = System.currentTimeMillis();
        index(list);
        long cost = System.currentTimeMillis() - start;
        
        if(success != list.size()){
            LOG.info("索引失敗: "+(list.size()-success)+" 個");            
        }
        if(success > 0){
            LOG.info("索引成功: "+success+" 個");
        }
        LOG.info("耗時:"+ConvertUtils.getTimeDes(cost));
    }
}

 

最後咱們在配置文件config.local.properties中指定log.handlers的值爲ElasticSearchLogHandler類的Spring bean name elasticSearchLogHandler,由於ElasticSearchLogHandler類加了Spring的@Service註解:

 

log.handlers=elasticSearchLogHandler

 

 

APDPlat託管在Github

相關文章
相關標籤/搜索