Elasticsearch Java API 6.2(文檔API)

文檔API

本節描述如下CRUD API:html

單文檔的API

  • Index API
  • Get API
  • Delete API
  • Update API

多文檔API

  • Multi Get API
  • Bulk API
  • Reindex API
  • Update By Query API
  • Delete By Query API
注意
全部 CRUD API都是單索引 API,索引參數接受單個索引名,或指向單個索引的別名

Index API

index API容許將類型化的JSON文檔索引到特定的索引中,並使其可搜索。json

生成JSON文檔

生成JSON文檔有幾種不一樣的方法:數組

  • 手動的(也就是你本身)使用原生byte[]或做爲String
  • 使用一個Map,該Map將自動轉換爲它的JSON等效項
  • 使用第三方庫對bean(如Jackson)進行序列化
  • 使用內置的助手XContentFactory.jsonBuilder()

在內部,每一個類型被轉換爲byte[](像String被轉換爲byte[]),所以,若是對象已經以這種形式存在,那麼就使用它,jsonBuilder是高度優化的JSON生成器,它直接構造一個byte[]併發

本身動手

這裏沒有什麼困難,可是請注意,您必須根據日期格式對日期進行編碼。app

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

使用Map

Map是一個鍵:值對集合,它表示一個JSON結構:less

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");

bean序列化

可使用Jacksonbean序列化爲JSON,請將Jackson Databind添加到您的項目中,而後,您可使用ObjectMapper來序列化您的bean:異步

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);

使用Elasticsearch助手

Elasticsearch提供了內置的助手來生成JSON內容。elasticsearch

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意,您還可使用startArray(String)endArray()方法添加數組,順便說一下,field方法接受許多對象類型,您能夠直接傳遞數字、日期甚至其餘XContentBuilder對象。ide

若是須要查看生成的JSON內容,可使用string()方法。oop

String json = builder.string();

索引文檔

下面的示例將JSON文檔索引爲一個名爲twitter的索引,其類型爲tweet, id值爲1:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

注意,您還能夠將文檔索引爲JSON字符串,而且不須要提供ID:

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json, XContentType.JSON)
        .get();

IndexResponse對象會給你一個響應:

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

有關索引操做的更多信息,請查看REST索引文檔

Get API

get API容許根據索引的id從索引中獲取類型化的JSON文檔,下面的示例從一個名爲twitter的索引中獲取JSON文檔,該索引的類型名爲tweet, id值爲1:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

有關get操做的更多信息,請查看REST get文檔。

Delete API

delete API容許基於id從特定索引中刪除類型化的JSON文檔,下面的示例從名爲twitter的索引中刪除JSON文檔,該索引的類型名爲tweet, id值爲1:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

Delete By Query API

經過查詢刪除的API能夠根據查詢結果刪除給定的一組文檔:

BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male")) 
    .source("persons")                                  
    .get();                                             
long deleted = response.getDeleted();
  • QueryBuilders.matchQuery("gender", "male")(查詢)
  • source("persons") (索引)
  • get()(執行操做)
  • response.getDeleted()(被刪除的文檔數)

因爲這是一個長時間運行的操做,若是您但願異步執行,能夠調用execute而不是get,並提供以下監聽器:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))     
    .source("persons")                                      
    .execute(new ActionListener<BulkByScrollResponse>() {   
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();           
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

您能夠建立一個UpdateRequest並將其發送給客戶端:

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

也可使用prepareUpdate()方法:

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()               
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();
  • Script()(你的腳本,它也能夠是本地存儲的腳本名)
  • setDoc()(將合併到現有的文檔)

注意,您不能同時提供腳本和doc

使用腳本更新

update API容許基於提供的腳本更新文檔:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

經過合併文檔更新

update API還支持傳遞一個部分文檔合併到現有文檔中(簡單的遞歸合併,內部合併對象,取代核心的「鍵/值」和數組),例如:

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

也有對Upsert的支持,若是文檔不存在,則使用upsert元素的內容索引新的doc:

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

若是文檔不存在,將添加indexRequest中的文檔。

若是文件index/type/1已經存在,咱們將在此操做後得到以下文件:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}
  • "gender": "male"(此字段由更新請求添加)

若是不存在,咱們將有一份新文件:

{
    "name" : "Joe Smith",
    "gender": "male"
}

Multi Get API

multi get API容許根據文檔的indextypeid獲取文檔列表:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}
  • add("twitter", "tweet", "1")(經過單一id)
  • add("twitter", "tweet", "2", "3", "4")(或以相同index/type的id列表)
  • add("another", "type", "foo")(你也能夠從另外一個索引中獲得)
  • MultiGetItemResponse itemResponse : multiGetItemResponses(迭代結果集)
  • response.isExists()(您能夠檢查文檔是否存在)
  • response.getSourceAsString()(訪問_source字段)

有關multi get操做的更多信息,請查看剩餘的multi get文檔

Bulk API

bulk API容許在一個請求中索引和刪除多個文檔,這裏有一個示例用法:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

使用Bulk處理器

BulkProcessor類提供了一個簡單的接口,能夠根據請求的數量或大小自動刷新bulk操做,或者在給定的時間以後。

要使用它,首先建立一個BulkProcessor實例:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();
  • beforeBulk()

    • 此方法在執行bulk以前被調用,例如,您能夠經過request.numberOfActions()查看numberOfActions
  • afterBulk(...BulkResponse response)

    • 此方法在執行bulk以後被調用,例如,您能夠經過response.hasFailures()檢查是否存在失敗請求
  • afterBulk(...Throwable failure)

    • bulk失敗並引起一個可拋出對象時,將調用此方法
  • setBulkActions(10000)

    • 咱們但願每10,000個請求就執行一次bulk
  • setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))

    • 咱們但願每5MB就flush一次
  • setFlushInterval(TimeValue.timeValueSeconds(5))

    • 不管請求的數量是多少,咱們都但願每5秒flush一次
  • setConcurrentRequests(1)

    • 設置併發請求的數量,值爲0意味着只容許執行一個請求,在積累新的bulk請求時,容許執行一個值爲1的併發請求
  • setBackoffPolicy()

    • 設置一個自定義的備份策略,該策略最初將等待100ms,以指數形式增長並重試三次,當一個或多個bulk項目請求以EsRejectedExecutionException失敗時,將嘗試重試,該異常代表用於處理請求的計算資源太少,要禁用backoff,請傳遞BackoffPolicy.noBackoff()

默認狀況下,BulkProcessor:

  • bulkActions設置爲1000
  • bulkSize設置爲5mb
  • 不設置flushInterval
  • concurrentrequest設置爲1,這意味着flush操做的異步執行
  • backoffPolicy設置爲一個指數備份,8次重試,啓動延時爲50ms,總等待時間約爲5.1秒

添加請求

而後您能夠簡單地將您的請求添加到BulkProcessor:

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

關閉Bulk Processor

當全部的文檔都被加載到BulkProcessor,可使用awaitCloseclose方法進行關閉:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

若是經過設置flushInterval來調度其餘計劃的flush,這兩種方法都將flush全部剩餘的文檔,並禁用全部其餘計劃flush。若是併發請求被啓用,那麼awaitClose方法等待指定的超時以完成全部bulk請求,而後返回true,若是在全部bulk請求完成以前指定的等待時間已通過去,則返回falseclose方法不等待任何剩餘的批量請求完成並當即退出。

在測試中使用Bulk Processor

若是您正在使用Elasticsearch運行測試,而且正在使用BulkProcessor來填充數據集,那麼您最好將併發請求的數量設置爲0,以便以同步方式執行批量的flush操做:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

Update By Query API

updateByQuery最簡單的用法是在不更改源的狀況下更新索引中的每一個文檔,這種用法容許獲取一個新屬性或另外一個在線映射更改。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

updateByQuery API的調用從獲取索引快照開始,索引使用內部版本控制找到任何文檔。

注意
當一個文檔在快照的時間和索引請求過程之間發生變化時,會發生版本衝突。

當版本匹配時,updateByQuery更新文檔並增長版本號。

全部更新和查詢失敗都會致使updateByQuery停止,這些故障能夠從BulkByScrollResponse#getIndexingFailures方法中得到,任何成功的更新仍然存在,而且不會回滾,當第一次失敗致使停止時,響應包含由失敗的bulk請求生成的全部失敗。

爲了防止版本衝突致使updateByQuery停止,請設置abortOnVersionConflict(false),第一個示例之因此這樣作,是由於它試圖獲取在線映射更改,而版本衝突意味着在相同時間開始updateByQuery和試圖更新文檔的衝突文檔。這很好,由於該更新將獲取在線映射更新。

UpdateByQueryRequestBuilder API支持過濾更新的文檔,限制要更新的文檔總數,並使用腳本更新文檔:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder還容許直接訪問用於選擇文檔的查詢,您可使用此訪問來更改默認的滾動大小,或者以其餘方式修改對匹配文檔的請求。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您還能夠將大小與排序相結合以限制文檔的更新:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了更改文檔的_source字段外,還可使用腳本更改操做,相似於Update API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == 'absolutely) {"
            + "  ctx.op='noop'"
            + "} else if (ctx._source.awesome == 'lame') {"
            + "  ctx.op='delete'"
            + "} else {"
            + "ctx._source.awesome = 'absolutely'}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

Update API中,能夠設置ctx.op的值來更改執行的操做:

  • noop

    • 若是您的腳本沒有作任何更改,設置ctx.op = "noop"updateByQuery操做將從更新中省略該文檔,這種行爲增長了響應主體中的noop計數器。
  • delete

    • 若是您的腳本決定必須刪除該文檔,設置ctx.op = "delete",刪除將在響應主體中已刪除的計數器中報告。

ctx.op設置爲任何其餘值都會產生錯誤,在ctx中設置任何其餘字段都會產生錯誤。

這個API不容許您移動它所接觸的文檔,只是修改它們的源,這是故意的!咱們沒有規定要把文件從原來的位置移走。

您也能夠同時對多個索引和類型執行這些操做,相似於search API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

若是提供路由值,則進程將路由值複製到滾動查詢,將進程限制爲與路由值匹配的碎片:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery也能夠經過指定這樣的pipeline來使用ingest節點:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();

使用Task API

您可使用Task API獲取全部正在運行的update-by-query請求的狀態:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

使用上面所示的TaskId,您能夠直接查找任務:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

使用Cancel Task API

任何查詢更新均可以使用Task Cancel API取消:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

使用list tasks API查找taskId的值。

取消請求一般是一個很是快速的過程,但可能要花費幾秒鐘的時間,task status API繼續列出任務,直到取消完成。

Rethrottling

在正在運行的更新中,使用_rethrottle API更改requests_per_second的值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

使用list tasks API查找taskId的值。

updateByQuery API同樣,requests_per_second的值能夠是任何正值的浮點值來設置節流的級別,或者Float.POSITIVE_INFINITY禁用節流。requests_per_second值加快進程當即生效,減慢查詢的requests_per_second值在完成當前批處理後生效,以防止滾動超時。

Reindex API

詳情見reindex API

BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client)
    .destination("target_index")
    .filter(QueryBuilders.matchQuery("category", "xzy")) 
    .get();

還能夠提供查詢來篩選應該從源索引到目標索引的哪些文檔。

相關文章
相關標籤/搜索