目錄html
業餘時間搞 python 爬蟲爬取數據,完善個人小程序;工做時間仍是要努力完成領導分配的任務,作個人 Java 老本行的。java
這不,如今就有個需求,集團要將 elasticsearch 版本從 2.2 升級到 6.3, 因爲以前作項目使用 spring data es
來完成 es 數據的增刪改查,如今一下升級到這麼高的版本,遇到各類 API 不兼容的問題。而且 spring data es
因爲總體框架 spring
等版本的限制,也不能使用了。python
無奈之下,只能使用 elasticsearch 提供的 java reset client API 來完成以前的操做。工欲善其事,必先利其器。要使用 API,第一步就是要完整,熟練的理解各個 API 的用途,限制。在學習 API 的過程當中,我將 API 的文檔統一整理了一番,方便本身使用時查詢,也但願能對用到這部分的同窗提供方便。spring
注意,本 API 指南只針對 elasticsearch 6.3 版本。json
Rest client 分紅兩部分:小程序
官方文檔連接地址api
High Client 基於 Low Client, 主要目的是暴露一些 API,這些 API 能夠接受請求對象爲參數,返回響應對象,而對請求和響應細節的處理都是由 client 自動完成的。併發
每一個 API 在調用時均可以是同步或者異步的。同步和異步 API 的區別是什麼呢?框架
async
後綴,須要有一個 listener
做爲參數,等這個請求返回結果或者發生錯誤時,這個 listener
就會被調用只有英文版異步
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.3.2</version> </dependency>
org.elasticsearch.client:elasticsearch-rest-client
org.elasticsearch:elasticsearch
RestHighLevelClient
實例依賴 REST low-level client builder
RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http")));
High-level client 會依賴 Low-level client 來執行請求, low-level client 則會維護一個請求的線程鏈接池,由於當 high-level 請求處理結束時,應該 close 掉這個鏈接,使 low-level client 能儘快釋放資源。
client.close();
High level rest 客戶端支持下面的 文檔(Document) API
IndexRequest request = new IndexRequest( "posts", // 索引 Index "doc", // Type "1"); // 文檔 Document Id String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON); // 文檔源格式爲 json string
document source 能夠是下面的格式
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap); // 會自動將 Map 轉換爲 JSON 格式
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.timeField("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder);
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");
IndexResponse indexResponse = client.index(request);
前面已經講過,異步執行函數須要添加 listener
, 而對於 index 而言,這個 listener
的類型就是 ActionListener
client.indexAsync(request, listener);
異步方法執行後會馬上返回,在索引操做執行完成後,ActionListener
就會被回調:
onResponse
函數onFailure
函數ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { } @Override public void onFailure(Exception e) { } };
不論是同步回調仍是異步回調,若是調用成功,都會返回 IndexRespose
對象。 這個對象中包含什麼信息呢?看下面代碼
String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { // 文檔第一次建立 } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { // 文檔以前已存在,當前是重寫 } ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功的分片數量少於總分片數量 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 處理潛在的失敗信息 } }
在索引時有版本衝突的話,會拋出 ElasticsearchException
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); // 這裏是文檔版本號 try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { // 衝突了 } }
若是將 opType
設置爲 create
, 並且若是索引的文檔與已存在的文檔在 index, type 和 id 上均相同,也會拋出衝突異常。
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { } }
每一個 GET 請求都必須需傳入下面 3 個參數
GetRequest getRequest = new GetRequest( "posts", "doc", "1");
下面的參數都是可選的, 裏面的選項並不完整,如要獲取完整的屬性,請參考 官方文檔
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"message"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
request.realtime(false);
request.version(2);
request.versionType(VersionType.EXTERNAL);
GetResponse getResponse = client.get(getRequest);
此部分與 index 類似, 只有一點不一樣, 返回類型爲 GetResponse
代碼部分略
返回的 GetResponse
對象包含要請求的文檔數據(包含元數據和字段)
String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); // string 形式 Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 字節形式 } else { // 沒有發現請求的文檔 }
在請求中若是包含特定的文檔版本,若是與已存在的文檔版本不匹配, 就會出現衝突
try { GetRequest request = new GetRequest("posts", "doc", "1").version(2); GetResponse getResponse = client.get(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { // 版本衝突 } }
若是文檔存在 Exists API 返回 true
, 不然返回 fasle
。
GetRequest
用法和 Get API 差很少,兩個對象的可選參數是相同的。因爲 exists()
方法只返回 true
或者 false
, 建議將獲取 _source
以及任何存儲字段的值關閉,儘可能使請求輕量級。
GetRequest getRequest = new GetRequest( "posts", // Index "doc", // Type "1"); // Document id getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段 getRequest.storedFields("_none_"); // 禁止存儲任何字段
boolean exists = client.exists(getRequest);
異步請求與 Index API 類似,此處不贅述,只粘貼代碼。如需詳細瞭解,請參閱官方地址
ActionListener<Boolean> listener = new ActionListener<Boolean>() { @Override public void onResponse(Boolean exists) { } @Override public void onFailure(Exception e) { } }; client.existsAsync(getRequest, listener);
DeleteRequest
必須傳入下面參數
DeleteRequest request = new DeleteRequest( "posts", // index "doc", // doc "1"); // document id
超時時間
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
版本
request.version(2);
版本類型
request.versionType(VersionType.EXTERNAL);
DeleteResponse deleteResponse = client.delete(request);
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() { @Override public void onResponse(DeleteResponse deleteResponse) { } @Override public void onFailure(Exception e) { } }; client.deleteAsync(request, listener);
DeleteResponse
能夠檢索執行操做的信息,如代碼所示
String index = deleteResponse.getIndex(); String type = deleteResponse.getType(); String id = deleteResponse.getId(); long version = deleteResponse.getVersion(); ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功分片數目小於總分片 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 處理潛在失敗 } }
也能夠來檢查文檔是否存在
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist"); DeleteResponse deleteResponse = client.delete(request); if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { // 文檔不存在 }
版本衝突時也會拋出 `ElasticsearchException
try { DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2); DeleteResponse deleteResponse = client.delete(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { // 版本衝突 } }
UpdateRequest
的必需參數以下
UpdateRequest request = new UpdateRequest( "posts", // Index "doc", // 類型 "1"); // 文檔 Id
在更新部分文檔時,已存在文檔與部分文檔會合並。
部分文檔能夠有如下形式
JSON 格式
UpdateRequest request = new UpdateRequest("posts", "doc", "1"); String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request.doc(jsonString, XContentType.JSON);
Map
格式
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(jsonMap);
XContentBuilder
對象
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.timeField("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(builder);
Object
key-pairs
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update");
若是文檔不存在,可使用 upserts
方法將文檔以新文檔的方式建立。
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update");
upserts
方法支持的文檔格式與 update
方法相同。
超時時間
request.timeout(TimeValue.timeValueSeconds(1)); request.timeout("1s");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
衝突後重試次數
request.retryOnConflict(3);
獲取數據源,默認是開啓的
request.fetchSource(true);
包括特定字段
String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource(new FetchSourceContext(true, includes, excludes));
排除特定字段
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"updated"}; request.fetchSource(new FetchSourceContext(true, includes, excludes));
指定版本
request.version(2);
禁用 noop detection
request.scriptedUpsert(true);
設置若是更新的文檔不存在,就必需要建立一個
request.docAsUpsert(true);
UpdateResponse updateResponse = client.update(request);
此處只貼代碼,官方地址
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() { @Override public void onResponse(UpdateResponse updateResponse) { } @Override public void onFailure(Exception e) { } }; client.updateAsync(request, listener);
String index = updateResponse.getIndex(); String type = updateResponse.getType(); String id = updateResponse.getId(); long version = updateResponse.getVersion(); if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { // 文檔已建立 } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { // 文檔已更新 } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { // 文檔已刪除 } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { // 文檔不受更新的影響 }
若是在 UpdateRequest
中使能了獲取源數據,響應中則包含了更新後的源文檔信息。
GetResult result = updateResponse.getGetResult(); if (result.isExists()) { String sourceAsString = result.sourceAsString(); // 將獲取的文檔以 string 格式輸出 Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式輸出 byte[] sourceAsBytes = result.source(); // 字節形式 } else { // 默認狀況下,不會返回文檔源數據 }
也能夠檢測是否分片失敗
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { // 成功的分片數量小於總分片數量 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); // 獲得分片失敗的緣由 } }
若是在執行 UpdateRequest
時,文檔不存在,響應中會包含 404
狀態碼,並且會拋出 ElasticsearchException
。
UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist") .doc("field", "value"); try { UpdateResponse updateResponse = client.update(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { // 處理文檔不存在的狀況 } }
若是版本衝突,也會拋出 ElasticsearchException
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse = client.update(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { // 處理版本衝突的狀況 } }
使用 BulkRequest
能夠在一次請求中執行多個索引,更新和刪除的操做。
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); // 將第一個 IndexRequest 添加到批量請求中 request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); // 第二個 request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz")); // 第三個
在同一個 BulkRequest
也能夠添加不一樣的操做類型
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.JSON,"field", "baz"));
超時時間
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
設置在批量操做前必須有幾個分片處於激活狀態
request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL); // 所有分片都處於激活狀態 request.waitForActiveShards(ActiveShardCount.DEFAULT); // 默認 request.waitForActiveShards(ActiveShardCount.ONE); // 一個
BulkResponse bulkResponse = client.bulk(request);
與 GETAPI 等請求相似,只貼代碼。
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } }; client.bulkAsync(request, listener);
BulkResponse
中包含執行操做後的信息,並容許對每一個操做結果迭代。
for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍歷全部的操做結果 DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 獲取操做結果的響應,能夠是 IndexResponse, UpdateResponse or DeleteResponse, 它們均可以慚怍是 DocWriteResponse 實例 if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操做後的響應結果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操做後的響應結果 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操做後的響應結果 } }
此外,批量響應還有一個很是便捷的方法來檢測是否有一個或多個操做失敗
if (bulkResponse.hasFailures()) { // 表示至少有一個操做失敗 }
在這種狀況下,咱們要遍歷全部的操做結果,檢查是不是失敗的操做,並獲取對應的失敗信息
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { // 檢測給定的操做是否失敗 BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 獲取失敗信息 } }
BulkProcessor
是爲了簡化 Bulk API 的操做提供的一個工具類,要執行操做,就須要下面組件
RestHighLevelClient
用來執行 BulkRequest
並獲取 BulkResponse`BulkProcessor.Listener
對 BulkRequest
執行先後以及失敗時監聽BulkProcessor.builder
方法用來構建一個新的BulkProcessor
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { // 在每一個 BulkRequest 執行前調用 } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { // 在每一個 BulkRequest 執行後調用 } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { // 失敗時調用 } }; BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build(); // 構建 BulkProcessor, RestHighLevelClient.bulkAsync() 用來執行 BulkRequest
BulkProcessor.Builder
提供了多個方法來配置 BulkProcessor
如何來處理請求的執行。
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener); builder.setBulkActions(500); // 指定多少操做時,就會刷新一次 builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); // 指定多大容量,就會刷新一次 builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 容許併發執行的數量 builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
BulkProcessor
建立後,各類請求就能夠添加進去:
IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
BulkProcessor
執行時,會對每一個 bulk request調用 BulkProcessor.Listener
, listener 提供了下面方法來訪問 BulkRequest
和 BulkResponse
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); // 在執行前獲取操做的數量 logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { // 執行後查看響應中是否包含失敗的操做 logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); // 請求失敗時打印信息 } };
請求添加到 BulkProcessor
, 它的實例可使用下面兩種方法關閉請求。
awaitClose()
在請求返回後或等待必定時間關閉boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
close()
馬上關閉bulkProcessor.close();
兩個方法都會在關閉前對處理器中的請求進行刷新,並避免新的請求添加進去。
multiGet
API 能夠在單個 http 交互中並行的執行多個 get
請求。
MutiGetRequest
實例化時參數爲空,實例化後能夠經過添加 MultiGetRequest.Item
來配置獲取的信息
MultiGetRequest request = new MultiGetRequest(); request.add(new MultiGetRequest.Item( "index", // 索引 "type", // 類型 "example_id")); // 文檔 id request.add(new MultiGetRequest.Item("index", "type", "another_id")); // 添加另一個條目
multiGet
支持的參數與 Get API 支持的可選參數是相同的,能夠在 Item 上設置它們。
構建 MultiGetRequest
後能夠同步的方式執行multiGet
MultiGetResponse response = client.multiGet(request);
和上面的異步執行同樣,也是使用 listener 機制。
ActionListener<MultiGetResponse> listener = new ActionListener<MultiGetResponse>() { @Override public void onResponse(MultiGetResponse response) { } @Override public void onFailure(Exception e) { } }; client.multiGetAsync(request, listener);
MultiGetResponse
中getResponse
方法包含的 MultiGetItemResponse
順序與請求時的相同。
MultiGetItemResponse
,若是執行成功,就會返回 GetResponse
對象,失敗則返回
MultiGetResponse.Failure
MultiGetItemResponse firstItem = response.getResponses()[0]; assertNull(firstItem.getFailure()); // 執行成功,則返回 null GetResponse firstGet = firstItem.getResponse(); // 返回 GetResponse 對象 String index = firstItem.getIndex(); String type = firstItem.getType(); String id = firstItem.getId(); if (firstGet.isExists()) { long version = firstGet.getVersion(); String sourceAsString = firstGet.getSourceAsString(); // string 格式 Map<String, Object> sourceAsMap = firstGet.getSourceAsMap(); // Map byte[] sourceAsBytes = firstGet.getSourceAsBytes(); // bytes } else { // 沒有發現文檔 // 儘管響應中會返回 404 狀態碼,也會返回一個有效的 GetResponse // 這是可使用 isExists 方法來判斷 }
若是子請求中對應的 index 不存在,返回的 getFailure
方法中會包含 exception:
assertNull(missingIndexItem.getResponse()); // 獲取的響應爲空 Exception e = missingIndexItem.getFailure().getFailure(); // 獲取 exception ElasticsearchException ee = (ElasticsearchException) e; // TODO status is broken! fix in a followup // assertEquals(RestStatus.NOT_FOUND, ee.status()); assertThat(e.getMessage(), containsString("reason=no such index"));
對版本衝突時的處理,官方說明地址
MultiGetRequest request = new MultiGetRequest(); request.add(new MultiGetRequest.Item("index", "type", "example_id") .version(1000L)); MultiGetResponse response = client.multiGet(request); MultiGetItemResponse item = response.getResponses()[0]; assertNull(item.getResponse()); Exception e = item.getFailure().getFailure(); ElasticsearchException ee = (ElasticsearchException) e; // TODO status is broken! fix in a followup // assertEquals(RestStatus.CONFLICT, ee.status()); assertThat(e.getMessage(), containsString("version conflict, current version [1] is " + "different than the one provided [1000]"));
本文只包含 Java High level Rest Client 的 起步,和文檔 API 部分,下篇文章中會包含查詢 API,敬請期待。