在主節點elasticsearch.yml
中配置html
cluster.name: ES_books
node.name: master
node.master: true
複製代碼
子節點中配置java
cluster.name: ES_books 集羣名稱, 和主節點一致
node.name: slave1
discovery.zen.ping.unicast.hosts: ["sanq1.com.cn"] 找到主節點
複製代碼
重啓ES便可node
安裝IK分詞器插件的版本須要和ES的版本一致,而且將IK插件放到文件夾plugins
中git
wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.8.2/elasticsearch-analysis-ik-6.8.2.zip
複製代碼
X_Util項目下Config模塊github
在ES6.8.2 中ES官方推薦使用高級API來操做ES, 咱們來具體看下shell
聲明:json
配置基於
xml
的方式app
這裏採用HttpHost方式來進行連接, 爲了方便集羣配置, 咱們作以下的操做elasticsearch
public class HostAndPort {
private String host;
private int port; //這裏的端口是http訪問端口 默認9200
public HostAndPort(String host, int port) {
this.host = host;
this.port = port;
}
//setting, getting
}
複製代碼
public class EsCluster implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {
//ES地址
private Set<HostAndPort> sets;
private RestHighLevelClient client;
public EsCluster() {
}
public EsCluster(Set<HostAndPort> sets) {
this.sets = sets;
}
public Set<HostAndPort> getSets() {
return sets;
}
public void setSets(Set<HostAndPort> sets) {
this.sets = sets;
}
@Override
public void destroy() throws Exception {
if (this.client != null)
client.close();
}
@Override
public RestHighLevelClient getObject() throws Exception {
return this.client;
}
@Override
public Class<?> getObjectType() {
return EsCluster.class;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void afterPropertiesSet() throws Exception {
initClient();
}
// 這裏是實際的連接過程
private void initClient() {
List<HttpHost> httpHostList = new ArrayList<>(sets.size());
sets.stream().forEach(s -> {
httpHostList.add(new HttpHost(s.getHost(), s.getPort(), "http"));
});
client = new RestHighLevelClient(RestClient.builder(httpHostList.toArray(new HttpHost[httpHostList.size()])));
}
}
複製代碼
<bean id="restClient" class="com.sanq.product.utils.es.factory.EsCluster" destroy-method="destroy">
<constructor-arg index="0">
<set>
<bean class="com.sanq.product.utils.es.config.HostAndPort">
<constructor-arg index="0" value="192.168.87.134"/>
<constructor-arg index="1" value="9200" type="int"/>
</bean>
</set>
</constructor-arg>
</bean>
複製代碼
獲取客戶端ide
@Resource
private RestHighLevelClient restClient;
複製代碼
驗證索引是否建立
@Override
public boolean check(String index) throws Exception {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
return restClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
}
複製代碼
建立索引
/** * 建立索引 * * @param index 索引名 * @param mapping 文檔信息 * @return */
@Override
public boolean createIndex(String index, String mapping) throws Exception {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
.settings(Settings.builder()
.put("index.number_of_shards", 3) //分片數
.put("index.number_of_replicas", 2)) //備份數
.mapping(mapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged() && createIndexResponse.isShardsAcknowledged();
}
複製代碼
經過ID獲取詳情 關於泛型請查看完整代碼
@Override
public T findById(String index, String type, String id) throws Exception {
GetRequest getRequest = new GetRequest(index, type, id);
GetResponse getResponse = restClient.get(getRequest, RequestOptions.DEFAULT);
return JsonUtil.json2Obj(getResponse.getSourceAsString(), getGenericClass());
}
複製代碼
保存數據到ES
//entity中必須包含id字段, 給ES指定ID
@Override
public String save(String index, String type, T entity) throws Exception {
Map<String, Object> map = bean2Map(entity); //將entity轉換成Map
IndexRequest indexRequest = new IndexRequest(index, type, map.get("id").toString()).source(map).opType(DocWriteRequest.OpType.CREATE);
IndexResponse indexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT);
return indexResponse.getId();
}
複製代碼
批量保存
@Override
public boolean saveList(String index, String type, List<T> entityList) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
entityList.stream().forEach(entity -> {
try {
Map<String, Object> map = bean2Map(entity);
String id = String.valueOf(map.get("id"));
bulkRequest.add(new IndexRequest(index, type, id).source(JsonUtil.obj2Json(map), XContentType.JSON));
} catch (Exception e) {
e.printStackTrace();
}
});
BulkResponse bulk = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulk.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼
修改數據
@Override
public boolean update(String index, String type, T entity) throws Exception {
Map<String, Object> map = bean2Map(entity);
UpdateRequest request = new UpdateRequest(index, type, map.get("id").toString());
request.doc(map);
UpdateResponse update = restClient.update(request, RequestOptions.DEFAULT);
return update.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼
刪除
@Override
public boolean delete(String index, String type, String id) throws Exception {
DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
DeleteResponse delete = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
return delete.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼
批量刪除
@Override
public boolean deleteList(String index, String type, List<String> ids) throws Exception {
BulkRequest bulkRequest = new BulkRequest();
ids.stream().forEach(id -> {
bulkRequest.add(new DeleteRequest(index, type, id));
});
BulkResponse bulk = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulk.status().getStatus() == RestStatus.OK.getStatus();
}
複製代碼
刪除索引
@Override
public boolean deleteIndex(String index) throws Exception {
AcknowledgedResponse delete = restClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
return delete.isAcknowledged();
}
複製代碼
查詢數據
@Override
public SearchPager<T> findListByPager(String index, String type, T entity, SearchPagination pagination //默認展現條數, scrollId ) throws Exception {
if (!StringUtil.isEmpty(pagination.getScrollId())) {
return getScrollPager(pagination);
}
Map<String, Object> map = bean2Map(entity);
SearchRequest searchRequest = new SearchRequest(index).types(type);
//構造搜索條件
SearchSourceBuilder sourceBuilder = getSearchRequest(map);
//查詢條數
sourceBuilder.size(pagination.getPageSize());
//排序 能夠提取出來指定排序
sourceBuilder.sort("id", SortOrder.ASC);
searchRequest.source(sourceBuilder);
//這裏是指定scrollId的過時時間
searchRequest.scroll(TimeValue.timeValueMillis(5L));
SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
pagination.setScrollId(searchResponse.getScrollId());
List<T> data = getListData(searchResponse.getHits());
return new SearchPager<T>(pagination, data);
}
/**將查詢出來的數據 封裝成List*/
private List<T> getListData(SearchHits hits) {
List<T> data = new ArrayList<>();
for (SearchHit hit : hits.getHits()) {
if (getGenericClass() == null) data.add((T) hit.getSourceAsMap());
else data.add(JsonUtil.json2Obj(hit.getSourceAsString(), getGenericClass()));
}
return data;
}
/**經過scrollId獲取分頁數據*/
private SearchPager<T> getScrollPager(SearchPagination pagination) throws Exception {
SearchScrollRequest scrollRequest = new SearchScrollRequest(pagination.getScrollId());
scrollRequest.scroll(TimeValue.timeValueMillis(5L));
SearchResponse searchScrollResponse = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
// 生成了新的scrollId, 這裏將舊的scrollId進行刪除
clearScrollId(pagination.getScrollId());
//設置新的scrollId
pagination.setScrollId(searchScrollResponse.getScrollId());
List<T> data = getListData(searchScrollResponse.getHits());
return new SearchPager<T>(pagination, data);
}
/** * 清除以前的scrollId * @param scrollId */
private void clearScrollId(String scrollId) throws Exception {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
restClient.clearScroll(request, RequestOptions.DEFAULT);
}
private SearchSourceBuilder getSearchRequest(Map<String, Object> map) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
map.entrySet().stream().forEach(entry -> {
Object value = entry.getValue();
if (value instanceof Integer ||
value instanceof Long ||
value instanceof Float ||
value instanceof Double ||
value instanceof Boolean
) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(entry.getKey(), value));
} else
boolQueryBuilder.must().add(QueryBuilders.matchQuery(entry.getKey(), value));
});
sourceBuilder.query(boolQueryBuilder);
return sourceBuilder;
}
複製代碼
這裏的分頁方式只適用於上一頁下一頁, 不適合跳頁。
關於聚合查詢 尚未想到怎樣作成通用的方式來使用,由於聚合查詢能夠嵌套的一層又一層 這裏給出一個例子。
{
"size": 0,
"aggs": {
"ip_card": {
"cardinality": {
"field": "ip"
}
}
}
}
複製代碼
//實現
CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders.cardinality("ip_card").field("ip");
SearchResponse searchResponse = getSearchResponse(index, cardinalityAggregationBuilder);
if (searchResponse.status().getStatus() == RestStatus.OK.getStatus()) {
StringUtil.toInteger(searchResponse.getHits().getTotalHits()); //總數
StringUtil.toInteger(((ParsedCardinality) searchResponse.getAggregations().get("ip_card")).getValue()); //去重數
}
/**查詢條件*/
private SearchResponse getSearchResponse(String index, AggregationBuilder aggregationBuilder) throws Exception {
SearchRequest searchRequest = new SearchRequest(index).types(EsIndexs.Indexs._DOC);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().aggregation(aggregationBuilder);
searchSourceBuilder.size(0);
searchRequest.source(searchSourceBuilder);
return super.getClient().search(searchRequest, RequestOptions.DEFAULT);
}
複製代碼
{
"size": 0,
"aggs": {
"prov_terms": {
"terms": {
"field": "provName"
},
"aggs": {
"city_terms":{
"terms": {
"field": "cityName"
},
"aggs": {
"ip_card": {
"cardinality": {
"field": "ip"
}
}
}
}
}
}
}
}
複製代碼
//實現
@Override
public List<AreaReportVo> getAreaReport(String index) {
String ipCard = "ip_card",
provTerms = "prov_terms",
cityTerms = "city_terms";
try {
if (!super.check(index)) {
this.createIndex();
}
TermsAggregationBuilder termsAggregationBuilder =
AggregationBuilders.terms(provTerms).field("provName")
.subAggregation(
AggregationBuilders.terms(cityTerms).field("cityName")
.subAggregation(
AggregationBuilders.cardinality(ipCard).field("ip")
)
);
//和上面是同樣的
SearchResponse searchResponse = getSearchResponse(index, termsAggregationBuilder);
List<AreaReportVo> areaReportVoList = null;
if (searchResponse.status().getStatus() == RestStatus.OK.getStatus()) {
List<? extends Terms.Bucket> buckets = ((ParsedStringTerms) searchResponse.getAggregations().get(provTerms)).getBuckets();
AreaReportVo areaReportVo;
for (int i = 0, size = buckets.size(); i < size; i++) {
String provName = buckets.get(i).getKeyAsString(); //省份
List<? extends Terms.Bucket> cityBuckets = ((ParsedStringTerms) buckets.get(i).getAggregations().get(cityTerms)).getBuckets();
for (int j = 0, j_size = cityBuckets.size(); j < j_size; j++) {
areaReportVo = new AreaReportVo();
areaReportVo.setProvName(provName);
areaReportVo.setCityName(cityBuckets.get(j).getKeyAsString()); //城市
areaReportVo.setViews(StringUtil.toInteger(buckets.get(j).getDocCount()));
areaReportVo.setIpViews(StringUtil.toInteger(((ParsedCardinality) cityBuckets.get(j).getAggregations().get(ipCard)).getValue()));
areaReportVoList.add(areaReportVo);
}
}
}
return areaReportVoList;
} catch (Exception e) {
e.printStackTrace();
}
return Collections.emptyList();
}
複製代碼
到這裏就算是完成了通用方法的封裝, 你們能夠親自體驗下。