ES Java client Document API整理

ES Java client Document API整理java

對ES數據使用Java client進行處理設置。(這裏有一個小細節就是:Java API版本必定要和ES版本對應,就是前面兩位大版本必定要相等)json

Maven倉庫:api

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>5.5.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>transport</artifactId>
  <version>5.5.3</version>
</dependency>

api文檔說明:app

  • Client
TransportClient client=null;
    Settings settings = Settings.builder()
        .put("cluster.name", "my-application")//指定集羣名稱
        .put("client.transport.sniff", true)//探測集羣中機器狀態
        .build();
    client = new PreBuiltTransportClient(settings);
    try {
      client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
    } catch (UnknownHostException e) {
      e.printStackTrace();
    }
    return  client;

Document APIS:異步

對單個文檔進行的操做 elasticsearch

  • Index API 

setSource第一種用法ide

IndexResponse response = client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
            .startObject()
            .field("user", "kimchy")
            .field("postDate", "niaho")
            .field("message", "trying out Elasticsearch")
            .endObject()
        ).get();

setSource第二種用法post

XContentBuilder builder = jsonBuilder()
        .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
        .endObject();

    IndexResponse response = client.prepareIndex("twitter", "tweet", "2")
        .setSource(builder.toString(), XContentType.JSON
        )
        .get();

經過查看源碼,還有幾種用法(還有一些過時的就沒有推薦)ui

setSource(byte[] source, int offset, int length, XContentType xContentType)

setSource(Object... source)

setSource(XContentType xContentType, Object... source)

   對IndexResponse返回值的屬性進行了解線程

Result getResult()//結果

  public String getIndex()//索引名

  public ShardId getShardId()//分片id

  public String getType()

  public String getId()

  public long getVersion()//第一次存入的數據version 是1

  public boolean forcedRefresh()

  public void setForcedRefresh()

  public RestStatus status()
  • GET API 

prepareGet用法(index,type,id)比較侷限的是必需要知道id的狀況進行查詢

GetResponse getResponse=client.prepareGet("twitter", "tweet", "2")
    .setOperationThreaded(false)//是否異步線程
    .get();
    System.out.println(getResponse.toString());
  • Delete API

prepareDelete用法侷限同上

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
    .setOperationThreaded(false)
    .get();
  •  Delete By Query API

直接查詢刪除

BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) 
        .source("persons")                                  
        .get();                                             

long deleted = response.getDeleted();

對成功失敗須要一些處理的狀況下

//對成功失敗須要特殊處理的狀況下使用監聽
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))                  
    .source("persons")                                                   
    .execute(new ActionListener<BulkByScrollResponse>() {           
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();                        
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });
  • Update API

對UpdateRequest進行操做1

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

對UpdateRequest進行操做2

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

使用script進行更新操做(也能夠使用ScriptService.ScriptType.FILE,用的就是文件名)

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

upsert操做(存在就更新,不存在就插入)

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

對多個文檔進行批量操做 

  • Multi Get API

能夠跨索引查詢

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}
  • Bulk API

批量新增文檔,批量刪除文檔均可以

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

同時新增和刪除

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
  • Using Bulk Processor

對批量處理結果進行處理

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();
相關文章
相關標籤/搜索