前面歷經33篇內容的講解,與ES的請求操做都是在Kibana平臺上用Restful請求完成的,一直沒發佈Java或python的客戶端代碼,Restful纔是運用、理解ES核心功能最直接的表達方式,但實際項目中確定是以Java/python來完成ES請求的發起與數據處理的,前面理解了ES的核心功能,後面Java API的使用將會很是簡單,剩餘的未覆蓋的功能API,自行查閱文檔便可。html
本篇講解Elasticsearch的客戶端API開發的一些示例,以Java語言爲主,介紹一些最經常使用,最核心的API。java
咱們以maven項目爲例,添加項目依賴node
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.12.1</version> </dependency>
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
若是集羣的節點數比較多,爲每一個node分別指定IP、Port可行性不高,咱們可使用集羣節點自動探查的功能,代碼以下:python
// 將client.transport.sniff設置爲true便可打開集羣節點自動探查功能 Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build(); // 只須要指定一個node就行 TransportClient client = new PreBuiltTransportClient(settings); transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));
最基本的CRUD代碼,能夠看成入門demo來寫:mysql
/** * 建立員工信息(建立一個document) * @param client */ private static void createEmployee(TransportClient client) throws Exception { IndexResponse response = client.prepareIndex("company", "employee", "1") .setSource(XContentFactory.jsonBuilder() .startObject() .field("name", "jack") .field("age", 27) .field("position", "technique") .field("country", "china") .field("join_date", "2017-01-01") .field("salary", 10000) .endObject()) .get(); System.out.println(response.getResult()); } /** * 獲取員工信息 * @param client * @throws Exception */ private static void getEmployee(TransportClient client) throws Exception { GetResponse response = client.prepareGet("company", "employee", "1").get(); System.out.println(response.getSourceAsString()); } /** * 修改員工信息 * @param client * @throws Exception */ private static void updateEmployee(TransportClient client) throws Exception { UpdateResponse response = client.prepareUpdate("company", "employee", "1") .setDoc(XContentFactory.jsonBuilder() .startObject() .field("position", "technique manager") .endObject()) .get(); System.out.println(response.getResult()); } /** * 刪除 員工信息 * @param client * @throws Exception */ private static void deleteEmployee(TransportClient client) throws Exception { DeleteResponse response = client.prepareDelete("company", "employee", "1").get(); System.out.println(response.getResult()); }
咱們以前使用Restful的搜索,如今改用java實現,原有的Restful示例以下:sql
GET /company/employee/_search { "query": { "bool": { "must": [ { "match": { "position": "technique" } } ], "filter": { "range": { "age": { "gte": 30, "lte": 40 } } } } }, "from": 0, "size": 1 }
等同於這樣的Java代碼:apache
SearchResponse response = client.prepareSearch("company") .setTypes("employee") .setQuery(QueryBuilders.termQuery("position", "technique")) // Query .setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40)) // Filter .setFrom(0).setSize(60) .get();
聚合查詢稍微麻煩一些,請求的封裝和響應報文的解析,都是根據實際返回的結構來作的,例以下面的查詢:json
需求:設計模式
Restful的請求以下:api
GET /company/employee/_search { "size": 0, "aggs": { "group_by_country": { "terms": { "field": "country" }, "aggs": { "group_by_join_date": { "date_histogram": { "field": "join_date", "interval": "year" }, "aggs": { "avg_salary": { "avg": { "field": "salary" } } } } } } } }
用Java編寫的請求以下:
SearchResponse sr = node.client().prepareSearch() .addAggregation( AggregationBuilders.terms("by_country").field("country") .subAggregation(AggregationBuilders.dateHistogram("by_year") .field("dateOfBirth") .dateHistogramInterval(DateHistogramInterval.YEAR) .subAggregation(AggregationBuilders.avg("avg_children").field("children")) ) ) .execute().actionGet();
對響應的處理,則須要一層一層獲取數據:
Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap(); StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country"); Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator(); while(groupByCountryBucketIterator.hasNext()) { Bucket groupByCountryBucket = groupByCountryBucketIterator.next(); System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount()); Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator(); while(groupByJoinDateBucketIterator.hasNext()) { org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next(); System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount()); Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary"); System.out.println(avgSalary.getValue()); } } client.close(); }
private static void upsert(TransportClient transport) { try { IndexRequest index = new IndexRequest("book_shop", "books", "2").source( XContentFactory.jsonBuilder().startObject() .field("name", "mysql從入門到刪庫跑路") .field("tags", "mysql") .field("price", 32.8) .endObject()); UpdateRequest update = new UpdateRequest("book_shop", "books", "2") .doc(XContentFactory.jsonBuilder() .startObject().field("price", 31.8) .endObject()) .upsert(index); UpdateResponse response = transport.update(update).get(); System.out.println(response.getVersion()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
public static void mget(TransportClient transport) { MultiGetResponse res = transport.prepareMultiGet() .add("book_shop", "books", "1") .add("book_shop", "books", "2") .get(); for (MultiGetItemResponse item : res.getResponses()) { System.out.println(item.getResponse()); } }
public static void bulk(TransportClient transport) { try { BulkRequestBuilder bulk = transport.prepareBulk(); bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource( XContentFactory.jsonBuilder().startObject() .field("name", "設計模式從入門到拷貝代碼") .field("tags", "設計模式") .field("price", 55.9) .endObject())); bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource( XContentFactory.jsonBuilder().startObject() .field("name", "架構設計從入門到google搜索") .field("tags", "架構設計") .field("price", 68.9) .endObject())); bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder() .startObject().field("price", 32.8) .endObject()))); BulkResponse bulkRes = bulk.get(); if (bulkRes.hasFailures()) { System.out.println("Error..."); } } catch (IOException e) { e.printStackTrace(); } }
public static void scorll(TransportClient client) { SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get(); int batchCnt = 0; do { // 循環讀取scrollid信息,直到結果爲空 for(SearchHit hit: bookShop.getHits().getHits()) { System.out.println("batchCnt:" + ++batchCnt); System.out.println(hit.getSourceAsString()); } bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (bookShop.getHits().getHits().length != 0); }
public static void searchTemplates(TransportClient client) { Map<String,Object> params = new HashMap<>(10); params.put("from",0); params.put("size",10); params.put("tags","java"); SearchTemplateResponse str = new SearchTemplateRequestBuilder(client) .setScript("page_query_by_tags") .setScriptType(ScriptType.STORED) .setScriptParams(params) .setRequest(new SearchRequest()) .get(); for(SearchHit hit:str.getResponse().getHits().getHits()) { System.out.println(hit.getSourceAsString()); } }
public static void otherSearch(TransportClient client) { SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get(); SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get(); SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入門")).get(); SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get(); System.out.println(response1.getHits().getHits()[0].getSourceAsString()); System.out.println(response2.getHits().getHits()[0].getSourceAsString()); System.out.println(response3.getHits().getHits()[0].getSourceAsString()); System.out.println(response4.getHits().getHits()[0].getSourceAsString()); // 多個條件組合 SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("tags", "java")) .mustNot(QueryBuilders.matchQuery("name", "跑路")) .should(QueryBuilders.matchQuery("name", "入門")) .filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get(); System.out.println(response5.getHits().getHits()[0].getSourceAsString()); }
public static void geo(TransportClient client) { GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114); List<GeoPoint> points = new ArrayList<>(); points.add(new GeoPoint(23,115)); points.add(new GeoPoint(25,113)); points.add(new GeoPoint(21,112)); GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points); GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS); SearchResponse response = client.prepareSearch("location").setQuery(query3).get(); for(SearchHit hit:response.getHits().getHits()) { System.out.println(hit.getSourceAsString()); } }
上述的那些案例demo,快速瀏覽一下便可,若是已經在開發ES相關的項目,仍是多參考官方的API文檔:https://www.elastic.co/guide/...。上面有很詳盡的API說明和使用Demo
專一Java高併發、分佈式架構,更多技術乾貨分享與心得,請關注公衆號:Java架構社區
能夠掃左邊二維碼添加好友,邀請你加入Java架構社區微信羣共同探討技術