本節描述如下CRUD API:html
注意
全部 CRUD API都是單索引 API,索引參數接受單個索引名,或指向單個索引的別名
index API容許將類型化的JSON文檔索引到特定的索引中,並使其可搜索。json
生成JSON文檔有幾種不一樣的方法:數組
byte[]
或做爲String
Map
,該Map
將自動轉換爲它的JSON等效項XContentFactory.jsonBuilder()
在內部,每一個類型被轉換爲byte[]
(像String
被轉換爲byte[]
),所以,若是對象已經以這種形式存在,那麼就使用它,jsonBuilder是高度優化的JSON生成器,它直接構造一個byte[]
。併發
這裏沒有什麼困難,可是請注意,您必須根據日期格式對日期進行編碼。app
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
Map是一個鍵:值對集合,它表示一個JSON結構:less
Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");
可使用Jackson將bean序列化爲JSON,請將Jackson Databind添加到您的項目中,而後,您可使用ObjectMapper
來序列化您的bean:異步
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
Elasticsearch提供了內置的助手來生成JSON內容。elasticsearch
import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()
注意,您還可使用startArray(String)
和endArray()
方法添加數組,順便說一下,field
方法接受許多對象類型,您能夠直接傳遞數字、日期甚至其餘XContentBuilder
對象。ide
若是須要查看生成的JSON內容,可使用string()
方法。oop
String json = builder.string();
下面的示例將JSON文檔索引爲一個名爲twitter的索引,其類型爲tweet, id值爲1:
import static org.elasticsearch.common.xcontent.XContentFactory.*; IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .get();
注意,您還能夠將文檔索引爲JSON字符串,而且不須要提供ID:
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json, XContentType.JSON) .get();
IndexResponse
對象會給你一個響應:
// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status();
有關索引操做的更多信息,請查看REST索引文檔
get API容許根據索引的id從索引中獲取類型化的JSON文檔,下面的示例從一個名爲twitter的索引中獲取JSON文檔,該索引的類型名爲tweet, id值爲1:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
有關get操做的更多信息,請查看REST get文檔。
delete API容許基於id從特定索引中刪除類型化的JSON文檔,下面的示例從名爲twitter的索引中刪除JSON文檔,該索引的類型名爲tweet, id值爲1:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
經過查詢刪除的API能夠根據查詢結果刪除給定的一組文檔:
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .get(); long deleted = response.getDeleted();
QueryBuilders.matchQuery("gender", "male")
(查詢)source("persons")
(索引)get()
(執行操做)response.getDeleted()
(被刪除的文檔數)因爲這是一個長時間運行的操做,若是您但願異步執行,能夠調用execute
而不是get
,並提供以下監聽器:
DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .execute(new ActionListener<BulkByScrollResponse>() { @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });
您能夠建立一個UpdateRequest
並將其發送給客戶端:
UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
也可使用prepareUpdate()
方法:
client.prepareUpdate("ttl", "doc", "1") .setScript(new Script("ctx._source.gender = \"male\"" , ScriptService.ScriptType.INLINE, null, null)) .get(); client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .get();
Script()
(你的腳本,它也能夠是本地存儲的腳本名)setDoc()
(將合併到現有的文檔)注意,您不能同時提供腳本和doc
update API容許基於提供的腳本更新文檔:
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1") .script(new Script("ctx._source.gender = \"male\"")); client.update(updateRequest).get();
update API還支持傳遞一個部分文檔合併到現有文檔中(簡單的遞歸合併,內部合併對象,取代核心的「鍵/值」和數組),例如:
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
也有對Upsert的支持,若是文檔不存在,則使用upsert元素的內容索引新的doc:
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
若是文檔不存在,將添加indexRequest中的文檔。
若是文件index/type/1已經存在,咱們將在此操做後得到以下文件:
{ "name" : "Joe Dalton", "gender": "male" }
"gender": "male"
(此字段由更新請求添加)若是不存在,咱們將有一份新文件:
{ "name" : "Joe Smith", "gender": "male" }
multi get API容許根據文檔的index、type和id獲取文檔列表:
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }
add("twitter", "tweet", "1")
(經過單一id)add("twitter", "tweet", "2", "3", "4")
(或以相同index/type的id列表)add("another", "type", "foo")
(你也能夠從另外一個索引中獲得)MultiGetItemResponse itemResponse : multiGetItemResponses
(迭代結果集)response.isExists()
(您能夠檢查文檔是否存在)response.getSourceAsString()
(訪問_source字段)有關multi get操做的更多信息,請查看剩餘的multi get文檔
bulk API容許在一個請求中索引和刪除多個文檔,這裏有一個示例用法:
import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
BulkProcessor
類提供了一個簡單的接口,能夠根據請求的數量或大小自動刷新bulk操做,或者在給定的時間以後。
要使用它,首先建立一個BulkProcessor
實例:
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();
beforeBulk()
request.numberOfActions()
查看numberOfActions afterBulk(...BulkResponse response)
response.hasFailures()
檢查是否存在失敗請求afterBulk(...Throwable failure)
setBulkActions(10000)
setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
setFlushInterval(TimeValue.timeValueSeconds(5))
setConcurrentRequests(1)
setBackoffPolicy()
EsRejectedExecutionException
失敗時,將嘗試重試,該異常代表用於處理請求的計算資源太少,要禁用backoff,請傳遞BackoffPolicy.noBackoff()
默認狀況下,BulkProcessor
:
而後您能夠簡單地將您的請求添加到BulkProcessor
:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
當全部的文檔都被加載到BulkProcessor
,可使用awaitClose
或close
方法進行關閉:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
若是經過設置flushInterval來調度其餘計劃的flush,這兩種方法都將flush全部剩餘的文檔,並禁用全部其餘計劃flush。若是併發請求被啓用,那麼awaitClose
方法等待指定的超時以完成全部bulk請求,而後返回true
,若是在全部bulk請求完成以前指定的等待時間已通過去,則返回false
,close
方法不等待任何剩餘的批量請求完成並當即退出。
若是您正在使用Elasticsearch運行測試,而且正在使用BulkProcessor來填充數據集,那麼您最好將併發請求的數量設置爲0,以便以同步方式執行批量的flush操做:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) .setBulkActions(10000) .setConcurrentRequests(0) .build(); // Add your requests bulkProcessor.add(/* Your requests */); // Flush any remaining requests bulkProcessor.flush(); // Or close the bulkProcessor if you don't need it anymore bulkProcessor.close(); // Refresh your indices client.admin().indices().prepareRefresh().get(); // Now you can start searching! client.prepareSearch().get();
updateByQuery最簡單的用法是在不更改源的狀況下更新索引中的每一個文檔,這種用法容許獲取一個新屬性或另外一個在線映射更改。
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index").abortOnVersionConflict(false); BulkByScrollResponse response = updateByQuery.get();
對updateByQuery API的調用從獲取索引快照開始,索引使用內部版本控制找到任何文檔。
注意
當一個文檔在快照的時間和索引請求過程之間發生變化時,會發生版本衝突。
當版本匹配時,updateByQuery更新文檔並增長版本號。
全部更新和查詢失敗都會致使updateByQuery停止,這些故障能夠從BulkByScrollResponse#getIndexingFailures
方法中得到,任何成功的更新仍然存在,而且不會回滾,當第一次失敗致使停止時,響應包含由失敗的bulk請求生成的全部失敗。
爲了防止版本衝突致使updateByQuery停止,請設置abortOnVersionConflict(false)
,第一個示例之因此這樣作,是由於它試圖獲取在線映射更改,而版本衝突意味着在相同時間開始updateByQuery和試圖更新文檔的衝突文檔。這很好,由於該更新將獲取在線映射更新。
UpdateByQueryRequestBuilder API支持過濾更新的文檔,限制要更新的文檔總數,並使用腳本更新文檔:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .filter(QueryBuilders.termQuery("level", "awesome")) .size(1000) .script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap())); BulkByScrollResponse response = updateByQuery.get();
UpdateByQueryRequestBuilder還容許直接訪問用於選擇文檔的查詢,您可使用此訪問來更改默認的滾動大小,或者以其餘方式修改對匹配文檔的請求。
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .source().setSize(500); BulkByScrollResponse response = updateByQuery.get();
您還能夠將大小與排序相結合以限制文檔的更新:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index").size(100) .source().addSort("cat", SortOrder.DESC); BulkByScrollResponse response = updateByQuery.get();
除了更改文檔的_source
字段外,還可使用腳本更改操做,相似於Update API:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .script(new Script( ScriptType.INLINE, "if (ctx._source.awesome == 'absolutely) {" + " ctx.op='noop'" + "} else if (ctx._source.awesome == 'lame') {" + " ctx.op='delete'" + "} else {" + "ctx._source.awesome = 'absolutely'}", "painless", Collections.emptyMap())); BulkByScrollResponse response = updateByQuery.get();
在Update API中,能夠設置ctx.op
的值來更改執行的操做:
noop
ctx.op = "noop"
,updateByQuery操做將從更新中省略該文檔,這種行爲增長了響應主體中的noop計數器。delete
ctx.op = "delete"
,刪除將在響應主體中已刪除的計數器中報告。將ctx.op
設置爲任何其餘值都會產生錯誤,在ctx
中設置任何其餘字段都會產生錯誤。
這個API不容許您移動它所接觸的文檔,只是修改它們的源,這是故意的!咱們沒有規定要把文件從原來的位置移走。
您也能夠同時對多個索引和類型執行這些操做,相似於search API:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("foo", "bar").source().setTypes("a", "b"); BulkByScrollResponse response = updateByQuery.get();
若是提供路由值,則進程將路由值複製到滾動查詢,將進程限制爲與路由值匹配的碎片:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source().setRouting("cat"); BulkByScrollResponse response = updateByQuery.get();
updateByQuery也能夠經過指定這樣的pipeline來使用ingest節點:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.setPipeline("hurray"); BulkByScrollResponse response = updateByQuery.get();
您可使用Task API獲取全部正在運行的update-by-query請求的狀態:
ListTasksResponse tasksList = client.admin().cluster().prepareListTasks() .setActions(UpdateByQueryAction.NAME).setDetailed(true).get(); for (TaskInfo info: tasksList.getTasks()) { TaskId taskId = info.getTaskId(); BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus(); // do stuff }
使用上面所示的TaskId,您能夠直接查找任務:
GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();
使用Cancel Task API
任何查詢更新均可以使用Task Cancel API取消:
// Cancel all update-by-query requests client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks(); // Cancel a specific update-by-query request client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();
使用list tasks API查找taskId的值。
取消請求一般是一個很是快速的過程,但可能要花費幾秒鐘的時間,task status API繼續列出任務,直到取消完成。
在正在運行的更新中,使用_rethrottle API更改requests_per_second
的值:
RethrottleAction.INSTANCE.newRequestBuilder(client) .setTaskId(taskId) .setRequestsPerSecond(2.0f) .get();
使用list tasks API查找taskId的值。
與updateByQuery API同樣,requests_per_second
的值能夠是任何正值的浮點值來設置節流的級別,或者Float.POSITIVE_INFINITY
禁用節流。requests_per_second
值加快進程當即生效,減慢查詢的requests_per_second
值在完成當前批處理後生效,以防止滾動超時。
詳情見reindex API
BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client) .destination("target_index") .filter(QueryBuilders.matchQuery("category", "xzy")) .get();
還能夠提供查詢來篩選應該從源索引到目標索引的哪些文檔。