一、index APIhtml
IndexRequest request = new IndexRequest( "posts", //index "doc", //type 類型,我對類型的理解有點相似於數據庫中的表 index相似於數據庫中的database "1"); //Document Id String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON); //source能夠有多種形式下面介紹
source能夠以map的形式提供,查看官方文檔介紹map形式提供的source會自動轉換成json格式,初步觀察源代碼,寫的還挺複雜,簡單過了一遍其實沒太懂,大概意思是map->XContentBuilder,XContentBuilder經過內置工具生成json數據庫
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(jsonMap);
source也能夠以XContentBuilder形式提供,經過 Elasticsearch built-in helpers生產恆jsonjson
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.field("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source(builder);
source也能夠以Object key-value形式提供,依然轉換成jsonapi
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1") .source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch");
其餘可選參數數組
設置路由,提及這個方法要介紹一下路由的概念,elasticsearch 路由機制緩存
request.routing("routing");
parent,es的parent-child結構,簡單點說是一對多的關係,es多對多狀況要拆分紅一對多;限制是parent和children必須在同一個shard當中,當添加文檔時指定了parent後,就不會用默認的本文檔id分配路由,而是採用父文檔的路由值,保證父文檔和子文檔處於同一個shard當中。須要注意的是,查詢時須要指定路由,不然查詢會出錯。當大於等於三級時,須要指定最頂層父節點路由,以讓文檔存儲在用一個shard中。less
request.parent("parent");
設置超時時間,有兩種形式, TimeValue 或者字符串異步
request.timeout(TimeValue.timeValueSeconds(1)); request.timeout("1s");
設置刷新策略, WriteRequest.RefreshPolicy 或者字符串elasticsearch
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
版本號,好像是文檔版本,相似於樂觀鎖的東西吧,沒驗證,驗證後更新ide
request.version(2);
操做類型,當設置create時,而且指定index、type和id存在,不會更新文檔,會拋出異常
request.opType(DocWriteRequest.OpType.CREATE); request.opType("create");
同步調用方式,沒什麼可說的
IndexResponse indexResponse = client.index(request);
異步調用方式,須要傳入一個監聽接口,通知結果或者接受異常
client.indexAsync(request, new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { //成功後執行代碼,響應結果以參數傳入 } @Override public void onFailure(Exception e) { //失敗時執行代碼,異常以參數傳入 } });
響應結果
響應結果官方示例代碼以下
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
//文檔不存在,建立後處理代碼
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
、、//
/////文檔被更新
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//成功shard數量小於總數
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
//處理可能存在的失敗
}
}
異常處理
文檔發生版本衝突
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .version(1); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //版本衝突處理代碼 } }
設置opType=create,而且指定index、type和id存在,不會更新文檔,會拋出異常
IndexRequest request = new IndexRequest("posts", "doc", "1") .source("field", "value") .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = client.index(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //處理代碼 } }
二、get api
GetRequest getRequest = new GetRequest( "posts", //index "doc", //type "1"); //document id
可變參數
禁用獲取源數據(source),默認開啓。先要理解什麼是源數據,_source字段,保存存儲是的json body
request.fetchSourceContext(new FetchSourceContext(false));
設置檢索返回字段和檢索排除字段,同時試了一下即包含又排除的字段,排除有限,既包含又排斥的座位排斥字段
String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"message"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);
設置返回指定存儲字段(我是這樣理解的,不過我測試失敗了,設置了存儲字段以後,仍是不能像例子同樣獲取值)
request.storedFields("message"); GetResponse getResponse = client.get(request); String message = getResponse.getField("message").getValue();
設置路由,parent-child查詢時須要指定路由
request.routing("routing");
設置查詢父節點,在get查詢時,不理解爲何須要設置parent,我測試了一下發現沒什麼用,無論設置的parent是否是正確的parent,都能查詢出結果,這個應該是一個通用方法吧,在get中可能沒有用到
request.parent("parent");
elasticsearch可使用preference參數來指定分片查詢的優先級,即咱們能夠經過該參數來控制搜索時的索引數據分片。如不設置該參數:在全部有效的主分片以及副本間輪詢
沒太理解,因此沒測試
request.preference("preference");
設置實時查詢開啓/關閉,默認爲true。
經過查詢資料,real-time是es的一種方式,也是新文檔索引模型,跟這個api好像關係不大,這個api不是很理解,我本地集羣數據比較少,true/false沒發現太大區別;es經過fsync把數據寫入到磁盤中,fsync十分消耗資源,es的實時查詢瓶頸在硬盤讀寫,es利用文件系統緩存來加快實時查詢速度。具體資料:
大牛博客:https://www.jianshu.com/p/94ce44d6a802
官方文檔:https://www.elastic.co/guide/en/elasticsearch/guide/current/near-real-time.html
request.realtime(false);
設置查詢前執行刷新,默認false,每一個shard默認1秒鐘一次refresh
request.refresh(true);
版本號,很少解釋
request.version(2);
調用方式
同步
GetResponse getResponse = client.get(getRequest);
異步
client.getAsync(request, new ActionListener<GetResponse>() { @Override public void onResponse(GetResponse getResponse) { //成功 } @Override public void onFailure(Exception e) { //失敗 } });
響應對象
GetResponse查詢請求文檔及其源數據以及最終存儲字段
String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); //字符串形式 Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); //Map形式 byte[] sourceAsBytes = getResponse.getSourceAsBytes(); //字節數組形式 } else { //處理找不到文檔代碼。注意返回的是404狀態而不是異常, }
異常處理
若是須要捕獲異常,須要try catch代碼塊包裹,ElasticsearchException是runtime exception
GetRequest request = new GetRequest("does_not_exist", "doc", "1"); try { GetResponse getResponse = client.get(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { //處理文檔不存在狀況 } }
指定版本號,版本號衝突狀況
try { GetRequest request = new GetRequest("posts", "doc", "1").version(2); GetResponse getResponse = client.get(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { //版本號衝突 } }
三、delete api
若是存在parent的type,須要指定路由(route)
異步調用測試失敗,按照官方文檔,無論成功仍是失敗均沒捕獲到斷點
DeleteRequest request = new DeleteRequest(
"posts", //index
"doc", //type
"1"); //id
調用方式
同步
DeleteResponse deleteResponse = client.delete(request);
異步
client.deleteAsync(request, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
//成功
}
@Override
public void onFailure(Exception e) {
//失敗
}
});
響應對象
返回操做信息,以下
String index = deleteResponse.getIndex(); String type = deleteResponse.getType(); String id = deleteResponse.getId(); long version = deleteResponse.getVersion(); ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //成功shard數量小於總數 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); //處理潛在的錯誤 } }
刪除文檔不存在狀況
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist"); DeleteResponse deleteResponse = client.delete(request); if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { //刪除文檔不存在 }
異常處理
版本號衝突
try { DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2); DeleteResponse deleteResponse = client.delete(request); } catch (ElasticsearchException exception) { if (exception.status() == RestStatus.CONFLICT) { //版本衝突 } }
四、update api
UpdateRequest request = new UpdateRequest( "posts", //index "doc", //type "1"); //document id
painless script暫時不寫,沒搞明白語法
源數據格式(其實都是轉換成json格式)
UpdateRequest request = new UpdateRequest("posts", "doc", "1"); String jsonString = "{" + "\"updated\":\"2017-01-01\"," + "\"reason\":\"daily update\"" + "}"; request.doc(jsonString, XContentType.JSON); //json格式
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(jsonMap); //map格式
XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("updated", new Date()); builder.field("reason", "daily update"); } builder.endObject(); UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc(builder); //XContentBuilder
UpdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("updated", new Date(), "reason", "daily update"); //object key-pairs
upserts(save or update)
當文檔不存在時,以新文檔插入
String jsonString = "{\"created\":\"2017-01-01\"}"; request.upsert(jsonString, XContentType.JSON);
可變參數
設置路由,路由什麼意思,前一篇文章介紹過
request.routing("routing");
設置父節點
request.parent("parent");
兩種形式的超時設置
request.timeout(TimeValue.timeValueSeconds(1)); request.timeout("1s");
設置刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
衝突嘗試次數
request.retryOnConflict(3);
是否返回源數據,默認false
request.fetchSource(true);
指定包含字段或者排除字段,重疊時排除字段優先
String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource(new FetchSourceContext(true, includes, excludes));
String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"updated"}; request.fetchSource(new FetchSourceContext(true, includes, excludes));
數據版本號
request.version(2);
noop檢測。個人理解是返回狀態值
request.detectNoop(false);
調用方式
同步
UpdateResponse updateResponse = client.update(request);
異步
client.updateAsync(request, new ActionListener<UpdateResponse>() { @Override public void onResponse(UpdateResponse updateResponse) { //成功 } @Override public void onFailure(Exception e) { //失敗 } });
響應對象
String index = updateResponse.getIndex(); String type = updateResponse.getType(); String id = updateResponse.getId(); long version = updateResponse.getVersion(); if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) { (upserts)首次建立對象 } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { //文檔被更新 } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) { //文檔被刪除 } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) { //沒有 }
設置返回源數據,回去返回的源數據
GetResult result = updateResponse.getGetResult(); //以GetResult格式返回更新後的文檔 if (result.isExists()) { String sourceAsString = result.sourceAsString(); //字符串形式返回更新後文檔源數據 Map<String, Object> sourceAsMap = result.sourceAsMap(); //map形式 byte[] sourceAsBytes = result.source(); //字節形式 } else { //默認狀況下,源數據在響應對象中返回 }
shard異常
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { //成功數量小於總數 } if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); //處理可能的異常 } }
版本號衝突狀況
pdateRequest request = new UpdateRequest("posts", "doc", "1") .doc("field", "value") .version(1); try { UpdateResponse updateResponse = client.update(request); } catch(ElasticsearchException e) { if (e.status() == RestStatus.CONFLICT) { //處理版本號衝突狀況 } }