ElasticSearch 的 聚合(Aggregations)-javaAPI示例

Elasticsearch权威指南 java API deme文档地址: https://es.xiaoleilu.com/

 

Elasticsearch有一个功能叫做 聚合(aggregations) ,它允许你在数据上生成复杂的分析统计。它很像SQL中的 GROUP BY 但是功能更强大。

Aggregations种类分为:

  • Metrics, Metrics 是简单的对过滤出来的数据集进行avg,max等操作,是一个单一的数值。
  • Bucket, Bucket 你则可以理解为将过滤出来的数据集按条件分成多个小数据集,然后Metrics会分别作用在这些小数据集上。

聚合概念

和查询DSL一样,聚合(Aggregations)也拥有一种可组合(Composable)的语法:独立的功能单元可以被混合在一起来满足你的需求。这意味着需要学习的基本概念虽然不多,但是它们的组合方式是几近无穷的。

为了掌握聚合,你只需要了解两个主要概念:
Buckets(桶)
满足某个条件的文档集合。
Metrics(指标)
为某个桶中的文档计算得到的统计信息。

就是这样!每个聚合只是简单地由一个或者多个桶,零个或者多个指标组合而成。可以将它粗略地转换为SQL:

[java] view plain copy

 在CODE上查看代码片派生到我的代码片

  1. SELECT COUNT(color)   
  2. FROM table  
  3. GROUP BY color  

以上的COUNT(color)就相当于一个指标。GROUP BY color则相当于一个桶。
桶和SQL中的组(Grouping)拥有相似的概念,而指标则与COUNT(),SUM(),MAX()等相似。

让我们仔细看看这些概念。

一个桶就是满足特定条件的一个文档集合:

  • 一名员工要么属于男性桶,或者女性桶。
  • 城市Albany属于New York州这个桶。
  • 日期2014-10-28属于十月份这个桶。

随着聚合被执行,每份文档中的值会被计算来决定它们是否匹配了桶的条件。如果匹配成功,那么该文档会被置入该桶中,同时聚合会继续执行。
桶也能够嵌套在其它桶中,能让你完成层次或者条件划分这些需求。比如,Cincinnati可以被放置在Ohio州这个桶中,而整个Ohio州则能够被放置在美国这个桶中。

ES中有很多类型的桶,让你可以将文档通过多种方式进行划分(按小时,按最流行的词条,按年龄区间,按地理位置,以及更多)。但是从根本上,它们都根据相同的原理运作:按照条件对文档进行划分。

 

指标(Metrics)

 

桶能够让我们对文档进行有意义的划分,但是最终我们还是需要对每个桶中的文档进行某种指标计算。分桶是达到最终目的的手段:提供了对文档进行划分的方法,从而让你能够计算需要的指标。

多数指标仅仅是简单的数学运算(比如,min,mean,max以及sum),它们使用文档中的值进行计算。在实际应用中,指标能够让你计算例如平均薪资,最高出售价格,或者百分之95的查询延迟。

 

 

将两者结合起来

 

一个聚合就是一些桶和指标的组合。一个聚合可以只有一个桶,或者一个指标,或者每样一个。在桶中甚至可以有多个嵌套的桶。比如,我们可以将文档按照其所属国家进行分桶,然后对每个桶计算其平均薪资(一个指标)。

因为桶是可以嵌套的,我们能够实现一个更加复杂的聚合操作:

  1. 将文档按照国家进行分桶。(桶)
  2. 然后将每个国家的桶再按照性别分桶。(桶)
  3. 然后将每个性别的桶按照年龄区间进行分桶。(桶)
  4. 最后,为每个年龄区间计算平均薪资。(指标)

此时,就能够得到每个<国家,性别,年龄>组合的平均薪资信息了。它可以通过一个请求,一次数据遍历来完成

javaAPI

 

案例1

现有索引数据:

index:school
type:student
---------------------------------------------------
{"grade":"1", "class":"1", "name":"xiao 1"}
{"grade":"1", "class":"1", "name":"xiao 2"}
{"grade":"1", "class":"2", "name":"xiao 3"}
{"grade":"1", "class":"2", "name":"xiao 4"}
{"grade":"1", "class":"2", "name":"xiao 5"}

 

Java分组统计年级和班级学生个数,如SQL: SELECT grade,class,count(1) FROM student GROUP BY grade,class;

[java] view plain copy

 在CODE上查看代码片派生到我的代码片

  1. package test;  
  2.   
  3. import java.util.Iterator;  
  4. import java.util.Map;  
  5.   
  6. import org.elasticsearch.action.search.SearchRequestBuilder;  
  7. import org.elasticsearch.action.search.SearchResponse;  
  8. import org.elasticsearch.action.search.SearchType;  
  9. import org.elasticsearch.search.aggregations.Aggregation;  
  10. import org.elasticsearch.search.aggregations.AggregationBuilders;  
  11. import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;  
  12. import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;  
  13. import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;  
  14. import org.junit.Test;  
  15.   
  16. import utils.NesUtils;  
  17.   
  18. public class TestAggregation  
  19. {  
  20.     @Test  
  21.     public void testAggregation()  
  22.     {  
  23.         SearchRequestBuilder srb = NesUtils.getSearcher("school");  
  24.         srb.setTypes("student");  
  25.         srb.setSearchType(SearchType.COUNT);  
  26.           
  27.         TermsBuilder gradeTermsBuilder = AggregationBuilders.terms("gradeAgg").field("grade");  
  28.         TermsBuilder classTermsBuilder = AggregationBuilders.terms("classAgg").field("class");  
  29.           
  30.         gradeTermsBuilder.subAggregation(classTermsBuilder);  
  31.           
  32.         srb.addAggregation(gradeTermsBuilder);  
  33.           
  34.         SearchResponse sr = srb.execute().actionGet();  
  35.           
  36.         Map<String, Aggregation> aggMap = sr.getAggregations().asMap();  
  37.           
  38.         StringTerms gradeTerms = (StringTerms) aggMap.get("gradeAgg");  
  39.           
  40.         Iterator<Bucket> gradeBucketIt = gradeTerms.getBuckets().iterator();  
  41.           
  42.         while(gradeBucketIt.hasNext())  
  43.         {  
  44.             Bucket gradeBucket = gradeBucketIt.next();  
  45.             System.out.println(gradeBucket.getKey() + "年级有" + gradeBucket.getDocCount() +"个学生。");  
  46.               
  47.             StringTerms classTerms = (StringTerms) gradeBucket.getAggregations().asMap().get("classAgg");  
  48.             Iterator<Bucket> classBucketIt = classTerms.getBuckets().iterator();  
  49.               
  50.             while(classBucketIt.hasNext())  
  51.             {  
  52.                 Bucket classBucket = classBucketIt.next();  
  53.                 System.out.println(gradeBucket.getKey() + "年级" +classBucket.getKey() + "班有" + classBucket.getDocCount() +"个学生。");  
  54.             }  
  55.             System.out.println();  
  56.         }  
  57.           
  58.     }  
  59. }  
运行完成输出结果
---------------------------------------------------
1年级有5个学生。
1年级2班有3个学生。

1年级1班有2个学生

 

 

实现一个SQL: SELECT sum(field) from table group by field2

使用:AggregationBuilders.sum("name").field("field");

 

[java] view plain copy

 在CODE上查看代码片派生到我的代码片

  1. public static void searchTest() throws IOException {  
  2.         TermsBuilder companyNameAgg = AggregationBuilders.terms("companyName").field("companyName").size(10);  
  3.         SumBuilder companyNameAggSum = AggregationBuilders.sum("companyNameSum").field("cvcount");  
  4.         companyNameAgg.subAggregation(companyNameAggSum);//把sum聚合器放入到Term聚合器中,相当于先group by在sum  
  5.         SearchRequestBuilder searchBuilder = ElasticClientFactory.getClient().prepareSearch(indexname).
  6. setTypes(typeName).addAggregation(companyNameAgg);  
  7.         SearchResponse searchResponse = searchBuilder.execute().actionGet();  
  8.         Terms terms = searchResponse.getAggregations().get("companyName");  
  9.         List<Terms.Bucket> buckets = terms.getBuckets();  
  10.         List<String> list = Lists.newArrayList();  
  11.         for (Terms.Bucket bucket : buckets) {  
  12.             InternalSum internalSum = bucket.getAggregations().get("companyNameSum");//注意从bucket而不是searchResponse  
  13.             System.out.println(bucket.getKeyAsString() + "\t" + bucket.getDocCount() + "\t"+internalSum.getValue());  
  14.         }  
  15.         System.out.println("done");  
  16.     }  
  17.  
 
 

 

案例2

 
 
  1. PUT /company

  2. {

  3. "mappings": {

  4. "employee": {

  5. "properties": {

  6. "age": {

  7. "type": "long"

  8. },

  9. "country": {

  10. "type": "text",

  11. "fields": {

  12. "keyword": {

  13. "type": "keyword",

  14. "ignore_above": 256

  15. }

  16. },

  17. "fielddata": true

  18. },

  19. "join_date": {

  20. "type": "date"

  21. },

  22. "name": {

  23. "type": "text",

  24. "fields": {

  25. "keyword": {

  26. "type": "keyword",

  27. "ignore_above": 256

  28. }

  29. }

  30. },

  31. "position": {

  32. "type": "text",

  33. "fields": {

  34. "keyword": {

  35. "type": "keyword",

  36. "ignore_above": 256

  37. }

  38. }

  39. },

  40. "salary": {

  41. "type": "long"

  42. }

  43. }

  44. }

  45. }

  46. }

  47.  
  48. GET /company/employee/_search

  49. {

  50. "size": 0,

  51. "aggs": {

  52. "group_by_country": {

  53. "terms": {

  54. "field": "country"

  55. },

  56. "aggs": {

  57. "group_by_join_date": {

  58. "date_histogram": {

  59. "field": "join_date",

  60. "interval": "year"

  61. },

  62. "aggs": {

  63. "avg_salary": {

  64. "avg": {

  65. "field": "salary"

  66. }

  67. }

  68. }

  69. }

  70. }

  71. }

  72. }

  73. }

 

 
  1. public class EmployeeAggrApp {

  2.  
  3. @SuppressWarnings({ "unchecked", "resource" })

  4. public static void main(String[] args) throws Exception {

  5. Settings settings = Settings.builder()

  6. .put("cluster.name", "elasticsearch")

  7. .build();

  8.  
  9. TransportClient client = new PreBuiltTransportClient(settings)

  10. .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

  11.  
  12. SearchResponse searchResponse = client.prepareSearch("company")

  13. .addAggregation(AggregationBuilders.terms("group_by_country").field("country")

  14. .subAggregation(AggregationBuilders

  15. .dateHistogram("group_by_join_date")

  16. .field("join_date")

  17. .dateHistogramInterval(DateHistogramInterval.YEAR)

  18. .subAggregation(AggregationBuilders.avg("avg_salary").field("salary")))

  19. )

  20. .execute().actionGet();

  21.  
  22. Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();

  23.  
  24. StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");

  25. Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();

  26. while(groupByCountryBucketIterator.hasNext()) {

  27. Bucket groupByCountryBucket = groupByCountryBucketIterator.next();

  28. System.out.println(groupByCountryBucket.getKey() + ":" + groupByCountryBucket.getDocCount());

  29.  
  30. Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date");

  31. Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();

  32. while(groupByJoinDateBucketIterator.hasNext()) {

  33. org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();

  34. System.out.println(groupByJoinDateBucket.getKey() + ":" +groupByJoinDateBucket.getDocCount());

  35.  
  36. Avg avg = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");

  37. System.out.println(avg.getValue());

  38. }

  39. }

  40.  
  41. client.close();

  42. }

  43.  
  44. }

 

 

ElasticSearch AggregationBuilders java api常用聚会查询

以球员信息为例,player索引的player type包含5个字段,姓名,年龄,薪水,球队,场上位置。
index的mapping为:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

"mappings": {

    "player": {

        "properties": {

            "name": {

                "index""not_analyzed",

                "type""string"

            },

            "age": {

                "type""integer"

            },

            "salary": {

                "type""integer"

            },

            "team": {

                "index""not_analyzed",

                "type""string"

            },

            "position": {

                "index""not_analyzed",

                "type""string"

            }

        },

        "_all": {

            "enabled"false

        }

    }

}

  

索引中的全部数据:
 

微信截图_20160920171030.png

 
首先,初始化Builder:

1

SearchRequestBuilder sbuilder = client.prepareSearch("player").setTypes("player");

  

接下来举例说明各种聚合操作的实现方法,因为在es的api中,多字段上的聚合操作需要用到子聚合(subAggregation),初学者可能找不到方法(网上资料比较少,笔者在这个问题上折腾了两天,最后度了源码才彻底搞清楚T_T),后边会特意说明多字段聚合的实现方法。另外,聚合后的排序也会单独说明。

  • group by/count

例如要计算每个球队的球员数,如果使用SQL语句,应表达如下:

select team, count(*) as player_count from player group by team;

ES的java api:

1

2

3

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");

sbuilder.addAggregation(teamAgg);

SearchResponse response = sbuilder.execute().actionGet();

  

 

  • group by多个field

例如要计算每个球队每个位置的球员数,如果使用SQL语句,应表达如下:

select team, position, count(*) as pos_count from player group by team, position;

ES的java api:

1

2

3

4

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");

TermsBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");

sbuilder.addAggregation(teamAgg.subAggregation(posAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

 

  • max/min/sum/avg

例如要计算每个球队年龄最大/最小/总/平均的球员年龄,如果使用SQL语句,应表达如下:

select team, max(age) as max_age from player group by team;

ES的java api:

1

2

3

4

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");

MaxBuilder ageAgg= AggregationBuilders.max("max_age").field("age");

sbuilder.addAggregation(teamAgg.subAggregation(ageAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

 

  • 对多个field求max/min/sum/avg

例如要计算每个球队球员的平均年龄,同时又要计算总年薪,如果使用SQL语句,应表达如下:

select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;

ES的java api:

1

2

3

4

5

6

TermsBuilder teamAgg= AggregationBuilders.terms("team");

AvgBuilder ageAgg= AggregationBuilders.avg("avg_age").field("age");

SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");

sbuilder.addAggregation(teamAgg.subAggregation(ageAgg).subAggregation(salaryAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

  • 聚合后对Aggregation结果排序

例如要计算每个球队总年薪,并按照总年薪倒序排列,如果使用SQL语句,应表达如下:

select team, sum(salary) as total_salary from player group by team order by total_salary desc;

ES的java api:

1

2

3

4

TermsBuilder teamAgg= AggregationBuilders.terms("team").order(Order.aggregation("total_salary "false);

SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");

sbuilder.addAggregation(teamAgg.subAggregation(salaryAgg));

SearchResponse response = sbuilder.execute().actionGet();

  

需要特别注意的是,排序是在TermAggregation处执行的,Order.aggregation函数的第一个参数是aggregation的名字,第二个参数是boolean型,true表示正序,false表示倒序。 

  • Aggregation结果条数的问题

默认情况下,search执行后,仅返回10条聚合结果,如果想反悔更多的结果,需要在构建TermsBuilder 时指定size:

TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);

 

  • Aggregation结果的解析/输出

得到response后:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

Map<String, Aggregation> aggMap = response.getAggregations().asMap();

StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg");

Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();

while (teamBucketIt .hasNext()) {

Bucket buck = teamBucketIt .next();

//球队名

String team = buck.getKey();

//记录数

long count = buck.getDocCount();

//得到所有子聚合

Map subaggmap = buck.getAggregations().asMap();

//avg值获取方法

double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue();

//sum值获取方法

double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue();

//...

//max/min以此类推

}

  

 

  • 总结

综上,聚合操作主要是调用了SearchRequestBuilder的addAggregation方法,通常是传入一个TermsBuilder,子聚合调用TermsBuilder的subAggregation方法,可以添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常见的聚合操作。
 
从实现上来讲,SearchRequestBuilder在内部保持了一个私有的 SearchSourceBuilder实例, SearchSourceBuilder内部包含一个List<AbstractAggregationBuilder>,每次调用addAggregation时会调用 SearchSourceBuilder实例,添加一个AggregationBuilder。
同样的,TermsBuilder也在内部保持了一个List<AbstractAggregationBuilder>,调用addAggregation方法(来自父类addAggregation)时会添加一个AggregationBuilder。有兴趣的读者也可以阅读源码的实现。
 

1、 _index元数据解析

  • 代表这个document存放在哪个index中
  • 类似的数据放在一个索引,非类似的数据放不同索引。例如:product index(包含了所有的商品),sales index(包含了所有的商品销售数据),inventory index(包含了所有库存相关的数据)。如果你把比如product,sales,human resource(employee),全都放在一个大的index里面,比如说company index,不合适的。
  • index中包含了很多类似的document:类似是什么意思,其实指的就是说,这些document的fields很大一部分是相同的,你说你放了3个document,每个document的fields都完全不一样,这就不是类似了,就不太适合放到一个index里面去了。
  • 索引名称必须是小写的,不能用下划线开头,不能包含逗号:product,website,blog

     

     

    为什么类似的数据放在一个索引,非类似的数据放不同索引

2、 _type元数据解析

  • 代表document属于index中的哪个类别(type)
  • 一个索引通常会划分为多个type,逻辑上对index中有些许不同的几类数据进行分类:因为一批相同的数据,可能有很多相同的fields,但是还是可能会有一些轻微的不同,可能会有少数fields是不一样的,举个例子,就比如说,商品,可能划分为电子商品,生鲜商品,日化商品,等等。
  • type名称可以是大写或者小写,但是同时不能用下划线开头,不能包含逗号

3、 _id元数据解析

  • 代表document的唯一标识,id与index和type一起,可以唯一标识和定位一个document
  • 我们可以手动指定document的id(put /index/type/id),也可以不指定,由es自动为我们创建一个id

4、document id的手动指定与自动生成两种方式解析

1. 手动指定document id
(1)根据应用情况来说,是否满足手动指定document id的前提:

  • 一般来说,是从某些其他的系统中,导入一些数据到es时,会采取这种方式,就是使用系统中已有数据的唯一标识,作为es中document的id。

举个例子,比如说,我们现在在开发一个电商网站,做搜索功能,或者是OA系统,做员工检索功能。这个时候,数据首先会在网站系统或者IT系统内部的数据库中,会先有一份,此时就肯定会有一个数据库的primary key(自增长,UUID,或者是业务编号)。如果将数据导入到es中,此时就比较适合采用数据在数据库中已有的primary key。

  • 如果说,我们是在做一个系统,这个系统主要的数据存储就是es一种,也就是说,数据产生出来以后,可能就没有id,直接就放es一个存储,那么这个时候,可能就不太适合说手动指定document id的形式了,因为你也不知道id应该是什么,此时可以采取下面要讲解的让es自动生成id的方式。

 

转载于:https://my.oschina.net/tantexian/blog/3055762

THE END
< <上一篇
下一篇>>