Elasticsearch Java High Level REST Client(Bulk API)

Bulk API

Java High Level REST Client提供了 Bulk處理器來幫助處理批量請求。

Bulk請求

BulkRequest可使用一個請求執行多個索引、更新和/或刪除操做。服務器

它須要在批量請求中添加至少一個操做:併發

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));
  • 建立BulkRequest
  • IndexRequest添加到Bulk請求。
Bulk API只支持 JSONSMILE編碼的文檔,提供任何其餘格式的文檔都會致使錯誤。

不一樣的操做類型能夠添加到同一個BulkRequest異步

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));
  • BulkRequest添加DeleteRequest
  • BulkRequest添加UpdateRequest
  • 使用JSON格式添加IndexRequest

可選參數

能夠選擇提供如下參數:ide

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");
  • 做爲TimeValue等待bulk請求執行的超時。
  • 做爲String等待bulk請求執行的超時。
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");
  • 做爲WriteRequest.RefreshPolicy實例的刷新策略。
  • 做爲String的刷新策略。
request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);
  • 設置在繼續執行索引/更新/刪除操做以前必須活動的碎片副本的數量。
  • 做爲ActiveShardCount提供的碎片副本的數量:能夠是ActiveShardCount.ALLActiveShardCount.ONEActiveShardCount.DEFAULT(默認)。
request.pipeline("pipelineId");
  • 全局pipelineId用於全部子請求,除非在子請求上重寫。
request.routing("routingId");
  • 全局routingId用於全部子請求,除非在子請求上重寫。
BulkRequest defaulted = new BulkRequest("posts");
  • 在全部子請求上使用全局索引的bulk請求,除非在子請求上重寫,這個參數是@Nullable,並只能在建立BulkRequest時設置。

同步執行

當以如下方式執行BulkRequest時,客戶端等待BulkResponse返回,而後繼續執行代碼:post

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

在高級別REST客戶端中解析REST響應失敗、請求超時或相似的狀況,其中沒有來自服務器的響應的狀況下,同步調用可能引起IOExceptionui

在服務器返回4xx5xx錯誤代碼的狀況下,高級別客戶端嘗試解析響應體錯誤細節,而後拋出一個通用的ElasticsearchException並將原始的ResponseException做爲一個被抑制的異常添加到它。編碼

異步執行

還能夠以異步方式執行BulkRequest,以便客戶端能夠直接返回,用戶須要指定如何經過將請求和偵聽器傳遞給異步塊方法來處理響應或潛在故障:debug

client.bulkAsync(request, RequestOptions.DEFAULT, listener);
  • 要執行的BulkRequest和在執行完成時使用的ActionListener

異步方法不會阻塞並當即返回,一旦執行完成,ActionListener將使用onResponse方法(若是執行成功)被調用,或者使用onFailure方法(若是執行失敗)被調用,失敗狀況和預期的異常與同步執行狀況相同。code

一個典型的bulk監聽器是這樣的:索引

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};
  • onResponse當執行成功完成時調用。
  • onFailure當整個BulkRequest失敗時調用。

Bulk響應

返回的BulkResponse包含執行操做的信息,容許對每一個結果進行以下迭代:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    
    case CREATE:
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}
  • 遍歷全部操做的結果。
  • 檢索操做的響應(成功與否),能夠是IndexResponseUpdateResponseDeleteResponse,它們均可以看做DocWriteResponse實例。
  • 處理索引操做的響應。
  • 處理更新操做的響應。
  • 處理刪除操做的響應。

Bulk響應提供了一種方法來快速檢查一個或多個操做是否失敗:

if (bulkResponse.hasFailures()) { 

}
  • 若是至少有一個操做失敗,此方法將返回true

在這種狀況下,須要對全部的操做結果進行迭代,以檢查操做是否失敗,若是失敗,則檢索相應的失敗:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure(); 
    }
}
  • 指示給定操做是否失敗。
  • 檢索失敗操做的失敗。

Bulk處理器

BulkProcessor提供了一個實用程序類,容許索引/更新/刪除操做在添加處處理器時透明地執行,從而簡化了Bulk API的使用。

爲了執行請求,BulkProcessor須要如下組件:

RestHighLevelClient

  • 此客戶端用於執行BulkRequest並檢索BulkResponse

BulkProcessor.Listener

  • 在每次執行BulkRequest以前和以後,或者當一個BulkRequest失敗時,都會調用這個偵聽器。

而後BulkProcessor.builder方法能夠用來構建一個新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();
  • 建立BulkProcessor.Listener
  • beforeBulk方法在每次執行BulkRequest以前調用。
  • afterBulk方法在每次執行BulkRequest以後調用。
  • failure參數的afterBulk方法在BulkRequest失敗時調用。
  • 經過從BulkProcessor.builder調用build()方法建立BulkProcessorRestHighLevelClient.bulkAsync()方法將用於在後臺執行BulkRequest

BulkProcessor.Builder提供了一些方法來配置BulkProcessor應該如何處理請求執行:

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
  • 根據當前添加的操做數量設置刷新新bulk請求的時間(默認爲1000,使用-1禁用它)。
  • 根據當前添加的操做大小設置刷新新bulk請求的時間(默認爲5Mb,使用-1禁用)。
  • 設置容許執行的併發請求數量(默認爲1,使用0只容許執行單個請求)。
  • 設置刷新間隔,若是間隔經過,則刷新任何掛起的BulkRequest(默認爲未設置)。
  • 設置一個常量後退策略,該策略最初等待1秒並重試最多3次,有關更多選項,請參見BackoffPolicy.noBackoff()BackoffPolicy.constantBackoff()BackoffPolicy.exponentialBackoff()

一旦建立了BulkProcessor,就能夠向它添加請求:

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

請求將由BulkProcessor執行,它負責爲每一個bulk請求調用BulkProcessor.Listener

監聽器提供訪問BulkRequestBulkResponse的方法:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        logger.error("Failed to execute bulk", failure); 
    }
};
  • beforeBulk在執行BulkRequest的每次執行以前調用,這個方法容許知道將要在BulkRequest中執行的操做的數量。
  • afterBulk在每次執行BulkRequest以後調用,這個方法容許知道BulkResponse是否包含錯誤。
  • 若是BulkRequest失敗,則調用帶failure參數的afterBulk方法,該方法容許知道失敗。

將全部請求添加到BulkProcessor以後,須要使用兩種可用的關閉方法之一關閉它的實例。

awaitClose()方法能夠用來等待,直到全部的請求都被處理完畢或者指定的等待時間過去:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
  • 若是全部bulk請求都已完成,則該方法返回true,若是在全部bulk請求完成以前的等待時間已通過去,則返回false

close()方法可用於當即關閉BulkProcessor

這兩種方法都在關閉處理器以前刷新添加處處理器的請求,而且禁止向處理器添加任何新請求。

相關文章
相關標籤/搜索