Elasitcsearch High Level Rest Client學習筆記(二) 基礎API

一、index APIhtml

IndexRequest request = new IndexRequest(
        "posts", //index
        "doc",   //type 類型,我對類型的理解有點相似於數據庫中的表   index相似於數據庫中的database
        "1");    //Document Id
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";
request.source(jsonString, XContentType.JSON);  //source能夠有多種形式下面介紹

 

source能夠以map的形式提供,查看官方文檔介紹map形式提供的source會自動轉換成json格式,初步觀察源代碼,寫的還挺複雜,簡單過了一遍其實沒太懂,大概意思是map->XContentBuilder,XContentBuilder經過內置工具生成json數據庫

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(jsonMap);

 

source也能夠以XContentBuilder形式提供,經過 Elasticsearch built-in helpers生產恆jsonjson

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.field("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(builder);

 

source也能夠以Object key-value形式提供,依然轉換成jsonapi

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
                "postDate", new Date(),
                "message", "trying out Elasticsearch");

 

其餘可選參數數組

設置路由,提及這個方法要介紹一下路由的概念,elasticsearch 路由機制緩存

request.routing("routing");

 

parent,es的parent-child結構,簡單點說是一對多的關係,es多對多狀況要拆分紅一對多;限制是parent和children必須在同一個shard當中,當添加文檔時指定了parent後,就不會用默認的本文檔id分配路由,而是採用父文檔的路由值,保證父文檔和子文檔處於同一個shard當中。須要注意的是,查詢時須要指定路由,不然查詢會出錯。當大於等於三級時,須要指定最頂層父節點路由,以讓文檔存儲在用一個shard中。less

request.parent("parent");

 

設置超時時間,有兩種形式, TimeValue 或者字符串異步

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s");

 

設置刷新策略, WriteRequest.RefreshPolicy 或者字符串elasticsearch

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

 

版本號,好像是文檔版本,相似於樂觀鎖的東西吧,沒驗證,驗證後更新ide

request.version(2);

 

操做類型,當設置create時,而且指定index、type和id存在,不會更新文檔,會拋出異常

request.opType(DocWriteRequest.OpType.CREATE); 
request.opType("create");

 

同步調用方式,沒什麼可說的

IndexResponse indexResponse = client.index(request);

異步調用方式,須要傳入一個監聽接口,通知結果或者接受異常

client.indexAsync(request, new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        //成功後執行代碼,響應結果以參數傳入
    }

    @Override
    public void onFailure(Exception e) {
       //失敗時執行代碼,異常以參數傳入
    }
});

響應結果

響應結果官方示例代碼以下

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    //文檔不存在,建立後處理代碼
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    、、//
/////文檔被更新
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    //成功shard數量小於總數
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); 
        //處理可能存在的失敗
    }
}

異常處理

文檔發生版本衝突

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .version(1);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        //版本衝突處理代碼
    }
}

設置opType=create,而且指定index、type和id存在,不會更新文檔,會拋出異常

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
         //處理代碼
    }
}

二、get api

GetRequest getRequest = new GetRequest(
        "posts", //index
        "doc",  //type
        "1");     //document id

可變參數

禁用獲取源數據(source),默認開啓。先要理解什麼是源數據,_source字段,保存存儲是的json body

request.fetchSourceContext(new FetchSourceContext(false));

 

設置檢索返回字段和檢索排除字段,同時試了一下即包含又排除的字段,排除有限,既包含又排斥的座位排斥字段

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);

 

設置返回指定存儲字段(我是這樣理解的,不過我測試失敗了,設置了存儲字段以後,仍是不能像例子同樣獲取值)

request.storedFields("message"); 
GetResponse getResponse = client.get(request);
String message = getResponse.getField("message").getValue();

 

設置路由,parent-child查詢時須要指定路由

request.routing("routing");

 

設置查詢父節點,在get查詢時,不理解爲何須要設置parent,我測試了一下發現沒什麼用,無論設置的parent是否是正確的parent,都能查詢出結果,這個應該是一個通用方法吧,在get中可能沒有用到

request.parent("parent");

 

elasticsearch可使用preference參數來指定分片查詢的優先級,即咱們能夠經過該參數來控制搜索時的索引數據分片。如不設置該參數:在全部有效的主分片以及副本間輪詢

沒太理解,因此沒測試

request.preference("preference");

 

設置實時查詢開啓/關閉,默認爲true。

經過查詢資料,real-time是es的一種方式,也是新文檔索引模型,跟這個api好像關係不大,這個api不是很理解,我本地集羣數據比較少,true/false沒發現太大區別;es經過fsync把數據寫入到磁盤中,fsync十分消耗資源,es的實時查詢瓶頸在硬盤讀寫,es利用文件系統緩存來加快實時查詢速度。具體資料:

大牛博客:https://www.jianshu.com/p/94ce44d6a802

官方文檔:https://www.elastic.co/guide/en/elasticsearch/guide/current/near-real-time.html

request.realtime(false);

 

設置查詢前執行刷新,默認false,每一個shard默認1秒鐘一次refresh

request.refresh(true);

 

版本號,很少解釋

request.version(2);

 

調用方式

同步

GetResponse getResponse = client.get(getRequest);

異步

client.getAsync(request, new ActionListener<GetResponse>() {
    @Override
    public void onResponse(GetResponse getResponse) {
        //成功
    }

    @Override
    public void onFailure(Exception e) {
        //失敗
    }
});

響應對象

GetResponse查詢請求文檔及其源數據以及最終存儲字段

String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
    long version = getResponse.getVersion();
    String sourceAsString = getResponse.getSourceAsString();        //字符串形式
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); //Map形式
    byte[] sourceAsBytes = getResponse.getSourceAsBytes();          //字節數組形式
} else {
    //處理找不到文檔代碼。注意返回的是404狀態而不是異常,
}

異常處理

若是須要捕獲異常,須要try catch代碼塊包裹,ElasticsearchException是runtime exception

GetRequest request = new GetRequest("does_not_exist", "doc", "1");
try {
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        //處理文檔不存在狀況
    }
}

指定版本號,版本號衝突狀況

try {
    GetRequest request = new GetRequest("posts", "doc", "1").version(2);
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        //版本號衝突
    }
}

三、delete api

若是存在parent的type,須要指定路由(route)

異步調用測試失敗,按照官方文檔,無論成功仍是失敗均沒捕獲到斷點

DeleteRequest request = new DeleteRequest(
        "posts",    //index
        "doc",     //type
        "1");      //id

調用方式

同步

DeleteResponse deleteResponse = client.delete(request);

異步

client.deleteAsync(request, new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        //成功
    }

    @Override
    public void onFailure(Exception e) {
        //失敗
    }
});

響應對象

返回操做信息,以下

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    //成功shard數量小於總數
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); //處理潛在的錯誤
    }
}

刪除文檔不存在狀況

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    //刪除文檔不存在
}

異常處理

版本號衝突

try {
    DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
    DeleteResponse deleteResponse = client.delete(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        //版本衝突
    }
}

四、update api

UpdateRequest request = new UpdateRequest(
        "posts", //index
        "doc",  //type
        "1");    //document id

painless script暫時不寫,沒搞明白語法

源數據格式(其實都是轉換成json格式)

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON); //json格式
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); //map格式
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder); //XContentBuilder
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); //object key-pairs

upserts(save or update)

當文檔不存在時,以新文檔插入

String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);

可變參數

設置路由,路由什麼意思,前一篇文章介紹過

request.routing("routing");

 

設置父節點

request.parent("parent");

 

兩種形式的超時設置

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s");

 

設置刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

 

衝突嘗試次數

request.retryOnConflict(3);

 

是否返回源數據,默認false

request.fetchSource(true);

 

指定包含字段或者排除字段,重疊時排除字段優先

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes));
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes));

 

數據版本號

request.version(2);

 

noop檢測。個人理解是返回狀態值

request.detectNoop(false);

調用方式

同步

UpdateResponse updateResponse = client.update(request);

異步

client.updateAsync(request, new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        //成功
    }

    @Override
    public void onFailure(Exception e) {
        //失敗
    }
});

響應對象

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    (upserts)首次建立對象
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    //文檔被更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    //文檔被刪除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    //沒有
}

設置返回源數據,回去返回的源數據

GetResult result = updateResponse.getGetResult(); //以GetResult格式返回更新後的文檔
if (result.isExists()) {
    String sourceAsString = result.sourceAsString(); //字符串形式返回更新後文檔源數據
    Map<String, Object> sourceAsMap = result.sourceAsMap(); //map形式
    byte[] sourceAsBytes = result.source(); //字節形式
} else {
    //默認狀況下,源數據在響應對象中返回
}

shard異常

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
     //成功數量小於總數
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason();  //處理可能的異常
    }
}

版本號衝突狀況

pdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        //處理版本號衝突狀況
    }
}
相關文章
相關標籤/搜索