使用Java操做Elasticsearch(Elasticsearch的java api使用)

一、Elasticsearch是基於Lucene開發的一個分佈式全文檢索框架,向Elasticsearch中存儲和從Elasticsearch中查詢,格式是json。java

索引index,至關於數據庫中的database。數據庫

類型type至關於數據庫中的table。apache

主鍵id至關於數據庫中記錄的主鍵,是惟一的。json

向Elasticsearch中存儲數據,其實就是向es中的index下面的type中存儲json類型的數據。設計模式

二、Elasticsearch是RestFul風格的api,經過http的請求形式(注意,參數是url拼接仍是請求的json形式哦),發送請求,對Elasticsearch進行操做。
查詢,請求方式應該是get。刪除,請求方式應該是delete。添加,請求方式應該是put/post。修改,請求方式應該是put/post。
RESTFul接口url的格式:http://ip:port/<index>/<type>/<[id]>。其中index、type是必須提供的。id是能夠選擇的,不提供es會自動生成,index、type將信息進行分層,利於管理。
api

三、如何使用java鏈接Elasticsearch。因爲使用的是maven項目,pom.xml的依賴以下所示:restful

 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 4     http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.bie</groupId>
 7     <artifactId>elasticsearch-hello</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9 
10     <properties>
11         <maven.compiler.source>1.8</maven.compiler.source>
12         <maven.compiler.target>1.8</maven.compiler.target>
13         <encoding>UTF-8</encoding>
14     </properties>
15 
16     <dependencies>
17         <!-- elasticsearch的客戶端 -->
18         <dependency>
19             <groupId>org.elasticsearch.client</groupId>
20             <artifactId>transport</artifactId>
21             <version>5.4.3</version>
22         </dependency>
23         <!-- elasticsearch依賴2.x的log4j -->
24         <dependency>
25             <groupId>org.apache.logging.log4j</groupId>
26             <artifactId>log4j-api</artifactId>
27             <version>2.8.2</version>
28         </dependency>
29         <dependency>
30             <groupId>org.apache.logging.log4j</groupId>
31             <artifactId>log4j-core</artifactId>
32             <version>2.8.2</version>
33         </dependency>
34         <!-- junit單元測試 -->
35         <dependency>
36             <groupId>junit</groupId>
37             <artifactId>junit</artifactId>
38             <version>4.12</version>
39         </dependency>
40     </dependencies>
41 
42 
43 </project>

使用查詢的方式,先簡單測試一下是否連通es集羣,和對比查詢的數據是否一致。app

 1 package com.bie.elasticsearch;
 2 
 3 import java.net.InetAddress;
 4 
 5 import org.elasticsearch.action.get.GetResponse;
 6 import org.elasticsearch.client.transport.TransportClient;
 7 import org.elasticsearch.common.settings.Settings;
 8 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 9 import org.elasticsearch.transport.client.PreBuiltTransportClient;
10 
11 /**
12  * 
13  * @author biehl
14  *
15  */
16 public class HelloElasticsearch {
17 
18     public static void main(String[] args) {
19         try {
20             // 設置集羣名稱biehl01,Settings設置es的集羣名稱,使用的設計模式,鏈式設計模式、build設計模式。
21             Settings settings = Settings.builder().put("cluster.name", "biehl01").build();
22             // 讀取es集羣中的數據,建立client。
23             @SuppressWarnings("resource")
24             TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
25                     // 用java訪問ES用的端口是9300。es的9200是restful的請求端口號
26                     // 因爲我使用的是僞集羣,因此就配置了一臺機器,若是是集羣方式,將競選主節點的加進來便可。
27                     // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
28                     // 9300),
29                     // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
30                     // 9300),
31                     new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300));
32             // 搜索數據(.actionGet()方法是同步的,沒有返回就等待)
33             // 方式是先去索引裏面查詢出索引數據,再去文檔裏面查詢出數據。
34             GetResponse response = client.prepareGet("news", "fulltext", "2").execute().actionGet();
35             // 輸出結果
36             System.out.println(response);
37             // 關閉client
38             client.close();
39         } catch (Exception e) {
40             e.printStackTrace();
41         }
42 
43     }
44 
45 }

查詢的結果以下所示:框架

四、如何使用java api建立索引Index、類型Type、以及指定字段,是否建立索引,是否存儲,是否即分詞,又創建索引(analyzed)、是否建索引不分詞(not_analyzed)等等。異步

  1 package com.bie.elasticsearch;
  2 
  3 import java.io.IOException;
  4 import java.net.InetAddress;
  5 import java.util.HashMap;
  6 
  7 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
  8 import org.elasticsearch.client.AdminClient;
  9 import org.elasticsearch.client.IndicesAdminClient;
 10 import org.elasticsearch.client.transport.TransportClient;
 11 import org.elasticsearch.common.settings.Settings;
 12 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 13 import org.elasticsearch.common.xcontent.XContentBuilder;
 14 import org.elasticsearch.common.xcontent.XContentFactory;
 15 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 16 import org.junit.Before;
 17 import org.junit.Test;
 18 
 19 /**
 20  * 
 21  * @author biehl
 22  *
 23  */
 24 public class AdminAPI {
 25 
 26     private TransportClient client = null;
 27 
 28     // 在全部的測試方法以前執行
 29     @SuppressWarnings("resource")
 30     @Before
 31     public void init() throws Exception {
 32         // 設置集羣名稱biehl01
 33         Settings settings = Settings.builder().put("cluster.name", "biehl01")
 34                 // 自動感知的功能(能夠經過當前指定的節點獲取全部es節點的信息)
 35                 .put("client.transport.sniff", true).build();
 36         // 建立client
 37         client = new PreBuiltTransportClient(settings).addTransportAddresses(
 38                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 39                 // 9300),
 40                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 41                 // 9300),
 42                 // 建議指定2個及其以上的節點。
 43                 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300));
 44     }
 45 
 46     /**
 47      * 
 48      * AdminClient建立索引,並配置一些參數,用來指定一些映射關係等等
 49      * 
 50      * 這裏建立一個索引Index,而且指定分區、副本的數量
 51      * 
 52      */
 53     @Test
 54     public void createIndexWithSettings() {
 55         // 獲取Admin的API
 56         AdminClient admin = client.admin();
 57         // 使用Admin API對索引進行操做
 58         IndicesAdminClient indices = admin.indices();
 59         // 準備建立索引
 60         indices.prepareCreate("food")
 61                 // 配置索引參數
 62                 .setSettings(
 63                         // 參數配置器
 64                         Settings.builder()// 指定索引分區的數量。shards分區
 65                                 .put("index.number_of_shards", 5)
 66                                 // 指定索引副本的數量(注意:不包括自己,若是設置數據存儲副本爲1,實際上數據存儲了2份)
 67                                 // replicas副本
 68                                 .put("index.number_of_replicas", 1))
 69                 // 真正執行
 70                 .get();
 71     }
 72 
 73     /**
 74      * 你能夠經過dynamic設置來控制這一行爲,它可以接受如下的選項: true:默認值。
 75      * 
 76      * 動態添加字段 false:忽略新字段
 77      * 
 78      * strict:若是碰到陌生字段,拋出異常
 79      * 
 80      * 給索引添加mapping信息(給表添加schema信息)
 81      * 
 82      * @throws IOException
 83      */
 84     @Test
 85     public void elasticsearchSettingsMappings() throws IOException {
 86         // 1:settings
 87         HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
 88         // shards分區的數量4
 89         settings_map.put("number_of_shards", 4);
 90         // 副本的數量1
 91         settings_map.put("number_of_replicas", 1);
 92 
 93         // 2:mappings(映射、schema)
 94         // field("dynamic", "true")含義是動態字段
 95         XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("dynamic", "true")
 96                 // 設置type中的屬性
 97                 .startObject("properties")
 98                 // id屬性
 99                 .startObject("id")
100                 // 類型是integer
101                 .field("type", "integer")
102                 // 不分詞,可是建索引
103                 .field("index", "not_analyzed")
104                 // 在文檔中存儲
105                 .field("store", "yes").endObject()
106                 // name屬性
107                 .startObject("name")
108                 // string類型
109                 .field("type", "string")
110                 // 在文檔中存儲
111                 .field("store", "yes")
112                 // 創建索引
113                 .field("index", "analyzed")
114                 // 使用ik_smart進行分詞
115                 .field("analyzer", "ik_smart").endObject().endObject().endObject();
116 
117         CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("computer");
118         // 管理索引(user_info)而後關聯type(user)
119         prepareCreate.setSettings(settings_map).addMapping("xiaomi", builder).get();
120     }
121 
122     /**
123      * index這個屬性,no表明不建索引
124      * 
125      * not_analyzed,建索引不分詞
126      * 
127      * analyzed 即分詞,又創建索引
128      * 
129      * expected [no],[not_analyzed] or [analyzed]。便可以選擇三者任意一個值
130      * 
131      * @throws IOException
132      */
133 
134     @Test
135     public void elasticsearchSettingsPlayerMappings() throws IOException {
136         // 1:settings
137         HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
138         // 分區的數量4
139         settings_map.put("number_of_shards", 4);
140         // 副本的數量1
141         settings_map.put("number_of_replicas", 1);
142 
143         // 2:mappings
144         XContentBuilder builder = XContentFactory.jsonBuilder().startObject()//
145                 .field("dynamic", "true").startObject("properties")
146                 // 在文檔中存儲、
147                 .startObject("id").field("type", "integer").field("store", "yes").endObject()
148                 // 不分詞,可是建索引、
149                 .startObject("name").field("type", "string").field("index", "not_analyzed").endObject()
150                 //
151                 .startObject("age").field("type", "integer").endObject()
152                 //
153                 .startObject("salary").field("type", "integer").endObject()
154                 // 不分詞,可是建索引、
155                 .startObject("team").field("type", "string").field("index", "not_analyzed").endObject()
156                 // 不分詞,可是建索引、
157                 .startObject("position").field("type", "string").field("index", "not_analyzed").endObject()
158                 // 即分詞,又創建索引、
159                 .startObject("description").field("type", "string").field("store", "no").field("index", "analyzed")
160                 .field("analyzer", "ik_smart").endObject()
161                 // 即分詞,又創建索引、在文檔中存儲、
162                 .startObject("addr").field("type", "string").field("store", "yes").field("index", "analyzed")
163                 .field("analyzer", "ik_smart").endObject()
164 
165                 .endObject()
166 
167                 .endObject();
168 
169         CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player");
170         prepareCreate.setSettings(settings_map).addMapping("basketball", builder).get();
171     }
172 }

五、使用java api操做Elasticsearch的增刪改查以及複雜查詢(聚合查詢,能夠進行分組統計數量,分組統計最大值,分組統計平均值,等等統計)。

  1 package com.bie.elasticsearch;
  2 
  3 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
  4 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
  5 
  6 import java.io.IOException;
  7 import java.net.InetAddress;
  8 import java.util.Date;
  9 import java.util.Iterator;
 10 import java.util.Map;
 11 import java.util.Set;
 12 
 13 import org.elasticsearch.action.ActionListener;
 14 import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
 15 import org.elasticsearch.action.delete.DeleteResponse;
 16 import org.elasticsearch.action.get.GetResponse;
 17 import org.elasticsearch.action.get.MultiGetItemResponse;
 18 import org.elasticsearch.action.get.MultiGetResponse;
 19 import org.elasticsearch.action.index.IndexResponse;
 20 import org.elasticsearch.action.search.SearchRequestBuilder;
 21 import org.elasticsearch.action.search.SearchResponse;
 22 import org.elasticsearch.action.update.UpdateRequest;
 23 import org.elasticsearch.action.update.UpdateResponse;
 24 import org.elasticsearch.client.transport.TransportClient;
 25 import org.elasticsearch.common.settings.Settings;
 26 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 27 import org.elasticsearch.index.query.QueryBuilder;
 28 import org.elasticsearch.index.query.QueryBuilders;
 29 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 30 import org.elasticsearch.search.aggregations.Aggregation;
 31 import org.elasticsearch.search.aggregations.AggregationBuilders;
 32 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 33 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 34 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 35 import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
 36 import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
 37 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
 38 import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
 39 import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
 40 import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
 41 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 42 import org.junit.Before;
 43 import org.junit.Test;
 44 
 45 /**
 46  * 
 47  * @author biehl
 48  *
 49  */
 50 public class ElasticsearchCRUD {
 51 
 52     private TransportClient client = null;
 53 
 54     @SuppressWarnings("resource")
 55     @Before
 56     public void init() throws Exception {
 57         // 設置集羣名稱biehl01
 58         Settings settings = Settings.builder().put("cluster.name", "biehl01")
 59                 // 自動感知的功能(能夠經過當前指定的節點獲取全部es節點的信息)
 60                 .put("client.transport.sniff", true).build();
 61         // 建立client
 62         client = new PreBuiltTransportClient(settings).addTransportAddresses(
 63                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 64                 // 9300),
 65                 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 66                 // 9300),
 67                 // 建議指定2個及其以上的節點。
 68                 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300));
 69     }
 70 
 71     /**
 72      * 建立一個Index索引、Type類型、以及id。
 73      * 
 74      * 而後插入類型裏面的數據。
 75      * 
 76      * @throws IOException
 77      */
 78     @Test
 79     public void elasticsearchCreate() throws IOException {
 80         IndexResponse response = client.prepareIndex("people", "student", "3")
 81                 .setSource(jsonBuilder().startObject().field("username", "王五五").field("sex", "")
 82                         .field("birthday", new Date()).field("age", 21).field("message", "trying out Elasticsearch")
 83                         .endObject())
 84                 .get();
 85         System.out.println(response.toString());
 86     }
 87 
 88     /**
 89      * 查找一條索引Index裏面的類型Type裏面的id的全部信息
 90      * 
 91      * @throws IOException
 92      */
 93     @Test
 94     public void elasticsearchGet() throws IOException {
 95         GetResponse response = client.prepareGet("people", "student", "1").get();
 96         System.out.println(response.getSourceAsString());
 97     }
 98 
 99     /**
100      * 查找多條
101      * 
102      * 索引Index裏面的類型Type裏面的多個id的全部信息
103      * 
104      * @throws IOException
105      */
106     @Test
107     public void elasticsearchMultiGet() throws IOException {
108         // 查詢出多個索引Index多個類型Type的多個id的全部信息
109         MultiGetResponse multiGetItemResponses = client.prepareMultiGet().add("people", "student", "1")
110                 .add("people", "student", "2", "3").add("people", "teacher", "1").add("news", "fulltext", "1").get();
111         // 將查詢出的結果遍歷輸出
112         for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
113             // 將每個查詢出的結果遍歷輸出
114             GetResponse response = itemResponse.getResponse();
115             // 判斷若是存在就進行遍歷輸出
116             if (response.isExists()) {
117                 String json = response.getSourceAsString();
118                 System.out.println(json);
119             }
120         }
121     }
122 
123     /**
124      * 修改指定的索引Index裏面的類型Type的id的信息
125      * 
126      * @throws Exception
127      */
128     @Test
129     public void elasticsearchUpdate() throws Exception {
130         // 建立一個更新的請求對象
131         UpdateRequest updateRequest = new UpdateRequest();
132         // 指定索引Index
133         updateRequest.index("people");
134         // 指定類型Type
135         updateRequest.type("student");
136         // 指定id的值
137         updateRequest.id("3");
138         // 設置修改的字段信息
139         updateRequest.doc(jsonBuilder().startObject().field("username", "王五五").endObject());
140         // 開始進行修改,而且返回響應信息
141         UpdateResponse updateResponse = client.update(updateRequest).get();
142         // 打印輸出響應的信息
143         System.out.println(updateResponse.toString());
144     }
145 
146     /**
147      * 刪除指定的索引Index裏面的類型Type的id的信息
148      */
149     @Test
150     public void elasticsearchDelete() {
151         // 指定刪除的id信息,而且給出響應結果
152         // prepareDelete(String index, String type, String id);
153         DeleteResponse response = client.prepareDelete("people", "student", "4").get();
154         // 打印輸出的響應信息
155         System.out.println(response);
156     }
157 
158     /**
159      * 根據查詢條件進行刪除數據
160      * 
161      * 
162      */
163     @Test
164     public void elasticsearchDeleteByQuery() {
165         BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
166                 // 指定查詢條件,matchQuery是name的值text裏面包括了這個內容就進行刪除。默認使用標準分詞器。
167                 .filter(QueryBuilders.matchQuery("username", "王五五"))
168                 // 指定索引名稱
169                 .source("people").get();
170         // 獲取到刪除的個數
171         long deleted = response.getDeleted();
172         // 打印輸出刪除的個數
173         System.out.println(deleted);
174     }
175 
176     /**
177      * 異步刪除
178      * 
179      * 監聽,若是真正刪除之後進行回調,打印輸出刪除確認的消息。
180      */
181     @Test
182     public void elasticsearchDeleteByQueryAsync() {
183         DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(QueryBuilders.matchQuery("sex", ""))
184                 .source("people").execute(new ActionListener<BulkByScrollResponse>() {
185 
186                     // 刪除之後的方法回調
187                     @Override
188                     public void onResponse(BulkByScrollResponse response) {
189                         // 返回刪除的個數
190                         long deleted = response.getDeleted();
191                         System.out.println("數據刪除完畢!");
192                         // 打印刪除的個數
193                         System.out.println("數據刪除的個數: " + deleted);
194                     }
195 
196                     @Override
197                     public void onFailure(Exception e) {
198                         // 失敗打印異常信息
199                         e.printStackTrace();
200                     }
201                 });
202 
203         // 先打印輸出,正常執行完畢。再執行異步監聽刪除數據。
204         try {
205             System.out.println("異步刪除操做!");
206             // 休眠10秒鐘,避免主線程裏面結束,子線程沒法進行結果輸出
207             Thread.sleep(10000);
208         } catch (Exception e) {
209             e.printStackTrace();
210         }
211     }
212 
213     /**
214      * 
215      * 按照範圍進行查找。
216      * 
217      */
218     @Test
219     public void elasticsearchRange() {
220         // includeLower(true).includeUpper(false)含義是包含前面,不包含後面的
221         // [21, 24)
222         QueryBuilder qb = rangeQuery("age").from(21).to(24).includeLower(true).includeUpper(false);
223         // 將查詢條件傳遞進去,並將查詢結果進行返回。
224         SearchResponse response = client.prepareSearch("people").setQuery(qb).get();
225         System.out.println(response);
226     }
227 
228     /**
229      * 
230      * 向指定索引index裏面的類型Type的id的信息
231      * 
232      * @throws IOException
233      */
234     @Test
235     public void elasticsearchAddPlayer() throws IOException {
236         //
237         IndexResponse response = client.prepareIndex("player", "basketball", "4")
238 
239                 .setSource(jsonBuilder().startObject()
240 
241                         .field("name", "安其拉")
242 
243                         .field("age", 28)
244 
245                         .field("salary", 99000)
246 
247                         .field("team", "啦啦隊 team")
248 
249                         .field("position", "打中鋒")
250 
251                         .field("description", "跪族藍孩")
252 
253                         .endObject())
254                 .get();
255 
256         System.out.println(response);
257     }
258 
259     /**
260      * 
261      *
262      * select team, count(*) as team_count from player group by team;
263      * 
264      * team_counts是別名稱。
265      */
266     @Test
267     public void elasticsearchAgg1() {
268         // 指定索引和type
269         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
270         // 按team分組而後聚合,可是並無指定聚合函數。
271         // team_count是別名稱
272         TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_count").field("team");
273         // 添加聚合器
274         builder.addAggregation(teamAgg);
275         // 觸發
276         SearchResponse response = builder.execute().actionGet();
277         // System.out.println(response);
278         // 將返回的結果放入到一個map中
279         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
280         // 遍歷打印輸出
281         Set<String> keys = aggMap.keySet();
282         for (String key : keys) {
283             System.out.println("key: " + key);
284         }
285 
286         System.out.println("");
287 
288         // //取出聚合屬性
289         StringTerms terms = (StringTerms) aggMap.get("team_count");
290 
291         // //依次迭代出分組聚合數據
292         for (Terms.Bucket bucket : terms.getBuckets()) {
293             // 分組的名字
294             String team = (String) bucket.getKey();
295             // count,分組後一個組有多少數據
296             long count = bucket.getDocCount();
297             System.out.println(team + ": " + count);
298         }
299 
300         System.out.println("");
301 
302         // 使用Iterator進行遍歷迭代
303         Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
304         while (teamBucketIt.hasNext()) {
305             Terms.Bucket bucket = teamBucketIt.next();
306             // 獲取到分組後每組的組名稱
307             String team = (String) bucket.getKey();
308             // 獲取到分組後的每組數量
309             long count = bucket.getDocCount();
310             // 打印輸出
311             System.out.println(team + ": " + count);
312         }
313     }
314 
315     /**
316      * 
317      * select
318      * 
319      * team, position, count(*) as pos_count
320      * 
321      * from
322      * 
323      * player
324      * 
325      * group by
326      * 
327      * team,position;
328      * 
329      * 
330      */
331     @Test
332     public void elasticsearchAgg2() {
333         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
334         // 指定別名和分組的字段
335         TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
336         TermsAggregationBuilder posAgg = AggregationBuilders.terms("pos_count").field("position");
337         // 添加兩個聚合構建器。先按照team分組,再按照position分組。
338         builder.addAggregation(teamAgg.subAggregation(posAgg));
339         // 執行查詢
340         SearchResponse response = builder.execute().actionGet();
341         // 將查詢結果放入map中
342         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
343         // 根據屬性名到map中查找
344         StringTerms teams = (StringTerms) aggMap.get("team_name");
345         // 循環查找結果
346         for (Terms.Bucket teamBucket : teams.getBuckets()) {
347             // 先按球隊進行分組
348             String team = (String) teamBucket.getKey();
349             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
350             StringTerms positions = (StringTerms) subAggMap.get("pos_count");
351             // 由於一個球隊有不少位置,那麼還要依次拿出位置信息
352             for (Terms.Bucket posBucket : positions.getBuckets()) {
353                 // 拿到位置的名字
354                 String pos = (String) posBucket.getKey();
355                 // 拿出該位置的數量
356                 long docCount = posBucket.getDocCount();
357                 // 打印球隊,位置,人數
358                 System.out.println(team + ": " + pos + ": " + docCount);
359             }
360         }
361 
362     }
363 
364     /**
365      * select team, max(age) as max_age from player group by team;
366      */
367     @Test
368     public void elasticsearchAgg3() {
369         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
370         // 指定安球隊進行分組
371         TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
372         // 指定分組求最大值
373         MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
374         // 分組後求最大值
375         builder.addAggregation(teamAgg.subAggregation(maxAgg));
376         // 查詢
377         SearchResponse response = builder.execute().actionGet();
378         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
379         // 根據team屬性,獲取map中的內容
380         StringTerms teams = (StringTerms) aggMap.get("team_name");
381         for (Terms.Bucket teamBucket : teams.getBuckets()) {
382             // 分組的屬性名
383             String team = (String) teamBucket.getKey();
384             // 在將聚合後取最大值的內容取出來放到map中
385             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
386             // 取分組後的最大值
387             InternalMax ages = (InternalMax) subAggMap.get("max_age");
388             // 獲取到年齡的值
389             double max = ages.getValue();
390             // 打印輸出值
391             System.out.println(team + ": " + max);
392         }
393     }
394 
395     /**
396      * select team, avg(age) as avg_age, sum(salary) as total_salary from player
397      * group by team;
398      */
399     @Test
400     public void elasticsearchAgg4() {
401         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
402         // 指定分組字段
403         TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
404         // 指定聚合函數是求平均數據
405         AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
406         // 指定另一個聚合函數是求和
407         SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
408         // 分組的聚合器關聯了兩個聚合函數
409         builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
410         // 查詢
411         SearchResponse response = builder.execute().actionGet();
412         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
413         // 按分組的名字取出數據
414         StringTerms teams = (StringTerms) aggMap.get("team_name");
415         for (Terms.Bucket teamBucket : teams.getBuckets()) {
416             // 獲取球隊名字
417             String team = (String) teamBucket.getKey();
418             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
419             // 根據別名取出平均年齡
420             InternalAvg avgAge = (InternalAvg) subAggMap.get("avg_age");
421             // 根據別名取出薪水總和
422             InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
423             double avgAgeValue = avgAge.getValue();
424             double totalSalaryValue = totalSalary.getValue();
425             System.out.println(team + ": " + avgAgeValue + ": " + totalSalaryValue);
426         }
427     }
428 
429     /**
430      * select team, sum(salary) as total_salary from player group by team order by
431      * total_salary desc;
432      */
433     @Test
434     public void elasticsearchAgg5() {
435         SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
436         // 按team進行分組,而後指定排序規則
437         TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team")
438                 .order(Terms.Order.aggregation("total_salary ", true));
439         // 指定一個聚合函數是求和
440         SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
441         // 添加兩個聚合構建器。先按照team分組,再按照salary求和。
442         builder.addAggregation(termsAgg.subAggregation(sumAgg));
443         // 查詢
444         SearchResponse response = builder.execute().actionGet();
445         // 將查詢結果放入map中
446         Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
447         // 從查詢結果中獲取到team_name的信息
448         StringTerms teams = (StringTerms) aggMap.get("team_name");
449         // 開始遍歷獲取到的信息
450         for (Terms.Bucket teamBucket : teams.getBuckets()) {
451             // 獲取到key的值
452             String team = (String) teamBucket.getKey();
453             // 獲取到求和的值
454             Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
455             // 獲取到求和的值的信息
456             InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
457             // 獲取到求和的值
458             double totalSalaryValue = totalSalary.getValue();
459             // 打印輸出信息
460             System.out.println(team + " " + totalSalaryValue);
461         }
462     }
463 
464 }

執行效果,本身能夠分別進行測試。因爲測試都寫了說明,這裏就不一一進行測試打印效果了。請自行練習使用便可。

做者:別先生

博客園:https://www.cnblogs.com/biehongli/

若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。

相關文章
相關標籤/搜索