Elasticsearch Java High Level REST Client(Update By Query API)

Update By Query API

Update By Query請求

UpdateByQueryRequest可用於更新索引中的文檔。less

它須要在其上執行更新的現有索引(或一組索引)。異步

最簡單的UpdateByQueryRequest形式以下所示:ide

UpdateByQueryRequest request = new UpdateByQueryRequest("source1", "source2");
  • 在一組索引上建立UpdateByQueryRequest

默認狀況下,版本衝突會停止UpdateByQueryRequest進程,但你能夠經過在請求體中設置它爲proceed來計數它們。oop

request.setConflicts("proceed");
  • 設置proceed當版本衝突。

你能夠經過添加查詢來限制文檔。ui

request.setQuery(new TermQueryBuilder("user", "kimchy"));
  • 僅複製將字段user設置爲kimchy的文檔。

也能夠經過設置大小來限制處理文檔的數量。code

request.setSize(10);
  • 只複製10條文檔。

默認狀況下,UpdateByQueryRequest使用1000批次,你可使用setBatchSize更改批量大小。索引

request.setBatchSize(100);
  • 使用100個文檔批次。

按查詢更新還能夠經過指定管道來使用攝取功能。進程

request.setPipeline("my_pipeline");

UpdateByQueryRequest還支持修改文檔的script,如下示例說明了這一點。ip

request.setScript(
    new Script(
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap()));
  • setScript使用戶爲kimchy的全部文檔上的likes字段遞增。

UpdateByQueryRequest還有助於使用sliced-scroll自動並行化到_uid上的切片,使用setSlices指定要使用的切片數。路由

request.setSlices(2);
  • 設置要使用的切片數。

UpdateByQueryRequest使用scroll參數來控制它保持「搜索上下文」活動的時間。

request.setScroll(TimeValue.timeValueMinutes(10));
  • 設置滾動時間。

若是提供路由,則路由將複製到滾動查詢,從而將進程限制爲與該路由值匹配的碎片。

request.setRouting("=cat");
  • 設置路由。

可選參數

除上述選項外,還能夠選擇提供如下參數:

request.setTimeout(TimeValue.timeValueMinutes(2));
  • 等待查詢請求更新執行做爲TimeValue的超時時間。
request.setRefresh(true);
  • 經過調用查詢更新後刷新索引。
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
  • 設置索引選項。

同步執行

BulkByScrollResponse bulkResponse = client.updateByQuery(request, RequestOptions.DEFAULT);

異步執行

經過查詢更新請求異步執行須要將UpdateByQueryRequest實例和ActionListener實例傳遞給異步方法:

client.updateByQueryAsync(request, RequestOptions.DEFAULT, listener);
  • 要執行的UpdateByQueryRequest和執行完成時要使用的ActionListener

異步方法不會阻塞並當即返回,完成後,若是執行成功完成,則使用onResponse方法回調ActionListener,若是失敗則使用onFailure方法。

BulkByScrollResponse的典型監聽器以下所示:

ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
    @Override
    public void onResponse(BulkByScrollResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};
  • onResponse — 執行成功完成時調用,響應做爲參數提供,幷包含已執行的每一個操做的單個結果列表,請注意,一個或多個操做可能已失敗,而其餘操做已成功執行。
  • onFailure — 在整個UpdateByQueryRequest失敗時調用,在這種狀況下,引起異常做爲參數提供,而且沒有執行任何操做。

Update By Query響應

返回的BulkByScrollResponse包含有關已執行操做的信息,並容許迭代每一個結果,以下所示:

TimeValue timeTaken = bulkResponse.getTook(); 
boolean timedOut = bulkResponse.isTimedOut(); 
long totalDocs = bulkResponse.getTotal(); 
long updatedDocs = bulkResponse.getUpdated(); 
long deletedDocs = bulkResponse.getDeleted(); 
long batches = bulkResponse.getBatches(); 
long noops = bulkResponse.getNoops(); 
long versionConflicts = bulkResponse.getVersionConflicts(); 
long bulkRetries = bulkResponse.getBulkRetries(); 
long searchRetries = bulkResponse.getSearchRetries(); 
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); 
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); 
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures();
  • 獲取總耗時。
  • 檢查請求是否超時。
  • 獲取處理的文檔總數。
  • 已更新的文檔數。
  • 已刪除的文檔數。
  • 已執行的批次數。
  • 跳過的文檔數。
  • 版本衝突數。
  • 請求必須重試批量索引操做的次數。
  • 請求必須重試搜索操做的次數。
  • 此請求限制的總時間不包括當前正在休眠的當前節流時間。
  • 任何當前節流閥休眠的剩餘延遲或若是不休眠則爲0。
  • 搜索階段的失敗。
  • 批量索引操做期間的失敗。
相關文章
相關標籤/搜索