Elasticsearch高級API-通用方法封裝

補充點

補充集羣安裝

在主節點elasticsearch.yml中配置html

cluster.name: ES_books
 node.name: master
 node.master: true
複製代碼

子節點中配置java

cluster.name: ES_books                                 集羣名稱, 和主節點一致
 node.name: slave1
 discovery.zen.ping.unicast.hosts: ["sanq1.com.cn"]     找到主節點
複製代碼

重啓ES便可node

補充安裝IK插件

安裝IK分詞器插件的版本須要和ES的版本一致,而且將IK插件放到文件夾pluginsgit

wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.8.2/elasticsearch-analysis-ik-6.8.2.zip
複製代碼

完整封裝代碼查看

X_Util項目下Config模塊github

正式開始

在ES6.8.2 中ES官方推薦使用高級API來操做ES, 咱們來具體看下shell

聲明:json

配置基於xml的方式app

連接ES

這裏採用HttpHost方式來進行連接, 爲了方便集羣配置, 咱們作以下的操做elasticsearch

  1. 聲明實體, 配置地址和端口
public class HostAndPort {
    private String host;
    private int port;       //這裏的端口是http訪問端口 默認9200

    public HostAndPort(String host, int port) {
        this.host = host;
        this.port = port;
    }
    
    //setting, getting
}
複製代碼
  1. 正式配置工具
public class EsCluster implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {

    //ES地址
    private Set<HostAndPort> sets;
    
    private RestHighLevelClient client;

    public EsCluster() {
    }

    public EsCluster(Set<HostAndPort> sets) {
        this.sets = sets;
    }

    public Set<HostAndPort> getSets() {
        return sets;
    }

    public void setSets(Set<HostAndPort> sets) {
        this.sets = sets;
    }


    @Override
    public void destroy() throws Exception {
        if (this.client != null)
            client.close();
    }

    @Override
    public RestHighLevelClient getObject() throws Exception {
        return this.client;
    }

    @Override
    public Class<?> getObjectType() {
        return EsCluster.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        initClient();
    }

    // 這裏是實際的連接過程
    private void initClient() {
        List<HttpHost> httpHostList = new ArrayList<>(sets.size());

        sets.stream().forEach(s -> {
            httpHostList.add(new HttpHost(s.getHost(), s.getPort(), "http"));
        });
        client = new RestHighLevelClient(RestClient.builder(httpHostList.toArray(new HttpHost[httpHostList.size()])));
    }
}
複製代碼
  1. 連接樣例:
<bean id="restClient" class="com.sanq.product.utils.es.factory.EsCluster" destroy-method="destroy">
    <constructor-arg index="0">
        <set>
            <bean class="com.sanq.product.utils.es.config.HostAndPort">
                <constructor-arg index="0" value="192.168.87.134"/>
                <constructor-arg index="1" value="9200" type="int"/>
            </bean>
        </set>
    </constructor-arg>
</bean>
複製代碼

方法說明

獲取客戶端ide

@Resource
private RestHighLevelClient restClient;
複製代碼

驗證索引是否建立

@Override
public boolean check(String index) throws Exception {
    GetIndexRequest getIndexRequest = new GetIndexRequest(index);
    return restClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
}
複製代碼

建立索引

/** * 建立索引 * * @param index 索引名 * @param mapping 文檔信息 * @return */
@Override
public boolean createIndex(String index, String mapping) throws Exception {
    CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
            .settings(Settings.builder()
                    .put("index.number_of_shards", 3)       //分片數
                    .put("index.number_of_replicas", 2))    //備份數
            .mapping(mapping, XContentType.JSON);

    CreateIndexResponse createIndexResponse = restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    return createIndexResponse.isAcknowledged() && createIndexResponse.isShardsAcknowledged();
}
複製代碼

經過ID獲取詳情 關於泛型請查看完整代碼

@Override
public T findById(String index, String type, String id) throws Exception {
    GetRequest getRequest = new GetRequest(index, type, id);
    GetResponse getResponse = restClient.get(getRequest, RequestOptions.DEFAULT);
    return JsonUtil.json2Obj(getResponse.getSourceAsString(), getGenericClass());
}
複製代碼

保存數據到ES

//entity中必須包含id字段, 給ES指定ID
@Override
public String save(String index, String type, T entity) throws Exception {

    Map<String, Object> map = bean2Map(entity); //將entity轉換成Map

    IndexRequest indexRequest = new IndexRequest(index, type, map.get("id").toString()).source(map).opType(DocWriteRequest.OpType.CREATE);
    IndexResponse indexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT);

    return indexResponse.getId();
}
複製代碼

批量保存

@Override
public boolean saveList(String index, String type, List<T> entityList) throws Exception {
    BulkRequest bulkRequest = new BulkRequest();

    entityList.stream().forEach(entity -> {
        try {
            Map<String, Object> map = bean2Map(entity);

            String id = String.valueOf(map.get("id"));

            bulkRequest.add(new IndexRequest(index, type, id).source(JsonUtil.obj2Json(map), XContentType.JSON));
        } catch (Exception e) {
            e.printStackTrace();
        }
    });

    BulkResponse bulk = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    return bulk.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼

修改數據

@Override
public boolean update(String index, String type, T entity) throws Exception {
    Map<String, Object> map = bean2Map(entity);

    UpdateRequest request = new UpdateRequest(index, type, map.get("id").toString());
    request.doc(map);

    UpdateResponse update = restClient.update(request, RequestOptions.DEFAULT);
    return update.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼

刪除

@Override
public boolean delete(String index, String type, String id) throws Exception {
    DeleteRequest deleteRequest = new DeleteRequest(index, type, id);

    DeleteResponse delete = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
    return delete.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼

批量刪除

@Override
public boolean deleteList(String index, String type, List<String> ids) throws Exception {
    BulkRequest bulkRequest = new BulkRequest();

    ids.stream().forEach(id -> {
        bulkRequest.add(new DeleteRequest(index, type, id));
    });

    BulkResponse bulk = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    return bulk.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼

刪除索引

@Override
public boolean deleteIndex(String index) throws Exception {
    AcknowledgedResponse delete = restClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);

    return delete.isAcknowledged();
}
複製代碼

查詢數據

@Override
public SearchPager<T> findListByPager(String index, String type, T entity, SearchPagination pagination //默認展現條數, scrollId ) throws Exception {
    if (!StringUtil.isEmpty(pagination.getScrollId())) {
        return getScrollPager(pagination);
    }

    Map<String, Object> map = bean2Map(entity);

    SearchRequest searchRequest = new SearchRequest(index).types(type);

    //構造搜索條件
    SearchSourceBuilder sourceBuilder = getSearchRequest(map);

    //查詢條數
    sourceBuilder.size(pagination.getPageSize());
    //排序 能夠提取出來指定排序
    sourceBuilder.sort("id", SortOrder.ASC);

    searchRequest.source(sourceBuilder);
    
    //這裏是指定scrollId的過時時間
    searchRequest.scroll(TimeValue.timeValueMillis(5L));

    SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
    pagination.setScrollId(searchResponse.getScrollId());

    List<T> data = getListData(searchResponse.getHits());

    return new SearchPager<T>(pagination, data);
}

/**將查詢出來的數據 封裝成List*/
private List<T> getListData(SearchHits hits) {
    List<T> data = new ArrayList<>();
    for (SearchHit hit : hits.getHits()) {
        if (getGenericClass() == null) data.add((T) hit.getSourceAsMap());
        else data.add(JsonUtil.json2Obj(hit.getSourceAsString(), getGenericClass()));
    }
    return data;
}

/**經過scrollId獲取分頁數據*/
private SearchPager<T> getScrollPager(SearchPagination pagination) throws Exception {
    
    SearchScrollRequest scrollRequest = new SearchScrollRequest(pagination.getScrollId());
    scrollRequest.scroll(TimeValue.timeValueMillis(5L));
    SearchResponse searchScrollResponse = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
        
    // 生成了新的scrollId, 這裏將舊的scrollId進行刪除
    clearScrollId(pagination.getScrollId());

    //設置新的scrollId
    pagination.setScrollId(searchScrollResponse.getScrollId());

    List<T> data = getListData(searchScrollResponse.getHits());

    return new SearchPager<T>(pagination, data);
}

/** * 清除以前的scrollId * @param scrollId */
private void clearScrollId(String scrollId) throws Exception {
    ClearScrollRequest request = new ClearScrollRequest();
    request.addScrollId(scrollId);
    restClient.clearScroll(request, RequestOptions.DEFAULT);
}

private SearchSourceBuilder getSearchRequest(Map<String, Object> map) {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

    BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

    map.entrySet().stream().forEach(entry -> {
        Object value = entry.getValue();
        if (value instanceof Integer ||
                value instanceof Long ||
                value instanceof Float ||
                value instanceof Double ||
                value instanceof Boolean
        ) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(entry.getKey(), value));
        } else
            boolQueryBuilder.must().add(QueryBuilders.matchQuery(entry.getKey(), value));
    });

    sourceBuilder.query(boolQueryBuilder);

    return sourceBuilder;
}
複製代碼

這裏的分頁方式只適用於上一頁下一頁, 不適合跳頁。

聚合查詢

關於聚合查詢 尚未想到怎樣作成通用的方式來使用,由於聚合查詢能夠嵌套的一層又一層 這裏給出一個例子。

  1. 經過IP去重查詢
{
    "size": 0,
    "aggs": {
       "ip_card": {
    		"cardinality": {
    			"field": "ip"
    		}
    	}
    }
}
複製代碼
//實現
CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders.cardinality("ip_card").field("ip");

SearchResponse searchResponse = getSearchResponse(index, cardinalityAggregationBuilder);
if (searchResponse.status().getStatus() == RestStatus.OK.getStatus()) {
    StringUtil.toInteger(searchResponse.getHits().getTotalHits());  //總數
    StringUtil.toInteger(((ParsedCardinality) searchResponse.getAggregations().get("ip_card")).getValue());    //去重數
}

/**查詢條件*/
private SearchResponse getSearchResponse(String index, AggregationBuilder aggregationBuilder) throws Exception {
    SearchRequest searchRequest = new SearchRequest(index).types(EsIndexs.Indexs._DOC);
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().aggregation(aggregationBuilder);
    searchSourceBuilder.size(0);

    searchRequest.source(searchSourceBuilder);

    return super.getClient().search(searchRequest, RequestOptions.DEFAULT);
}
複製代碼
  1. 根據省份,城市IP去重
{
	"size": 0,
	"aggs": {
		"prov_terms": {
			"terms": {
				"field": "provName"
			},
			"aggs": {
				"city_terms":{
					"terms": {
						"field": "cityName"
					},
					"aggs": {
						"ip_card": {
							"cardinality": {
								"field": "ip"
							}
						}
					}
				}
			}
		}
	}
}
複製代碼
//實現
@Override
public List<AreaReportVo> getAreaReport(String index) {
    String ipCard = "ip_card",
            provTerms = "prov_terms",
            cityTerms = "city_terms";
    try {
        if (!super.check(index)) {
            this.createIndex();
        }

        TermsAggregationBuilder termsAggregationBuilder =
                AggregationBuilders.terms(provTerms).field("provName")
                        .subAggregation(
                                AggregationBuilders.terms(cityTerms).field("cityName")
                                        .subAggregation(
                                                AggregationBuilders.cardinality(ipCard).field("ip")
                                        )
                        );

        //和上面是同樣的
        SearchResponse searchResponse = getSearchResponse(index, termsAggregationBuilder);

        List<AreaReportVo> areaReportVoList = null;
        if (searchResponse.status().getStatus() == RestStatus.OK.getStatus()) {
            List<? extends Terms.Bucket> buckets = ((ParsedStringTerms) searchResponse.getAggregations().get(provTerms)).getBuckets(); 

            AreaReportVo areaReportVo;
            for (int i = 0, size = buckets.size(); i < size; i++) {
                String provName = buckets.get(i).getKeyAsString();  //省份

                List<? extends Terms.Bucket> cityBuckets = ((ParsedStringTerms) buckets.get(i).getAggregations().get(cityTerms)).getBuckets();
                for (int j = 0, j_size = cityBuckets.size(); j < j_size; j++) {
                    areaReportVo = new AreaReportVo();

                    areaReportVo.setProvName(provName);
                    areaReportVo.setCityName(cityBuckets.get(j).getKeyAsString());          //城市
                    areaReportVo.setViews(StringUtil.toInteger(buckets.get(j).getDocCount()));
                    areaReportVo.setIpViews(StringUtil.toInteger(((ParsedCardinality) cityBuckets.get(j).getAggregations().get(ipCard)).getValue()));

                    areaReportVoList.add(areaReportVo);
                }
            }
        }
        return areaReportVoList;

    } catch (Exception e) {
        e.printStackTrace();
    }
    return Collections.emptyList();
}
複製代碼

到這裏就算是完成了通用方法的封裝, 你們能夠親自體驗下。

文檔

  1. 官方文檔
  2. 封裝完整代碼, 該項目持續更新
相關文章
相關標籤/搜索