ES Java client Document API整理java
對ES數據使用Java client進行處理設置。(這裏有一個小細節就是:Java API版本必定要和ES版本對應,就是前面兩位大版本必定要相等)json
Maven倉庫:api
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>5.5.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.5.3</version> </dependency>
api文檔說明:app
TransportClient client=null; Settings settings = Settings.builder() .put("cluster.name", "my-application")//指定集羣名稱 .put("client.transport.sniff", true)//探測集羣中機器狀態 .build(); client = new PreBuiltTransportClient(settings); try { client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); } catch (UnknownHostException e) { e.printStackTrace(); } return client;
Document APIS:異步
對單個文檔進行的操做 elasticsearch
setSource第一種用法ide
IndexResponse response = client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", "niaho") .field("message", "trying out Elasticsearch") .endObject() ).get();
setSource第二種用法post
XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject(); IndexResponse response = client.prepareIndex("twitter", "tweet", "2") .setSource(builder.toString(), XContentType.JSON ) .get();
經過查看源碼,還有幾種用法(還有一些過時的就沒有推薦)ui
setSource(byte[] source, int offset, int length, XContentType xContentType) setSource(Object... source) setSource(XContentType xContentType, Object... source)
對IndexResponse返回值的屬性進行了解線程
Result getResult()//結果 public String getIndex()//索引名 public ShardId getShardId()//分片id public String getType() public String getId() public long getVersion()//第一次存入的數據version 是1 public boolean forcedRefresh() public void setForcedRefresh() public RestStatus status()
prepareGet用法(index,type,id)比較侷限的是必需要知道id的狀況進行查詢
GetResponse getResponse=client.prepareGet("twitter", "tweet", "2") .setOperationThreaded(false)//是否異步線程 .get(); System.out.println(getResponse.toString());
prepareDelete用法侷限同上
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .setOperationThreaded(false) .get();
直接查詢刪除
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .get(); long deleted = response.getDeleted();
對成功失敗須要一些處理的狀況下
//對成功失敗須要特殊處理的狀況下使用監聽 DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .execute(new ActionListener<BulkByScrollResponse>() { @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });
對UpdateRequest進行操做1
UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
對UpdateRequest進行操做2
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
使用script進行更新操做(也能夠使用ScriptService.ScriptType.FILE,用的就是文件名)
client.prepareUpdate("ttl", "doc", "1") .setScript(new Script("ctx._source.gender = \"male\"" , ScriptService.ScriptType.INLINE, null, null)) .get();
upsert操做(存在就更新,不存在就插入)
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
對多個文檔進行批量操做
能夠跨索引查詢
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }
Bulk API
批量新增文檔,批量刪除文檔均可以
// either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
同時新增和刪除
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
對批量處理結果進行處理
BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();