{
"took": 53,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"hits": {
"total": 6,
"max_score": 1.0,
"hits": [{
"_index": "db",
"_type": "users",
"_id": "AVOiyjHmzUObmc5euUGS",
"_score": 1.0,
"_source": {
"user": "james",
"lastvisited": "2016/01/20 02:03:11",
"browser": "chrome",
"offercode": "JB20"
}
}, {
"_index": "db",
"_type": "users",
"_id": "AVOiyjIQzUObmc5euUGT",
"_score": 1.0,
"_source": {
"user": "james",
"lastvisited": "2016/01/20 03:04:15",
"browser": "firefox",
"offercode": "JB20,JB50"
}
}, {
"_index": "db",
"_type": "users",
"_id": "AVOiyjIlzUObmc5euUGU",
"_score": 1.0,
"_source": {
"user": "james",
"lastvisited": "2016/01/21 00:15:21",
"browser": "chrome",
"offercode": "JB20,JB50,JB100"
}
}, {
"_index": "db",
"_type": "users",
"_id": "AVOiyjJKzUObmc5euUGW",
"_score": 1.0,
"_source": {
"user": "peter",
"lastvisited": "2016/01/20 02:32:22",
"browser": "chrome",
"offercode": "JB20,JB50,JB100"
}
}, {
"_index": "db",
"_type": "users",
"_id": "AVOiy4jhzUObmc5euUGX",
"_score": 1.0,
"_source": {
"user": "james",
"lastvisited": "2016/01/19 02:03:11",
"browser": "chrome",
"offercode": ""
}
}, {
"_index": "db",
"_type": "users",
"_id": "AVOiyjI2zUObmc5euUGV",
"_score": 1.0,
"_source": {
"user": "adams",
"lastvisited": "2016/01/20 00:12:11",
"browser": "chrome",
"offercode": "JB10"
}
}]
}
}
我想根据用户的上次访问时间过滤出文档,并获取单个用户的最新访问文档,然后根据报价代码对所有过滤后的文档进行分组。
我通过执行tophits聚合来获取用户最近访问的文档。但是,我无法使用offercode对tophits聚合的结果进行分组。
ES查询以获取用户的最新文档
curl -XGET localhost:9200/account/users/_search?pretty -d'{
"size": "0",
"query": {
"bool": {
"must": {
"range": {
"lastvisited": {
"gte": "2016/01/19",
"lte": "2016/01/21"
}
}
}
}
},
"aggs": {
"lastvisited_users": {
"terms": {
"field": "user"
}
,
"aggs": {
"top_user_hits": {
"top_hits": {
"sort": [
{
"lastvisited": {
"order": "desc"
}
}
],
"_source": {
"include": [
"user","offercode","lastvisited"
]
},
"size": 1
}
}
}
}
}}'
ES输出
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 6,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"lastvisited_users" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [ {
"key" : "james",
"doc_count" : 3,
"top_user_hits" : {
"hits" : {
"total" : 3,
"max_score" : null,
"hits" : [ {
"_index" : "accounts",
"_type" : "users",
"_id" : "AVOtexIEz1WBU8vnnZ2d",
"_score" : null,
"_source" : {
"lastvisited" : "2016/01/20 03:04:15",
"offercode" : "JB20,JB50",
"user" : "james"
},
"sort" : [ 1453259055000 ]
} ]
}
}
}, {
"key" : "adams",
"doc_count" : 1,
"top_user_hits" : {
"hits" : {
"total" : 1,
"max_score" : null,
"hits" : [ {
"_index" : "accounts",
"_type" : "users",
"_id" : "AVOtexJMz1WBU8vnnZ2h",
"_score" : null,
"_source" : {
"lastvisited" : "2016/01/20 00:12:11",
"offercode" : "JB10",
"user" : "adams"
},
"sort" : [ 1453248731000 ]
} ]
}
}
}, {
"key" : "adamsnew",
"doc_count" : 1,
"top_user_hits" : {
"hits" : {
"total" : 1,
"max_score" : null,
"hits" : [ {
"_index" : "accounts",
"_type" : "users",
"_id" : "AVOtexJhz1WBU8vnnZ2i",
"_score" : null,
"_source" : {
"lastvisited" : "2016/01/20 00:12:11",
"offercode" : "JB1010,aka10",
"user" : "adamsnew"
},
"sort" : [ 1453248731000 ]
} ]
}
}
}, {
"key" : "peter",
"doc_count" : 1,
"top_user_hits" : {
"hits" : {
"total" : 1,
"max_score" : null,
"hits" : [ {
"_index" : "accounts",
"_type" : "users",
"_id" : "AVOtexIoz1WBU8vnnZ2f",
"_score" : null,
"_source" : {
"lastvisited" : "2016/01/20 02:32:22",
"offercode" : "JB20,JB50,JB100",
"user" : "peter"
},
"sort" : [ 1453257142000 ]
} ]
}
}
} ]
}
}
}
现在,我想汇总tophits汇总的结果。
预期输出
{
"offercode_grouped": {
"JB20": 1,
"JB10": 1,
"JB20,JB50": 1,
"JB20,JB50,JB100": 2,
"":1
}
}
我尝试使用管道聚合,但是我不知道如何对tophits聚合的结果进行分组。
最佳答案
希望我能正确理解您的问题。我想我发现了一些棘手的“解决方案”。
它是 function_score query
, sampler aggregation
和 terms aggregation
的组合。
建立新索引
curl -s -XPUT "http://127.0.0.1:9200/stackoverflow" -d'
{
"mappings": {
"document": {
"properties": {
"name": {
"type": "string",
"index": "not_analyzed"
},
"lastvisited": {
"type": "date",
"format": "YYYY/MM/dd HH:mm:ss"
},
"browser": {
"type": "string",
"index": "not_analyzed"
},
"offercode": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}'
索引文件
curl -s -XPUT "http://127.0.0.1:9200/stackoverflow/document/1?routing=james" -d'
{
"user": "james",
"lastvisited": "2016/01/20 02:03:11",
"browser": "chrome",
"offercode": "JB20"
}'
curl -s -XPUT "http://127.0.0.1:9200/stackoverflow/document/2?routing=james" -d'
{
"user": "james",
"lastvisited": "2016/01/20 03:04:15",
"browser": "firefox",
"offercode": "JB20,JB50"
}'
curl -s -XPUT "http://127.0.0.1:9200/stackoverflow/document/3?routing=james" -d'
{
"user": "james",
"lastvisited": "2016/01/21 00:15:21",
"browser": "chrome",
"offercode": "JB20,JB50,JB100"
}'
curl -s -XPUT "http://127.0.0.1:9200/stackoverflow/document/4?routing=peter" -d'
{
"user": "peter",
"lastvisited": "2016/01/20 02:32:22",
"browser": "chrome",
"offercode": "JB20,JB50,JB100"
}'
curl -s -XPUT "http://127.0.0.1:9200/stackoverflow/document/5?routing=james" -d'
{
"user": "james",
"lastvisited": "2016/01/19 02:03:11",
"browser": "chrome",
"offercode": ""
}'
curl -s -XPUT "http://127.0.0.1:9200/stackoverflow/document/6?routing=adams" -d'
{
"user": "adams",
"lastvisited": "2016/01/20 00:12:11",
"browser": "chrome",
"offercode": "JB10"
}'
获取汇总
curl -XPOST "http://127.0.0.1:9200/stackoverflow/_search" -d'
{
"query": {
"function_score": {
"boost_mode": "replace", // we need to replace document score with the result of the functions
"query": {
"bool": {
"filter": [
{
"range": { // get documents within the date range
"lastvisited": {
"gte": "2016/01/19 00:00:00",
"lte": "2016/01/21 23:59:59"
}
}
}
]
}
},
"functions": [
{
"linear": {
"lastvisited": {
"origin": "2016/01/21 23:59:59", // same as lastvisited lte filter
"scale": "2d" // set the scale - please, see elasticsearch docs for more info https://www.elastic.co/guide/en/elasticsearch/reference/2.3/query-dsl-function-score-query.html#function-decay
}
}
}
]
}
},
"aggs": {
"user": {
"sampler": { // get top scored document per user
"field": "user",
"max_docs_per_value": 1
},
"aggs": {
"offers": { // aggregate user documents per `offercode`
"terms": {
"field": "offercode"
}
}
}
}
},
"size": 0
}'
响应
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 6,
"max_score": 0,
"hits": []
},
"aggregations": {
"user": {
"doc_count": 3,
"offers": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "JB20,JB50,JB100",
"doc_count": 2
},
{
"key": "JB10",
"doc_count": 1
}
]
}
}
}
}
除非每个索引只有一个分片,否则在对数据编制索引时需要指定
routing
。这是因为sampler
聚合是根据每个片段计算的。因此,我们需要确保特定用户的所有数据都位于同一分片中-以获得每位用户得分最高的一个文档。Sampler
聚合按分数返回文档。这就是为什么我们需要修改文档的分数。 function_score query
在这里可以提供帮助。使用field_value_factor
,分数只是上次访问的时间戳-因此,访问越近,得分就越高。更新:对于
field_value_factor
,_score
的准确性可能存在问题。有关更多信息,请参见issue https://github.com/elastic/elasticsearch/issues/11872。这就是为什么decay
函数用作问题中建议的clintongormley的原因。因为decay function对origin
都适用。这意味着比origin
大1天和比1天小1天的文档可以接收相同的_score
。这就是为什么我们需要过滤出较新的文档(请参阅查询中的范围过滤器)。注意:我仅使用示例中显示的数据尝试了此查询,因此需要更大的数据集来测试查询。但是我认为它应该起作用...