Elasitcsearch High Level Rest Client學習筆記(三)批量api

Bulk Request併發

BulkRequest能夠在一塊兒從請求執行批量添加、更新和刪除,至少須要添加一個操做異步

BulkRequest request = new BulkRequest(); //建立BulkRequest
request.add(new IndexRequest("posts", "doc", "1")  //添加操做
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")  //添加操做
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  //添加操做
        .source(XContentType.JSON,"field", "baz"));

注意:每次只支持一種encoded,不然會報錯ide

能夠在同一個BulkRequest中添加不一樣類型操做post

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可選參數ui

超時時間設置spa

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");

 

刷新策略線程

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

 

設置副本shard活躍驗證,執行index、update、delete操做前必須有多少個副本shard活躍debug

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);

調用方式code

同步對象

BulkResponse bulkResponse = client.bulk(request);

異步

client.bulkAsync(request, new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        //成功
    }

    @Override
    public void onFailure(Exception e) {
        //失敗
    }
});

響應對象

響應對象包括操做信息,而且能夠便利每個結果

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {     //index操做
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {     //update操做
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {     //delete操做
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

BulkResponce提供方法快速查看操做是否失敗

if (bulkResponse.hasFailures()) { 
    //todo
}

BulkProcessor

RestHighLevelClient:執行BulkRequest而且返回BulkResponse

BulkProcessor.Listener:在bulk請求先後執行,而且能夠處理失敗狀況

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        //bulk請求前執行
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        //bulk請求後執行
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        //失敗後執行
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build();  //BulkProcessor經過 BulkProcessor.Builder build()方法構建, RestHighLevelClient.bulkAsync() 用來執行bulk請求

BulkProcessor.Builder提供方法使BulkProcessor調整請求參數

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); //按照數量批量處理(默認1000,-1禁用) 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); //按照大小批量處理
builder.setConcurrentRequests(0); //併發處理線程
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //設置flush索引週期
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //回退策略,等待1秒並重試3次, BackoffPolicy.noBackoff()  BackoffPolicy.constantBackoff()  BackoffPolicy.exponentialBackoff()  查看更多選項

添加請求

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

BulkProcessor經過 BulkProcessor.Listener 監控請求, BulkProcessor.Listener 提供方法接受BulkRequest和BulkResponse

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); //在每一個execution前執行,能夠獲知每次執行多少個操做
        logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        if (response.hasFailures()) {  //在每一個execution後執行,能夠獲知是否包含錯誤
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); //發生錯誤時執行
    }
};

批量請求執行後須要關閉BulkProcessor。兩種關閉方式選其一

awaitClose(),全部請求被處理後或者等待時間結束後關閉,返回ture代表全部請求已經完成,false說明等待時間結束後請求並未執行結束

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

close(),當即關閉BulkProcessor

bulkProcessor.close();

關閉processor以前,全部已經被添加的請求會被提交執行,而且不能再向其中添加請求

相關文章
相關標籤/搜索