Java High Level REST Client提供了 Bulk處理器來幫助處理批量請求。
BulkRequest
可使用一個請求執行多個索引、更新和/或刪除操做。服務器
它須要在批量請求中添加至少一個操做:併發
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts").id("1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts").id("2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts").id("3") .source(XContentType.JSON,"field", "baz"));
BulkRequest
。IndexRequest
添加到Bulk
請求。Bulk API只支持 JSON或 SMILE編碼的文檔,提供任何其餘格式的文檔都會致使錯誤。
不一樣的操做類型能夠添加到同一個BulkRequest
:異步
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "3")); request.add(new UpdateRequest("posts", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts").id("4") .source(XContentType.JSON,"field", "baz"));
BulkRequest
添加DeleteRequest
。BulkRequest
添加UpdateRequest
。IndexRequest
。能夠選擇提供如下參數:ide
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
TimeValue
等待bulk請求執行的超時。String
等待bulk請求執行的超時。request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
WriteRequest.RefreshPolicy
實例的刷新策略。String
的刷新策略。request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL);
ActiveShardCount
提供的碎片副本的數量:能夠是ActiveShardCount.ALL
、ActiveShardCount.ONE
、ActiveShardCount.DEFAULT
(默認)。request.pipeline("pipelineId");
pipelineId
用於全部子請求,除非在子請求上重寫。request.routing("routingId");
routingId
用於全部子請求,除非在子請求上重寫。BulkRequest defaulted = new BulkRequest("posts");
@Nullable
,並只能在建立BulkRequest
時設置。當以如下方式執行BulkRequest
時,客戶端等待BulkResponse
返回,而後繼續執行代碼:post
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
在高級別REST客戶端中解析REST響應失敗、請求超時或相似的狀況,其中沒有來自服務器的響應的狀況下,同步調用可能引起IOException
。ui
在服務器返回4xx
或5xx
錯誤代碼的狀況下,高級別客戶端嘗試解析響應體錯誤細節,而後拋出一個通用的ElasticsearchException
並將原始的ResponseException
做爲一個被抑制的異常添加到它。編碼
還能夠以異步方式執行BulkRequest
,以便客戶端能夠直接返回,用戶須要指定如何經過將請求和偵聽器傳遞給異步塊方法來處理響應或潛在故障:debug
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
BulkRequest
和在執行完成時使用的ActionListener
。異步方法不會阻塞並當即返回,一旦執行完成,ActionListener
將使用onResponse
方法(若是執行成功)被調用,或者使用onFailure
方法(若是執行失敗)被調用,失敗狀況和預期的異常與同步執行狀況相同。code
一個典型的bulk監聽器是這樣的:索引
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } };
onResponse
當執行成功完成時調用。onFailure
當整個BulkRequest
失敗時調用。返回的BulkResponse
包含執行操做的信息,容許對每一個結果進行以下迭代:
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); switch (bulkItemResponse.getOpType()) { case INDEX: case CREATE: IndexResponse indexResponse = (IndexResponse) itemResponse; break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponse; break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
IndexResponse
、UpdateResponse
或DeleteResponse
,它們均可以看做DocWriteResponse
實例。Bulk響應提供了一種方法來快速檢查一個或多個操做是否失敗:
if (bulkResponse.hasFailures()) { }
true
。在這種狀況下,須要對全部的操做結果進行迭代,以檢查操做是否失敗,若是失敗,則檢索相應的失敗:
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); } }
BulkProcessor
提供了一個實用程序類,容許索引/更新/刪除操做在添加處處理器時透明地執行,從而簡化了Bulk API的使用。
爲了執行請求,BulkProcessor
須要如下組件:
RestHighLevelClient
BulkRequest
並檢索BulkResponse
。BulkProcessor.Listener
BulkRequest
以前和以後,或者當一個BulkRequest
失敗時,都會調用這個偵聽器。而後BulkProcessor.builder
方法能夠用來構建一個新的BulkProcessor
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener).build();
BulkProcessor.Listener
。beforeBulk
方法在每次執行BulkRequest
以前調用。afterBulk
方法在每次執行BulkRequest
以後調用。failure
參數的afterBulk
方法在BulkRequest
失敗時調用。BulkProcessor.builder
調用build()
方法建立BulkProcessor
,RestHighLevelClient.bulkAsync()
方法將用於在後臺執行BulkRequest
。BulkProcessor.Builder
提供了一些方法來配置BulkProcessor
應該如何處理請求執行:
BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); builder.setBulkActions(500); builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
1000
,使用-1
禁用它)。5Mb
,使用-1
禁用)。1
,使用0
只容許執行單個請求)。BulkRequest
(默認爲未設置)。BackoffPolicy.noBackoff()
、BackoffPolicy.constantBackoff()
和BackoffPolicy.exponentialBackoff()
。一旦建立了BulkProcessor
,就能夠向它添加請求:
IndexRequest one = new IndexRequest("posts").id("1") .source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts").id("2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts").id("3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
請求將由BulkProcessor
執行,它負責爲每一個bulk請求調用BulkProcessor.Listener
。
監聽器提供訪問BulkRequest
和BulkResponse
的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); } };
beforeBulk
在執行BulkRequest
的每次執行以前調用,這個方法容許知道將要在BulkRequest
中執行的操做的數量。afterBulk
在每次執行BulkRequest
以後調用,這個方法容許知道BulkResponse
是否包含錯誤。BulkRequest
失敗,則調用帶failure
參數的afterBulk
方法,該方法容許知道失敗。將全部請求添加到BulkProcessor
以後,須要使用兩種可用的關閉方法之一關閉它的實例。
awaitClose()
方法能夠用來等待,直到全部的請求都被處理完畢或者指定的等待時間過去:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
true
,若是在全部bulk請求完成以前的等待時間已通過去,則返回false
。close()
方法可用於當即關閉BulkProcessor
:
這兩種方法都在關閉處理器以前刷新添加處處理器的請求,而且禁止向處理器添加任何新請求。