1.首先將es中全部的操做封裝成爲一個EsSearchManager,而且使用單例模式,提供一個實例外部調用。git
EsSearchManager esSearchManager = EsSearchManager.getInstance();github
getInstance()的返回值是這個類的實例,構造函數中也是對client的建立數據庫
public static EsSearchManager getInstance(){ if(null == esSearchManager ){ synchronized (EsSearchManager.class){ esSearchManager = new EsSearchManager(); } } return esSearchManager; } private EsSearchManager(){ getClient(); } private Client getClient() { try{ if(client==null){ init(); } }catch (Exception e){ LOG.error(e.getMessage()); } return client; } private void init() throws Exception { client = EsClient.getClient(); }
2.獲得了client就能夠建立索引,存入數據等一系列的索引數據操做了json
根據索引和類型建立索引。app
/** * 根據索引和類型建立索引 * @param indexName * @param type * @return * @throws Exception */ public Boolean buildIndex(String indexName,String type) throws Exception { IndicesExistsResponse response = getClient().admin().indices() .prepareExists(indexName,type).execute().actionGet(); Boolean flag = true; ResourceBundle rb = ResourceBundle.getBundle("commons"); String replicas = rb.getString("replicas"); String shards = rb.getString("shards"); String refreshInterval = rb.getString("refreshInterval"); if (!response.isExists()) { //須要將配置放置到配置文件中 Settings settings = Settings.settingsBuilder() .put("number_of_replicas", Integer.parseInt(replicas)) .put("number_of_shards", Integer.parseInt(shards)) .put("index.translog.flush_threshold_ops", 10000000) .put("refresh_interval", refreshInterval) .put("index.codec", "best_compression").build(); CreateIndexResponse createIndxeResponse = getClient().admin().indices() .prepareCreate(indexName).setSettings(settings).addMapping(type).execute() .actionGet(); flag = createIndxeResponse.isAcknowledged(); LOG.info("返回值" + flag); } return flag; }
/** * 建立單條索引 * @param indexName * @param type * @param json * @throws Exception */ public void buildDocument(String indexName, String type, String json) throws Exception { getClient().prepareIndex(indexName, type).setSource(json).execute() .actionGet(); } /** * 構造list集合索引數據 * @param indexName * @param type * @param list */ public void buildList2Documents(String indexName, String type, List<Map<String,Object>> list) throws Exception{ BulkRequestBuilder bulkRequest = getClient().prepareBulk(); for(Map<String,Object> map : list){ bulkRequest.add(getClient().prepareIndex(indexName, type) .setSource(this.generateJson(map))); } BulkResponse bulkIndexResponse = bulkRequest.execute().actionGet(); if (bulkIndexResponse.hasFailures()) { LOG.error(bulkIndexResponse.buildFailureMessage()); } }
如何判斷索引是否存在,刪除索引,下面就是判斷和刪除索引的方法函數
/** * 根據索引名稱判斷索引是否存在 * @param indexName * @return * @throws NumberFormatException * @throws UnknownHostException */ public Boolean existsIndex(String indexName) throws Exception { IndicesExistsResponse response = getClient().admin().indices() .prepareExists(indexName).execute().actionGet(); return response.isExists(); } /** * 根據索引名稱刪除索引 * @param indexName * @return * @throws NumberFormatException * @throws UnknownHostException */ public Boolean deleteIndex(String indexName) throws Exception { boolean flag = true; IndicesExistsResponse response = getClient().admin().indices() .prepareExists(indexName).execute().actionGet(); if (response.isExists()) { DeleteIndexResponse response2 = getClient().admin().indices() .prepareDelete(indexName).execute().actionGet(); flag = response2.isAcknowledged(); } return flag; }
根據docId刪除某一條索引記錄oop
** * 根據索引Id刪除文檔 * @param indexName * @param type * @param docId * @throws NumberFormatException * @throws UnknownHostException */ public void deleteDocument(String indexName, String type, String docId) throws Exception { getClient().prepareDelete(indexName, type, docId).execute().actionGet(); }
3.上面給出了一些索引的基本操做,下面來說講es中如何根據索引去查詢數據呢大數據
``` /** 下面這個方法就是構建全文檢索,根據關鍵詞全文檢索 * 全文檢索查詢,多條件類型查詢 */ public PageEntity<JSONObject> queryFulltext(List<String> keywords, List<String> indexs, List<String> types, List<String> fieldNames, List<String> allColumns, int pagenum, int pagesize) throws Exception { BoolQueryBuilder qb = buildFullText(keywords, types, fieldNames); if (qb == null) { LOG.info("queryFull Text == null"); return null; } LOG.info("Fulltext begin"); long begin = System.currentTimeMillis(); PageEntity<JSONObject> result = execute(qb, fieldNames,allColumns, indexs, types, pagenum, pagesize); long end = System.currentTimeMillis(); LOG.info("query Fulltext end cost:[{}]ms", end - begin); return result; } /** * 構造多字段 全文搜索查詢條件 * [@param]keywords * [@param types * [@param] fieldNames * [@return * @throws Exception */ private BoolQueryBuilder buildFullText(List<String> keywords, List<String> types, List<String> fieldNames) throws Exception { BoolQueryBuilder qb = null; if (keywords != null && keywords.size() > 0 && fieldNames != null && fieldNames.size() > 0 && types != null && types.size() > 0) { qb = QueryBuilders.boolQuery(); for (String keyword : keywords) { QueryBuilder mustCondition = QueryBuilders.multiMatchQuery( keyword, fieldNames.toArray(new String[0])); qb.must(mustCondition); } } return qb; }
上面的方法實現的全文檢索,只要某個字段或者某幾個字段實現了分詞,就能夠實現相似數據庫中的模糊匹配查詢,下面介紹下term查詢,只是針對某些特殊字段徹底匹配纔可以查詢到,這些字段每每都是不須要分詞的。
/** * term 查詢(在查詢的時候不分詞,主要針對 人名 地名等特殊的詞語) * [[@param](https://my.oschina.net/u/2303379)] keywords * [@param) types * [[@param](https://my.oschina.net/u/2303379)] fieldnames * [[@param](https://my.oschina.net/u/2303379)] pagenum * [[@param](https://my.oschina.net/u/2303379)] pagesize * @throws Exception */ public PageEntity<JSONObject> queryWithTerm(List<String> keywords, List<String> indexs, List<String> types, List<String> fieldnames, List<String> dateFieldnames, List<String> allColumns, Long startTime, Long endTime, int pagenum, int pagesize) throws Exception { BoolQueryBuilder qb = buildTermQuery(keywords, types, fieldnames, dateFieldnames, startTime, endTime); if (qb == null) { LOG.info("queryTerm() QueryBuilder == null"); return new PageEntity<JSONObject>(); } LOG.info("query begin"); long begin = System.currentTimeMillis(); PageEntity<JSONObject> result = execute(qb, fieldnames, allColumns, indexs, types, pagenum, pagesize); long end = System.currentTimeMillis(); LOG.info("query end cost:[{}]ms", end - begin); return result; } /** * es query 實現es分頁查詢 * @param qb * @param fieldnames * @param allColumns * @param indexs * @param types * @param pagenum * @param pagesize * @return * @throws Exception */ private PageEntity<JSONObject> execute(QueryBuilder qb, List<String> fieldnames, List<String> allColumns, List<String> indexs, List<String> types, int pagenum, int pagesize) throws Exception { PageEntity<JSONObject> page = new PageEntity<JSONObject>(); String[] typeArry = types.toArray(new String[0]); String[] indexArry = indexs.toArray(new String[0]); int startnum = (pagenum - 1) * pagesize; SearchResponse response = null; try { response = getClient().prepareSearch(indexArry).setTypes(typeArry) .setQuery(qb).setFrom(startnum).setSize(pagesize) .execute().actionGet(); } catch (Exception e) { LOG.error("query error", e); throw e; } if (response == null) { return page; } SearchHits hits = response.getHits(); LOG.info("execute hit:" + hits.totalHits()); List<JSONObject> resultString = new ArrayList<JSONObject>(); if (null != hits && hits.totalHits() > 0) { for (SearchHit hit : hits) { JSONObject obj = new JSONObject(); if (allColumns != null && allColumns.size() > 0) { fieldnames = allColumns; } for (String str : fieldnames) { obj.put(str, hit.getSource().get(str)); } resultString.add(obj); } } page.setContents(resultString); page.setPageSize(pagesize); page.setCurrentPageNo(pagenum); page.setTotalCount(hits.totalHits()); return page; }
4.下面的例子代碼演示如何調用上述代碼操做es的ui
建立testIndex中的testType類型的索引this
EsSearchManager esSearchManager = EsSearchManager.getInstance();
esSearchManager.buildIndex("testIndex","testType");
//索引數據
List<Map<String,Object>> list = new ArrayList<>();
Map<String,Object> map = new HashMap<>();
map.put("fieldA",100);
map.put("fieldB",22);
map.put("fieldC","hoge");
map.put("fieldD","huga");
list.add(map);
esSearchManager.buildList2Documents("testindex","testtypes",list);
//傳入某個字段進行不分詞精確查找
List<String> keywords = new ArrayList<>();
keywords.add("huga");
List<String> types = new ArrayList<>();
types.add("testtypes");
List<String> indexs = new ArrayList<>();
indexs.add("testindex");
List<String> fieldNames = new ArrayList<>();
fieldNames.add("fieldD");
PageEntity<JSONObject> pg = esSearchManager.queryWithTerm(keywords,indexs,
types,fieldNames,null,null,null,null,1,10);
//傳入某個字段進行全文檢索
List<String> keywords = new ArrayList<>();
keywords.add("huga");
List<String> types = new ArrayList<>();
types.add("testtypes");
List<String> indexs = new ArrayList<>();
indexs.add("testindex");
List<String> fieldNames = new ArrayList<>();
fieldNames.add("fieldD");
PageEntity<JSONObject> pg1 = esSearchManager.queryFulltext(keywords,
indexs, types, fieldNames,
null, 1,10);
以上總結了部分es基本操做API和調用demo,詳細的代碼請查看github地址 https://github.com/winstonelei/BigDataTools ,包括了一些大數據組件的基本操做,包含了hbase,hadoop,es,hive等