Elasticsearch6.8.3文檔類API

1.鏈接集羣

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

public class InitClient {
    public static RestHighLevelClient getClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("htkj101", 9200, "http"),
                        new HttpHost("htkj102", 9200, "http"),
                        new HttpHost("htkj103", 9200, "http")
                        )
        );
        return client;
    };
}

2.CreatIndex

import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class CreateIndex {
    public static void main(String ags[]){
        createIndex1();
        createIndex2();
        createIndex3();
        createIndex4();
    }
    /**
    * 第一種建立Index方法
    * */
    private static void createIndex1(){
        try(RestHighLevelClient client = InitClient.getClient()){
            IndexRequest request = new IndexRequest("posts","doc","1");
            String jsonString="{" +
                    "\"user\":\"kimchy\"," +
                    "\"postDate\":\"2013-01-30\"," +
                    "\"message\":\"trying out Elasticsearch\"" +
                    "}";
            request.source(jsonString, XContentType.JSON);
            synExecution(client,request);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     *  第二種建立Index方法
     * */
    private static void createIndex2(){
        try(RestHighLevelClient client = InitClient.getClient()){
            Map<String, Object> jsonMap = new HashMap<>();
            jsonMap.put("user", "kimchy");
            jsonMap.put("postDate", new Date());
            jsonMap.put("message", "trying out Elasticsearch");
            IndexRequest indexRequest = new IndexRequest("posts", "doc", "2")
                    .source(jsonMap);
            synExecution(client,indexRequest);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 第三種建立Index方法
     * */
    private static void createIndex3() {
        try(RestHighLevelClient client = InitClient.getClient()){
            XContentBuilder builder= XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.field("user", "kimchy");
                builder.timeField("postDate", new Date());
                builder.field("message", "trying out Elasticsearch");
            }
            builder.endObject();
            IndexRequest request = new IndexRequest("posts", "doc", "3")
                    .source(builder);
            synExecution(client,request);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 第四種建立Index方法
     * */
    private static void createIndex4(){
        try(RestHighLevelClient client = InitClient.getClient()){
            IndexRequest request=new IndexRequest("posts","doc","4")
                    .source("user", "kimchy",
                            "postDate", new Date(),
                            "message", "trying out Elasticsearch");
            synExecution(client,request);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void synExecution(RestHighLevelClient client,IndexRequest request) throws IOException {
        IndexResponse indexResponse=client.index(request, RequestOptions.DEFAULT);
        IndexResponse(indexResponse);
    }
    private static void asynExecution(RestHighLevelClient client,IndexRequest request){
        ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
                @Override
                public void onResponse(IndexResponse indexResponse) {
                   //當執行成功時 調用這裏
                    System.out.println("執行成功");
                }
                @Override
                public void onFailure(Exception e) {
                    //失敗時調用這個
                    System.out.println("執行失敗");
                }
            };
        client.indexAsync(request, RequestOptions.DEFAULT, listener);
    }
    public static void IndexResponse(IndexResponse indexResponse){
        String index = indexResponse.getIndex();
        String type = indexResponse.getType();
        String id = indexResponse.getId();
        long version = indexResponse.getVersion();
        if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
            //首次建立文檔的狀況
            System.out.println("第一次被建立");
        } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            //已存在的文檔被重寫
            System.out.println("更新成功");
        }
        System.out.println(" index: "+index+" type: "+type+" id: "+id+" version: "+version);
        ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
            //處理成功分片數量少於總分片數量的狀況  ---沒懂是什麼意思
        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure :
                    shardInfo.getFailures()) {
                String reason = failure.reason(); //出錯的狀況
                System.out.println(reason);
            }
        }
    }
}

3.Get

import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Map;

public class Get {
    public static void main(String ags[]){
        getRequest();
    }
    private static void getRequest(){
        try(RestHighLevelClient client = InitClient.getClient()){
            GetRequest getRequest = new GetRequest("posts", "doc", "1");
            synExecution(client,getRequest);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private static void synExecution(RestHighLevelClient client,GetRequest getRequest ) throws IOException {
        GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
        getResponse(getResponse);
    }
    private static void asynExecution(RestHighLevelClient client,GetRequest getRequest){
        ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
            @Override
            public void onResponse(GetResponse getResponse) {
                System.out.println("執行成功時調用");
            }

            @Override
            public void onFailure(Exception e) {
                System.out.println("執行失敗時調用");
            }
        };
        client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
    }
    private static void getResponse(GetResponse getResponse){
        String index = getResponse.getIndex();
        String type = getResponse.getType();
        String id = getResponse.getId();
        if (getResponse.isExists()) {
            long version = getResponse.getVersion();
            //將文檔變爲String類型
            String sourceAsString = getResponse.getSourceAsString();
            //將文檔變爲map
            Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
            //將文檔變爲Byte
            byte[] sourceAsBytes = getResponse.getSourceAsBytes();
            System.out.println("文檔--->"+sourceAsString);
        } else {
            System.out.println("找不到文檔");
        }
    }
    private static void failMethod(){
        //若是查找一個索引,索引不存在,應當按照以下的方式進行處理
        try(RestHighLevelClient client = InitClient.getClient()){
            GetRequest request = new GetRequest("does_not_exist", "doc", "1");
            try {
                GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
            } catch (ElasticsearchException e) {
                if (e.status() == RestStatus.NOT_FOUND) {
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }


    }
}

4.Exsits

import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;

public class Exists {
    public static void main(String ags[]){
         existsRequest();
    }
    private static void existsRequest(){
        try(RestHighLevelClient client = InitClient.getClient()){
            GetRequest getRequest = new GetRequest("posts", "doc", "1");
            //禁用獲取_source
            getRequest.fetchSourceContext(new FetchSourceContext(false));
            //禁用獲取的存儲字段
            getRequest.storedFields("_none_");
            synExecution(client,getRequest);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    private static void synExecution(RestHighLevelClient client,GetRequest getRequest) throws IOException {
        boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
    }
    private static void asynExecution(RestHighLevelClient client,GetRequest getRequest){
        ActionListener<Boolean> listener = new ActionListener<Boolean>() {
            @Override
            public void onResponse(Boolean exists) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        };
        client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
    }

}

5.Update

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class Update {
    public static void main(String[] args) {

    }
    private static void updateRequest(){
        UpdateRequest request = new UpdateRequest("posts", "doc", "1");
    }
    private static void update1(){
        UpdateRequest request = new UpdateRequest("posts", "doc", "1");
        String jsonString = "{" +
                "\"updated\":\"2017-01-01\"," +
                "\"reason\":\"daily update\"" +
                "}";
        request.doc(jsonString, XContentType.JSON);
    }
    private static void update2(){
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("updated", new Date());
        jsonMap.put("reason", "daily update");
        UpdateRequest request = new UpdateRequest("posts", "doc", "1")
                .doc(jsonMap);
    }
    private static void update3() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.timeField("updated", new Date());
            builder.field("reason", "daily update");
        }
        builder.endObject();
        UpdateRequest request = new UpdateRequest("posts", "doc", "1")
                .doc(builder);
    }
    private static void update4(){
        UpdateRequest request = new UpdateRequest("posts", "doc", "1")
                .doc("updated", new Date(),
                        "reason", "daily update");
    }
    /**
     *若是文檔尚不存在,則可使用如下upsert方法定義一些內容,這些內容將做爲新文檔插入
     **/
    private static void upsert(UpdateRequest request){
        String jsonString = "{\"created\":\"2017-01-01\"}";
        request.upsert(jsonString, XContentType.JSON);
    }
    private static void synExecution(RestHighLevelClient client,UpdateRequest request) throws IOException {
        UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
    }
    private static void asynExecution(RestHighLevelClient client,UpdateRequest request){
        ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
            @Override
            public void onResponse(UpdateResponse updateResponse) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        };
        client.updateAsync(request, RequestOptions.DEFAULT, listener);
    }
    public static void updateResponse(UpdateResponse updateResponse){
        String index = updateResponse.getIndex();
        String type = updateResponse.getType();
        String id = updateResponse.getId();
        long version = updateResponse.getVersion();
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            //處理首次建立文檔的狀況(upsert)
            System.out.println("首次建立文檔");

        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            //處理文檔更新的狀況
            System.out.println("update更新成功");

        } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
            //處理文件被刪除的狀況
            System.out.println("刪除成功");
        } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
            //處理文檔不受更新影響的狀況,即未對文檔執行任何操做(空轉)
            System.out.println("沒有執行任何操做");
        }
        System.out.println(" index: "+index+" type: "+type+" id: "+id+" version: "+version);
        ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure :
                    shardInfo.getFailures()) {
                String reason = failure.reason();
            }
        }
    }
}

6.Delete

import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;

import java.io.IOException;

public class Delete {
    public static void main(String ags[]){
      deleteRequest();
    }
    private static void deleteRequest(){
        try(RestHighLevelClient client = InitClient.getClient()){
            DeleteRequest request = new DeleteRequest("posts", "doc", "2");
            synExecution(client,request);
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private static void synExecution(RestHighLevelClient client, DeleteRequest request) throws IOException {
        DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
        deleteResponse(deleteResponse);
    }
    private static void asynExecution(RestHighLevelClient client, DeleteRequest request){
        ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
            @Override
            public void onResponse(DeleteResponse deleteResponse) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        };
        client.deleteAsync(request, RequestOptions.DEFAULT, listener);
    }
    public static void deleteResponse(DeleteResponse deleteResponse) throws IOException {
        String index = deleteResponse.getIndex();
        String type = deleteResponse.getType();
        String id = deleteResponse.getId();
        long version = deleteResponse.getVersion();
        ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
        System.out.println("刪除的是"+" index: "+index+" type: "+type+" id: "+id+" version: "+version);
        if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

        }
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure :
                    shardInfo.getFailures()) {
                String reason = failure.reason();
            }
        }
    }
    private static void deleteFail(RestHighLevelClient client) throws IOException {
        DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
        DeleteResponse deleteResponse = client.delete(
                request, RequestOptions.DEFAULT);
        if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
            System.out.println("沒有找到");
        }
    }
}

7.Bulk(批量操做)

import com.htkj.elasticsearch.InitClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

public class Bulk {
    public static void main(String[] args) {
        test();
    }
    private static Logger logger = LogManager.getRootLogger();
    private static void bulkRequest(){
        //建立BulkRequest
        BulkRequest request = new BulkRequest();
        //添加indexRequest
        //批量操做僅支持json或SMILE編碼的文檔,其餘格式會錯誤
        request.add(new IndexRequest("posts", "doc", "1")
                .source(XContentType.JSON,"field", "foo"));
        request.add(new IndexRequest("posts", "doc", "2")
                .source(XContentType.JSON,"field", "bar"));
        request.add(new IndexRequest("posts", "doc", "3")
                .source(XContentType.JSON,"field", "baz"));
    }
    private static void bulkOtherRequest(){
        //能夠將不一樣的操做類型添加到相同的BulkRequest中
        BulkRequest request = new BulkRequest();
        //添加DeleteRequest
        request.add(new DeleteRequest("posts", "doc", "3"));
        //添加UpdateRequest
        request.add(new UpdateRequest("posts", "doc", "2")
                .doc(XContentType.JSON,"other", "test"));
        //添加IndexRequest
        request.add(new IndexRequest("posts", "doc", "4")
                .source(XContentType.JSON,"field", "baz"));
    }
    private static void argsOptional(){
        //其餘的可選參數
        BulkRequest request = new BulkRequest();
        //第一種設置超時時間 超時時間爲2min
        request.timeout(TimeValue.timeValueMinutes(2));
        //第二種設置超時時間 超時時間爲2min
        request.timeout("2m");
        //第一種設置刷新的方式 將刷新的方式設置爲WriteRequest.RefreshPolicy
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //第二種設置刷新方式
        request.setRefreshPolicy("wait_for");
    }
    private static void synExecution(RestHighLevelClient client,BulkRequest request) throws IOException {
        BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

    }
    private static void asynExecution(RestHighLevelClient client,BulkRequest request){
        ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkResponse) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        };
        client.bulkAsync(request, RequestOptions.DEFAULT, listener);
    }
    private static void bulkResonse(BulkResponse bulkResponse) {
        //遍歷全部結果
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            //IndexResponse, UpdateResponse  DeleteResponse 都會被視爲DocWriteResponse的實例
            DocWriteResponse itemResponse = bulkItemResponse.getResponse();
            switch (bulkItemResponse.getOpType()) {
                case INDEX:
                case CREATE:
                    IndexResponse indexResponse = (IndexResponse) itemResponse;
                    //調用方法
                    CreateIndex.IndexResponse(indexResponse);
                    break;
                case UPDATE:
                    UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                    //調用方法
                    Update.updateResponse(updateResponse);
                    break;
                case DELETE:
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                    try {
                        //調用方法
                        Delete.deleteResponse(deleteResponse);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                default:
            }
        }
    }
    private static void bulkResponseFail(BulkResponse bulkResponse){
        if (bulkResponse.hasFailures()) {
            for (BulkItemResponse bulkItemResponse : bulkResponse) {//操做是否失敗
                if (bulkItemResponse.isFailed()) {
                    //檢索失敗的操做
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                }
            }
        }
    }
    /**
     *  BulkProcessor 經過提供一個工具類 容許index update delete操做可以被添加到processor中被執行
     *  爲了執行請求,BulkProcessor須要如下組件
     *  RestHighLevelClient 該客戶端用於執行BulkRequest 和檢索BulkResponse
     *  BulkProcessor.Listener  在每次BulkRequest執行以前或以後或BulkRequest失敗時調用此監聽器
     *  而後BulkProcessor.builder方法可用於構建新的 BulkProcessor:
     *
     * */
    private static void bulkProcessor(RestHighLevelClient client) throws InterruptedException {
        //建立BulkProcessor.Listener
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            /**
             * 此方法在每次執行bulkrequest以前調用
             *
             * */
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                //在每次執行bulkrequest以前調用,此方法容許知道將在bulkrequest中執行的操做數
                int numberOfActions = request.numberOfActions();
                logger.debug("Executing bulk [{}] with {} requests",
                        executionId, numberOfActions);
            }
            /**
             * 此方法在每次執行bulkrequest以後調用
             *
             * */
            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                //在每次執行以後調用BulkRequest,此方法容許知道是否BulkResponse包含錯誤
                if (response.hasFailures()) {
                    logger.warn("Bulk [{}] executed with failures", executionId);
                } else {
                    logger.debug("Bulk [{}] completed in {} milliseconds",
                            executionId, response.getTook().getMillis());
                }
            }
            /**
             * 當bulkrequest失敗時調用此方法
             *
             * */
            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                logger.error("Failed to execute bulk", failure);
            }
        };
        //RestHighLevelClient.bulkAsync() 方法將用於執行BulkRequest後臺操做
        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
        //BulkProcessor經過build()從中調用方法來建立BulkProcessor.Builder
        BulkProcessor bulkProcessor   = BulkProcessor.builder(bulkConsumer, listener).build();
        //BulkProcessor.Builder提供的方法來配置如何 BulkProcessor應該處理請求的執行:
        BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
        //根據當前添加的操做數設置什麼時候刷新新的批量請求(默認爲1000,使用-1禁用它)
        builder.setBulkActions(500);
        //根據當前添加的操做大小設置什麼時候刷新新的批量請求(默認爲5Mb,使用-1禁用它)
        builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
        //設置容許執行的併發請求數(默認爲1,使用0僅容許執行單個請求)
        builder.setConcurrentRequests(0);
        //設置刷新間隔,若是間隔經過,則刷新任何掛起的bulkrequest(默認爲未設置)
        builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
        //設置一個初始等待1秒並最多重試3次的回退策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
        //以上 也能夠寫成這樣的形式
        BulkProcessor bulkProcessor2   = BulkProcessor.builder(bulkConsumer, listener)
                .setBulkActions(500)
                .setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
                .setConcurrentRequests(0)
                .setFlushInterval(TimeValue.timeValueSeconds(10L))
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
                .build();
        //BulkProcessor 被建立後 requests 就能夠添加進BulkProcessor 中
        IndexRequest one = new IndexRequest("posts", "doc", "1").
                source(XContentType.JSON, "title",
                        "In which order are my Elasticsearch queries executed?");
        IndexRequest two = new IndexRequest("posts", "doc", "2")
                .source(XContentType.JSON, "title",
                        "Current status and upcoming changes in Elasticsearch");
        IndexRequest three = new IndexRequest("posts", "doc", "3")
                .source(XContentType.JSON, "title",
                        "The Future of Federated Search in Elasticsearch");
        bulkProcessor2.add(one);
        bulkProcessor2.add(two);
        bulkProcessor2.add(three);
        //將全部請求添加到後BulkProcessor,須要關閉其實例
        //這裏給出兩種方式 第一種是awaitClose()方法
        //該awaitClose()方法可用於等待全部請求都已處理或通過指定的等待時間:
        boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
        //第二種是close()方法 這將當即關閉BulkProcessor
        bulkProcessor.close();

    }
    /**
    *使用bulkProcessor 批量建立Index
    * */
    private static void test(){
        try(RestHighLevelClient client = InitClient.getClient()){
            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            //listener
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {

                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    int numberOfActions = request.numberOfActions();
                    logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    //response
                    bulkResonse(response);
                    if (response.hasFailures()) {
                        logger.warn("Bulk [{}] executed with failures", executionId);
                    } else {
                        logger.debug("Bulk [{}] completed in {} milliseconds",
                                executionId, response.getTook().getMillis());
                    }
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                    logger.error("Failed to execute bulk", failure);
                }
            };
            //build
            BulkProcessor bulkProcessor   = BulkProcessor.builder(bulkConsumer, listener)
                    .setBulkActions(500)
                    .setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
                    .setConcurrentRequests(0)
                    .setFlushInterval(TimeValue.timeValueSeconds(10L))
                    .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
                    .build();
            //request
            IndexRequest one = new IndexRequest("posts", "doc", "1").
                    source(XContentType.JSON, "title",
                            "In which order are my Elasticsearch queries executed?");
            IndexRequest two = new IndexRequest("posts", "doc", "2")
                    .source(XContentType.JSON, "title",
                            "Current status and upcoming changes in Elasticsearch");
            IndexRequest three = new IndexRequest("posts", "doc", "3")
                    .source(XContentType.JSON, "title",
                            "The Future of Federated Search in Elasticsearch");

            DeleteRequest one2 = new DeleteRequest("posts", "doc", "3");

            UpdateRequest two2 = new UpdateRequest("posts", "doc", "2")
                    .doc(XContentType.JSON, "other", "test");
            IndexRequest three2 = new IndexRequest("posts", "doc", "4")
                    .source(XContentType.JSON, "field", "baz");
            //add
//            bulkProcessor.add(one);
//            bulkProcessor.add(two);
//            bulkProcessor.add(three);
            bulkProcessor.add(one2);
            bulkProcessor.add(two2);
            bulkProcessor.add(three2);
            //close
            boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
            client.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
相關文章
相關標籤/搜索