Bulk Request併發
BulkRequest能夠在一塊兒從請求執行批量添加、更新和刪除,至少須要添加一個操做異步
BulkRequest request = new BulkRequest(); //建立BulkRequest request.add(new IndexRequest("posts", "doc", "1") //添加操做 .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts", "doc", "2") //添加操做 .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts", "doc", "3") //添加操做 .source(XContentType.JSON,"field", "baz"));
注意:每次只支持一種encoded,不然會報錯ide
能夠在同一個BulkRequest中添加不一樣類型操做post
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.JSON,"field", "baz"));
可選參數ui
超時時間設置spa
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
刷新策略線程
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
設置副本shard活躍驗證,執行index、update、delete操做前必須有多少個副本shard活躍debug
request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL);
調用方式code
同步對象
BulkResponse bulkResponse = client.bulk(request);
異步
client.bulkAsync(request, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { //成功 } @Override public void onFailure(Exception e) { //失敗 } });
響應對象
響應對象包括操做信息,而且能夠便利每個結果
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { //index操做 IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { //update操做 UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { //delete操做 DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
BulkResponce提供方法快速查看操做是否失敗
if (bulkResponse.hasFailures()) { //todo }
BulkProcessor
RestHighLevelClient:執行BulkRequest而且返回BulkResponse
BulkProcessor.Listener:在bulk請求先後執行,而且能夠處理失敗狀況
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //bulk請求前執行 } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //bulk請求後執行 } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //失敗後執行 } }; BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build(); //BulkProcessor經過 BulkProcessor.Builder build()方法構建, RestHighLevelClient.bulkAsync() 用來執行bulk請求
BulkProcessor.Builder提供方法使BulkProcessor調整請求參數
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener); builder.setBulkActions(500); //按照數量批量處理(默認1000,-1禁用) builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); //按照大小批量處理 builder.setConcurrentRequests(0); //併發處理線程 builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //設置flush索引週期 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //回退策略,等待1秒並重試3次, BackoffPolicy.noBackoff() BackoffPolicy.constantBackoff() BackoffPolicy.exponentialBackoff() 查看更多選項
添加請求
IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
BulkProcessor經過 BulkProcessor.Listener 監控請求, BulkProcessor.Listener 提供方法接受BulkRequest和BulkResponse
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); //在每一個execution前執行,能夠獲知每次執行多少個操做 logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { //在每一個execution後執行,能夠獲知是否包含錯誤 logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); //發生錯誤時執行 } };
批量請求執行後須要關閉BulkProcessor。兩種關閉方式選其一
awaitClose(),全部請求被處理後或者等待時間結束後關閉,返回ture代表全部請求已經完成,false說明等待時間結束後請求並未執行結束
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
close(),當即關閉BulkProcessor
bulkProcessor.close();
關閉processor以前,全部已經被添加的請求會被提交執行,而且不能再向其中添加請求