1.鏈接集羣
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
public class InitClient {
public static RestHighLevelClient getClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("htkj101", 9200, "http"),
new HttpHost("htkj102", 9200, "http"),
new HttpHost("htkj103", 9200, "http")
)
);
return client;
};
}
2.CreatIndex
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class CreateIndex {
public static void main(String ags[]){
createIndex1();
createIndex2();
createIndex3();
createIndex4();
}
/**
* 第一種建立Index方法
* */
private static void createIndex1(){
try(RestHighLevelClient client = InitClient.getClient()){
IndexRequest request = new IndexRequest("posts","doc","1");
String jsonString="{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 第二種建立Index方法
* */
private static void createIndex2(){
try(RestHighLevelClient client = InitClient.getClient()){
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "2")
.source(jsonMap);
synExecution(client,indexRequest);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 第三種建立Index方法
* */
private static void createIndex3() {
try(RestHighLevelClient client = InitClient.getClient()){
XContentBuilder builder= XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest request = new IndexRequest("posts", "doc", "3")
.source(builder);
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 第四種建立Index方法
* */
private static void createIndex4(){
try(RestHighLevelClient client = InitClient.getClient()){
IndexRequest request=new IndexRequest("posts","doc","4")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client,IndexRequest request) throws IOException {
IndexResponse indexResponse=client.index(request, RequestOptions.DEFAULT);
IndexResponse(indexResponse);
}
private static void asynExecution(RestHighLevelClient client,IndexRequest request){
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
//當執行成功時 調用這裏
System.out.println("執行成功");
}
@Override
public void onFailure(Exception e) {
//失敗時調用這個
System.out.println("執行失敗");
}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);
}
public static void IndexResponse(IndexResponse indexResponse){
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
//首次建立文檔的狀況
System.out.println("第一次被建立");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//已存在的文檔被重寫
System.out.println("更新成功");
}
System.out.println(" index: "+index+" type: "+type+" id: "+id+" version: "+version);
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//處理成功分片數量少於總分片數量的狀況 ---沒懂是什麼意思
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason(); //出錯的狀況
System.out.println(reason);
}
}
}
}
3.Get
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Map;
public class Get {
public static void main(String ags[]){
getRequest();
}
private static void getRequest(){
try(RestHighLevelClient client = InitClient.getClient()){
GetRequest getRequest = new GetRequest("posts", "doc", "1");
synExecution(client,getRequest);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client,GetRequest getRequest ) throws IOException {
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
getResponse(getResponse);
}
private static void asynExecution(RestHighLevelClient client,GetRequest getRequest){
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
System.out.println("執行成功時調用");
}
@Override
public void onFailure(Exception e) {
System.out.println("執行失敗時調用");
}
};
client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
}
private static void getResponse(GetResponse getResponse){
String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
//將文檔變爲String類型
String sourceAsString = getResponse.getSourceAsString();
//將文檔變爲map
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
//將文檔變爲Byte
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
System.out.println("文檔--->"+sourceAsString);
} else {
System.out.println("找不到文檔");
}
}
private static void failMethod(){
//若是查找一個索引,索引不存在,應當按照以下的方式進行處理
try(RestHighLevelClient client = InitClient.getClient()){
GetRequest request = new GetRequest("does_not_exist", "doc", "1");
try {
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.Exsits
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
public class Exists {
public static void main(String ags[]){
existsRequest();
}
private static void existsRequest(){
try(RestHighLevelClient client = InitClient.getClient()){
GetRequest getRequest = new GetRequest("posts", "doc", "1");
//禁用獲取_source
getRequest.fetchSourceContext(new FetchSourceContext(false));
//禁用獲取的存儲字段
getRequest.storedFields("_none_");
synExecution(client,getRequest);
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client,GetRequest getRequest) throws IOException {
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
}
private static void asynExecution(RestHighLevelClient client,GetRequest getRequest){
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean exists) {
}
@Override
public void onFailure(Exception e) {
}
};
client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
}
}
5.Update
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class Update {
public static void main(String[] args) {
}
private static void updateRequest(){
UpdateRequest request = new UpdateRequest("posts", "doc", "1");
}
private static void update1(){
UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
}
private static void update2(){
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(jsonMap);
}
private static void update3() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(builder);
}
private static void update4(){
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc("updated", new Date(),
"reason", "daily update");
}
/**
*若是文檔尚不存在,則可使用如下upsert方法定義一些內容,這些內容將做爲新文檔插入
**/
private static void upsert(UpdateRequest request){
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
}
private static void synExecution(RestHighLevelClient client,UpdateRequest request) throws IOException {
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
}
private static void asynExecution(RestHighLevelClient client,UpdateRequest request){
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.updateAsync(request, RequestOptions.DEFAULT, listener);
}
public static void updateResponse(UpdateResponse updateResponse){
String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
//處理首次建立文檔的狀況(upsert)
System.out.println("首次建立文檔");
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//處理文檔更新的狀況
System.out.println("update更新成功");
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
//處理文件被刪除的狀況
System.out.println("刪除成功");
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//處理文檔不受更新影響的狀況,即未對文檔執行任何操做(空轉)
System.out.println("沒有執行任何操做");
}
System.out.println(" index: "+index+" type: "+type+" id: "+id+" version: "+version);
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
}
6.Delete
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
public class Delete {
public static void main(String ags[]){
deleteRequest();
}
private static void deleteRequest(){
try(RestHighLevelClient client = InitClient.getClient()){
DeleteRequest request = new DeleteRequest("posts", "doc", "2");
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client, DeleteRequest request) throws IOException {
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
deleteResponse(deleteResponse);
}
private static void asynExecution(RestHighLevelClient client, DeleteRequest request){
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);
}
public static void deleteResponse(DeleteResponse deleteResponse) throws IOException {
String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
System.out.println("刪除的是"+" index: "+index+" type: "+type+" id: "+id+" version: "+version);
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
private static void deleteFail(RestHighLevelClient client) throws IOException {
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("沒有找到");
}
}
}
7.Bulk(批量操做)
import com.htkj.elasticsearch.InitClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
public class Bulk {
public static void main(String[] args) {
test();
}
private static Logger logger = LogManager.getRootLogger();
private static void bulkRequest(){
//建立BulkRequest
BulkRequest request = new BulkRequest();
//添加indexRequest
//批量操做僅支持json或SMILE編碼的文檔,其餘格式會錯誤
request.add(new IndexRequest("posts", "doc", "1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON,"field", "baz"));
}
private static void bulkOtherRequest(){
//能夠將不一樣的操做類型添加到相同的BulkRequest中
BulkRequest request = new BulkRequest();
//添加DeleteRequest
request.add(new DeleteRequest("posts", "doc", "3"));
//添加UpdateRequest
request.add(new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON,"other", "test"));
//添加IndexRequest
request.add(new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON,"field", "baz"));
}
private static void argsOptional(){
//其餘的可選參數
BulkRequest request = new BulkRequest();
//第一種設置超時時間 超時時間爲2min
request.timeout(TimeValue.timeValueMinutes(2));
//第二種設置超時時間 超時時間爲2min
request.timeout("2m");
//第一種設置刷新的方式 將刷新的方式設置爲WriteRequest.RefreshPolicy
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//第二種設置刷新方式
request.setRefreshPolicy("wait_for");
}
private static void synExecution(RestHighLevelClient client,BulkRequest request) throws IOException {
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
}
private static void asynExecution(RestHighLevelClient client,BulkRequest request){
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
}
private static void bulkResonse(BulkResponse bulkResponse) {
//遍歷全部結果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
//IndexResponse, UpdateResponse DeleteResponse 都會被視爲DocWriteResponse的實例
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
//調用方法
CreateIndex.IndexResponse(indexResponse);
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
//調用方法
Update.updateResponse(updateResponse);
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
try {
//調用方法
Delete.deleteResponse(deleteResponse);
} catch (IOException e) {
e.printStackTrace();
}
default:
}
}
}
private static void bulkResponseFail(BulkResponse bulkResponse){
if (bulkResponse.hasFailures()) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {//操做是否失敗
if (bulkItemResponse.isFailed()) {
//檢索失敗的操做
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
}
}
}
/**
* BulkProcessor 經過提供一個工具類 容許index update delete操做可以被添加到processor中被執行
* 爲了執行請求,BulkProcessor須要如下組件
* RestHighLevelClient 該客戶端用於執行BulkRequest 和檢索BulkResponse
* BulkProcessor.Listener 在每次BulkRequest執行以前或以後或BulkRequest失敗時調用此監聽器
* 而後BulkProcessor.builder方法可用於構建新的 BulkProcessor:
*
* */
private static void bulkProcessor(RestHighLevelClient client) throws InterruptedException {
//建立BulkProcessor.Listener
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
/**
* 此方法在每次執行bulkrequest以前調用
*
* */
@Override
public void beforeBulk(long executionId, BulkRequest request) {
//在每次執行bulkrequest以前調用,此方法容許知道將在bulkrequest中執行的操做數
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
/**
* 此方法在每次執行bulkrequest以後調用
*
* */
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//在每次執行以後調用BulkRequest,此方法容許知道是否BulkResponse包含錯誤
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
/**
* 當bulkrequest失敗時調用此方法
*
* */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
//RestHighLevelClient.bulkAsync() 方法將用於執行BulkRequest後臺操做
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
//BulkProcessor經過build()從中調用方法來建立BulkProcessor.Builder
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
//BulkProcessor.Builder提供的方法來配置如何 BulkProcessor應該處理請求的執行:
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
//根據當前添加的操做數設置什麼時候刷新新的批量請求(默認爲1000,使用-1禁用它)
builder.setBulkActions(500);
//根據當前添加的操做大小設置什麼時候刷新新的批量請求(默認爲5Mb,使用-1禁用它)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
//設置容許執行的併發請求數(默認爲1,使用0僅容許執行單個請求)
builder.setConcurrentRequests(0);
//設置刷新間隔,若是間隔經過,則刷新任何掛起的bulkrequest(默認爲未設置)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
//設置一個初始等待1秒並最多重試3次的回退策略
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
//以上 也能夠寫成這樣的形式
BulkProcessor bulkProcessor2 = BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(500)
.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
.setConcurrentRequests(0)
.setFlushInterval(TimeValue.timeValueSeconds(10L))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
//BulkProcessor 被建立後 requests 就能夠添加進BulkProcessor 中
IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor2.add(one);
bulkProcessor2.add(two);
bulkProcessor2.add(three);
//將全部請求添加到後BulkProcessor,須要關閉其實例
//這裏給出兩種方式 第一種是awaitClose()方法
//該awaitClose()方法可用於等待全部請求都已處理或通過指定的等待時間:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
//第二種是close()方法 這將當即關閉BulkProcessor
bulkProcessor.close();
}
/**
*使用bulkProcessor 批量建立Index
* */
private static void test(){
try(RestHighLevelClient client = InitClient.getClient()){
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
//listener
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//response
bulkResonse(response);
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
//build
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(500)
.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
.setConcurrentRequests(0)
.setFlushInterval(TimeValue.timeValueSeconds(10L))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
//request
IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
DeleteRequest one2 = new DeleteRequest("posts", "doc", "3");
UpdateRequest two2 = new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON, "other", "test");
IndexRequest three2 = new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON, "field", "baz");
//add
// bulkProcessor.add(one);
// bulkProcessor.add(two);
// bulkProcessor.add(three);
bulkProcessor.add(one2);
bulkProcessor.add(two2);
bulkProcessor.add(three2);
//close
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}