文章

Elasticsearch:aggregation

本来以为聚合没啥的,就没细看。结果最近用到了,一细看,炸了……分布式聚合和非分布式的比,真的是难了好几个数量级……

  1. multiple aggregation vs. sub-aggregation
  2. bucket aggs - 数据分桶
    1. terms agg
      1. 分布式聚合
      2. 排序
      3. runtime field
      4. 值相关
    2. rare agg
    3. composit agg
      1. 分页
    4. range agg
    5. filter agg
    6. histogram agg
    7. date histogram agg
    8. children agg - 先按父类聚合,再按子类聚合
    9. parent agg - 先按子类聚合,再按父类聚合
  3. metric aggs
    1. sum/avg/stats/percentile agg
    2. value_count agg
    3. cardinality agg
  4. pipeline aggs - 后处理
    1. derivative agg
    2. avg_bucket agg
    3. bucket_sort agg
    4. bucket selector agg
    5. derivative agg
  5. Java aggregation
  6. 一些场景
    1. 分组后求top hits的metrics
  7. 感想

es的聚合分为三种:

  • bucket aggbucket聚合类似于mysql聚合的group by部分,它的主要目的是给数据分组,把一条条数据分组到不同的bucket里。bucket agg的主要目的是分组,不是计算metric。分组过后,可以使用sub bucket agg做进一步的分桶,或者sub metric agg对分桶数据计算metric;
  • metric agg:类似于mysql聚合的求值部分,比如sum、avg等。所以它不支持子聚合(已经是U和数据了,没法再分桶了);
  • pipeline agg操作的对象是其他聚合产出的桶,而非原始的一条条文档,主要目的是对聚合结果进行后处理,所以它被称为pipeline agg,就像es的pipeline一样,进行的是数据“后操作”。

multiple aggregation vs. sub-aggregation

  • 按照(A,B)聚合是子聚合,B是A的子聚合;
  • 返回按照A或者按照B的聚合是multiple聚合,相当于一个聚合查询查了两个聚合。和写成两个聚合查询是等价的

举个例子,在下面的查询中,ab是a的子查询,b和a是并列的查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
GET <index>/_search
{
  "aggs": {
    "a": {
      "terms": {
        "field": "media.isVisible",
        "size": 10
      },
      "aggs": {
        "ab": {
          "terms": {
            "field": "media.isLiveStreaming",
            "size": 10
          }
        }
      }
    },
    "b": {
      "terms": {
        "field": "media.isLiveStreaming",
        "size": 10
      }
    }
  }
}

结果很明朗:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
{
  "took" : 1884,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "a" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 1,
          "key_as_string" : "true",
          "doc_count" : 61096525,
          "ab" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : 0,
                "key_as_string" : "false",
                "doc_count" : 51635194
              },
              {
                "key" : 1,
                "key_as_string" : "true",
                "doc_count" : 9461331
              }
            ]
          }
        },
        {
          "key" : 0,
          "key_as_string" : "false",
          "doc_count" : 19169058,
          "ab" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
              {
                "key" : 1,
                "key_as_string" : "true",
                "doc_count" : 432454
              }
            ]
          }
        }
      ]
    },
    "b" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 0,
          "key_as_string" : "false",
          "doc_count" : 51635194
        },
        {
          "key" : 1,
          "key_as_string" : "true",
          "doc_count" : 9893785
        }
      ]
    }
  }
}

a和b是独立的聚合。a和ab的子查询是先按照A再按照B进行的聚合。

bucket aggs - 数据分桶

bucket就是group by操作,把一条条数据按照某个或某些维度的值进行分桶,每一条数据都会落到相应的bucket里。各种不同的bucket agg都是为了产生bucket

在elasticsearch里,和单个搜索最多能返回index.max_result_window即10000个(from + size)文档一样,聚合操作单个响应最多返回search.max_buckets即65536个bucket,同样使用size参数指定(位置不同,agg里具体聚合下的size参数,而非搜索最外层的size参数)。如果想遍历所有文档需要用scroll或search_after,同样,如果想遍历所有bucket,需要使用composite agg

Elasticsearch:遍历索引

terms agg

terms就是单纯的按照field的值做group by,应该是最常用的group by。但是记住单个请求的size最大只能是search.max_buckets,而且它是按照分桶内文档count倒序排列的

分布式聚合

分布式聚合的问题:对于搜索,如果需要按照得分的top10排序,每个shard选出top10,最后汇总在一起再取top10即可,因为一个doc只在一个shard里,其他shard的doc和它无关。但是聚合不一样,某一个本应属于整体top10 bucket,在某个shard上可能数目很少,没有进入该shard的top10。所以每个bucket取top10,最后汇总,可能漏掉了一些文档。为此,es默认为terms聚合取的top bucket大于指定的size:shard_size = size * 1.5 + 10。如果要的是top10,每个shard实际取了25个bucket。

当然,这么做只是缓解,依然可能会漏掉。可以增大shard_size以提高精准度,不过会增加数据传输量和协调节点的内存占用(那可不,最后都在协调节点里处理的)

1
2
3
4
5
6
7
8
9
10
11
12
{
  "size": 0,
  "aggs": {
    "users": {
      "terms": {
        "field": "age",
        "size": 1,
        "shard_size": 10
      }
    }
  }
}

es建议增大shard_size,而不是size:It is much cheaper to increase the shard_size than to increase the size

那么究竟会有多不准?可以做两个猜想:

  1. 假设某top bucket A在每个shard都取回来了,那么这个bucket就是准的;
  2. 假设某top bucket A在某个shard没取到,那么 这个shard返回的最后一个bucket的数量,就是A的最大可能被漏掉的数量(可能下一个bucket就是A了┓( ´∀` )┏)。而 最倒霉的情况,每一个ahrd的bucket A都是这种极限情况,所以每一个shard返回的最小bucket的值加起来,就是最大可能的误差量

就记住一个例子:统计一个公司最多的姓氏,如果“取每个基地(shard)的top1进行汇总并比较出最终的top1”,会有两个问题:

  1. top1未必在每个基地的top1姓氏之中:它可能每个基地都只是top2,但总数是top1;
  2. 即使在,统计出的该姓氏总人数也很可能会遗漏:因为它在某个基地可能非top1,而这部分数据没有被统计上。

terms agg的结果里会有这么两个数值:

  • doc_count_error_upper_bond:上述最倒霉情况下的可能的最大误差量;
  • sum_other_doc_count:总文档数减去返回的所有bucket的文档总数。显然,它大于0代表有的bucket没有进行最终的汇总计数。但正常情况下这个值肯定大于零……除非把每个shard的所有数据都取到协调节点了,但这几乎又是不可能的;

比如在一个shard=6的系统里,假设每个url只能出现两次,且一定在同一个shard里,那么:

1
2
3
4
5
6
7
8
9
10
11
12
GET url_info/_search
{
  "size": 0, 
  "aggs": {
    "by_url": {
      "terms": {
        "field": "url",
        "size": 3
      }
    }
  }
}

返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
  "took" : 2764,
  "timed_out" : false,
  "_shards" : {
    "total" : 6,
    "successful" : 6,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_url" : {
      "doc_count_error_upper_bound" : 12,
      "sum_other_doc_count" : 12649621,
      "buckets" : [
        {
          "key" : "../82",
          "doc_count" : 2
        },
        {
          "key" : "../gmt.php",
          "doc_count" : 2
        },
        {
          "key" : "./redirect.php",
          "doc_count" : 2
        }
      ]
    }
  }
}

每个shard对url做聚合,所有的bucket都是2。所以最小bucket也是2,doc_count_error_upper_bound就是6个2=12。

查询的时候添加show_term_doc_count_error参数则会给每个最终展示出来的bucket显示 它自己的 可能的最大误差数doc_count_error_upper_bound

1
2
3
4
5
6
7
8
9
10
11
12
13
GET url_info/_search
{
  "size": 0, 
  "aggs": {
    "by_url": {
      "terms": {
        "field": "url",
        "size": 3,
        "show_term_doc_count_error": true
      }
    }
  }
}

由于显示出的每个bucket都在一个shard里出现了,且在其他shard里都没出现,所以每个bucket的最大误差是5个2=10:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
  "took" : 2437,
  "timed_out" : false,
  "_shards" : {
    "total" : 6,
    "successful" : 6,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_url" : {
      "doc_count_error_upper_bound" : 12,
      "sum_other_doc_count" : 12649621,
      "buckets" : [
        {
          "key" : "../82",
          "doc_count" : 2,
          "doc_count_error_upper_bound" : 10
        },
        {
          "key" : "../gmt.php",
          "doc_count" : 2,
          "doc_count_error_upper_bound" : 10
        },
        {
          "key" : "./redirect.php",
          "doc_count" : 2,
          "doc_count_error_upper_bound" : 10
        }
      ]
    }
  }
}

显然,如果一个bucket自己的最大可能误差为0(它在每个shard里都取到了,最终汇总的时候它肯定没有遗漏),它就是准确的。

  • https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket-terms-aggregation.html#search-aggregations-bucket-terms-aggregation-shard-size
  • 这篇写的不错:https://www.leevii.com/2020/01/es-terms-aggregation-statistics-inaccuracy.html

果然,一分布式,本来简单的问题变得复杂了……分布式系统无法获取全局数据。如果shard=1,就没有这些问题了:D

还有两个跟分布式聚合相关的属性,可以用来做过滤:

  • shard_min_doc_count:默认是0,shard的该bucket必须满足这个数才能被返回;
  • min_doc_count:默认是1,把所有符合shard_min_doc_count的bucket加起来,如果还符合min_doc_count,才能返回;

排序

虽然可以认为terms是简单的group by,但它毕竟是个聚合,聚合是要有数据的(要不然聚合啥?),所以它默认会统计该bucket下所有的文档数量的:doc_count,相当于MySQL的count

默认情况下,terms agg是按照分桶内文档数count降序排的,可以用order属性更改为按照升序排,"order": { "_count": "asc" },但是es不建议这么干:因为它不准,原因和上面一样,它是分布式的。而且,它比降序要不准的多:小值有一个shard大了就不再小了,大值有一个shard小,加起来依然是大的。所以 降序聚合虽然有误差,但一般是可接受的,升序聚合则很容易错太多

想升序排:

  • 使用rare_terms:https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket-rare-terms-aggregation.html
  • 拒绝asc:https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket-terms-aggregation.html#_ordering_by_count_ascending

按bucket的值排序:doc数不准确

但是,按照“bucket的key的顺序”排是可以的,无论升序降序都是没问题的,不仅是精准排序,得到的doc数量也是准确的。

按bucket的key排序:doc数准确

排序条件影响的是“怎么从shard里挑bucket”!

比如按照field的字典序排:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET url_info/_search
{
  "size": 0, 
  "aggs": {
    "by_url": {
      "terms": {
        "field": "url",
        "size": 3,
        "show_term_doc_count_error": true,
        "order": {
          "_key": "asc"
        }
      }
    }
  }
}

是在按照field的key的最值(字典序升序)挑bucket。如果别的shard的top没有这个bucket,说明它就没这个bucket(而不像sum,因为该shard里相关doc数量太多导致没有统计到它)。

看结果也很明确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
  "took" : 2554,
  "timed_out" : false,
  "_shards" : {
    "total" : 6,
    "successful" : 6,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_url" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 12649622,
      "buckets" : [
        {
          "key" : "",
          "doc_count" : 1,
          "doc_count_error_upper_bound" : 0
        },
        {
          "key" : ".",
          "doc_count" : 2,
          "doc_count_error_upper_bound" : 0
        },
        {
          "key" : "../146",
          "doc_count" : 2,
          "doc_count_error_upper_bound" : 0
        }
      ]
    }
  }
}

doc_count_error_upper_bound都是0,说明确实是精准排序。

  • _count:按聚合后的sum值排;
  • _key:按field的自然序排;

如果 按照子聚合排序,其实是按照“bucket的值”排序,只不过这里的值是按照子聚合算出来的,所以最后计算出的doc数也不准确。

但是,如果按照这两种方式进行子聚合排序,顺序是准确的

  1. 子聚合是最大值,按其降序排列;
  2. 子聚合是最小值,按其升序排列;

比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
GET witake_media/_search
{
  "size": 0,
  "aggs": {
    "by_user": {
      "terms": {
        "field": "userId",
        "size": 3,
        "order": {
          "by_time": "desc"
        }
      },
      "aggs": {
        "by_time": {
          "max": {
            "field": "timestamp"
          }
        }
      }
    }
  }
}

是每个bucket得到一个最大的时间,然后按照时间倒序排列bucket。

bucket的doc_value是不准确的,而且这次doc_count_error_upper_bound也不计算了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
{
  "took" : 12473,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_user" : {
      "doc_count_error_upper_bound" : -1,
      "sum_other_doc_count" : 139259578,
      "buckets" : [
        {
          "key" : 3848752,
          "doc_count" : 43,
          "doc_count_error_upper_bound" : -1,
          "by_time" : {
            "value" : 1.661244437E12,
            "value_as_string" : "1661244437000"
          }
        },
        {
          "key" : 3849226,
          "doc_count" : 95,
          "doc_count_error_upper_bound" : -1,
          "by_time" : {
            "value" : 1.661244232E12,
            "value_as_string" : "1661244232000"
          }
        },
        {
          "key" : 3847758,
          "doc_count" : 53,
          "doc_count_error_upper_bound" : -1,
          "by_time" : {
            "value" : 1.661244081E12,
            "value_as_string" : "1661244081000"
          }
        }
      ]
    }
  }
}

如果不按这两种方式进行子聚合排序,不仅doc不正确,连顺序都是不正确的。

  • https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket-terms-aggregation.html#_ordering_by_a_sub_aggregation

Pipeline aggs cannot be used for sorting: Pipeline aggregations are run during the reduce phase after all other aggregations have already completed. For this reason, they cannot be used for ordering.

子聚合可以一直进行下去,排序可以按照最里面的子聚合得到的value排,但是指定order path的时候需要使用符号>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /_search
{
  "aggs": {
    "countries": {
      "terms": {
        "field": "artist.country",
        "order": { "rock>playback_stats.avg": "desc" }
      },
      "aggs": {
        "rock": {
          "filter": { "term": { "genre": "rock" } },
          "aggs": {
            "playback_stats": { "stats": { "field": "play_count" } }
          }
        }
      }
    }
  }
}

es使用下面的引用方式

  • AGG_SEPARATOR = ‘>’ ;
  • METRIC_SEPARATOR = ‘.’ ;
  • AGG_NAME = ;
  • METRIC = <the name of the metric (in case of multi-value metrics aggregation)> ;
  • PATH = [ , ]* [ , ] ;

runtime field

如果想聚合的字段不存在,可以搞一个runtime field,然后按照动态字段聚合。

当然,这会比聚合已存在的字段慢一些。如果字段很重要,又想加速,把它作为一个普通字段加到索引里。

值相关

在聚合的时候根据值做一些处理:

  • 过滤:https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket-terms-aggregation.html#_filtering_values_4
  • 值缺失:https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket-terms-aggregation.html#_missing_value_5

光一个terms agg都够看大半天的了……分布式聚合真的是……不适合聚合……

rare agg

  • https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket-rare-terms-aggregation.html

需要的时候再折腾吧……也得研究一会儿……

composit agg

看字面意思就知道composite agg可以支持混合聚合,它能把多种聚合方式结合起来,形成一种“复合bucket”。另外,想要遍历所有的bucket,也需要使用composit agg。它有以下特性:

  • 可以按照多个key聚合
  • 只能按照多个key的自然序排序
  • 支持search_after分页

比如,针对所有的media,按照kol的id、platform聚合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
GET witake_media/_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "size": 3, 
        "sources": [
          {
            "by_user": {
              "terms": {
                "field": "userId"
              }
            }
          },
          {
            "by_platform": {
              "terms": {
                "field": "platform"
              }
            }
          }
        ]
      }
    }
  }
}

返回按照<userId, platform>这个复合key组成的bucket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
{
  "took" : 392,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_buckets" : {
      "after_key" : {
        "by_user" : 105030,
        "by_platform" : "YouTube"
      },
      "buckets" : [
        {
          "key" : {
            "by_user" : 10532,
            "by_platform" : "YouTube"
          },
          "doc_count" : 1438
        },
        {
          "key" : {
            "by_user" : 105028,
            "by_platform" : "YouTube"
          },
          "doc_count" : 174
        },
        {
          "key" : {
            "by_user" : 105030,
            "by_platform" : "YouTube"
          },
          "doc_count" : 502
        }
      ]
    }
  }
}

bucket的排序方式就是复合key的自然序:先比较第一个,再比较第二个。

sources里可以指定多个key,不只是terms,也可以是histogram、date histogram、geotile grid。他们之间也可以混合。

虽然这里的terms看起来和terms agg类似,但是它并不完全支持terms agg里的参数,比如sort就不支持:

Although similar, the terms value source doesn’t support the same set of parameters as the terms aggregation.

分页

和search_after搜索一样,只要把上一个返回里after_key后面的bucket抄到after参数里就能获取下一页:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
GET witake_media/_search
{
  "size": 0,
  "aggs": {
    "my_buckets": {
      "composite": {
        "size": 3,
        "sources": [
          {
            "by_user": {
              "terms": {
                "field": "userId"
              }
            }
          },
          {
            "by_platform": {
              "terms": {
                "field": "platform"
              }
            }
          }
        ],
        "after": {
          "by_user": 105030,
          "by_platform": "YouTube"
        }
      }
    }
  }
}

range agg

定义数值区间,每个区间就是一个bucket,然后看文档的数值field能落在哪个bucket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET sales/_search
{
  "aggs": {
    "price_ranges": {
      "range": {
        "field": "price",
        "ranges": [
          { "to": 100.0 },
          { "from": 100.0, "to": 200.0 },
          { "from": 200.0 }
        ]
      }
    }
  }
}

filter agg

缩小进行聚合的文档范围,具体过滤的写法和query一样。所有在filter里的agg的文档范围都受该filter影响

terms agg里也能使用include/exclude对值做一些简单过滤。不过功能比filter agg弱多了。

1
2
3
4
5
6
7
8
9
10
11
12
POST /sales/_search?size=0&filter_path=aggregations
{
  "aggs": {
    "avg_price": { "avg": { "field": "price" } },
    "t_shirts": {
      "filter": { "term": { "type": "t-shirt" } },
      "aggs": {
        "avg_price": { "avg": { "field": "price" } }
      }
    }
  }
}

先做了一个agg,然后在它的基础上做了一些过滤,再做一个agg。如果想对所有的agg做filter,应该直接在搜索的时候直接把query写在开始,然后再agg。

问题在于,他为什么属于bucket agg?按照文档的说法,它是把所有符合条件的文档放在了一个bucket里,所以也算是bucket agg。

histogram agg

柱状图!我们需要定义:

  • 柱状图的柱子宽度(区间):interval
  • 柱子的偏移量:offset

相当于划分了不同的区间,落在这个区间的文档就能聚合到这个bucket上。判断文档落到哪个bucket,直接计算value属于哪个区间即可:

1
bucket_key = Math.floor((value - offset) / interval) * interval + offset

假设数据为非负数,则:

  • 如果interval=10,则区间为[0, 10), [10, 20), [20, 30)…
  • 如果interval=10, offset=4,则区间为[-6, 4), [4, 14), [14, 24)…

因为有值在0-4之间,所以第一个柱子是[-6, 4)

为什么我老想着histogram是stats……

还能定义一些其他属性:

  • min_doc_count:区间内至少有多少文档,否则就不显示了。默认为0,即所有区间都显示,如果bucket内没有文档,就显示doc_count=0;
  • missing:如果某文档这个field没有值,给它设置个默认值,然后再根据默认值计算它落到哪个bucket。如果不设置默认值,就忽略这个文档了

粉丝[0, 5000]的kol,每1000一个bucket,offset=50(分成6组),计算每一组的平均粉丝数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
GET stored_kol/_search
{
  "query": {
    "range": {
      "fan_num": {
        "gte": 0,
        "lte": 5000
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "by_fans": {
      "histogram": {
        "field": "fan_num",
        "interval": 1000,
        "offset": 50,
        "min_doc_count": 100,
        "missing": 0
      },
      "aggs": {
        "avg": {
          "avg": {
            "field": "fan_num"
          }
        }
      }
    }
  }
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
{
  "took" : 71,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_fans" : {
      "buckets" : [
        {
          "key" : -950.0,
          "doc_count" : 33865,
          "avg" : {
            "value" : 2.1592499630887345
          }
        },
        {
          "key" : 50.0,
          "doc_count" : 2020,
          "avg" : {
            "value" : 487.53712871287127
          }
        },
        {
          "key" : 1050.0,
          "doc_count" : 1431,
          "avg" : {
            "value" : 1516.522711390636
          }
        },
        {
          "key" : 2050.0,
          "doc_count" : 1398,
          "avg" : {
            "value" : 2514.0400572246067
          }
        },
        {
          "key" : 3050.0,
          "doc_count" : 1109,
          "avg" : {
            "value" : 3534.672678088368
          }
        },
        {
          "key" : 4050.0,
          "doc_count" : 900,
          "avg" : {
            "value" : 4499.082222222222
          }
        }
      ]
    }
  }
}

第一组显然是0-50粉的kol,应该很多人都显示0或1,导致均值只有2。除了这个,其他组都比较正常,平均粉丝基本在区间的中点处。

可以使用percentile agg看一下这个区间的粉丝分布:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET stored_kol/_search
{
  "query": {
    "range": {
      "fan_num": {
        "gte": 0,
        "lte": 50
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "by_fans": {
      "percentiles": {
        "field": "fan_num"
      }
    }
  }
}

非常惨淡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
  "took" : 342,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_fans" : {
      "values" : {
        "1.0" : 1.0,
        "5.0" : 2.0,
        "25.0" : 2.0,
        "50.0" : 2.0,
        "75.0" : 2.0,
        "95.0" : 2.0,
        "99.0" : 4.622222222222869
      }
    }
  }
}

date histogram agg

date histogram就是把日期作为分桶的依据,做柱状图,所以它就是一个特殊的以日期作为横坐标做的柱状图。

这个很常用,因为经常按照日期、按月聚合数据。

看看最近四个月的media发布数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
GET witake_media/_search
{
  "query": {
    "range": {
      "timestamp": {
        "gte": "2022-05-01",
        "lte": "2022-08-31", 
        "format": "yyyy-MM-dd",
        "time_zone": "+08:00"
      }
    }
  },
  "size": 0, 
  "aggs": {
    "by_month": {
      "date_histogram": {
        "field": "timestamp",
        "calendar_interval": "month",
        "time_zone": "+08:00" 
      }
    }
  }
}

按月分为四个区间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
{
  "took" : 744,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_month" : {
      "buckets" : [
        {
          "key_as_string" : "1651334400000",
          "key" : 1651334400000,
          "doc_count" : 18024891
        },
        {
          "key_as_string" : "1654012800000",
          "key" : 1654012800000,
          "doc_count" : 16496623
        },
        {
          "key_as_string" : "1656604800000",
          "key" : 1656604800000,
          "doc_count" : 16715213
        },
        {
          "key_as_string" : "1659283200000",
          "key" : 1659283200000,
          "doc_count" : 12135539
        }
      ]
    }
  }
}

如果聚合的时候不指定时区,默认会按照UTC聚合,而东八区5月1日0点是UTC的四月末16点,所以会聚合出5个月份:四月也会被聚合出来。

其他属性:

  • calendar_interval:能理解日历,Calendar-aware intervals understand that daylight savings changes the length of specific days, months have different amounts of days, and leap seconds can be tacked onto a particular year;
  • fixed_interval:不按人类的日历,纯按固定事件区间:fixed intervals are a fixed number of SI units and never deviate, regardless of where they fall on the calendar;
  • time_zone:不指定就按照UTC区间算时间;
  • offset比如+6h,按天聚合的时候,一天按照“六点到六点”算,而不是“零点到零点”;

聚合果然很有用啊,数据分布一下子就看出来了。

children agg - 先按父类聚合,再按子类聚合

适用于父子文档,将子文档聚合到父文档的类别下

比如,父文档是question,有tag字段;子文档是answer,有author字段。就可以按照父文档的tag聚合子文档,得到top tag下的answer的top author。比如看看spring-data-elasticsearch这个tag下回答问题最多的author是谁,一看是P.J.Mesh。

top 10问题标签下,每个标签对应的answer的top author:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POST <index>/_search?size=0
{
  "aggs": {
    "top-tags": {
      "terms": {
        "field": "tags",
        "size": 10
      },
      "aggs": {
        "to-answers": {
          "children": {
            "type" : "answer" 
          },
          "aggs": {
            "top-names": {
              "terms": {
                "field": "owner.display_name",
                "size": 10
              }
            }
          }
        }
      }
    }
  }
}

示例,聚合每个top tag下的top author:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
{
  "took": 25,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped" : 0,
    "failed": 0
  },
  "hits": {
    "total" : {
      "value": 3,
      "relation": "eq"
    },
    "max_score": null,
    "hits": []
  },
  "aggregations": {
    "top-tags": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "file-transfer",
          "doc_count": 1, 
          "to-answers": {
            "doc_count": 2, 
            "top-names": {
              "doc_count_error_upper_bound": 0,
              "sum_other_doc_count": 0,
              "buckets": [
                {
                  "key": "Sam",
                  "doc_count": 1
                },
                {
                  "key": "Troll",
                  "doc_count": 1
                }
              ]
            }
          }
        },
        {
          "key": "windows-server-2003",
          "doc_count": 1, 
          "to-answers": {
            "doc_count": 2, 
            "top-names": {
              "doc_count_error_upper_bound": 0,
              "sum_other_doc_count": 0,
              "buckets": [
                {
                  "key": "Sam",
                  "doc_count": 1
                },
                {
                  "key": "Troll",
                  "doc_count": 1
                }
              ]
            }
          }
        },
        {
          "key": "windows-server-2008",
          "doc_count": 1, 
          "to-answers": {
            "doc_count": 2, 
            "top-names": {
              "doc_count_error_upper_bound": 0,
              "sum_other_doc_count": 0,
              "buckets": [
                {
                  "key": "Sam",
                  "doc_count": 1
                },
                {
                  "key": "Troll",
                  "doc_count": 1
                }
              ]
            }
          }
        }
      ]
    }
  }
}

其他场景比如按照kol的国家聚合,再聚合media信息。可以查看每个kol国家的media局和信息。

parent agg - 先按子类聚合,再按父类聚合

反过来,寻找answer的top author下的top question tags。比如寻找P.J.Meish回答过的问题的top tag,一看是spring-data-elasticsearch。

metric aggs

bucket聚合分完组了,默认会对每个组的文档数做count统计。还可以对其他field做sum、avg、stats、percentile、cardinality等。

metric agg用起来就比bucket agg简单多了。

sum/avg/stats/percentile agg

这些都差不多,就放一起了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
GET stored_kol/_search
{
  "size": 0, 
  "aggs": {
    "all_fans": {
      "sum": {
        "field": "fan_num"
      }
    },
    "avg_fans": {
      "avg": {
        "field": "fan_num"
      }
    },
    "stat_fans": {
      "stats": {
        "field": "fan_num"
      }
    },
    "percentile_fans": {
      "percentiles": {
        "field": "fan_num"
      }
    }
  }
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
{
  "took" : 232,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "avg_fans" : {
      "value" : 440638.40846507257
    },
    "percentile_fans" : {
      "values" : {
        "1.0" : 2.0,
        "5.0" : 2.0,
        "25.0" : 18502.685834450735,
        "50.0" : 58744.01277738653,
        "75.0" : 220575.06404652845,
        "95.0" : 1511191.5571863481,
        "99.0" : 5635877.402374003
      }
    },
    "stat_fans" : {
      "count" : 546977,
      "min" : -15.0,
      "max" : 2.1345657688E10,
      "avg" : 440638.40846507257,
      "sum" : 2.41019074747E11
    },
    "all_fans" : {
      "value" : 2.41019074747E11
    }
  }
}

这个世界的中位数果然永远赶不上平均数啊┓( ´∀` )┏

value_count agg

数一数文档总数。

1
2
3
4
5
6
7
8
9
10
11
GET stored_kol/_search
{
  "size": 0, 
  "aggs": {
    "v_c": {
      "value_count": {
        "field": "fan_num"
      }
    }
  }
}

所以它和_count的返回值是一样的,都可以用来计数:

1
GET stored_kol/_count

cardinality agg

理论上,cardinality是value_count的去重版,但实际上它只是一个约数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET stored_kol/_search
{
  "size": 0, 
  "aggs": {
    "v_c": {
      "value_count": {
        "field": "user_id"
      }
    },
    "card": {
      "cardinality": {
        "field": "user_id"
      }
    }
  }
}

即便值本身是相同的,二者的值算出来也并不同(误差为0.24%):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
  "took" : 16,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "v_c" : {
      "value" : 546977
    },
    "card" : {
      "value" : 548292
    }
  }
}

因为当集合的cardinality太高的时候,把他们都扔到一个hash set里并返回size的内存代价太高了,所以es的cardinality用了hyperloglog++算法,用精度换内存占用

  1. cardinality小的时候,很精准;
  2. 可配置精准度,决定了内存占用。且内存占用是固定的,和数据量无关,只取决于设置的精度

Calculating the exact cardinality of a multiset requires an amount of memory proportional to the cardinality, which is impractical for very large data sets. Probabilistic cardinality estimators, such as the HyperLogLog algorithm, use significantly less memory than this, at the cost of obtaining only an approximation of the cardinality. The HyperLogLog algorithm is able to estimate cardinalities of > 10^9 with a typical accuracy (standard error) of 2%, using 1.5 kB of memory. HyperLogLog is an extension of the earlier LogLog algorithm, itself deriving from the 1984 Flajolet–Martin algorithm.

计算超过一亿的cardinality,只需要1.5KB内存……

如果精度为c,es的实现占用的内存为8c字节。

即使精准度只有100,无论数据集的势多大,误差也不会超过6%,所以效果还是挺不错的。

  • precision_threshold:默认为3000(24KB),最大为40000,即占用内存32w byte,大概320KB。

reddit实时计算一篇post的unique view就是用的hyperloglog

pipeline aggs - 后处理

pipeline agg主要理解两点:

  1. 操作的对象是其他聚合产出的桶,而不是原始文档
  2. 有逻辑上的先后顺序之分:其他agg先产生bucket,pipeline agg再进行后处理。

pipeline agg分为两类:

  • parent:A family of pipeline aggregations that is provided with the output of its parent aggregation and is able to compute new buckets or new aggregations to add to existing buckets
    • 操作父聚合所产生的桶的结果,产生的结果在当前桶下;
  • sibling:Pipeline aggregations that are provided with the output of a sibling aggregation and are able to compute a new aggregation which will be at the same level as the sibling aggregation
    • 操作子聚合的输出,变成一个输出结果,该结果平行于当前桶;

说起来比较抽象,直接看两个例子就行了。

derivative agg

第一个例子是parent pipeline agg。

求每月销售额的变化(求导):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST /sales/_search
{
  "size": 0,
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sales": {
          "sum": {
            "field": "price"
          }
        },
        "sales_deriv": {
          "derivative": {
            "buckets_path": "sales" 
          }
        }
      }
    }
  }
}

父pipeline agg操纵的是父bucket,所以和子聚合一样,写在内层。由于和子聚合同一层级,buckets_path直接用子聚合的名字就行。

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
   "took": 11,
   "timed_out": false,
   "_shards": ...,
   "hits": ...,
   "aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2015/01/01 00:00:00",
               "key": 1420070400000,
               "doc_count": 3,
               "sales": {
                  "value": 550.0
               } 
            },
            {
               "key_as_string": "2015/02/01 00:00:00",
               "key": 1422748800000,
               "doc_count": 2,
               "sales": {
                  "value": 60.0
               },
               "sales_deriv": {
                  "value": -490.0 
               }
            },
            {
               "key_as_string": "2015/03/01 00:00:00",
               "key": 1425168000000,
               "doc_count": 2, 
               "sales": {
                  "value": 375.0
               },
               "sales_deriv": {
                  "value": 315.0
               }
            }
         ]
      }
   }
}

结果也和子聚合一样,在桶的里面。

甚至还可以求二阶导:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
POST /sales/_search
{
  "size": 0,
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sales": {
          "sum": {
            "field": "price"
          }
        },
        "sales_deriv": {
          "derivative": {
            "buckets_path": "sales"
          }
        },
        "sales_2nd_deriv": {
          "derivative": {
            "buckets_path": "sales_deriv" 
          }
        }
      }
    }
  }
}

产生的结果同样在桶下,和子聚合一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
{
   "took": 50,
   "timed_out": false,
   "_shards": ...,
   "hits": ...,
   "aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2015/01/01 00:00:00",
               "key": 1420070400000,
               "doc_count": 3,
               "sales": {
                  "value": 550.0
               } 
            },
            {
               "key_as_string": "2015/02/01 00:00:00",
               "key": 1422748800000,
               "doc_count": 2,
               "sales": {
                  "value": 60.0
               },
               "sales_deriv": {
                  "value": -490.0
               } 
            },
            {
               "key_as_string": "2015/03/01 00:00:00",
               "key": 1425168000000,
               "doc_count": 2,
               "sales": {
                  "value": 375.0
               },
               "sales_deriv": {
                  "value": 315.0
               },
               "sales_2nd_deriv": {
                  "value": 805.0
               }
            }
         ]
      }
   }
}

avg_bucket agg

第二个例子是sibling pipeline agg。

求每个月销售额的均值。

不是求所有销售额的均值,是每个月先sum,再对所有月的结果求均值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POST _search
{
  "size": 0,
  "aggs": {
    "sales_per_month": {
      "date_histogram": {
        "field": "date",
        "calendar_interval": "month"
      },
      "aggs": {
        "sales": {
          "sum": {
            "field": "price"
          }
        }
      }
    },
    "avg_monthly_sales": {
      "avg_bucket": {
        "buckets_path": "sales_per_month>sales",
        "gap_policy": "skip",
        "format": "#,##0.00;(#,##0.00)"
      }        
    }
  }
}

子pipeline agg操纵的是子bucket,所以不和子聚合在同一层级,而是在它外面一层级,所以写在bucket同层级,高出子聚合一层级。由于不和子聚合同一层级,buckets_path得用层级路径才能访问到子聚合的名字。

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
  "took": 11,
  "timed_out": false,
  "_shards": ...,
  "hits": ...,
  "aggregations": {
    "sales_per_month": {
      "buckets": [
        {
          "key_as_string": "2015/01/01 00:00:00",
          "key": 1420070400000,
          "doc_count": 3,
          "sales": {
            "value": 550.0
          }
        },
        {
          "key_as_string": "2015/02/01 00:00:00",
          "key": 1422748800000,
          "doc_count": 2,
          "sales": {
            "value": 60.0
          }
        },
        {
          "key_as_string": "2015/03/01 00:00:00",
          "key": 1425168000000,
          "doc_count": 2,
          "sales": {
            "value": 375.0
          }
        }
      ]
    },
    "avg_monthly_sales": {
      "value": 328.33333333333333,
      "value_as_string": "328.33"
    }
  }
}

结果在子聚合的外面,所以不在桶里面,和桶同一层级。看起来仿佛是多了一个桶。

看完这两个例子就清晰了。下面再看一些其他常用的pipeline agg——

bucket_sort agg

bucket_sort agg用于对父agg产生的bucket进行排序,所以它是一个parent agg。但是注意一点:pipeline agg是不影响原有agg排序的,因为非pipeline agg先产生,它再做后处理。所以它排的是父agg已经产生出的bucket。如果父bucket是terms agg,size=10,那么bucket_sort agg也就是对这10个bucket进行(重)排序,而非对所有bucket进行排序。

The bucket_sort aggregation, like all pipeline aggregations, is executed after all other non-pipeline aggregations. This means the sorting only applies to whatever buckets are already returned from the parent aggregation. For example, if the parent aggregation is terms and its size is set to 10, the bucket_sort will only sort over those 10 returned term buckets.

比如我们把media按kol聚合(size=1000,聚合出1000个bucket),按照最大点赞量、平均评论量排序。最终返回top5 bucket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
GET witake_media/_search
{
  "size": 0,
  "aggs": {
    "by_kol": {
      "terms": {
        "field": "userId",
        "size": 1000
      },
      "aggs": {
        "max_likes": {
          "max": {
            "field": "likes",
            "missing": 0
          }
        },
        "mean_comment": {
          "avg": {
            "field": "comment",
            "missing": 0
          }
        },
        "sort_by_likes_then_comment": {
          "bucket_sort": {
            "sort": [
              {
                "max_likes": {
                  "order": "desc"
                }
              },
              {
                "mean_comment": {
                  "order": "desc"
                }
              }
            ],
            "from": 0,
            "size": 5
          }
        }
      }
    }
  }
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
{
  "took" : 12653,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_kol" : {
      "doc_count_error_upper_bound" : 43265,
      "sum_other_doc_count" : 897934248,
      "buckets" : [
        {
          "key" : 242272,
          "doc_count" : 23350,
          "max_likes" : {
            "value" : 4040338.0
          },
          "mean_comment" : {
            "value" : 121.99952890792291
          }
        },
        {
          "key" : 253858,
          "doc_count" : 16482,
          "max_likes" : {
            "value" : 2890153.0
          },
          "mean_comment" : {
            "value" : 80.16157019779153
          }
        },
        {
          "key" : 207286,
          "doc_count" : 16827,
          "max_likes" : {
            "value" : 1977516.0
          },
          "mean_comment" : {
            "value" : 155.56040886670232
          }
        },
        {
          "key" : 645094,
          "doc_count" : 15296,
          "max_likes" : {
            "value" : 1789505.0
          },
          "mean_comment" : {
            "value" : 10.627549686192468
          }
        },
        {
          "key" : 347188,
          "doc_count" : 14209,
          "max_likes" : {
            "value" : 989547.0
          },
          "mean_comment" : {
            "value" : 44.23182489971145
          }
        }
      ]
    }
  }
}

看起来很对的样子,真的对吗?不对!bucket sort是对获取的1000个bucket按最大点赞量进行排序。那么问题来了,这1000个bucket怎么来的?有没有可能,拥有点赞量最大的视频的那个kol不在这1000个bucket里?这是非常有可能的!

1000个bucket怎么来的?根据terms agg的默认排序规则,这1000个bucket其实是media数最多的1000个kol(count)。因为用的是terms agg,所以这1000个bucket默认按照_count的desc取的,也就是kol的media数的降序。这和max_likes完全没关系。所以这1000个bucket并不是max_likes最大的1000个bucket。我们只是“随便”取了1000个bucket,然后再按照最大点赞量排序,但是很可能拥有最大点赞量视频的那个kol根本就不在这1000个bucket里面。

世界第一都没来,你们搁这儿华山论剑排个什么劲……

所以用bucket sort排序是不一定对的:它只能排已有的bucket。如果已有的bucket不是全部的bucket,bucket sort就白干了。切记,它只是后处理!

真排序,还得靠terms的sort参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
GET witake_media/_search
{
  "size": 0,
  "aggs": {
    "by_user": {
      "terms": {
        "field": "userId",
        "size": 5,
        "order": {
          "max_likes": "desc"
        }
      },
      "aggs": {
        "max_likes": {
          "max": {
            "field": "likes"
          }
        }
      }
    }
  }
}

真正的点赞top的5个kol:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
{
  "took" : 10442,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_user" : {
      "doc_count_error_upper_bound" : -1,
      "sum_other_doc_count" : 139259610,
      "buckets" : [
        {
          "key" : 2654112,
          "doc_count" : 15,
          "max_likes" : {
            "value" : 3.04E7
          }
        },
        {
          "key" : 594674,
          "doc_count" : 2,
          "max_likes" : {
            "value" : 2.83E7
          }
        },
        {
          "key" : 596610,
          "doc_count" : 6,
          "max_likes" : {
            "value" : 2.59E7
          }
        },
        {
          "key" : 598052,
          "doc_count" : 76,
          "max_likes" : {
            "value" : 2.5E7
          }
        },
        {
          "key" : 4034590,
          "doc_count" : 60,
          "max_likes" : {
            "value" : 2.44E7
          }
        }
      ]
    }
  }
}

bucket selector agg

bucket selector和bucket sort类似,也是对父agg产生的bucket做后处理。它的处理方式是过滤,所以相当于MySQL group by后的HAVING操作

比如按照kol聚合,求视频的平均观看量,并只留下平均观看大于十万的kol:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
GET witake_media/_search
{
  "size": 0,
  "query": {
    "range": {
      "timestamp": {
        "gte": "now-30d"
      }
    }
  }, 
  "aggs": {
    "by_kol": {
      "terms": {
        "field": "userId",
        "size": 1000
      },
      "aggs": {
        "avg_view": {
          "avg": {
            "field": "view"
          }
        },
        "kol_bucket_filter": {
          "bucket_selector": {
            "buckets_path": {
              "avgViews": "avg_view"
            },
            "script": "params.avgViews > 100000"
          }
        }
      }
    }
  }
}

derivative agg

  • https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-pipeline-derivative-aggregation.html

Java aggregation

使用es的Java API写聚合表达式相对复杂一些:

  • https://elasticsearchjava-api.readthedocs.io/en/latest/aggregation.html
  • https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html

强烈推荐使用chatgpt根据json query生成Java代码,再做微调

一些场景

分组后求top hits的metrics

metrics aggregations有top hits aggregation支持分组后求每个组的top hits,但是不能再进一步子聚合查其metric了,因为它已经是metric aggregation,不是bucket aggregation,不能再有子聚合:

  • https://github.com/elastic/elasticsearch/issues/16537

但是只查top hits还是挺有用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
GET witake_media/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "userId": [
              "264188",
              "250934"
            ]
          }
        }
      ]
    }
  },
  "aggs": {
    "by_userId": {
      "terms": {
        "field": "userId"
      },
      "aggs": {
        "top3": {
          "top_hits": {
            "_source": ["view"], 
            "size": 3,
            "sort": [
              {
                "timestamp": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    }
  }
}

每个分组都能显示其top hits:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
{
  "took" : 20,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 83,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_userId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 250934,
          "doc_count" : 70,
          "top3" : {
            "hits" : {
              "total" : {
                "value" : 70,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "witake_media_v5",
                  "_type" : "_doc",
                  "_id" : "250934-WAPKPDdu8uA",
                  "_score" : null,
                  "_routing" : "250934",
                  "_source" : {
                    "view" : 49
                  },
                  "sort" : [
                    1662060997000
                  ]
                },
                {
                  "_index" : "witake_media_v5",
                  "_type" : "_doc",
                  "_id" : "250934-dlBcPZ57yzM",
                  "_score" : null,
                  "_routing" : "250934",
                  "_source" : {
                    "view" : 26
                  },
                  "sort" : [
                    1657888253000
                  ]
                },
                {
                  "_index" : "witake_media_v5",
                  "_type" : "_doc",
                  "_id" : "250934-K27AVFiJi0Y",
                  "_score" : null,
                  "_routing" : "250934",
                  "_source" : {
                    "view" : 208
                  },
                  "sort" : [
                    1657869004000
                  ]
                }
              ]
            }
          }
        },
        {
          "key" : 264188,
          "doc_count" : 13,
          "top3" : {
            "hits" : {
              "total" : {
                "value" : 13,
                "relation" : "eq"
              },
              "max_score" : null,
              "hits" : [
                {
                  "_index" : "witake_media_v5",
                  "_type" : "_doc",
                  "_id" : "264188-164BvSzRuxg",
                  "_score" : null,
                  "_routing" : "264188",
                  "_source" : {
                    "view" : 21
                  },
                  "sort" : [
                    1658178252000
                  ]
                },
                {
                  "_index" : "witake_media_v5",
                  "_type" : "_doc",
                  "_id" : "264188-4eOTDSVal6c",
                  "_score" : null,
                  "_routing" : "264188",
                  "_source" : {
                    "view" : 14
                  },
                  "sort" : [
                    1651920297000
                  ]
                },
                {
                  "_index" : "witake_media_v5",
                  "_type" : "_doc",
                  "_id" : "264188-ZpJRIZROH8A",
                  "_score" : null,
                  "_routing" : "264188",
                  "_source" : {
                    "view" : 6
                  },
                  "sort" : [
                    1651919311000
                  ]
                }
              ]
            }
          }
        }
      ]
    }
  }
}

想进一步求top hits的metrics,只能从client端自己算了。

但是有一个很苟的trick——

存在这么一个pipeline aggregation:average bucket aggregation。作为pipeline aggregations,它本来是处理其他bucket的agg结果的。

比如使用terms可以很容易求每个组的均值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
GET witake_media/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "userId": [
              "264188",
              "250934"
            ]
          }
        }
      ]
    }
  },
  "aggs": {
    "by_user": {
      "terms": {
        "field": "userId"
      },
      "aggs": {
        "avg_view": {
          "avg": {
            "field": "view"
          }
        }
      }
    }
  }
}

两个均值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 83,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 250934,
          "doc_count" : 70,
          "avg_view" : {
            "value" : 22631.957142857143
          }
        },
        {
          "key" : 264188,
          "doc_count" : 13,
          "avg_view" : {
            "value" : 52704.61538461538
          }
        }
      ]
    }
  }
}

average bucket agg可以给这两个组再做一次平均:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
GET witake_media/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "userId": [
              "264188",
              "250934"
            ]
          }
        }
      ]
    }
  },
  "aggs": {
    "by_user": {
      "terms": {
        "field": "userId"
      },
      "aggs": {
        "avg_view": {
          "avg": {
            "field": "view"
          }
        }
      }
    },
    "avg_view_of_user": {
      "avg_bucket": {
        "buckets_path": "by_user>avg_view"
      }
    }
  }
}

求出来就是每个组的均值的均值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 83,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 250934,
          "doc_count" : 70,
          "avg_view" : {
            "value" : 22631.957142857143
          }
        },
        {
          "key" : 264188,
          "doc_count" : 13,
          "avg_view" : {
            "value" : 52704.61538461538
          }
        }
      ]
    },
    "avg_view_of_user" : {
      "value" : 37668.28626373626
    }
  }
}

当然均值的均值听起来比较绕,换个查询:可以求每个组的最大值的均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
GET witake_media/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "userId": [
              "264188",
              "250934"
            ]
          }
        }
      ]
    }
  },
  "aggs": {
    "by_user": {
      "terms": {
        "field": "userId"
      },
      "aggs": {
        "max_view": {
          "max": {
            "field": "view"
          }
        }
      }
    },
    "avg_view_of_user_max_view": {
      "avg_bucket": {
        "buckets_path": "by_user>max_view"
      }
    }
  }
}

它是sibling pipeline aggregation,也就是说它处理的是和自己同级的聚合。它能把自己的sibling聚合的子聚合的结果收集起来,做个平均。说句白话,就是能把自己兄弟的孩子们平均一下。

既然它是求平均,所以还有能在自己的兄弟的孩子们中取个最大的取个最小的给所有熊孩子求个和求个统计

sibling pipeline agg,所以它放在要搞的bucket的同层

现在再看bucket_sort也好理解多了:它是一个parent pipeline agg。也就是说它搞的是自己父辈的子结果,给他们排个序。所以它要放在要搞的bucket下层。它处理完之后,同层的bucket还是存在的。但是sibling pipeline agg搞完之后,同层聚合的子聚合就被它搞没了

现在比较苟的方法来了:

  1. 按照user分组,每个user的所有media一组;
  2. 按照某个唯一的属性搞子聚合,子聚合后,每个组一个hit。比如按照timestamp聚合,并排序,限制一下size,就可以求最新的top3 hits;
  3. 每个人的top3自成一组(每个人有3个孩子),现在就可以使用avg bucket agg,和user放在同级,user就是它的sibling。然后它可以求出自己sibling的孩子的均值,也就是每个人的top3 hits的均值;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
GET witake_media/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "userId": [
              "264188",
              "250934"
            ]
          }
        }
      ]
    }
  },
  "aggs": {
    "by_user": {
      "terms": {
        "field": "userId"
      },
      "aggs": {
        "by_ts": {
          "terms": {
            "field": "timestamp",
            "size": 3,
            "order": {
              "_key": "desc"
            }
          },
          "aggs": {
            "avg_view": {
              "avg": {
                "field": "view"
              }
            }
          }
        },
        "avg_view_of_top3_hits": {
          "avg_bucket": {
            "buckets_path": "by_ts>avg_view"
          }
        }
      }
    }
  }
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 15,
    "successful" : 15,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 83,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "by_user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : 250934,
          "doc_count" : 70,
          "by_ts" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 67,
            "buckets" : [
              {
                "key" : 1662060997000,
                "key_as_string" : "1662060997000",
                "doc_count" : 1,
                "avg_view" : {
                  "value" : 49.0
                }
              },
              {
                "key" : 1657888253000,
                "key_as_string" : "1657888253000",
                "doc_count" : 1,
                "avg_view" : {
                  "value" : 26.0
                }
              },
              {
                "key" : 1657869004000,
                "key_as_string" : "1657869004000",
                "doc_count" : 1,
                "avg_view" : {
                  "value" : 208.0
                }
              }
            ]
          },
          "avg_view_of_top3_hits" : {
            "value" : 94.33333333333333
          }
        },
        {
          "key" : 264188,
          "doc_count" : 13,
          "by_ts" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 10,
            "buckets" : [
              {
                "key" : 1658178252000,
                "key_as_string" : "1658178252000",
                "doc_count" : 1,
                "avg_view" : {
                  "value" : 21.0
                }
              },
              {
                "key" : 1651920297000,
                "key_as_string" : "1651920297000",
                "doc_count" : 1,
                "avg_view" : {
                  "value" : 14.0
                }
              },
              {
                "key" : 1651919311000,
                "key_as_string" : "1651919311000",
                "doc_count" : 1,
                "avg_view" : {
                  "value" : 6.0
                }
              }
            ]
          },
          "avg_view_of_top3_hits" : {
            "value" : 13.666666666666666
          }
        }
      ]
    }
  }
}

这个脑回路就很绕……而且如果子聚合并不能让media自成一组,数据就出错了,top3子bucket代表的就不再是3个doc了,而是大于3个doc,最终avg出来的就不是top3的avg了。

但无论如何,这个场景是学习理解pipeline agg的一个不错的例子。

avg bucket agg主要注意两点:

  1. 放置位置:记住它是sibling agg,就知道位置了;
  2. 既然要搞sibling的children,就要使用buckets_path深入下一层级。层级和子聚合的层级是一致的,使用>表示。为啥不用.?估计因为elasticsearch的field本身就能用.,可能有某些冲突吧;

感想

分布式系统做聚合,问题还是挺多的,有很多难以逾越的限制。本来以为聚合没啥的……还是年轻了……

本文由作者按照 CC BY 4.0 进行授权