承接上文,使用Java High Level REST Client操做elasticsearchhtml
高級客戶端提供了批量處理器以協助批量請求java
BulkRequest能夠在一次請求中執行多個索引,更新或者刪除操做。一次請求至少須要一個操做。併發
//建立BulkRequest實例 BulkRequest request = new BulkRequest(); //使用IndexRequest添加三個文檔,不清楚用法能夠參考Index API request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz"));
Bulk API僅支持以JSON或SMILE編碼的文檔。 提供任何其餘格式的文檔將致使錯誤。異步
同一個BulkRequest能夠添加不一樣類型的操做。elasticsearch
// 添加 DeleteRequest到BulkRequest,不清楚用法能夠參考Delete API request.add(new DeleteRequest("posts", "doc", "3")); // 添加 UpdateRequest到BulkRequest,不清楚用法能夠參考Update API request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON, "other", "test")); // 添加 一個使用SMILE格式的IndexRequest request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.SMILE, "field", "baz"));
可選參數ide
//設置超時,等待批處理被執行的超時時間(使用TimeValue形式) request.timeout(TimeValue.timeValueMinutes(2)); //設置超時,等待批處理被執行的超時時間(字符串形式) request.timeout("2m");
//刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy實例方式 request.setRefreshPolicy("wait_for");//字符串方式
//設置在執行索引/更新/刪除操做以前必須處於活動狀態的分片副本數。 request.waitForActiveShards(2); //使用ActiveShardCount方式來提供分片副本數:能夠是ActiveShardCount.ALL,ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默認) request.waitForActiveShards(ActiveShardCount.ALL);
同步執行工具
BulkResponse bulkResponse = client.bulk(request);
異步執行post
批量請求的異步執行須要將BulkRequest實例和ActionListener實例傳遞給異步方法:ui
//當BulkRequest執行完成時,ActionListener會被調用 client.bulkAsync(request, listener);
異步方法不會阻塞並會當即返回。完成後,若是執行成功完成,則使用onResponse方法回調ActionListener,若是失敗則使用onFailure方法。
BulkResponse 的典型監聽器以下所示:編碼
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { //執行成功完成時調用。 response做爲參數提供,幷包含已執行的每一個操做的單個結果列表。 請注意,一個或多個操做可能已失敗,然而其餘操做已成功執行。 } @Override public void onFailure(Exception e) { //在整個BulkRequest失敗時調用。 在這種狀況下,exception做爲參數提供,而且沒有執行任何操做。 } };
返回的BulkResponse包含有關已執行操做的信息,並容許迭代每一個結果,以下所示:
//遍歷全部操做結果 for (BulkItemResponse bulkItemResponse : bulkResponse) { //獲取操做的響應,能夠是IndexResponse,UpdateResponse或DeleteResponse, // 它們均可以被視爲DocWriteResponse實例 DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { //處理index操做 IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { //處理update操做 UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { //處理delete操做 DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
批量響應提供了用於快速檢查一個或多個操做是否失敗的方法:
if (bulkResponse.hasFailures()) { //該方法只要有一個操做失敗都會返回true }
若是想要查看操做失敗的緣由,則須要遍歷全部操做結果:
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) {//判斷當前操做是否失敗 //獲取失敗對象,拿到了failure對象,想怎麼玩都行 BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); } }
BulkProcessor經過提供一個工具類來簡化Bulk API的使用,容許索引/更新/刪除操做在添加處處理器後透明地執行。
爲了執行請求,BulkProcessor須要如下組件:
RestHighLevelClient
此客戶端用於執行BulkRequest並獲取BulkResponse
BulkProcessor.Listener
在每次BulkRequest執行以前和以後或BulkRequest失敗時調用此監聽器
而後BulkProcessor.builder方法可用於構建新的BulkProcessor:
//建立BulkProcessor.Listener BulkProcessor.Listener listener1 = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //在每次執行BulkRequest以前調用此方法 } @Override public void afterBulk(long executionId, BulkRequest request,BulkResponse response) { //在每次執行BulkRequest以後調用此方法 } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //執行BulkRequest失敗時調用此方法 } }; //經過從BulkProcessor.Builder調用build()方法來建立BulkProcessor。 //RestHighLevelClient.bulkAsync()方法將用來執行BulkRequest。 BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener1).build();
BulkProcessor.Builder提供了配置BulkProcessor應如何處理請求執行的方法:
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener1); //設置什麼時候刷新新的批量請求,根據當前已添加的操做數量(默認爲1000,使用-1禁用它) builder.setBulkActions(500);//操做數爲500時就刷新請求 //設置什麼時候刷新新的批量請求,根據當前已添加的操做大小(默認爲5Mb,使用-1禁用它) builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));//操做大小爲1M時就刷新請求 //設置容許執行的併發請求數(默認爲1,使用0只容許執行單個請求) builder.setConcurrentRequests(0);//不併發執行 //設置刷新間隔時間,若是超過了間隔時間,則隨便刷新一個BulkRequest掛起(默認爲未設置) builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //設置一個最初等待1秒,最多重試3次的常量退避策略。 // 有關更多選項,請參閱BackoffPolicy.noBackoff(),BackoffPolicy.constantBackoff()和BackoffPolicy.exponentialBackoff()。 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
建立BulkProcessor後,就能夠向其添加請求了:
IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
這些請求將由BulkProcessor執行,BulkProcessor負責爲每一個批量請求調用BulkProcessor.Listener。
偵聽器提供訪問BulkRequest和BulkResponse的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //在每次執行BulkRequest以前調用,經過此方法能夠獲取將在BulkRequest中執行的操做數 int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //在每次執行BulkRequest後調用,經過此方法能夠獲取BulkResponse是否包含錯誤 if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //若是BulkRequest失敗,經過調用此方法能夠獲取失敗 logger.error("Failed to execute bulk", failure); } };
將全部請求添加到BulkProcessor後,須要使用兩種可用的關閉方法之一關閉其實例。
awaitClose()方法可用於等待全部請求都已處理或過了指定的等待時間:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
若是全部批量請求都已完成,則該方法返回true;若是在全部批量請求完成以前等待時間已過,則返回false
close()方法可用於當即關閉BulkProcessor:
bulkProcessor.close();
兩種方法在關閉處理器以前會刷新已添加處處理器的請求,而且還會禁止將任何新請求添加處處理器。
官方文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html#_optional_arguments_4