ElasticSearch 的 聚合(Aggregations)-javaAPI示例

Elasticsearch權威指南 java API deme文檔地址: https://es.xiaoleilu.com/html

 

Elasticsearch有一個功能叫作 聚合(aggregations) ,它容許你在數據上生成複雜的分析統計。它很像SQL中的 GROUP BY 可是功能更強大。java

Aggregations種類分爲:web

  • Metrics, Metrics 是簡單的對過濾出來的數據集進行avg,max等操做,是一個單一的數值。
  • Bucket, Bucket 你則能夠理解爲將過濾出來的數據集按條件分紅多個小數據集,而後Metrics會分別做用在這些小數據集上。

聚合概念

和查詢DSL同樣,聚合(Aggregations)也擁有一種可組合(Composable)的語法:獨立的功能單元能夠被混合在一塊兒來知足你的需求。這意味着須要學習的基本概念雖然很少,可是它們的組合方式是幾近無窮的。數據庫

爲了掌握聚合,你只須要了解兩個主要概念:
Buckets(桶)
知足某個條件的文檔集合。
Metrics(指標)
爲某個桶中的文檔計算獲得的統計信息。api

就是這樣!每一個聚合只是簡單地由一個或者多個桶,零個或者多個指標組合而成。能夠將它粗略地轉換爲SQL:微信

[java] view plain copyapp

 在CODE上查看代碼片派生到個人代碼片

  1. SELECT COUNT(color)   
  2. FROM table  
  3. GROUP BY color  

以上的COUNT(color)就至關於一個指標。GROUP BY color則至關於一個桶。
桶和SQL中的組(Grouping)擁有類似的概念,而指標則與COUNT(),SUM(),MAX()等類似。elasticsearch

讓咱們仔細看看這些概念。函數

一個桶就是知足特定條件的一個文檔集合:學習

  • 一名員工要麼屬於男性桶,或者女性桶。
  • 城市Albany屬於New York州這個桶。
  • 日期2014-10-28屬於十月份這個桶。

隨着聚合被執行,每份文檔中的值會被計算來決定它們是否匹配了桶的條件。若是匹配成功,那麼該文檔會被置入該桶中,同時聚合會繼續執行。
桶也可以嵌套在其它桶中,能讓你完成層次或者條件劃分這些需求。好比,Cincinnati能夠被放置在Ohio州這個桶中,而整個Ohio州則可以被放置在美國這個桶中。

ES中有不少類型的桶,讓你能夠將文檔經過多種方式進行劃分(按小時,按最流行的詞條,按年齡區間,按地理位置,以及更多)。可是從根本上,它們都根據相同的原理運做:按照條件對文檔進行劃分。

 

指標(Metrics)

 

桶可以讓咱們對文檔進行有意義的劃分,可是最終咱們仍是須要對每一個桶中的文檔進行某種指標計算。分桶是達到最終目的的手段:提供了對文檔進行劃分的方法,從而讓你可以計算須要的指標。

多數指標僅僅是簡單的數學運算(好比,min,mean,max以及sum),它們使用文檔中的值進行計算。在實際應用中,指標可以讓你計算例如平均薪資,最高出售價格,或者百分之95的查詢延遲。

 

 

將二者結合起來

 

一個聚合就是一些桶和指標的組合。一個聚合能夠只有一個桶,或者一個指標,或者每樣一個。在桶中甚至能夠有多個嵌套的桶。好比,咱們能夠將文檔按照其所屬國家進行分桶,而後對每一個桶計算其平均薪資(一個指標)。

由於桶是能夠嵌套的,咱們可以實現一個更加複雜的聚合操做:

  1. 將文檔按照國家進行分桶。(桶)
  2. 而後將每一個國家的桶再按照性別分桶。(桶)
  3. 而後將每一個性別的桶按照年齡區間進行分桶。(桶)
  4. 最後,爲每一個年齡區間計算平均薪資。(指標)

此時,就可以獲得每一個<國家,性別,年齡>組合的平均薪資信息了。它能夠經過一個請求,一次數據遍從來完成

javaAPI

 

案例1

現有索引數據:

index:school
type:student
---------------------------------------------------
{"grade":"1", "class":"1", "name":"xiao 1"}
{"grade":"1", "class":"1", "name":"xiao 2"}
{"grade":"1", "class":"2", "name":"xiao 3"}
{"grade":"1", "class":"2", "name":"xiao 4"}
{"grade":"1", "class":"2", "name":"xiao 5"}

 

Java分組統計年級和班級學生個數,如SQL: SELECT grade,class,count(1) FROM student GROUP BY grade,class;

[java] view plain copy

 在CODE上查看代碼片派生到個人代碼片

  1. package test;  
  2.   
  3. import java.util.Iterator;  
  4. import java.util.Map;  
  5.   
  6. import org.elasticsearch.action.search.SearchRequestBuilder;  
  7. import org.elasticsearch.action.search.SearchResponse;  
  8. import org.elasticsearch.action.search.SearchType;  
  9. import org.elasticsearch.search.aggregations.Aggregation;  
  10. import org.elasticsearch.search.aggregations.AggregationBuilders;  
  11. import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;  
  12. import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;  
  13. import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;  
  14. import org.junit.Test;  
  15.   
  16. import utils.NesUtils;  
  17.   
  18. public class TestAggregation  
  19. {  
  20.     @Test  
  21.     public void testAggregation()  
  22.     {  
  23.         SearchRequestBuilder srb = NesUtils.getSearcher("school");  
  24.         srb.setTypes("student");  
  25.         srb.setSearchType(SearchType.COUNT);  
  26.           
  27.         TermsBuilder gradeTermsBuilder = AggregationBuilders.terms("gradeAgg").field("grade");  
  28.         TermsBuilder classTermsBuilder = AggregationBuilders.terms("classAgg").field("class");  
  29.           
  30.         gradeTermsBuilder.subAggregation(classTermsBuilder);  
  31.           
  32.         srb.addAggregation(gradeTermsBuilder);  
  33.           
  34.         SearchResponse sr = srb.execute().actionGet();  
  35.           
  36.         Map<String, Aggregation> aggMap = sr.getAggregations().asMap();  
  37.           
  38.         StringTerms gradeTerms = (StringTerms) aggMap.get("gradeAgg");  
  39.           
  40.         Iterator<Bucket> gradeBucketIt = gradeTerms.getBuckets().iterator();  
  41.           
  42.         while(gradeBucketIt.hasNext())  
  43.         {  
  44.             Bucket gradeBucket = gradeBucketIt.next();  
  45.             System.out.println(gradeBucket.getKey() + "年級有" + gradeBucket.getDocCount() +"個學生。");  
  46.               
  47.             StringTerms classTerms = (StringTerms) gradeBucket.getAggregations().asMap().get("classAgg");  
  48.             Iterator<Bucket> classBucketIt = classTerms.getBuckets().iterator();  
  49.               
  50.             while(classBucketIt.hasNext())  
  51.             {  
  52.                 Bucket classBucket = classBucketIt.next();  
  53.                 System.out.println(gradeBucket.getKey() + "年級" +classBucket.getKey() + "班有" + classBucket.getDocCount() +"個學生。");  
  54.             }  
  55.             System.out.println();  
  56.         }  
  57.           
  58.     }  
  59. }  
運行完成輸出結果
---------------------------------------------------
1年級有5個學生。
1年級2班有3個學生。

1年級1班有2個學生

 

 

實現一個SQL: SELECT sum(field) from table group by field2

使用:AggregationBuilders.sum("name").field("field");

 

[java] view plain copy

 在CODE上查看代碼片派生到個人代碼片

  1. public static void searchTest() throws IOException {  
  2.         TermsBuilder companyNameAgg = AggregationBuilders.terms("companyName").field("companyName").size(10);  
  3.         SumBuilder companyNameAggSum = AggregationBuilders.sum("companyNameSum").field("cvcount");  
  4.         companyNameAgg.subAggregation(companyNameAggSum);//把sum聚合器放入到Term聚合器中,至關於先group by在sum  
  5.         SearchRequestBuilder searchBuilder = ElasticClientFactory.getClient().prepareSearch(indexname).
  6. setTypes(typeName).addAggregation(companyNameAgg);  
  7.         SearchResponse searchResponse = searchBuilder.execute().actionGet();  
  8.         Terms terms = searchResponse.getAggregations().get("companyName");  
  9.         List<Terms.Bucket> buckets = terms.getBuckets();  
  10.         List<String> list = Lists.newArrayList();  
  11.         for (Terms.Bucket bucket : buckets) {  
  12.             InternalSum internalSum = bucket.getAggregations().get("companyNameSum");//注意從bucket而不是searchResponse  
  13.             System.out.println(bucket.getKeyAsString() + "\t" + bucket.getDocCount() + "\t"+internalSum.getValue());  
  14.         }  
  15.         System.out.println("done");  
  16.     }  
  17.  
 
 

 

案例2

 
 
  1. PUT /company

  2. {

  3. "mappings": {

  4. "employee": {

  5. "properties": {

  6. "age": {

  7. "type": "long"

  8. },

  9. "country": {

  10. "type": "text",

  11. "fields": {

  12. "keyword": {

  13. "type": "keyword",

  14. "ignore_above": 256

  15. }

  16. },

  17. "fielddata": true

  18. },

  19. "join_date": {

  20. "type": "date"

  21. },

  22. "name": {

  23. "type": "text",

  24. "fields": {

  25. "keyword": {

  26. "type": "keyword",

  27. "ignore_above": 256

  28. }

  29. }

  30. },

  31. "position": {

  32. "type": "text",

  33. "fields": {

  34. "keyword": {

  35. "type": "keyword",

  36. "ignore_above": 256

  37. }

  38. }

  39. },

  40. "salary": {

  41. "type": "long"

  42. }

  43. }

  44. }

  45. }

  46. }

  47.  
  48. GET /company/employee/_search

  49. {

  50. "size": 0,

  51. "aggs": {

  52. "group_by_country": {

  53. "terms": {

  54. "field": "country"

  55. },

  56. "aggs": {

  57. "group_by_join_date": {

  58. "date_histogram": {

  59. "field": "join_date",

  60. "interval": "year"

  61. },

  62. "aggs": {

  63. "avg_salary": {

  64. "avg": {

  65. "field": "salary"

  66. }

  67. }

  68. }

  69. }

  70. }

  71. }

  72. }

  73. }

 

 
  1. public class EmployeeAggrApp {

  2.  
  3. @SuppressWarnings({ "unchecked", "resource" })

  4. public static void main(String[] args) throws Exception {

  5. Settings settings = Settings.builder()

  6. .put("cluster.name", "elasticsearch")

  7. .build();

  8.  
  9. TransportClient client = new PreBuiltTransportClient(settings)

  10. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

  11.  
  12. SearchResponse searchResponse = client.prepareSearch("company")

  13. .addAggregation(AggregationBuilders.terms("group_by_country").field("country")

  14. .subAggregation(AggregationBuilders

  15. .dateHistogram("group_by_join_date")

  16. .field("join_date")

  17. .dateHistogramInterval(DateHistogramInterval.YEAR)

  18. .subAggregation(AggregationBuilders.avg("avg_salary").field("salary")))

  19. )

  20. .execute().actionGet();

  21.  
  22. Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();

  23.  
  24. StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");

  25. Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();

  26. while(groupByCountryBucketIterator.hasNext()) {

  27. Bucket groupByCountryBucket = groupByCountryBucketIterator.next();

  28. System.out.println(groupByCountryBucket.getKey() + ":" + groupByCountryBucket.getDocCount());

  29.  
  30. Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");

  31. Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();

  32. while(groupByJoinDateBucketIterator.hasNext()) {

  33. org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();

  34. System.out.println(groupByJoinDateBucket.getKey() + ":" +groupByJoinDateBucket.getDocCount());

  35.  
  36. Avg avg = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");

  37. System.out.println(avg.getValue());

  38. }

  39. }

  40.  
  41. client.close();

  42. }

  43.  
  44. }

 

 

ElasticSearch AggregationBuilders java api經常使用聚會查詢

以球員信息爲例,player索引的player type包含5個字段,姓名,年齡,薪水,球隊,場上位置。
index的mapping爲:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

"mappings": {

    "player": {

        "properties": {

            "name": {

                "index""not_analyzed",

                "type""string"

            },

            "age": {

                "type""integer"

            },

            "salary": {

                "type""integer"

            },

            "team": {

                "index""not_analyzed",

                "type""string"

            },

            "position": {

                "index""not_analyzed",

                "type""string"

            }

        },

        "_all": {

            "enabled"false

        }

    }

}

  


索引中的所有數據:
 

微信截圖_20160920171030.png


 
首先,初始化Builder:

1

SearchRequestBuilder sbuilder = client.prepareSearch("player").setTypes("player");

  

接下來舉例說明各類聚合操做的實現方法,由於在es的api中,多字段上的聚合操做須要用到子聚合(subAggregation),初學者可能找不到方法(網上資料比較少,筆者在這個問題上折騰了兩天,最後度了源碼才完全搞清楚T_T),後邊會特地說明多字段聚合的實現方法。另外,聚合後的排序也會單獨說明。

  • group by/count

例如要計算每一個球隊的球員數,若是使用SQL語句,應表達以下:

select team, count(*) as player_count from player group by team;

ES的java api:

1

2

3

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");

sbuilder.addAggregation(teamAgg);

SearchResponse response = sbuilder.execute().actionGet();

  

 

  • group by多個field

例如要計算每一個球隊每一個位置的球員數,若是使用SQL語句,應表達以下:

select team, position, count(*) as pos_count from player group by team, position;

ES的java api:

1

2

3

4

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");

TermsBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");

sbuilder.addAggregation(teamAgg.subAggregation(posAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

 

  • max/min/sum/avg

例如要計算每一個球隊年齡最大/最小/總/平均的球員年齡,若是使用SQL語句,應表達以下:

select team, max(age) as max_age from player group by team;

ES的java api:

1

2

3

4

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");

MaxBuilder ageAgg= AggregationBuilders.max("max_age").field("age");

sbuilder.addAggregation(teamAgg.subAggregation(ageAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

 

  • 對多個field求max/min/sum/avg

例如要計算每一個球隊球員的平均年齡,同時又要計算總年薪,若是使用SQL語句,應表達以下:

select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;

ES的java api:

1

2

3

4

5

6

TermsBuilder teamAgg= AggregationBuilders.terms("team");

AvgBuilder ageAgg= AggregationBuilders.avg("avg_age").field("age");

SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");

sbuilder.addAggregation(teamAgg.subAggregation(ageAgg).subAggregation(salaryAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

  • 聚合後對Aggregation結果排序

例如要計算每一個球隊總年薪,並按照總年薪倒序排列,若是使用SQL語句,應表達以下:

select team, sum(salary) as total_salary from player group by team order by total_salary desc;

ES的java api:

1

2

3

4

TermsBuilder teamAgg= AggregationBuilders.terms("team").order(Order.aggregation("total_salary "false);

SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");

sbuilder.addAggregation(teamAgg.subAggregation(salaryAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

須要特別注意的是,排序是在TermAggregation處執行的,Order.aggregation函數的第一個參數是aggregation的名字,第二個參數是boolean型,true表示正序,false表示倒序。 

  • Aggregation結果條數的問題

默認狀況下,search執行後,僅返回10條聚合結果,若是想反悔更多的結果,須要在構建TermsBuilder 時指定size:

TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);

 

  • Aggregation結果的解析/輸出

獲得response後:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

Map<String, Aggregation> aggMap = response.getAggregations().asMap();

StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg");

Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();

while (teamBucketIt .hasNext()) {

Bucket buck = teamBucketIt .next();

//球隊名

String team = buck.getKey();

//記錄數

long count = buck.getDocCount();

//獲得全部子聚合

Map subaggmap = buck.getAggregations().asMap();

//avg值獲取方法

double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue();

//sum值獲取方法

double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue();

//...

//max/min以此類推

}

  

 

  • 總結

綜上,聚合操做主要是調用了SearchRequestBuilder的addAggregation方法,一般是傳入一個TermsBuilder,子聚合調用TermsBuilder的subAggregation方法,能夠添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常見的聚合操做。
 
從實現上來說,SearchRequestBuilder在內部保持了一個私有的 SearchSourceBuilder實例, SearchSourceBuilder內部包含一個List<AbstractAggregationBuilder>,每次調用addAggregation時會調用 SearchSourceBuilder實例,添加一個AggregationBuilder。
一樣的,TermsBuilder也在內部保持了一個List<AbstractAggregationBuilder>,調用addAggregation方法(來自父類addAggregation)時會添加一個AggregationBuilder。有興趣的讀者也能夠閱讀源碼的實現。
 

一、 _index元數據解析

  • 表明這個document存放在哪一個index中
  • 相似的數據放在一個索引,非相似的數據放不一樣索引。例如:product index(包含了全部的商品),sales index(包含了全部的商品銷售數據),inventory index(包含了全部庫存相關的數據)。若是你把好比product,sales,human resource(employee),全都放在一個大的index裏面,好比說company index,不合適的。
  • index中包含了不少相似的document:相似是什麼意思,其實指的就是說,這些document的fields很大一部分是相同的,你說你放了3個document,每一個document的fields都徹底不同,這就不是相似了,就不太適合放到一個index裏面去了。
  • 索引名稱必須是小寫的,不能用下劃線開頭,不能包含逗號:product,website,blog

     

     

    爲何相似的數據放在一個索引,非相似的數據放不一樣索引

二、 _type元數據解析

  • 表明document屬於index中的哪一個類別(type)
  • 一個索引一般會劃分爲多個type,邏輯上對index中有些許不一樣的幾類數據進行分類:由於一批相同的數據,可能有不少相同的fields,可是仍是可能會有一些輕微的不一樣,可能會有少數fields是不同的,舉個例子,就好比說,商品,可能劃分爲電子商品,生鮮商品,日化商品,等等。
  • type名稱能夠是大寫或者小寫,可是同時不能用下劃線開頭,不能包含逗號

三、 _id元數據解析

  • 表明document的惟一標識,id與index和type一塊兒,能夠惟一標識和定位一個document
  • 咱們能夠手動指定document的id(put /index/type/id),也能夠不指定,由es自動爲咱們建立一個id

四、document id的手動指定與自動生成兩種方式解析

1. 手動指定document id
(1)根據應用狀況來講,是否知足手動指定document id的前提:

  • 通常來講,是從某些其餘的系統中,導入一些數據到es時,會採起這種方式,就是使用系統中已有數據的惟一標識,做爲es中document的id。

舉個例子,好比說,咱們如今在開發一個電商網站,作搜索功能,或者是OA系統,作員工檢索功能。這個時候,數據首先會在網站系統或者IT系統內部的數據庫中,會先有一份,此時就確定會有一個數據庫的primary key(自增加,UUID,或者是業務編號)。若是將數據導入到es中,此時就比較適合採用數據在數據庫中已有的primary key。

  • 若是說,咱們是在作一個系統,這個系統主要的數據存儲就是es一種,也就是說,數據產生出來之後,可能就沒有id,直接就放es一個存儲,那麼這個時候,可能就不太適合說手動指定document id的形式了,由於你也不知道id應該是什麼,此時能夠採起下面要講解的讓es自動生成id的方式。
相關文章
相關標籤/搜索