一、Elasticsearch是基於Lucene開發的一個分佈式全文檢索框架,向Elasticsearch中存儲和從Elasticsearch中查詢,格式是json。java
索引index,至關於數據庫中的database。數據庫
類型type至關於數據庫中的table。apache
主鍵id至關於數據庫中記錄的主鍵,是惟一的。json
向Elasticsearch中存儲數據,其實就是向es中的index下面的type中存儲json類型的數據。設計模式
二、Elasticsearch是RestFul風格的api,經過http的請求形式(注意,參數是url拼接仍是請求的json形式哦),發送請求,對Elasticsearch進行操做。
查詢,請求方式應該是get。刪除,請求方式應該是delete。添加,請求方式應該是put/post。修改,請求方式應該是put/post。
RESTFul接口url的格式:http://ip:port/<index>/<type>/<[id]>。其中index、type是必須提供的。id是能夠選擇的,不提供es會自動生成,index、type將信息進行分層,利於管理。api
三、如何使用java鏈接Elasticsearch。因爲使用的是maven項目,pom.xml的依賴以下所示:restful
1 <project xmlns="http://maven.apache.org/POM/4.0.0" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 4 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.bie</groupId> 7 <artifactId>elasticsearch-hello</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 10 <properties> 11 <maven.compiler.source>1.8</maven.compiler.source> 12 <maven.compiler.target>1.8</maven.compiler.target> 13 <encoding>UTF-8</encoding> 14 </properties> 15 16 <dependencies> 17 <!-- elasticsearch的客戶端 --> 18 <dependency> 19 <groupId>org.elasticsearch.client</groupId> 20 <artifactId>transport</artifactId> 21 <version>5.4.3</version> 22 </dependency> 23 <!-- elasticsearch依賴2.x的log4j --> 24 <dependency> 25 <groupId>org.apache.logging.log4j</groupId> 26 <artifactId>log4j-api</artifactId> 27 <version>2.8.2</version> 28 </dependency> 29 <dependency> 30 <groupId>org.apache.logging.log4j</groupId> 31 <artifactId>log4j-core</artifactId> 32 <version>2.8.2</version> 33 </dependency> 34 <!-- junit單元測試 --> 35 <dependency> 36 <groupId>junit</groupId> 37 <artifactId>junit</artifactId> 38 <version>4.12</version> 39 </dependency> 40 </dependencies> 41 42 43 </project>
使用查詢的方式,先簡單測試一下是否連通es集羣,和對比查詢的數據是否一致。app
1 package com.bie.elasticsearch; 2 3 import java.net.InetAddress; 4 5 import org.elasticsearch.action.get.GetResponse; 6 import org.elasticsearch.client.transport.TransportClient; 7 import org.elasticsearch.common.settings.Settings; 8 import org.elasticsearch.common.transport.InetSocketTransportAddress; 9 import org.elasticsearch.transport.client.PreBuiltTransportClient; 10 11 /** 12 * 13 * @author biehl 14 * 15 */ 16 public class HelloElasticsearch { 17 18 public static void main(String[] args) { 19 try { 20 // 設置集羣名稱biehl01,Settings設置es的集羣名稱,使用的設計模式,鏈式設計模式、build設計模式。 21 Settings settings = Settings.builder().put("cluster.name", "biehl01").build(); 22 // 讀取es集羣中的數據,建立client。 23 @SuppressWarnings("resource") 24 TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses( 25 // 用java訪問ES用的端口是9300。es的9200是restful的請求端口號 26 // 因爲我使用的是僞集羣,因此就配置了一臺機器,若是是集羣方式,將競選主節點的加進來便可。 27 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 28 // 9300), 29 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 30 // 9300), 31 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 32 // 搜索數據(.actionGet()方法是同步的,沒有返回就等待) 33 // 方式是先去索引裏面查詢出索引數據,再去文檔裏面查詢出數據。 34 GetResponse response = client.prepareGet("news", "fulltext", "2").execute().actionGet(); 35 // 輸出結果 36 System.out.println(response); 37 // 關閉client 38 client.close(); 39 } catch (Exception e) { 40 e.printStackTrace(); 41 } 42 43 } 44 45 }
查詢的結果以下所示:框架
四、如何使用java api建立索引Index、類型Type、以及指定字段,是否建立索引,是否存儲,是否即分詞,又創建索引(analyzed)、是否建索引不分詞(not_analyzed)等等。異步
1 package com.bie.elasticsearch; 2 3 import java.io.IOException; 4 import java.net.InetAddress; 5 import java.util.HashMap; 6 7 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; 8 import org.elasticsearch.client.AdminClient; 9 import org.elasticsearch.client.IndicesAdminClient; 10 import org.elasticsearch.client.transport.TransportClient; 11 import org.elasticsearch.common.settings.Settings; 12 import org.elasticsearch.common.transport.InetSocketTransportAddress; 13 import org.elasticsearch.common.xcontent.XContentBuilder; 14 import org.elasticsearch.common.xcontent.XContentFactory; 15 import org.elasticsearch.transport.client.PreBuiltTransportClient; 16 import org.junit.Before; 17 import org.junit.Test; 18 19 /** 20 * 21 * @author biehl 22 * 23 */ 24 public class AdminAPI { 25 26 private TransportClient client = null; 27 28 // 在全部的測試方法以前執行 29 @SuppressWarnings("resource") 30 @Before 31 public void init() throws Exception { 32 // 設置集羣名稱biehl01 33 Settings settings = Settings.builder().put("cluster.name", "biehl01") 34 // 自動感知的功能(能夠經過當前指定的節點獲取全部es節點的信息) 35 .put("client.transport.sniff", true).build(); 36 // 建立client 37 client = new PreBuiltTransportClient(settings).addTransportAddresses( 38 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 39 // 9300), 40 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 41 // 9300), 42 // 建議指定2個及其以上的節點。 43 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 44 } 45 46 /** 47 * 48 * AdminClient建立索引,並配置一些參數,用來指定一些映射關係等等 49 * 50 * 這裏建立一個索引Index,而且指定分區、副本的數量 51 * 52 */ 53 @Test 54 public void createIndexWithSettings() { 55 // 獲取Admin的API 56 AdminClient admin = client.admin(); 57 // 使用Admin API對索引進行操做 58 IndicesAdminClient indices = admin.indices(); 59 // 準備建立索引 60 indices.prepareCreate("food") 61 // 配置索引參數 62 .setSettings( 63 // 參數配置器 64 Settings.builder()// 指定索引分區的數量。shards分區 65 .put("index.number_of_shards", 5) 66 // 指定索引副本的數量(注意:不包括自己,若是設置數據存儲副本爲1,實際上數據存儲了2份) 67 // replicas副本 68 .put("index.number_of_replicas", 1)) 69 // 真正執行 70 .get(); 71 } 72 73 /** 74 * 你能夠經過dynamic設置來控制這一行爲,它可以接受如下的選項: true:默認值。 75 * 76 * 動態添加字段 false:忽略新字段 77 * 78 * strict:若是碰到陌生字段,拋出異常 79 * 80 * 給索引添加mapping信息(給表添加schema信息) 81 * 82 * @throws IOException 83 */ 84 @Test 85 public void elasticsearchSettingsMappings() throws IOException { 86 // 1:settings 87 HashMap<String, Object> settings_map = new HashMap<String, Object>(2); 88 // shards分區的數量4 89 settings_map.put("number_of_shards", 4); 90 // 副本的數量1 91 settings_map.put("number_of_replicas", 1); 92 93 // 2:mappings(映射、schema) 94 // field("dynamic", "true")含義是動態字段 95 XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("dynamic", "true") 96 // 設置type中的屬性 97 .startObject("properties") 98 // id屬性 99 .startObject("id") 100 // 類型是integer 101 .field("type", "integer") 102 // 不分詞,可是建索引 103 .field("index", "not_analyzed") 104 // 在文檔中存儲 105 .field("store", "yes").endObject() 106 // name屬性 107 .startObject("name") 108 // string類型 109 .field("type", "string") 110 // 在文檔中存儲 111 .field("store", "yes") 112 // 創建索引 113 .field("index", "analyzed") 114 // 使用ik_smart進行分詞 115 .field("analyzer", "ik_smart").endObject().endObject().endObject(); 116 117 CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("computer"); 118 // 管理索引(user_info)而後關聯type(user) 119 prepareCreate.setSettings(settings_map).addMapping("xiaomi", builder).get(); 120 } 121 122 /** 123 * index這個屬性,no表明不建索引 124 * 125 * not_analyzed,建索引不分詞 126 * 127 * analyzed 即分詞,又創建索引 128 * 129 * expected [no],[not_analyzed] or [analyzed]。便可以選擇三者任意一個值 130 * 131 * @throws IOException 132 */ 133 134 @Test 135 public void elasticsearchSettingsPlayerMappings() throws IOException { 136 // 1:settings 137 HashMap<String, Object> settings_map = new HashMap<String, Object>(2); 138 // 分區的數量4 139 settings_map.put("number_of_shards", 4); 140 // 副本的數量1 141 settings_map.put("number_of_replicas", 1); 142 143 // 2:mappings 144 XContentBuilder builder = XContentFactory.jsonBuilder().startObject()// 145 .field("dynamic", "true").startObject("properties") 146 // 在文檔中存儲、 147 .startObject("id").field("type", "integer").field("store", "yes").endObject() 148 // 不分詞,可是建索引、 149 .startObject("name").field("type", "string").field("index", "not_analyzed").endObject() 150 // 151 .startObject("age").field("type", "integer").endObject() 152 // 153 .startObject("salary").field("type", "integer").endObject() 154 // 不分詞,可是建索引、 155 .startObject("team").field("type", "string").field("index", "not_analyzed").endObject() 156 // 不分詞,可是建索引、 157 .startObject("position").field("type", "string").field("index", "not_analyzed").endObject() 158 // 即分詞,又創建索引、 159 .startObject("description").field("type", "string").field("store", "no").field("index", "analyzed") 160 .field("analyzer", "ik_smart").endObject() 161 // 即分詞,又創建索引、在文檔中存儲、 162 .startObject("addr").field("type", "string").field("store", "yes").field("index", "analyzed") 163 .field("analyzer", "ik_smart").endObject() 164 165 .endObject() 166 167 .endObject(); 168 169 CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player"); 170 prepareCreate.setSettings(settings_map).addMapping("basketball", builder).get(); 171 } 172 }
五、使用java api操做Elasticsearch的增刪改查以及複雜查詢(聚合查詢,能夠進行分組統計數量,分組統計最大值,分組統計平均值,等等統計)。
1 package com.bie.elasticsearch; 2 3 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; 4 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; 5 6 import java.io.IOException; 7 import java.net.InetAddress; 8 import java.util.Date; 9 import java.util.Iterator; 10 import java.util.Map; 11 import java.util.Set; 12 13 import org.elasticsearch.action.ActionListener; 14 import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; 15 import org.elasticsearch.action.delete.DeleteResponse; 16 import org.elasticsearch.action.get.GetResponse; 17 import org.elasticsearch.action.get.MultiGetItemResponse; 18 import org.elasticsearch.action.get.MultiGetResponse; 19 import org.elasticsearch.action.index.IndexResponse; 20 import org.elasticsearch.action.search.SearchRequestBuilder; 21 import org.elasticsearch.action.search.SearchResponse; 22 import org.elasticsearch.action.update.UpdateRequest; 23 import org.elasticsearch.action.update.UpdateResponse; 24 import org.elasticsearch.client.transport.TransportClient; 25 import org.elasticsearch.common.settings.Settings; 26 import org.elasticsearch.common.transport.InetSocketTransportAddress; 27 import org.elasticsearch.index.query.QueryBuilder; 28 import org.elasticsearch.index.query.QueryBuilders; 29 import org.elasticsearch.index.reindex.DeleteByQueryAction; 30 import org.elasticsearch.search.aggregations.Aggregation; 31 import org.elasticsearch.search.aggregations.AggregationBuilders; 32 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; 33 import org.elasticsearch.search.aggregations.bucket.terms.Terms; 34 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; 35 import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; 36 import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg; 37 import org.elasticsearch.search.aggregations.metrics.max.InternalMax; 38 import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; 39 import org.elasticsearch.search.aggregations.metrics.sum.InternalSum; 40 import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; 41 import org.elasticsearch.transport.client.PreBuiltTransportClient; 42 import org.junit.Before; 43 import org.junit.Test; 44 45 /** 46 * 47 * @author biehl 48 * 49 */ 50 public class ElasticsearchCRUD { 51 52 private TransportClient client = null; 53 54 @SuppressWarnings("resource") 55 @Before 56 public void init() throws Exception { 57 // 設置集羣名稱biehl01 58 Settings settings = Settings.builder().put("cluster.name", "biehl01") 59 // 自動感知的功能(能夠經過當前指定的節點獲取全部es節點的信息) 60 .put("client.transport.sniff", true).build(); 61 // 建立client 62 client = new PreBuiltTransportClient(settings).addTransportAddresses( 63 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 64 // 9300), 65 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 66 // 9300), 67 // 建議指定2個及其以上的節點。 68 new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), 9300)); 69 } 70 71 /** 72 * 建立一個Index索引、Type類型、以及id。 73 * 74 * 而後插入類型裏面的數據。 75 * 76 * @throws IOException 77 */ 78 @Test 79 public void elasticsearchCreate() throws IOException { 80 IndexResponse response = client.prepareIndex("people", "student", "3") 81 .setSource(jsonBuilder().startObject().field("username", "王五五").field("sex", "男") 82 .field("birthday", new Date()).field("age", 21).field("message", "trying out Elasticsearch") 83 .endObject()) 84 .get(); 85 System.out.println(response.toString()); 86 } 87 88 /** 89 * 查找一條索引Index裏面的類型Type裏面的id的全部信息 90 * 91 * @throws IOException 92 */ 93 @Test 94 public void elasticsearchGet() throws IOException { 95 GetResponse response = client.prepareGet("people", "student", "1").get(); 96 System.out.println(response.getSourceAsString()); 97 } 98 99 /** 100 * 查找多條 101 * 102 * 索引Index裏面的類型Type裏面的多個id的全部信息 103 * 104 * @throws IOException 105 */ 106 @Test 107 public void elasticsearchMultiGet() throws IOException { 108 // 查詢出多個索引Index多個類型Type的多個id的全部信息 109 MultiGetResponse multiGetItemResponses = client.prepareMultiGet().add("people", "student", "1") 110 .add("people", "student", "2", "3").add("people", "teacher", "1").add("news", "fulltext", "1").get(); 111 // 將查詢出的結果遍歷輸出 112 for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 113 // 將每個查詢出的結果遍歷輸出 114 GetResponse response = itemResponse.getResponse(); 115 // 判斷若是存在就進行遍歷輸出 116 if (response.isExists()) { 117 String json = response.getSourceAsString(); 118 System.out.println(json); 119 } 120 } 121 } 122 123 /** 124 * 修改指定的索引Index裏面的類型Type的id的信息 125 * 126 * @throws Exception 127 */ 128 @Test 129 public void elasticsearchUpdate() throws Exception { 130 // 建立一個更新的請求對象 131 UpdateRequest updateRequest = new UpdateRequest(); 132 // 指定索引Index 133 updateRequest.index("people"); 134 // 指定類型Type 135 updateRequest.type("student"); 136 // 指定id的值 137 updateRequest.id("3"); 138 // 設置修改的字段信息 139 updateRequest.doc(jsonBuilder().startObject().field("username", "王五五").endObject()); 140 // 開始進行修改,而且返回響應信息 141 UpdateResponse updateResponse = client.update(updateRequest).get(); 142 // 打印輸出響應的信息 143 System.out.println(updateResponse.toString()); 144 } 145 146 /** 147 * 刪除指定的索引Index裏面的類型Type的id的信息 148 */ 149 @Test 150 public void elasticsearchDelete() { 151 // 指定刪除的id信息,而且給出響應結果 152 // prepareDelete(String index, String type, String id); 153 DeleteResponse response = client.prepareDelete("people", "student", "4").get(); 154 // 打印輸出的響應信息 155 System.out.println(response); 156 } 157 158 /** 159 * 根據查詢條件進行刪除數據 160 * 161 * 162 */ 163 @Test 164 public void elasticsearchDeleteByQuery() { 165 BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) 166 // 指定查詢條件,matchQuery是name的值text裏面包括了這個內容就進行刪除。默認使用標準分詞器。 167 .filter(QueryBuilders.matchQuery("username", "王五五")) 168 // 指定索引名稱 169 .source("people").get(); 170 // 獲取到刪除的個數 171 long deleted = response.getDeleted(); 172 // 打印輸出刪除的個數 173 System.out.println(deleted); 174 } 175 176 /** 177 * 異步刪除 178 * 179 * 監聽,若是真正刪除之後進行回調,打印輸出刪除確認的消息。 180 */ 181 @Test 182 public void elasticsearchDeleteByQueryAsync() { 183 DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(QueryBuilders.matchQuery("sex", "男")) 184 .source("people").execute(new ActionListener<BulkByScrollResponse>() { 185 186 // 刪除之後的方法回調 187 @Override 188 public void onResponse(BulkByScrollResponse response) { 189 // 返回刪除的個數 190 long deleted = response.getDeleted(); 191 System.out.println("數據刪除完畢!"); 192 // 打印刪除的個數 193 System.out.println("數據刪除的個數: " + deleted); 194 } 195 196 @Override 197 public void onFailure(Exception e) { 198 // 失敗打印異常信息 199 e.printStackTrace(); 200 } 201 }); 202 203 // 先打印輸出,正常執行完畢。再執行異步監聽刪除數據。 204 try { 205 System.out.println("異步刪除操做!"); 206 // 休眠10秒鐘,避免主線程裏面結束,子線程沒法進行結果輸出 207 Thread.sleep(10000); 208 } catch (Exception e) { 209 e.printStackTrace(); 210 } 211 } 212 213 /** 214 * 215 * 按照範圍進行查找。 216 * 217 */ 218 @Test 219 public void elasticsearchRange() { 220 // includeLower(true).includeUpper(false)含義是包含前面,不包含後面的 221 // [21, 24) 222 QueryBuilder qb = rangeQuery("age").from(21).to(24).includeLower(true).includeUpper(false); 223 // 將查詢條件傳遞進去,並將查詢結果進行返回。 224 SearchResponse response = client.prepareSearch("people").setQuery(qb).get(); 225 System.out.println(response); 226 } 227 228 /** 229 * 230 * 向指定索引index裏面的類型Type的id的信息 231 * 232 * @throws IOException 233 */ 234 @Test 235 public void elasticsearchAddPlayer() throws IOException { 236 // 237 IndexResponse response = client.prepareIndex("player", "basketball", "4") 238 239 .setSource(jsonBuilder().startObject() 240 241 .field("name", "安其拉") 242 243 .field("age", 28) 244 245 .field("salary", 99000) 246 247 .field("team", "啦啦隊 team") 248 249 .field("position", "打中鋒") 250 251 .field("description", "跪族藍孩") 252 253 .endObject()) 254 .get(); 255 256 System.out.println(response); 257 } 258 259 /** 260 * 261 * 262 * select team, count(*) as team_count from player group by team; 263 * 264 * team_counts是別名稱。 265 */ 266 @Test 267 public void elasticsearchAgg1() { 268 // 指定索引和type 269 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 270 // 按team分組而後聚合,可是並無指定聚合函數。 271 // team_count是別名稱 272 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_count").field("team"); 273 // 添加聚合器 274 builder.addAggregation(teamAgg); 275 // 觸發 276 SearchResponse response = builder.execute().actionGet(); 277 // System.out.println(response); 278 // 將返回的結果放入到一個map中 279 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 280 // 遍歷打印輸出 281 Set<String> keys = aggMap.keySet(); 282 for (String key : keys) { 283 System.out.println("key: " + key); 284 } 285 286 System.out.println(""); 287 288 // //取出聚合屬性 289 StringTerms terms = (StringTerms) aggMap.get("team_count"); 290 291 // //依次迭代出分組聚合數據 292 for (Terms.Bucket bucket : terms.getBuckets()) { 293 // 分組的名字 294 String team = (String) bucket.getKey(); 295 // count,分組後一個組有多少數據 296 long count = bucket.getDocCount(); 297 System.out.println(team + ": " + count); 298 } 299 300 System.out.println(""); 301 302 // 使用Iterator進行遍歷迭代 303 Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator(); 304 while (teamBucketIt.hasNext()) { 305 Terms.Bucket bucket = teamBucketIt.next(); 306 // 獲取到分組後每組的組名稱 307 String team = (String) bucket.getKey(); 308 // 獲取到分組後的每組數量 309 long count = bucket.getDocCount(); 310 // 打印輸出 311 System.out.println(team + ": " + count); 312 } 313 } 314 315 /** 316 * 317 * select 318 * 319 * team, position, count(*) as pos_count 320 * 321 * from 322 * 323 * player 324 * 325 * group by 326 * 327 * team,position; 328 * 329 * 330 */ 331 @Test 332 public void elasticsearchAgg2() { 333 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 334 // 指定別名和分組的字段 335 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); 336 TermsAggregationBuilder posAgg = AggregationBuilders.terms("pos_count").field("position"); 337 // 添加兩個聚合構建器。先按照team分組,再按照position分組。 338 builder.addAggregation(teamAgg.subAggregation(posAgg)); 339 // 執行查詢 340 SearchResponse response = builder.execute().actionGet(); 341 // 將查詢結果放入map中 342 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 343 // 根據屬性名到map中查找 344 StringTerms teams = (StringTerms) aggMap.get("team_name"); 345 // 循環查找結果 346 for (Terms.Bucket teamBucket : teams.getBuckets()) { 347 // 先按球隊進行分組 348 String team = (String) teamBucket.getKey(); 349 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 350 StringTerms positions = (StringTerms) subAggMap.get("pos_count"); 351 // 由於一個球隊有不少位置,那麼還要依次拿出位置信息 352 for (Terms.Bucket posBucket : positions.getBuckets()) { 353 // 拿到位置的名字 354 String pos = (String) posBucket.getKey(); 355 // 拿出該位置的數量 356 long docCount = posBucket.getDocCount(); 357 // 打印球隊,位置,人數 358 System.out.println(team + ": " + pos + ": " + docCount); 359 } 360 } 361 362 } 363 364 /** 365 * select team, max(age) as max_age from player group by team; 366 */ 367 @Test 368 public void elasticsearchAgg3() { 369 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 370 // 指定安球隊進行分組 371 TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team"); 372 // 指定分組求最大值 373 MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age"); 374 // 分組後求最大值 375 builder.addAggregation(teamAgg.subAggregation(maxAgg)); 376 // 查詢 377 SearchResponse response = builder.execute().actionGet(); 378 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 379 // 根據team屬性,獲取map中的內容 380 StringTerms teams = (StringTerms) aggMap.get("team_name"); 381 for (Terms.Bucket teamBucket : teams.getBuckets()) { 382 // 分組的屬性名 383 String team = (String) teamBucket.getKey(); 384 // 在將聚合後取最大值的內容取出來放到map中 385 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 386 // 取分組後的最大值 387 InternalMax ages = (InternalMax) subAggMap.get("max_age"); 388 // 獲取到年齡的值 389 double max = ages.getValue(); 390 // 打印輸出值 391 System.out.println(team + ": " + max); 392 } 393 } 394 395 /** 396 * select team, avg(age) as avg_age, sum(salary) as total_salary from player 397 * group by team; 398 */ 399 @Test 400 public void elasticsearchAgg4() { 401 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 402 // 指定分組字段 403 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team"); 404 // 指定聚合函數是求平均數據 405 AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age"); 406 // 指定另一個聚合函數是求和 407 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); 408 // 分組的聚合器關聯了兩個聚合函數 409 builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg)); 410 // 查詢 411 SearchResponse response = builder.execute().actionGet(); 412 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 413 // 按分組的名字取出數據 414 StringTerms teams = (StringTerms) aggMap.get("team_name"); 415 for (Terms.Bucket teamBucket : teams.getBuckets()) { 416 // 獲取球隊名字 417 String team = (String) teamBucket.getKey(); 418 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 419 // 根據別名取出平均年齡 420 InternalAvg avgAge = (InternalAvg) subAggMap.get("avg_age"); 421 // 根據別名取出薪水總和 422 InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary"); 423 double avgAgeValue = avgAge.getValue(); 424 double totalSalaryValue = totalSalary.getValue(); 425 System.out.println(team + ": " + avgAgeValue + ": " + totalSalaryValue); 426 } 427 } 428 429 /** 430 * select team, sum(salary) as total_salary from player group by team order by 431 * total_salary desc; 432 */ 433 @Test 434 public void elasticsearchAgg5() { 435 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball"); 436 // 按team進行分組,而後指定排序規則 437 TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team") 438 .order(Terms.Order.aggregation("total_salary ", true)); 439 // 指定一個聚合函數是求和 440 SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary"); 441 // 添加兩個聚合構建器。先按照team分組,再按照salary求和。 442 builder.addAggregation(termsAgg.subAggregation(sumAgg)); 443 // 查詢 444 SearchResponse response = builder.execute().actionGet(); 445 // 將查詢結果放入map中 446 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap(); 447 // 從查詢結果中獲取到team_name的信息 448 StringTerms teams = (StringTerms) aggMap.get("team_name"); 449 // 開始遍歷獲取到的信息 450 for (Terms.Bucket teamBucket : teams.getBuckets()) { 451 // 獲取到key的值 452 String team = (String) teamBucket.getKey(); 453 // 獲取到求和的值 454 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap(); 455 // 獲取到求和的值的信息 456 InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary"); 457 // 獲取到求和的值 458 double totalSalaryValue = totalSalary.getValue(); 459 // 打印輸出信息 460 System.out.println(team + " " + totalSalaryValue); 461 } 462 } 463 464 }
執行效果,本身能夠分別進行測試。因爲測試都寫了說明,這裏就不一一進行測試打印效果了。請自行練習使用便可。
做者:別先生
博客園:https://www.cnblogs.com/biehongli/
若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。