APDPlat充分利用Compass的OSEM和ORM integration特性,提供了簡單易用且功能強大的內置搜索特性。java
APDPlat的內置搜索,在設計簡潔優雅的同時,還具有了強大的實時搜索能力,用戶只需用註解的方式在模型中指定須要搜索哪些字段(還可在模型之間進行關聯搜索)就得到了搜索能力,而不用編寫任何代碼。平臺自動處理索引維護、查詢解析、結果高亮等支撐功能。git
然而APDPlat的內置搜索只能在單機上面使用,不支持分佈式,只能用於中小規模的場景。爲了支持大規模的分佈式搜索和實時分析,APDPlat除了能夠選擇Compass的進化版ElasticSearch外(APDPlat拓展搜索之集成ElasticSearch),還能夠有另一個選擇,那就是Solr。github
Solr提供了Java Client API(SolrJ),咱們可使用SolrJ來和Solr服務器進行交互。首先咱們在pom.xml中引入SolrJ依賴:數據庫
<dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> <version>${solrj.version}</version> </dependency>
接下來咱們看一個APDPlat和Solr集成的例子:apache
APDPlat提供了可擴展的日誌處理接口,用戶可編寫本身的插件並在配置文件中指定啓用哪些插件,日誌處理接口以下:服務器
/** * 日誌處理接口: * 可將日誌存入獨立日誌數據庫(非業務數據庫) * 可將日誌傳遞到activemq\rabbitmq\zeromq等消息隊列 * 可將日誌傳遞到kafka\flume\chukwa\scribe等日誌聚合系統 * 可將日誌傳遞到elasticsearch\solr等搜索服務器 * @author 楊尚川 */ public interface LogHandler { public <T extends Model> void handle(List<T> list); }
要想讓Solr搜索服務器索引日誌數據,咱們首先要構造一個HttpSolrServer的實例,而後用待索引的日誌對象列表構造一個SolrInputDocument列表,其次就能夠調用HttpSolrServer的add和commit方法把SolrInputDocument列表提交給Solr服務器創建索引,最後解析返回結果,判斷操做是否成功。less
構造HttpSolrServer實例須要指定幾個配置信息,這些配置信息默認存放在config.properties中,能夠在config.local.properties中對其進行覆蓋,以下所示:dom
#Solr服務器配置 solr.host=192.168.0.100 solr.port=8983 solr.core=apdplat_for_log solr.max.retries=1 solr.connection.timeout=5 solr.allow.compression =9200 solr.socket.read.timeout=3000 solr.max.connections.per.host=100 solr.max.total.connections=300
當咱們在配置Solr服務器的時候,要把core如這裏的apdplat_for_log配置爲schemaless,不然須要一一指定待索引的日誌對象的字段就太麻煩了,由於咱們把apdplat_for_log這個core配置爲schemaless,因此咱們提交的各類各樣未知類型的對象均可以索引到同一個core中。咱們在創建索引的時候加一個type字段,其值爲對象的類名稱,這樣搜索的時候就能夠區分不一樣的對象。socket
咱們看看如何構造HttpSolrServer:elasticsearch
private static final String host = PropertyHolder.getProperty("solr.host"); private static final String port = PropertyHolder.getProperty("solr.port"); private static final String core = PropertyHolder.getProperty("solr.core"); private static final int maxRetries = PropertyHolder.getIntProperty("solr.max.retries"); private static final int connectionTimeout = PropertyHolder.getIntProperty("solr.connection.timeout"); private static final boolean allowCompression = PropertyHolder.getBooleanProperty("solr.allow.compression"); private static final int socketReadTimeout = PropertyHolder.getIntProperty("solr.socket.read.timeout"); private static final int maxConnectionsPerHost = PropertyHolder.getIntProperty("solr.max.connections.per.host"); private static final int maxTotalConnections = PropertyHolder.getIntProperty("solr.max.total.connections"); private static SolrServer solrServer; public SolrLogHandler(){ LOG.info("solr.host: "+host); LOG.info("solr.port: "+port); LOG.info("solr.core: "+core); LOG.info("solr.max.retries: "+maxRetries); LOG.info("solr.connection.timeout: "+connectionTimeout); LOG.info("solr.allow.compression: "+allowCompression); LOG.info("solr.socket.read.timeout: "+socketReadTimeout); LOG.info("solr.max.connections.per.host: "+maxConnectionsPerHost); LOG.info("solr.max.total.connections: "+maxTotalConnections); String url = "http://"+host+":"+port+"/solr/"+core+"/"; LOG.info("初始化Solr服務器鏈接:"+url); HttpSolrServer httpSolrServer = new HttpSolrServer(url); httpSolrServer.setMaxRetries(maxRetries); httpSolrServer.setConnectionTimeout(connectionTimeout); httpSolrServer.setAllowCompression(allowCompression); httpSolrServer.setSoTimeout(socketReadTimeout); httpSolrServer.setDefaultMaxConnectionsPerHost(maxConnectionsPerHost); httpSolrServer.setMaxTotalConnections(maxTotalConnections); solrServer = httpSolrServer; }
值得注意的是這裏的url:
String url = "http://"+host+":"+port+"/solr/"+core+"/";
接下來要把日誌對象列表轉換爲SolrInputDocument列表:
public <T extends Model> List<SolrInputDocument> getSolrInputDocuments(List<T> list){ int j = 1; //構造批量索引請求 List<SolrInputDocument> docs = new ArrayList<>(list.size()); LOG.info("開始構造Solr文檔"); for(T model : list){ try{ String simpleName = model.getClass().getSimpleName(); LOG.debug((j++)+"、simpleName: 【"+simpleName+"】"); SolrInputDocument doc = new SolrInputDocument(); Field[] fields = model.getClass().getDeclaredFields(); int len = fields.length; for(int i = 0; i < len; i++){ Field field = fields[i]; String name = field.getName(); field.setAccessible(true); Object value = field.get(model); //當心空指針異常,LogHandler線程會悄無聲息地推出! if(value == null){ LOG.debug("忽略空字段:"+name); continue; } LOG.debug("name: "+name+" value: "+value); doc.addField(name, value); } //日誌類型(類名稱) doc.addField("type", simpleName); //增長主鍵 UUID uuid = UUID.randomUUID(); doc.addField("id", uuid.toString()); docs.add(doc); }catch(IllegalAccessException | IllegalArgumentException | SecurityException e){ LOG.error("構造索引請求失敗【"+model.getMetaData()+"】\n"+model, e); } } LOG.info("Solr文檔構造完畢"); return docs; }
這裏,咱們用UUID生成了一個隨機主鍵,增長了一個type字段,其值爲類名稱,使用反射的方式取得日誌對象的字段名稱和字段值。
文檔列表準備完畢以後,就能夠提交索引請求了:
solrServer.add(docs); UpdateResponse updateResponse = solrServer.commit();
而後處理返回結果,判斷索引操做是否成功:
int status = updateResponse.getStatus(); if(status==0){ LOG.info("成功爲Core: "+core+" 提交 "+docs.size()+" 個文檔"); }else{ LOG.info("索引提交失敗,status:"+status); }
下面是SolrLogHandler完整的實現:
/** * * 日誌處理插件: * 將日誌保存到Solr中 * 進行高性能實時搜索和分析 * 支持大規模分佈式搜索 * * @author 楊尚川 */ @Service public class SolrLogHandler implements LogHandler{ private static final APDPlatLogger LOG = new APDPlatLogger(SolrLogHandler.class); private static final String host = PropertyHolder.getProperty("solr.host"); private static final String port = PropertyHolder.getProperty("solr.port"); private static final String core = PropertyHolder.getProperty("solr.core"); private static final int maxRetries = PropertyHolder.getIntProperty("solr.max.retries"); private static final int connectionTimeout = PropertyHolder.getIntProperty("solr.connection.timeout"); private static final boolean allowCompression = PropertyHolder.getBooleanProperty("solr.allow.compression"); private static final int socketReadTimeout = PropertyHolder.getIntProperty("solr.socket.read.timeout"); private static final int maxConnectionsPerHost = PropertyHolder.getIntProperty("solr.max.connections.per.host"); private static final int maxTotalConnections = PropertyHolder.getIntProperty("solr.max.total.connections"); private static SolrServer solrServer; public SolrLogHandler(){ LOG.info("solr.host: "+host); LOG.info("solr.port: "+port); LOG.info("solr.core: "+core); LOG.info("solr.max.retries: "+maxRetries); LOG.info("solr.connection.timeout: "+connectionTimeout); LOG.info("solr.allow.compression: "+allowCompression); LOG.info("solr.socket.read.timeout: "+socketReadTimeout); LOG.info("solr.max.connections.per.host: "+maxConnectionsPerHost); LOG.info("solr.max.total.connections: "+maxTotalConnections); String url = "http://"+host+":"+port+"/solr/"+core+"/"; LOG.info("初始化Solr服務器鏈接:"+url); HttpSolrServer httpSolrServer = new HttpSolrServer(url); httpSolrServer.setMaxRetries(maxRetries); httpSolrServer.setConnectionTimeout(connectionTimeout); httpSolrServer.setAllowCompression(allowCompression); httpSolrServer.setSoTimeout(socketReadTimeout); httpSolrServer.setDefaultMaxConnectionsPerHost(maxConnectionsPerHost); httpSolrServer.setMaxTotalConnections(maxTotalConnections); solrServer = httpSolrServer; } @Override public <T extends Model> void handle(List<T> list) { LOG.info("開始將 "+list.size()+" 個日誌對象索引到Solr服務器"); long start = System.currentTimeMillis(); index(list); long cost = System.currentTimeMillis() - start; LOG.info("耗時:"+ConvertUtils.getTimeDes(cost)); } /** * 批量索引 * 批量提交 * * @param <T> 泛型參數 * @param list 批量模型 */ public <T extends Model> void index(List<T> list){ List<SolrInputDocument> docs = getSolrInputDocuments(list); //批量提交索引 try{ LOG.info("開始批量提交索引文檔"); solrServer.add(docs); UpdateResponse updateResponse = solrServer.commit(); int status = updateResponse.getStatus(); if(status==0){ LOG.info("成功爲Core: "+core+" 提交 "+docs.size()+" 個文檔"); }else{ LOG.info("索引提交失敗,status:"+status); } LOG.info("ResponseHeader:\n"+updateResponse.getResponseHeader().toString()); LOG.info("Response:\n"+updateResponse.getResponse().toString()); //加速內存釋放 docs.clear(); }catch(IOException | SolrServerException e){ LOG.error("批量提交索引失敗", e); } } /** * 把對象列表轉換爲SOLR文檔列表 * @param <T> 對象類型 * @param list 對象列表 * @return SOLR文檔列表 */ public <T extends Model> List<SolrInputDocument> getSolrInputDocuments(List<T> list){ int j = 1; //構造批量索引請求 List<SolrInputDocument> docs = new ArrayList<>(list.size()); LOG.info("開始構造Solr文檔"); for(T model : list){ try{ String simpleName = model.getClass().getSimpleName(); LOG.debug((j++)+"、simpleName: 【"+simpleName+"】"); SolrInputDocument doc = new SolrInputDocument(); Field[] fields = model.getClass().getDeclaredFields(); int len = fields.length; for(int i = 0; i < len; i++){ Field field = fields[i]; String name = field.getName(); field.setAccessible(true); Object value = field.get(model); //當心空指針異常,LogHandler線程會悄無聲息地推出! if(value == null){ LOG.debug("忽略空字段:"+name); continue; } LOG.debug("name: "+name+" value: "+value); doc.addField(name, value); } //日誌類型(類名稱) doc.addField("type", simpleName); //增長主鍵 UUID uuid = UUID.randomUUID(); doc.addField("id", uuid.toString()); docs.add(doc); }catch(IllegalAccessException | IllegalArgumentException | SecurityException e){ LOG.error("構造索引請求失敗【"+model.getMetaData()+"】\n"+model, e); } } LOG.info("Solr文檔構造完畢"); return docs; } }
最後咱們在配置文件config.local.properties中指定log.handlers的值爲SolrLogHandler類的Spring bean name solrSearchLogHandler,由於SolrLogHandler類加了Spring的@Service註解:
log.handlers=solrLogHandler