ES系列十5、ES經常使用Java Client API

1、簡介

1.先看ES的架構圖

2、ES支持的客戶端鏈接方式

1.REST API

  http請求,例如,瀏覽器請求get方法;利用Postman等工具發起REST請求;java 發起httpClient請求等。html

2.Transport 鏈接

  socket鏈接,用官方提供的TransPort客戶端,底層是netty。java

注意:ES的發展規劃中在7.0版本開始將廢棄 TransportClient,8.0版本中將徹底移除 TransportClient,取而代之的是High Level REST Client。node

3. ES提供了多種編程語言客戶端

  

官網能夠了解詳情:git

https://www.elastic.co/guide/en/elasticsearch/client/index.htmlgithub

3、Java REST Client介紹

1. ES提供了兩個JAVA REST client 版本

Java Low Level REST Client: 低級別的REST客戶端,經過http與集羣交互,用戶需本身編組請求JSON串,及解析響應JSON串。兼容全部ES版本
Java High Level REST Client: 高級別的REST客戶端,基於低級別的REST客戶端,增長了編組請求JSON串、解析響應JSON串等相關api。使用的版本須要保持和ES服務端的版本一致,不然會有版本問題。spring

官方推薦使用高級版,低級版須要本身準確記住api數據庫

2. Java Low Level REST Client 說明

特色,maven 引入、使用介紹: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.htmlapache

API doc :https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.4/index.html.編程

3. Java High Level REST Client 說明

從6.0.0開始加入的,目的是以java面向對象的方式來進行請求、響應處理。
每一個API 支持 同步/異步 兩種方式,同步方法直接返回一個結果對象。異步的方法以async爲後綴,經過listener參數來通知結果
高級java REST 客戶端依賴Elasticsearch core projectjson

兼容性說明:

依賴 java1.8 和 Elasticsearch core project
請使用與服務端ES版本一致的客戶端版本

4. Java High Level REST Client  maven 集成

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.2.4</version>
</dependency>

5.將log4j2.xml編譯到classes路徑下

<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </appenders>
    <loggers>
        <root level="info">
            <appender-ref ref="Console"/>
        </root>
    </loggers>
</configuration>

 

6. Java High Level REST Client  初始化

RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")));

給定集羣的多個節點地址,將客戶端負載均衡地向這個節點地址集發請求

Client 再也不使用了,記得關閉它:

client.close();

 API及用法示例,請參考:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.3/java-rest-high-create-index.html

4、Java High Level REST Client  使用示例

準備(須要配置log4j2的maven配置和log的配置文件,不然運行demo控制檯會報錯):

編寫示例以前首先在maven工程裏面引入和ES服務端版本同樣的Java客戶端

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.1</version>
</dependency>
<!--日誌-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>

給定集羣的多個節點地址,將客戶端負載均衡地向這個節點地址集發請求:

package com.es.demo;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

public class InitClient {

    public static RestHighLevelClient getClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("localhost", 9200, "http")));
        return client;
    }

}

1.建立索引

package com.es.demo;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;

public class CreateIndexDemo {

    public static void main(String ags[]){
        try(RestHighLevelClient client = InitClient.getClient()){

            // 1.建立索引名
            CreateIndexRequest request = new CreateIndexRequest("book8");

            // 2.索引setting配置
            request.settings(Settings.builder().put("index.number_of_shards",5)
                    .put("index.number_of_replicas", 2) // 副本數
                    .put("analysis.analyzer.default.tokenizer","standard")
            );

            // 3.設置索引的mapping
            request.mapping("_doc",
                    "  {\n" +
                            "    \"_doc\": {\n" +
                            "      \"properties\": {\n" +
                            "        \"message\": {\n" +
                            "          \"type\": \"text\"\n" +
                            "        }\n" +
                            "      }\n" +
                            "    }\n" +
                            "  }",
                    XContentType.JSON);

            // 設置索引別名
            request.alias(new Alias("lab1"));

            // 5.發送請求
            // 5.1同步方式
            CreateIndexResponse response = client.indices().create(request);

            // 處理響應
            boolean acknowledged = response.isAcknowledged();
            boolean shardsAcknowledged = response.isShardsAcknowledged();

            System.out.println("請求結果---------------");
            System.out.println("acknowledged:"+acknowledged);
            System.out.println("shardsAcknowledged:"+shardsAcknowledged);

            // 5.2 異步方式發送請求
           /* ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {

                @Override
                public void onResponse(CreateIndexResponse createIndexResponse) {
                    boolean acknowledged = createIndexResponse.isAcknowledged();
                    boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
                    System.out.println("請求結果---------------");
                    System.out.println("acknowledged:"+acknowledged);
                    System.out.println("shardsAcknowledged:"+shardsAcknowledged);
                }

                @Override
                public void onFailure(Exception e) {
                    e.printStackTrace();
                }
            };

            client.indices().createAsync(request, listener);*/

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

結果:

請求結果---------------
acknowledged:true
shardsAcknowledged:true

構建json官方一共給出四中方式:

package com.es.demo;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.HashMap;
import java.util.Map;

public class CreateIndexDemo {

    public static void main(String ags[]){
        try(RestHighLevelClient client = InitClient.getClient()){

            // 1.建立索引名
            CreateIndexRequest request = new CreateIndexRequest("book13");

            // 2.索引setting配置
            /*request.settings(Settings.builder().put("index.number_of_shards",5)
                    .put("index.number_of_replicas", 2) // 副本數
                    .put("analysis.analyzer.default.tokenizer","standard")
            );*/

            // 3.設置索引的mapping
            // 3.1方式1、直接給出json串
            /*      request.mapping("_doc",
                    "  {\n" +
                            "    \"_doc\": {\n" +
                            "      \"properties\": {\n" +
                            "        \"message\": {\n" +
                            "          \"type\": \"text\"\n" +
                            "        }\n" +
                            "      }\n" +
                            "    }\n" +
                            "  }",
                    XContentType.JSON);*/

            // 3.2方式2、給出封裝成Map

            /*   Map<String, Object> jsonMap = new HashMap<>();
            Map<String, Object> message = new HashMap<>();
            message.put("type", "text");
            Map<String, Object> properties = new HashMap<>();
            properties.put("message", message);
            Map<String, Object> _doc = new HashMap<>();
            _doc.put("properties", properties);
            jsonMap.put("_doc", _doc);
            request.mapping("_doc", jsonMap);*/

            // 3.3方式3、使用XContentBuilder
           /* XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.startObject("_doc");
                {
                    builder.startObject("properties");
                    {
                        builder.startObject("message");
                        {
                            builder.field("type", "text");
                        }
                        builder.endObject();
                        builder.startObject("message1");
                        {
                            builder.field("type", "text");
                        }
                        builder.endObject();
                    }
                    builder.endObject();
                }
                builder.endObject();
            }
            builder.endObject();
            request.mapping("_doc", builder);*/

            // 3.4方式4、使用XContentBuilder
            request.source("{\n" +
                    "    \"settings\" : {\n" +
                    "        \"number_of_shards\" : 1,\n" +
                    "        \"number_of_replicas\" : 0\n" +
                    "    },\n" +
                    "    \"mappings\" : {\n" +
                    "        \"_doc\" : {\n" +
                    "            \"properties\" : {\n" +
                    "                \"message\" : { \"type\" : \"text\" },\n" +
                    "                \"message1\" : { \"type\" : \"text\" }\n" +
                    "            }\n" +
                    "        }\n" +
                    "    },\n" +
                    "    \"aliases\" : {\n" +
                    "        \"lab2\" : {}\n" +
                    "    }\n" +
                    "}", XContentType.JSON);


            // 設置索引別名
            //request.alias(new Alias("lab1"));

            // 5.發送請求
            // 5.1同步方式
            CreateIndexResponse response = client.indices().create(request);

            // 處理響應
            boolean acknowledged = response.isAcknowledged();
            boolean shardsAcknowledged = response.isShardsAcknowledged();

            System.out.println("請求結果---------------");
            System.out.println("acknowledged:"+acknowledged);
            System.out.println("shardsAcknowledged:"+shardsAcknowledged);

            // 5.2 異步方式發送請求
           /* ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {

                @Override
                public void onResponse(CreateIndexResponse createIndexResponse) {
                    boolean acknowledged = createIndexResponse.isAcknowledged();
                    boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();
                    System.out.println("請求結果---------------");
                    System.out.println("acknowledged:"+acknowledged);
                    System.out.println("shardsAcknowledged:"+shardsAcknowledged);
                }

                @Override
                public void onFailure(Exception e) {
                    e.printStackTrace();
                }
            };

            client.indices().createAsync(request, listener);*/

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

更多用法參考官方:java索引API

2. index  document

索引文檔,即往索引裏面放入文檔數據.相似於數據庫裏面向表裏面插入一行數據,一行數據就是一個文檔

package com.es.demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;

public class IndexDocumentDemo {
    private static Logger logger = LogManager.getRootLogger();


    public static  void main(String args[]){
        try(RestHighLevelClient client = InitClient.getClient()){
            // 一、建立索引請求
            IndexRequest request = new IndexRequest(
                    "mess",   //索引
                    "_doc",     // mapping type
                    "1");     //文檔id

            // 二、準備文檔數據
            // 方式一:直接給JSON串
            String jsonString = "{" +
                    "\"user\":\"kimchy\"," +
                    "\"postDate\":\"2013-01-30\"," +
                    "\"message\":\"trying out Elasticsearch\"" +
                    "}";
            request.source(jsonString, XContentType.JSON);

            // 方式二:以map對象來表示文檔
            /*
            Map<String, Object> jsonMap = new HashMap<>();
            jsonMap.put("user", "kimchy");
            jsonMap.put("postDate", new Date());
            jsonMap.put("message", "trying out Elasticsearch");
            request.source(jsonMap);
            */

            // 方式三:用XContentBuilder來構建文檔
            /*
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.field("user", "kimchy");
                builder.field("postDate", new Date());
                builder.field("message", "trying out Elasticsearch");
            }
            builder.endObject();
            request.source(builder);
            */

            // 方式四:直接用key-value對給出
            /*
            request.source("user", "kimchy",
                            "postDate", new Date(),
                            "message", "trying out Elasticsearch");
            */

            //三、其餘的一些可選設置
            /*
            request.routing("routing");  //設置routing值
            request.timeout(TimeValue.timeValueSeconds(1));  //設置主分片等待時長
            request.setRefreshPolicy("wait_for");  //設置重刷新策略
            request.version(2);  //設置版本號
            request.opType(DocWriteRequest.OpType.CREATE);  //操做類別
            */

            //四、發送請求
            IndexResponse indexResponse = null;
            try {
                // 同步方式
                indexResponse = client.index(request);
            } catch(ElasticsearchException e) {
                // 捕獲,並處理異常
                //判斷是否版本衝突、create但文檔已存在衝突
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("衝突了,請在此寫衝突處理邏輯!\n" + e.getDetailedMessage());
                }

                logger.error("索引異常", e);
            }

            //五、處理響應
            if(indexResponse != null) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("新增文檔成功!");
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("修改文檔成功!");
                }
                // 分片處理信息
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

                }
                // 若是有分片副本失敗,能夠得到失敗緣由信息
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                        System.out.println("副本失敗緣由:" + reason);
                    }
                }
            }


            //異步方式發送索引請求
            /*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {

                }

                @Override
                public void onFailure(Exception e) {

                }
            };
            client.indexAsync(request, listener);
            */

        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

結果:

新增文檔成功!

官方文檔:API

3. get  document

package com.es.demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Map;

public class GetDocumentDemo {
    private static Logger logger = LogManager.getRootLogger();

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitClient.getClient();) {
            // 一、建立獲取文檔請求
            GetRequest request = new GetRequest(
                    "book13",   //索引
                    "_doc",     // mapping type
                    "1");     //文檔id

            // 二、可選的設置
            //request.routing("routing");
            //request.version(2);

            //request.fetchSourceContext(new FetchSourceContext(false)); //是否獲取_source字段
            //選擇返回的字段
            String[] includes = new String[]{"message", "*Date","user"};
            String[] excludes = Strings.EMPTY_ARRAY;
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext);


            // 取stored字段
            /*request.storedFields("message");
            GetResponse getResponse = client.get(request);
            String message = getResponse.getField("message").getValue();*/


            //三、發送請求
            GetResponse getResponse = null;
            try {
                // 同步請求
                getResponse = client.get(request);
            } catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                    logger.error("沒有找到該id的文檔" );
                }
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("獲取時版本衝突了,請在此寫衝突處理邏輯!" );
                }
                logger.error("獲取文檔異常", e);
            }

            //四、處理響應
            if(getResponse != null) {
                String index = getResponse.getIndex();
                String type = getResponse.getType();
                String id = getResponse.getId();
                if (getResponse.isExists()) { // 文檔存在
                    long version = getResponse.getVersion();
                    String sourceAsString = getResponse.getSourceAsString(); //結果取成 String
                    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();  // 結果取成Map
                    byte[] sourceAsBytes = getResponse.getSourceAsBytes();    //結果取成字節數組

                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info(sourceAsString);

                } else {
                    logger.error("沒有找到該id的文檔" );
                }
            }


            //異步方式發送獲取文檔請求
            /*
            ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
                @Override
                public void onResponse(GetResponse getResponse) {

                }

                @Override
                public void onFailure(Exception e) {

                }
            };
            client.getAsync(request, listener);
            */

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

結果:

20:15:40.943 [main] INFO   - index:book13  type:_doc  id:1
20:15:40.943 [main] INFO   - {"postDate":"2013-01-30","message":"trying out Elasticsearch","user":"kimchy"}

 4. Bulk 

 批量索引文檔,即批量往索引裏面放入文檔數據.相似於數據庫裏面批量向表裏面插入多行數據,一行數據就是一個文檔

 BulkDemo.java

package com.es.demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Date;

public class BulkDemo {
    private static Logger logger = LogManager.getRootLogger();

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitClient.getClient();) {

            // 一、建立批量操做請求參數
            BulkRequest request = new BulkRequest();
            request.add(new IndexRequest("book13", "_doc", "1")
                    .source(XContentType.JSON,"postDate", new Date()));
            request.add(new IndexRequest("book13", "_doc", "2")
                    .source(XContentType.JSON,"user", "liming"));
            request.add(new IndexRequest("book13", "_doc", "3")
                    .source(XContentType.JSON,"message", "add a doc"));

            /*
            request.add(new DeleteRequest("mess", "_doc", "3"));
            request.add(new UpdateRequest("mess", "_doc", "2")
                    .doc(XContentType.JSON,"other", "test"));
            request.add(new IndexRequest("mess", "_doc", "4")
                    .source(XContentType.JSON,"field", "baz"));
            */

            // 二、可選的設置
            /*
            request.timeout("2m");
            request.setRefreshPolicy("wait_for");
            request.waitForActiveShards(2);
            */


            //三、發送請求

            // 同步請求
            BulkResponse bulkResponse = client.bulk(request);


            //四、處理響應
            if(bulkResponse != null) {
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse();

                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        //TODO 新增成功的處理
                        logger.info("新增成功");

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        //TODO 修改爲功的處理
                        logger.info("修改爲功");

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        //TODO 刪除成功的處理
                        logger.info("刪除成功");
                    }
                }
            }


            //異步方式發送批量操做請求
            /*
            ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkResponse) {

                }

                @Override
                public void onFailure(Exception e) {

                }
            };
            client.bulkAsync(request, listener);
            */

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

結果:

20:25:41.726 [main] INFO   - 新增成功
20:25:41.730 [main] INFO   - 新增成功
20:25:41.730 [main] INFO   - 新增成功

重複運行屢次,並無返回修改爲功,也是新增成功,這多是個bug,實際上第一次之後運行都是修改操做了。

package com.es.demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Date;

public class BulkDemo {
    private static Logger logger = LogManager.getRootLogger();

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitClient.getClient();) {

            // 一、建立批量操做請求參數
            BulkRequest request = new BulkRequest();
            request.add(new IndexRequest("book13", "_doc", "1")
                    .source(XContentType.JSON,"postDate", new Date()));
            request.add(new IndexRequest("book13", "_doc", "2")
                    .source(XContentType.JSON,"user", "liming"));
            request.add(new IndexRequest("book13", "_doc", "3")
                    .source(XContentType.JSON,"message", "add a doc"));

            /*
            request.add(new DeleteRequest("mess", "_doc", "3"));
            request.add(new UpdateRequest("mess", "_doc", "2")
                    .doc(XContentType.JSON,"other", "test"));
            request.add(new IndexRequest("mess", "_doc", "4")
                    .source(XContentType.JSON,"field", "baz"));
            */

            // 二、可選的設置
            /*
            request.timeout("2m");
            request.setRefreshPolicy("wait_for");
            request.waitForActiveShards(2);
            */


            //三、發送請求

            // 同步請求
            BulkResponse bulkResponse = client.bulk(request);


            //四、處理響應
            if(bulkResponse != null) {
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse();

                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        //TODO 新增成功的處理
                        logger.info("新增成功,{}",indexResponse.toString());

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        //TODO 修改爲功的處理
                        logger.info("修改爲功,{}",updateResponse.toString());

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        //TODO 刪除成功的處理
                        logger.info("刪除成功,{}",deleteResponse.toString());
                    }
                }
            }


            //異步方式發送批量操做請求
            /*
            ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkResponse) {

                }

                @Override
                public void onFailure(Exception e) {

                }
            };
            client.bulkAsync(request, listener);
            */

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

結果:

20:31:44.095 [main] INFO   - 新增成功,IndexResponse[index=book13,type=_doc,id=1,version=6,result=updated,seqNo=13,primaryTerm=1,shards={"total":1,"successful":1,"failed":0}]
20:31:44.099 [main] INFO   - 新增成功,IndexResponse[index=book13,type=_doc,id=2,version=5,result=updated,seqNo=14,primaryTerm=1,shards={"total":1,"successful":1,"failed":0}]
20:31:44.099 [main] INFO   - 新增成功,IndexResponse[index=book13,type=_doc,id=3,version=5,result=updated,seqNo=15,primaryTerm=1,shards={"total":1,"successful":1,"failed":0}]

postMan對比查看結果:

{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 3,
        "max_score": 1,
        "hits": [
            {
                "_index": "book13",
                "_type": "_doc",
                "_id": "1",
                "_score": 1,
                "_source": {
                    "postDate": "2018-09-09T12:25:41.302Z"
                }
            },
            {
                "_index": "book13",
                "_type": "_doc",
                "_id": "2",
                "_score": 1,
                "_source": {
                    "user": "liming"
                }
            },
            {
                "_index": "book13",
                "_type": "_doc",
                "_id": "3",
                "_score": 1,
                "_source": {
                    "message": "add a doc"
                }
            }
        ]
    }
}

5. search

 搜索數據

 SearchDemo.java

package com.es.demo;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

/**
 *
 * @Description: 搜索數據
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class SearchDemo {

    private static Logger logger = LogManager.getRootLogger();

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitClient.getClient();) {

            // 一、建立search請求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("book13");
            searchRequest.types("_doc");

            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

            //構造QueryBuilder
            /*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
                    .fuzziness(Fuzziness.AUTO)
                    .prefixLength(3)
                    .maxExpansions(10);
            sourceBuilder.query(matchQueryBuilder);*/

            sourceBuilder.query(QueryBuilders.termQuery("user", "liming"));
            sourceBuilder.from(0);
            sourceBuilder.size(10);
            sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

            //是否返回_source字段
            //sourceBuilder.fetchSource(false);

            //設置返回哪些字段
            /*String[] includeFields = new String[] {"title", "user", "innerObject.*"};
            String[] excludeFields = new String[] {"_type"};
            sourceBuilder.fetchSource(includeFields, excludeFields);*/

            //指定排序
            //sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
            //sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));

            // 設置返回 profile
            //sourceBuilder.profile(true);

            //將請求體加入到請求中
            searchRequest.source(sourceBuilder);

            // 可選的設置
            //searchRequest.routing("routing");

            // 高亮設置
            /*
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            HighlightBuilder.Field highlightTitle =
                    new HighlightBuilder.Field("title");
            highlightTitle.highlighterType("unified");
            highlightBuilder.field(highlightTitle);
            HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
            highlightBuilder.field(highlightUser);
            sourceBuilder.highlighter(highlightBuilder);*/


            //加入聚合
            /*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
                    .field("company.keyword");
            aggregation.subAggregation(AggregationBuilders.avg("average_age")
                    .field("age"));
            sourceBuilder.aggregation(aggregation);*/

            //作查詢建議
            /*SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.termSuggestion("user").text("kmichy");
                SuggestBuilder suggestBuilder = new SuggestBuilder();
                suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
            sourceBuilder.suggest(suggestBuilder);*/

            //三、發送請求
            SearchResponse searchResponse = client.search(searchRequest);


            //四、處理響應
            //搜索結果狀態信息
            RestStatus status = searchResponse.status();
            TimeValue took = searchResponse.getTook();
            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
            boolean timedOut = searchResponse.isTimedOut();

            //分片搜索狀況
            int totalShards = searchResponse.getTotalShards();
            int successfulShards = searchResponse.getSuccessfulShards();
            int failedShards = searchResponse.getFailedShards();
            for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
                // failures should be handled here
            }

            //處理搜索命中文檔結果
            SearchHits hits = searchResponse.getHits();

            long totalHits = hits.getTotalHits();
            float maxScore = hits.getMaxScore();

            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                // do something with the SearchHit

                String index = hit.getIndex();
                String type = hit.getType();
                String id = hit.getId();
                float score = hit.getScore();

                //取_source字段值
                String sourceAsString = hit.getSourceAsString(); //取成json串
                Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map對象
                //從map中取字段值
                /*
                String documentTitle = (String) sourceAsMap.get("title");
                List<Object> users = (List<Object>) sourceAsMap.get("user");
                Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
                */
                logger.info("index:" + index + "  type:" + type + "  id:" + id);
                logger.info(sourceAsString);

                //取高亮結果
                /*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                HighlightField highlight = highlightFields.get("title");
                Text[] fragments = highlight.fragments();
                String fragmentString = fragments[0].string();*/
            }

            // 獲取聚合結果
            /*
            Aggregations aggregations = searchResponse.getAggregations();
            Terms byCompanyAggregation = aggregations.get("by_company");
            Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
            Avg averageAge = elasticBucket.getAggregations().get("average_age");
            double avg = averageAge.getValue();
            */

            // 獲取建議結果
            /*Suggest suggest = searchResponse.getSuggest();
            TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
            for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
                for (TermSuggestion.Entry.Option option : entry) {
                    String suggestText = option.getText().string();
                }
            }
            */

            //異步方式發送獲查詢請求
            /*
            ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
                @Override
                public void onResponse(SearchResponse getResponse) {
                    //結果獲取
                }

                @Override
                public void onFailure(Exception e) {
                    //失敗處理
                }
            };
            client.searchAsync(searchRequest, listener);
            */

        } catch (IOException e) {
            logger.error(e);
        }
    }
}

結果:

21:05:50.762 [main] INFO   - index:book13  type:_doc  id:2
21:05:50.766 [main] INFO   - {"user":"liming"}

6. highlight 高亮

HighlightDemo.java

package com.es.demo;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;

import java.io.IOException;
import java.util.Map;

public class HighlightDemo {

    private static Logger logger = LogManager.getRootLogger();

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitClient.getClient();) {

            // 一、建立search請求
            SearchRequest searchRequest = new SearchRequest("book1");

            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

            //構造QueryBuilder
            QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("name", "test");
            sourceBuilder.query(matchQueryBuilder);

            //分頁設置
            /*sourceBuilder.from(0);
            sourceBuilder.size(5); ;*/


            // 高亮設置
            HighlightBuilder highlightBuilder = new HighlightBuilder();
            highlightBuilder.requireFieldMatch(false).field("name").field("age")
                    .preTags("<strong>").postTags("</strong>");
            //不一樣字段可有不一樣設置,如不一樣標籤
            /*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title");
            highlightTitle.preTags("<strong>").postTags("</strong>");
            highlightBuilder.field(highlightTitle);
            HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content");
            highlightContent.preTags("<b>").postTags("</b>");
            highlightBuilder.field(highlightContent).requireFieldMatch(false);*/

            sourceBuilder.highlighter(highlightBuilder);

            searchRequest.source(sourceBuilder);

            //三、發送請求
            SearchResponse searchResponse = client.search(searchRequest);


            //四、處理響應
            if (RestStatus.OK.equals(searchResponse.status())) {
                //處理搜索命中文檔結果
                SearchHits hits = searchResponse.getHits();
                long totalHits = hits.getTotalHits();

                SearchHit[] searchHits = hits.getHits();
                for (SearchHit hit : searchHits) {
                    String index = hit.getIndex();
                    String type = hit.getType();
                    String id = hit.getId();
                    float score = hit.getScore();

                    //取_source字段值
                    //String sourceAsString = hit.getSourceAsString(); //取成json串
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map對象
                    //從map中取字段值
                    /*String title = (String) sourceAsMap.get("title");
                    String content  = (String) sourceAsMap.get("content"); */
                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info("sourceMap : " + sourceAsMap);
                    //取高亮結果
                    Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                    HighlightField highlight = highlightFields.get("name");
                    if (highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段會有多個值
                        if (fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("title highlight : " + fragmentString);
                            //可用高亮字符串替換上面sourceAsMap中的對應字段返回到上一級調用
                            //sourceAsMap.put("title", fragmentString);
                        }
                    }

                    highlight = highlightFields.get("age");
                    if (highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段會有多個值
                        if (fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("content highlight : " + fragmentString);
                            //可用高亮字符串替換上面sourceAsMap中的對應字段返回到上一級調用
                            //sourceAsMap.put("content", fragmentString);
                        }
                    }
                }
            }

        } catch (IOException e) {
            logger.error(e);
        }
    }
}

結果:

21:13:29.702 [main] INFO   - index:book1  type:english  id:5oVDQ2UBRzBxBrDgtIl0
21:13:29.706 [main] INFO   - sourceMap : {name=test goog my money, addr=中國, class=dsfdsf, age=12}
21:13:29.706 [main] INFO   - title highlight : <strong>test</strong> goog my money
21:13:29.706 [main] INFO   - index:book1  type:english  id:6IUkUmUBRzBxBrDgFok2
21:13:29.710 [main] INFO   - sourceMap : {name=test goog my money, addr=中國, class=dsfdsf, age=[14, 54, 45, 34]}
21:13:29.710 [main] INFO   - title highlight : <strong>test</strong> goog my money
21:13:29.710 [main] INFO   - index:book1  type:english  id:32
21:13:29.710 [main] INFO   - sourceMap : {name=test, age=1}
21:13:29.710 [main] INFO   - title highlight : <strong>test</strong>
21:13:29.710 [main] INFO   - index:book1  type:english  id:33
21:13:29.710 [main] INFO   - sourceMap : {name=test, age=1}
21:13:29.710 [main] INFO   - title highlight : <strong>test</strong>
21:13:29.710 [main] INFO   - index:book1  type:english  id:54UiUmUBRzBxBrDgfIl9
21:13:29.710 [main] INFO   - sourceMap : {name=test goog my money, addr=中國, class=dsfdsf, age=[11, 13, 14]}
21:13:29.710 [main] INFO   - title highlight : <strong>test</strong> goog my money

7. suggest 查詢建議

SuggestDemo.java(本demo只有單詞糾錯和前綴自動補全)

package com.es.demo;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.SuggestionBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestion;

/**
 *
 * @Description: 查詢建議
 * @author lgs
 * @date 2018年6月23日
 *
 */
public class SuggestDemo {

    private static Logger logger = LogManager.getRootLogger();

    //詞項建議拼寫檢查,檢查用戶的拼寫是否錯誤,若是有錯給用戶推薦正確的詞,appel->apple
    public static void termSuggest() {
        try (RestHighLevelClient client = InitClient.getClient();) {

            // 一、建立search請求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("book1");

            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

            sourceBuilder.size(0);

            //作查詢建議
            //詞項建議
            SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.termSuggestion("name").text("text");
            SuggestBuilder suggestBuilder = new SuggestBuilder();
            suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
            sourceBuilder.suggest(suggestBuilder);

            searchRequest.source(sourceBuilder);

            //三、發送請求
            SearchResponse searchResponse = client.search(searchRequest);


            //四、處理響應
            //搜索結果狀態信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 獲取建議結果
                Suggest suggest = searchResponse.getSuggest();
                TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
                for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
                    logger.info("text: " + entry.getText().string());
                    for (TermSuggestion.Entry.Option option : entry) {
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }
            /*
              "suggest": {
                "my-suggestion": [
                  {
                    "text": "tring",
                    "offset": 0,
                    "length": 5,
                    "options": [
                      {
                        "text": "trying",
                        "score": 0.8,
                        "freq": 1
                      }
                    ]
                  },
                  {
                    "text": "out",
                    "offset": 6,
                    "length": 3,
                    "options": []
                  },
                  {
                    "text": "elasticsearch",
                    "offset": 10,
                    "length": 13,
                    "options": []
                  }
                ]
              }*/

        } catch (IOException e) {
            logger.error(e);
        }
    }

    //自動補全,根據用戶的輸入聯想到可能的詞或者短語
    public static void completionSuggester() {
        try (RestHighLevelClient client = InitClient.getClient();) {

            // 一、建立search請求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("book5");

            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

            sourceBuilder.size(0);

            //作查詢建議
            //自動補全
            /*POST music/_search?pretty
                    {
                        "suggest": {
                            "song-suggest" : {
                                "prefix" : "lucene s",
                                "completion" : {
                                    "field" : "suggest" ,
                                    "skip_duplicates": true
                                }
                            }
                        }
                    }*/

            SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.completionSuggestion("suggest").prefix("tes")
                            .skipDuplicates(true);
            SuggestBuilder suggestBuilder = new SuggestBuilder();
            suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder);
            sourceBuilder.suggest(suggestBuilder);

            searchRequest.source(sourceBuilder);

            //三、發送請求
            SearchResponse searchResponse = client.search(searchRequest);


            //四、處理響應
            //搜索結果狀態信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 獲取建議結果
                Suggest suggest = searchResponse.getSuggest();
                CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest");
                for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) {
                    logger.info("text: " + entry.getText().string());
                    for (CompletionSuggestion.Entry.Option option : entry) {
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }

        } catch (IOException e) {
            logger.error(e);
        }
    }

    public static void main(String[] args) {
        termSuggest();

        logger.info("--------------------------------------");

        completionSuggester();
    }
}

結果:

21:24:40.416 [main] INFO   - text: text
21:24:40.420 [main] INFO   -    suggest option : test
21:24:40.420 [main] INFO   -    suggest option : term
21:24:40.420 [main] INFO   - --------------------------------------
21:24:40.624 [main] INFO   - text: tes
21:24:40.624 [main] INFO   -    suggest option : test english
21:24:40.624 [main] INFO   -    suggest option : test my book1

 8. aggregation 聚合分析

 AggregationDemo.java

package com.es.demo;

import java.io.IOException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;


public class AggregationDemo {

    private static Logger logger = LogManager.getRootLogger();

    public static void main(String[] args) {
        try (RestHighLevelClient client = InitClient.getClient();) {

            // 一、建立search請求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("book1");

            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

            sourceBuilder.size(0);

            //加入聚合
            //字段值項分組聚合
            TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age")
                    .field("age").order(BucketOrder.aggregation("average_balance", true));
            //計算每組的平均balance指標
            aggregation.subAggregation(AggregationBuilders.avg("average_balance")
                    .field("age"));
            sourceBuilder.aggregation(aggregation);

            searchRequest.source(sourceBuilder);

            //三、發送請求
            SearchResponse searchResponse = client.search(searchRequest);

            //四、處理響應
            //搜索結果狀態信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 獲取聚合結果
                Aggregations aggregations = searchResponse.getAggregations();
                Terms byAgeAggregation = aggregations.get("by_age");
                logger.info("aggregation by_age 結果");
                logger.info("docCountError: " + byAgeAggregation.getDocCountError());
                logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts());
                logger.info("------------------------------------");
                for(Bucket buck : byAgeAggregation.getBuckets()) {
                    logger.info("key: " + buck.getKeyAsNumber());
                    logger.info("docCount: " + buck.getDocCount());
                    logger.info("docCountError: " + buck.getDocCountError());
                    //取子聚合
                    Avg averageBalance = buck.getAggregations().get("average_balance");

                    logger.info("average_balance: " + averageBalance.getValue());
                    logger.info("------------------------------------");
                }
                //直接用key 來去分組
                /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24");
                Avg averageAge = elasticBucket.getAggregations().get("average_age");
                double avg = averageAge.getValue();*/

            }

        } catch (IOException e) {
            logger.error(e);
        }
    }
}

結果:

22:58:24.681 [main] INFO   - aggregation by_age 結果
22:58:24.685 [main] INFO   - docCountError: 0
22:58:24.685 [main] INFO   - sumOfOtherDocCounts: 1
22:58:24.685 [main] INFO   - ------------------------------------
22:58:24.685 [main] INFO   - key: 1
22:58:24.685 [main] INFO   - docCount: 11
22:58:24.685 [main] INFO   - docCountError: 0
22:58:24.685 [main] INFO   - average_balance: 1.0
22:58:24.685 [main] INFO   - ------------------------------------
22:58:24.685 [main] INFO   - key: 12
22:58:24.685 [main] INFO   - docCount: 16
22:58:24.685 [main] INFO   - docCountError: 0
22:58:24.685 [main] INFO   - average_balance: 12.0
22:58:24.685 [main] INFO   - ------------------------------------
22:58:24.685 [main] INFO   - key: 11
22:58:24.685 [main] INFO   - docCount: 1
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 12.666666666666666
22:58:24.689 [main] INFO   - ------------------------------------
22:58:24.689 [main] INFO   - key: 13
22:58:24.689 [main] INFO   - docCount: 2
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 12.75
22:58:24.689 [main] INFO   - ------------------------------------
22:58:24.689 [main] INFO   - key: 16
22:58:24.689 [main] INFO   - docCount: 1
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 16.0
22:58:24.689 [main] INFO   - ------------------------------------
22:58:24.689 [main] INFO   - key: 21
22:58:24.689 [main] INFO   - docCount: 1
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 21.0
22:58:24.689 [main] INFO   - ------------------------------------
22:58:24.689 [main] INFO   - key: 14
22:58:24.689 [main] INFO   - docCount: 2
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 26.428571428571427
22:58:24.689 [main] INFO   - ------------------------------------
22:58:24.689 [main] INFO   - key: 33
22:58:24.689 [main] INFO   - docCount: 1
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 33.0
22:58:24.689 [main] INFO   - ------------------------------------
22:58:24.689 [main] INFO   - key: 34
22:58:24.689 [main] INFO   - docCount: 1
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 36.75
22:58:24.689 [main] INFO   - ------------------------------------
22:58:24.689 [main] INFO   - key: 45
22:58:24.689 [main] INFO   - docCount: 1
22:58:24.689 [main] INFO   - docCountError: 0
22:58:24.689 [main] INFO   - average_balance: 36.75
22:58:24.689 [main] INFO   - ------------------------------------

9. 官網資料

各類查詢對應的QueryBuilder:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-query-builders.html

各類聚合對應的AggregationBuilder:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html

10.源碼

https://github.com/Star-Lordxing/ES-java-client-api

5、Java Client

1. Java Client 說明

java client 使用 TransportClient,各類操做本質上都是異步的(能夠用 listener,或返回 Future )。
注意:ES的發展規劃中在7.0版本開始將廢棄 TransportClient,8.0版本中將徹底移除 TransportClient,取而代之的是High Level REST Client。
High Level REST Client 中的操做API和java client 大可能是同樣的,除了鏈接方式InitClient代碼不同

2. 官方學習連接

https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

3. 兼容性說明

請使用與服務端ES版本一致的客戶端版本

4. Java Client maven 集成

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.3.1</version>
        </dependency>
        <!--日誌-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>

5.將log4j2.xml編譯到classes路徑下

<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
    <appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </appenders>
    <loggers>
        <root level="info">
            <appender-ref ref="Console"/>
        </root>
    </loggers>
</configuration>

六.Transport API使用示例

1.建立鏈接InitClient.java

package com.es.demo;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class InitClient {


    private static TransportClient client;

    public static TransportClient getClient() throws UnknownHostException {

        if(client == null) {
            //client = new PreBuiltTransportClient(Settings.EMPTY)

            // 鏈接集羣的設置
            Settings settings = Settings.builder()
                    .put("cluster.name", "my-application") //若是集羣的名字不是默認的elasticsearch,需指定
                    .put("client.transport.sniff", false) //自動嗅探
                    .build();
            client = new PreBuiltTransportClient(settings)
                    //.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
                    .addTransportAddress(new TransportAddress(InetAddress.getByName("start.com"), 9300));

            //可用鏈接設置參數說明
            /*
            cluster.name
                指定集羣的名字,若是集羣的名字不是默認的elasticsearch,需指定。
            client.transport.sniff
                設置爲true,將自動嗅探整個集羣,自動加入集羣的節點到鏈接列表中。
            client.transport.ignore_cluster_name
                Set to true to ignore cluster name validation of connected nodes. (since 0.19.4)
            client.transport.ping_timeout
                The time to wait for a ping response from a node. Defaults to 5s.
            client.transport.nodes_sampler_interval
                How often to sample / ping the nodes listed and connected. Defaults to 5s.
            */

        }
        return client;
    }


}

client.transport.sniff:false //自動嗅探 ,我本機單節點設置爲true會報錯。

 

1. Create index 建立索引

 

CreateIndexDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;

public class CreateIndexDemo {

    public static void main(String[] args) {
        //這裏和RESTful風格不一樣
        try (TransportClient client = InitDemo.getClient();) {

            // 一、建立 建立索引request
            CreateIndexRequest request = new CreateIndexRequest("mess");

            // 二、設置索引的settings
            request.settings(Settings.builder().put("index.number_of_shards", 3) // 分片數
                    .put("index.number_of_replicas", 2) // 副本數
                    .put("analysis.analyzer.default.tokenizer", "ik_smart") // 默認分詞器
            );

            // 三、設置索引的mappings
            request.mapping("_doc",
                    "  {\n" +
                    "    \"_doc\": {\n" +
                    "      \"properties\": {\n" +
                    "        \"message\": {\n" +
                    "          \"type\": \"text\"\n" +
                    "        }\n" +
                    "      }\n" +
                    "    }\n" +
                    "  }",
                    XContentType.JSON);

            // 四、 設置索引的別名
            request.alias(new Alias("mmm"));

            // 五、 發送請求 這裏和RESTful風格不一樣
            CreateIndexResponse createIndexResponse = client.admin().indices()
                    .create(request).get();

            // 六、處理響應
            boolean acknowledged = createIndexResponse.isAcknowledged();
            boolean shardsAcknowledged = createIndexResponse
                    .isShardsAcknowledged();
            System.out.println("acknowledged = " + acknowledged);
            System.out.println("shardsAcknowledged = " + shardsAcknowledged);

            // listener方式發送請求
            /*ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(
                        CreateIndexResponse createIndexResponse) {
                    // 六、處理響應
                    boolean acknowledged = createIndexResponse.isAcknowledged();
                    boolean shardsAcknowledged = createIndexResponse
                            .isShardsAcknowledged();
                    System.out.println("acknowledged = " + acknowledged);
                    System.out.println(
                            "shardsAcknowledged = " + shardsAcknowledged);
                }

                @Override
                public void onFailure(Exception e) {
                    System.out.println("建立索引異常:" + e.getMessage());
                }
            };
            client.admin().indices().create(request, listener);
            */
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

2. index document

索引文檔,即往索引裏面放入文檔數據.相似於數據庫裏面向表裏面插入一行數據,一行數據就是一個文檔
IndexDocumentDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;

public class IndexDocumentDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        //這裏和RESTful風格不一樣
        try (TransportClient client = InitDemo.getClient();) {
            // 一、建立索引請求
            IndexRequest request = new IndexRequest(
                    "mess",   //索引
                    "_doc",     // mapping type
                    "11");     //文檔id  
            
            // 二、準備文檔數據
            // 方式一:直接給JSON串
            String jsonString = "{" +
                    "\"user\":\"kimchy\"," +
                    "\"postDate\":\"2013-01-30\"," +
                    "\"message\":\"trying out Elasticsearch\"" +
                    "}";
            request.source(jsonString, XContentType.JSON); 
            
            // 方式二:以map對象來表示文檔
            /*
            Map<String, Object> jsonMap = new HashMap<>();
            jsonMap.put("user", "kimchy");
            jsonMap.put("postDate", new Date());
            jsonMap.put("message", "trying out Elasticsearch");
            request.source(jsonMap); 
            */
            
            // 方式三:用XContentBuilder來構建文檔
            /*
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.field("user", "kimchy");
                builder.field("postDate", new Date());
                builder.field("message", "trying out Elasticsearch");
            }
            builder.endObject();
            request.source(builder); 
            */
            
            // 方式四:直接用key-value對給出
            /*
            request.source("user", "kimchy",
                            "postDate", new Date(),
                            "message", "trying out Elasticsearch");
            */
            
            //三、其餘的一些可選設置
            /*
            request.routing("routing");  //設置routing值
            request.timeout(TimeValue.timeValueSeconds(1));  //設置主分片等待時長
            request.setRefreshPolicy("wait_for");  //設置重刷新策略
            request.version(2);  //設置版本號
            request.opType(DocWriteRequest.OpType.CREATE);  //操做類別  
            */
            
            //四、發送請求
            IndexResponse indexResponse = null;
            try {
                //方式一: 用client.index 方法,返回是 ActionFuture<IndexResponse>,再調用get獲取響應結果
                indexResponse = client.index(request).get();
                
                //方式二:client提供了一個 prepareIndex方法,內部爲咱們建立IndexRequest
                /*IndexResponse indexResponse = client.prepareIndex("mess","_doc","11")
                        .setSource(jsonString, XContentType.JSON)
                        .get();*/
                
                //方式三:request + listener
                //client.index(request, listener);    
                
            } catch(ElasticsearchException e) {
                // 捕獲,並處理異常
                //判斷是否版本衝突、create但文檔已存在衝突
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("衝突了,請在此寫衝突處理邏輯!\n" + e.getDetailedMessage());
                }
                
                logger.error("索引異常", e);
            }catch (InterruptedException | ExecutionException e) {
                logger.error("索引異常", e);
            }
            
            
            
            
            //五、處理響應
            if(indexResponse != null) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("新增文檔成功,處理邏輯代碼寫到這裏。");
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("修改文檔成功,處理邏輯代碼寫到這裏。");
                }
                // 分片處理信息
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    
                }
                // 若是有分片副本失敗,能夠得到失敗緣由信息
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason(); 
                        System.out.println("副本失敗緣由:" + reason);
                    }
                }
            }
            
            
            //listener 方式
            /*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                    
                }

                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.index(request, listener);
            */
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

 

3. get document 

 

獲取文檔數據
GetDocumentDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

public class GetDocumentDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        //這裏和RESTful風格不一樣
        try (TransportClient client = InitDemo.getClient();) {
            // 一、建立獲取文檔請求
            GetRequest request = new GetRequest(
                    "mess",   //索引
                    "_doc",     // mapping type
                    "11");     //文檔id  
            
            // 二、可選的設置
            //request.routing("routing");
            //request.version(2);
            
            //request.fetchSourceContext(new FetchSourceContext(false)); //是否獲取_source字段
            //選擇返回的字段
            String[] includes = new String[]{"message", "*Date"};
            String[] excludes = Strings.EMPTY_ARRAY;
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext); 
            
            //也可寫成這樣
            /*String[] includes = Strings.EMPTY_ARRAY;
            String[] excludes = new String[]{"message"};
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.fetchSourceContext(fetchSourceContext);*/
            
            
            // 取stored字段
            /*request.storedFields("message"); 
            GetResponse getResponse = client.get(request);
            String message = getResponse.getField("message").getValue();*/
            
            
            //三、發送請求        
            GetResponse getResponse = null;
            try {
                getResponse = client.get(request).get();
            } catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                    logger.error("沒有找到該id的文檔" );
                }
                if (e.status() == RestStatus.CONFLICT) {
                    logger.error("獲取時版本衝突了,請在此寫衝突處理邏輯!" );
                }
                logger.error("獲取文檔異常", e);
            }catch (InterruptedException | ExecutionException e) {
                logger.error("索引異常", e);
            }
            
            //四、處理響應
            if(getResponse != null) {
                String index = getResponse.getIndex();
                String type = getResponse.getType();
                String id = getResponse.getId();
                if (getResponse.isExists()) { // 文檔存在
                    long version = getResponse.getVersion();
                    String sourceAsString = getResponse.getSourceAsString(); //結果取成 String       
                    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();  // 結果取成Map
                    byte[] sourceAsBytes = getResponse.getSourceAsBytes();    //結果取成字節數組
                    
                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info(sourceAsString);
                    
                } else {
                    logger.error("沒有找到該id的文檔" );
                }
            }
            
            
            //異步方式發送獲取文檔請求
            /*
            ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
                @Override
                public void onResponse(GetResponse getResponse) {
                    
                }
            
                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.getAsync(request, listener);
            */
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

 

4. Bulk

 

批量索引文檔,即批量往索引裏面放入文檔數據.相似於數據庫裏面批量向表裏面插入多行數據,一行數據就是一個文檔
BulkDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentType;

public class BulkDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        //這裏和RESTful風格不一樣
        try (TransportClient client = InitDemo.getClient();) {
            
            // 一、建立批量操做請求
            BulkRequest request = new BulkRequest(); 
            request.add(new IndexRequest("mess", "_doc", "1")  
                    .source(XContentType.JSON,"field", "foo"));
            request.add(new IndexRequest("mess", "_doc", "2")  
                    .source(XContentType.JSON,"field", "bar"));
            request.add(new IndexRequest("mess", "_doc", "3")  
                    .source(XContentType.JSON,"field", "baz"));
            
            /*
            request.add(new DeleteRequest("mess", "_doc", "3")); 
            request.add(new UpdateRequest("mess", "_doc", "2") 
                    .doc(XContentType.JSON,"other", "test"));
            request.add(new IndexRequest("mess", "_doc", "4")  
                    .source(XContentType.JSON,"field", "baz"));
            */
            
            // 二、可選的設置
            /*
            request.timeout("2m");
            request.setRefreshPolicy("wait_for");  
            request.waitForActiveShards(2);
            */
            
            
            //三、發送請求        
        
            // 同步請求
            BulkResponse bulkResponse = client.bulk(request).get();
            
            
            //四、處理響應
            if(bulkResponse != null) {
                for (BulkItemResponse bulkItemResponse : bulkResponse) { 
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        //TODO 新增成功的處理

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                       //TODO 修改爲功的處理

                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        //TODO 刪除成功的處理
                    }
                }
            }
            
            
            //異步方式發送批量操做請求
            /*
            ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse bulkResponse) {
                    
                }
            
                @Override
                public void onFailure(Exception e) {
                    
                }
            };
            client.bulkAsync(request, listener);
            */
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

5. search

搜索數據
SearchDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class SearchDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            
            // 一、建立search請求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("bank"); 
            searchRequest.types("_doc");
            
            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
            
            //構造QueryBuilder
            /*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
                    .fuzziness(Fuzziness.AUTO)
                    .prefixLength(3)
                    .maxExpansions(10);
            sourceBuilder.query(matchQueryBuilder);*/
            
            sourceBuilder.query(QueryBuilders.termQuery("age", 24)); 
            sourceBuilder.from(0); 
            sourceBuilder.size(10); 
            sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); 
            
            //是否返回_source字段
            //sourceBuilder.fetchSource(false);
            
            //設置返回哪些字段
            /*String[] includeFields = new String[] {"title", "user", "innerObject.*"};
            String[] excludeFields = new String[] {"_type"};
            sourceBuilder.fetchSource(includeFields, excludeFields);*/
            
            //指定排序
            //sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); 
            //sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));
            
            // 設置返回 profile 
            //sourceBuilder.profile(true);
            
            //將請求體加入到請求中
            searchRequest.source(sourceBuilder);
            
            // 可選的設置
            //searchRequest.routing("routing");
            
            // 高亮設置
            /*
            HighlightBuilder highlightBuilder = new HighlightBuilder(); 
            HighlightBuilder.Field highlightTitle =
                    new HighlightBuilder.Field("title"); 
            highlightTitle.highlighterType("unified");  
            highlightBuilder.field(highlightTitle);  
            HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
            highlightBuilder.field(highlightUser);
            sourceBuilder.highlighter(highlightBuilder);*/
            
            
            //加入聚合
            /*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
                    .field("company.keyword");
            aggregation.subAggregation(AggregationBuilders.avg("average_age")
                    .field("age"));
            sourceBuilder.aggregation(aggregation);*/
            
            //作查詢建議
            /*SuggestionBuilder termSuggestionBuilder =
                    SuggestBuilders.termSuggestion("user").text("kmichy"); 
                SuggestBuilder suggestBuilder = new SuggestBuilder();
                suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); 
            sourceBuilder.suggest(suggestBuilder);*/
            
            //三、發送請求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //四、處理響應
            //搜索結果狀態信息
            RestStatus status = searchResponse.status();
            TimeValue took = searchResponse.getTook();
            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
            boolean timedOut = searchResponse.isTimedOut();
            
            //分片搜索狀況
            int totalShards = searchResponse.getTotalShards();
            int successfulShards = searchResponse.getSuccessfulShards();
            int failedShards = searchResponse.getFailedShards();
            for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
                // failures should be handled here
            }
            
            //處理搜索命中文檔結果
            SearchHits hits = searchResponse.getHits();
            
            long totalHits = hits.getTotalHits();
            float maxScore = hits.getMaxScore();
            
            SearchHit[] searchHits = hits.getHits();
            for (SearchHit hit : searchHits) {
                // do something with the SearchHit
                
                String index = hit.getIndex();
                String type = hit.getType();
                String id = hit.getId();
                float score = hit.getScore();
                
                //取_source字段值
                String sourceAsString = hit.getSourceAsString(); //取成json串
                Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map對象
                //從map中取字段值
                /*
                String documentTitle = (String) sourceAsMap.get("title"); 
                List<Object> users = (List<Object>) sourceAsMap.get("user");
                Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
                */
                logger.info("index:" + index + "  type:" + type + "  id:" + id);
                logger.info(sourceAsString);
                
                //取高亮結果
                /*Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                HighlightField highlight = highlightFields.get("title"); 
                Text[] fragments = highlight.fragments();  
                String fragmentString = fragments[0].string();*/
            }
            
            // 獲取聚合結果
            /*
            Aggregations aggregations = searchResponse.getAggregations();
            Terms byCompanyAggregation = aggregations.get("by_company"); 
            Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic"); 
            Avg averageAge = elasticBucket.getAggregations().get("average_age"); 
            double avg = averageAge.getValue();
            */
            
            // 獲取建議結果
            /*Suggest suggest = searchResponse.getSuggest(); 
            TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); 
            for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { 
                for (TermSuggestion.Entry.Option option : entry) { 
                    String suggestText = option.getText().string();
                }
            }
            */
            
            //異步方式發送獲查詢請求
            /*
            ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
                @Override
                public void onResponse(SearchResponse getResponse) {
                    //結果獲取
                }
            
                @Override
                public void onFailure(Exception e) {
                    //失敗處理
                }
            };
            client.searchAsync(searchRequest, listener); 
            */
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }
}
複製代碼

6. highlight 高亮

HighlightDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;

public class HighlightDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            
            // 一、建立search請求
            SearchRequest searchRequest = new SearchRequest("hl_test"); 
            
            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
            
            //構造QueryBuilder
            QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "lucene solr");
            sourceBuilder.query(matchQueryBuilder);
            
            //分頁設置
            /*sourceBuilder.from(0); 
            sourceBuilder.size(5); ;*/ 
            
                    
            // 高亮設置
            HighlightBuilder highlightBuilder = new HighlightBuilder(); 
            highlightBuilder.requireFieldMatch(false).field("title").field("content")
                .preTags("<strong>").postTags("</strong>");
            //不一樣字段可有不一樣設置,如不一樣標籤
            /*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title"); 
            highlightTitle.preTags("<strong>").postTags("</strong>");
            highlightBuilder.field(highlightTitle);  
            HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content");
            highlightContent.preTags("<b>").postTags("</b>");
            highlightBuilder.field(highlightContent).requireFieldMatch(false);*/
            
            sourceBuilder.highlighter(highlightBuilder);
            
            searchRequest.source(sourceBuilder);
            
            //三、發送請求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //四、處理響應
            if(RestStatus.OK.equals(searchResponse.status())) {
                //處理搜索命中文檔結果
                SearchHits hits = searchResponse.getHits();
                long totalHits = hits.getTotalHits();
                
                SearchHit[] searchHits = hits.getHits();
                for (SearchHit hit : searchHits) {        
                    String index = hit.getIndex();
                    String type = hit.getType();
                    String id = hit.getId();
                    float score = hit.getScore();
                    
                    //取_source字段值
                    //String sourceAsString = hit.getSourceAsString(); //取成json串
                    Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map對象
                    //從map中取字段值
                    /*String title = (String) sourceAsMap.get("title"); 
                    String content  = (String) sourceAsMap.get("content"); */
                    logger.info("index:" + index + "  type:" + type + "  id:" + id);
                    logger.info("sourceMap : " +  sourceAsMap);
                    //取高亮結果
                    Map<String, HighlightField> highlightFields = hit.getHighlightFields();
                    HighlightField highlight = highlightFields.get("title"); 
                    if(highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段會有多個值
                        if(fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("title highlight : " +  fragmentString);
                            //可用高亮字符串替換上面sourceAsMap中的對應字段返回到上一級調用
                            //sourceAsMap.put("title", fragmentString);
                        }
                    }
                    
                    highlight = highlightFields.get("content"); 
                    if(highlight != null) {
                        Text[] fragments = highlight.fragments();  //多值的字段會有多個值
                        if(fragments != null) {
                            String fragmentString = fragments[0].string();
                            logger.info("content highlight : " +  fragmentString);
                            //可用高亮字符串替換上面sourceAsMap中的對應字段返回到上一級調用
                            //sourceAsMap.put("content", fragmentString);
                        }
                    }
                }
            }
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }
}
複製代碼

7. suggest 查詢建議

SuggestDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.SuggestionBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.search.suggest.term.TermSuggestion;

public class SuggestDemo {
    
    private static Logger logger = LogManager.getRootLogger();  
    
    //拼寫檢查
    public static void termSuggest(TransportClient client) {
            
        // 一、建立search請求
        //SearchRequest searchRequest = new SearchRequest();
        SearchRequest searchRequest = new SearchRequest("mess"); 
        
        // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
         
        sourceBuilder.size(0); 
        
        //作查詢建議        
        //詞項建議
        SuggestionBuilder termSuggestionBuilder =
                SuggestBuilders.termSuggestion("user").text("kmichy"); 
        SuggestBuilder suggestBuilder = new SuggestBuilder();
        suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);     
        sourceBuilder.suggest(suggestBuilder);
        
        searchRequest.source(sourceBuilder);    

        try{
            //三、發送請求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //四、處理響應
            //搜索結果狀態信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 獲取建議結果
                Suggest suggest = searchResponse.getSuggest(); 
                TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); 
                for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { 
                    logger.info("text: " + entry.getText().string());
                    for (TermSuggestion.Entry.Option option : entry) { 
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }

        } catch (InterruptedException | ExecutionException e) {
            logger.error(e);
        }
            /*
              "suggest": {
                "my-suggestion": [
                  {
                    "text": "tring",
                    "offset": 0,
                    "length": 5,
                    "options": [
                      {
                        "text": "trying",
                        "score": 0.8,
                        "freq": 1
                      }
                    ]
                  },
                  {
                    "text": "out",
                    "offset": 6,
                    "length": 3,
                    "options": []
                  },
                  {
                    "text": "elasticsearch",
                    "offset": 10,
                    "length": 13,
                    "options": []
                  }
                ]
              }*/

    }
    //自動補全
    public static void completionSuggester(TransportClient client) {
                
        // 一、建立search請求
        //SearchRequest searchRequest = new SearchRequest();
        SearchRequest searchRequest = new SearchRequest("music"); 
        
        // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
         
        sourceBuilder.size(0); 
        
        //作查詢建議        
        //自動補全
        /*POST music/_search?pretty
                {
                    "suggest": {
                        "song-suggest" : {
                            "prefix" : "lucene s", 
                            "completion" : { 
                                "field" : "suggest" ,
                                "skip_duplicates": true
                            }
                        }
                    }
                }*/

        SuggestionBuilder termSuggestionBuilder =
                SuggestBuilders.completionSuggestion("suggest").prefix("lucene s")
                .skipDuplicates(true); 
        SuggestBuilder suggestBuilder = new SuggestBuilder();
        suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder);     
        sourceBuilder.suggest(suggestBuilder);
        
        searchRequest.source(sourceBuilder);    
            
        try {
            //三、發送請求        
            SearchResponse searchResponse = client.search(searchRequest).get();
            
            
            //四、處理響應
            //搜索結果狀態信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 獲取建議結果
                Suggest suggest = searchResponse.getSuggest(); 
                CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest"); 
                for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) { 
                    logger.info("text: " + entry.getText().string());
                    for (CompletionSuggestion.Entry.Option option : entry) { 
                        String suggestText = option.getText().string();
                        logger.info("   suggest option : " + suggestText);
                    }
                }
            }

        } catch (InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            termSuggest(client);
            
            logger.info("--------------------------------------");
            
            completionSuggester(client);
        } catch (IOException e) {
            logger.error(e);
        }
    }
}
複製代碼

 

8. aggregation 聚合分析

 

AggregationDemo.java

 

複製代碼
package com.study.es_java_client;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;

public class AggregationDemo {
    
    private static Logger logger = LogManager.getRootLogger();  

    public static void main(String[] args) {
        try (TransportClient client = InitDemo.getClient();) {
            
            // 一、建立search請求
            //SearchRequest searchRequest = new SearchRequest();
            SearchRequest searchRequest = new SearchRequest("bank"); 
            
            // 二、用SearchSourceBuilder來構造查詢請求體 ,請仔細查看它的方法,構造各類查詢的方法都在這。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 
             
            sourceBuilder.size(0); 

            //加入聚合
            //字段值項分組聚合
            TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age")
                    .field("age").order(BucketOrder.aggregation("average_balance", true));
            //計算每組的平均balance指標
            aggregation.subAggregation(AggregationBuilders.avg("average_balance")
                    .field("balance"));
            sourceBuilder.aggregation(aggregation);
            
            searchRequest.source(sourceBuilder);
            
            //三、發送請求        
            SearchResponse searchResponse = client.search(searchRequest).get();
                
            //四、處理響應
            //搜索結果狀態信息
            if(RestStatus.OK.equals(searchResponse.status())) {
                // 獲取聚合結果
                Aggregations aggregations = searchResponse.getAggregations();
                Terms byAgeAggregation = aggregations.get("by_age"); 
                logger.info("aggregation by_age 結果");
                logger.info("docCountError: " + byAgeAggregation.getDocCountError());
                logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts());
                logger.info("------------------------------------");
                for(Bucket buck : byAgeAggregation.getBuckets()) {
                    logger.info("key: " + buck.getKeyAsNumber());
                    logger.info("docCount: " + buck.getDocCount());
                    //logger.info("docCountError: " + buck.getDocCountError());
                    //取子聚合
                    Avg averageBalance = buck.getAggregations().get("average_balance"); 

                    logger.info("average_balance: " + averageBalance.getValue());
                    logger.info("------------------------------------");
                }
                //直接用key 來去分組
                /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24"); 
                Avg averageAge = elasticBucket.getAggregations().get("average_age"); 
                double avg = averageAge.getValue();*/
                
            }
            
        } catch (IOException | InterruptedException | ExecutionException e) {
            logger.error(e);
        }
    }
}
複製代碼

 

9. 官網文檔

 

Document API 文檔操做API:

 

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-docs.html

 

Search API:

 

https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-search.html

10.源代碼獲取地址

https://github.com/Star-Lordxing/ES-java-client-api

 

7、集成Spring

1.集成spring參考官方文檔

官網連接:

https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/

代碼庫:

https://github.com/spring-projects/spring-data-elasticsearch

2.集成spring boot

參考博客:https://blog.csdn.net/yejingtao703/article/details/78414874

spring 最新集成包只有ES 5.5,推薦使用ES提供原生Client包

 

參考

Elasticsearch API

https://www.cnblogs.com/leeSmall/p/9218779.html

相關文章
相關標籤/搜索