聚合用于分析查詢結果集的統計指標,我們以觀看日志分析為例,介紹各種常用的ElasticSearch聚合操作。
目錄:
- 查詢用戶觀看視頻數和觀看時長
- 聚合分頁器
- 查詢視頻uv
- 單個視頻uv
- 批量查詢視頻uv
- Having查詢
- 根據 count 進行過濾
- 根據其它指標進行過濾
首先展示一下我們要分析的文檔結構:
{"video_id": 1289643545120062253, // 視頻id"video_uid": 3931482202390368051, // 視頻發布者id"uid": 47381776787453866, // 觀看用戶id"time": 1533891263224, // 時間發生時間"watch_duration": 30 // 觀看時長
}
每個文檔記錄了一個觀看事件,我們通過聚合分析用戶的觀看行為。
ElasticSearch引入了兩個相關概念:
- 桶(Buckets): 滿足特定條件的文檔的集合
- 指標(Metrics): 桶中文檔的統計值,如特定字段的平均值
查詢用戶觀看視頻數和觀看時長
首先用sql語句描述這個查詢:
SELECT uid, count(*) as view_count
FROM view_log
WHERE time >= #{since} AND time <= #{to}
GROUP BY uid;
ES 查詢:
GET /view_log/_search
{"size" : 0,"query": {"range": {"time": {"gte": 0, // since"lte": 0 // to}}},"aggs": {"agg": { // agg為聚合的名稱"terms": { // 聚合的條件為 uid 相同"field": "uid"}}}
}
response:
{"took": 10,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 100000,"max_score": 0,"hits": []},"aggregations": {"agg": {"buckets": [{"key": 21836334489858688,"doc_count": 4026},{"key": 31489302390368051,"doc_count": 2717}]}
}
result.aggregations.agg.buckets列表中包含了查詢的結果。
因為我們按照terms:uid進行聚合,每個bucket為uid相同的文檔集合,key字段即為uid。
doc_count 字段表明bucket中文檔的數目即sql語句中的count(*) as view_count
。
我們可以為查詢添加額外的統計指標, sql描述:
SELECT uid, count(*) as view_count, avg(watch_duration) as avg_duration
FROM view_log
WHERE time >= #{since} AND time <= #{to}
GROUP BY uid;
ES 查詢:
GET /view_log/_search
{"size" : 0,"query": {"range": {"time": {"gte": 0, // since"lte": 0 // to}}},"aggs": {"agg": { // agg為聚合的名稱"terms": { // 聚合的條件為 uid 相同"field": "uid"},"aggs": { // 添加統計指標(Metrics)"avg_duration": { "avg": { // 統計 watch_duration 的平均值"field": "watch_duration" }}}}}
}
response:
{"took": 10,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 100000,"max_score": 0,"hits": []},"aggregations": {"agg": {"buckets": [{"key": 21836334489858688,"doc_count": 4026,"avg_duration": {"value": 12778.882352941177}},{"key": 31489302390368051,"doc_count": 2717,"avg_duration": {"value": 2652.5714285714284}}]}
}
avg_duration.value 表示 watch_duration 的平均值即該用戶的平均觀看時長。
聚合分頁器
在實際應用中用戶的數量非常驚人, 不可能通過一次查詢得到全部結果因此我們需要分頁器分批取回:
GET /view_log/_search
{"size" : 0,"query": {"range": {"time": {"gte": 0, // since"lte": 0 // to}}},"aggs": {"agg": { "terms": { "field": "uid","size": 10000, // bucket 的最大個數"include": { // 將聚合結果分為10頁,序號為[0,9], 取第一頁"partition": 0,"num_partitions": 10 }},"aggs": { "avg_duration": { "avg": { "field": "watch_duration" }}}}}
}
上述查詢與上節的查詢幾乎完全相同,只是在aggs.agg.terms字段中添加了include字段進行分頁。
查詢視頻uv
單個視頻uv
uv是指觀看一個視頻的用戶數(unique visit),與此相對沒有按照用戶去重的觀看數稱為pv(page visit)。
用SQL語句來描述:
SELECT video_id, count(*) as pv, count(distinct uid) as uv
FROM view_log
WHERE video_id = #{video_id};
ElasticSearch可以方便的進行count(distinct)查詢:
GET /view_log/_search
{"aggs": {"uv": {"cardinality": {"field": "uid"}}}
}
response:
{"took": 255,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 17579,"max_score": 0,"hits": []},"aggregations": {"uv": {"value": 11}}
}
批量查詢視頻uv
ElasticSearch也可以批量查詢count(distinct), 先用SQL進行描述:
SELECT video_id, count(*) as pv, count(distinct uid) as uv
FROM view_log
GROUP BY video_id;
查詢:
GET /view_log/_search
{"size": 0,"aggs": {"video": {"terms": {"field": "video_id"},"aggs": {"uv": {"cardinality": {"field": "uid"}}}}}
}
response:
{"took": 313,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 16940,"max_score": 0,"hits": []},"aggregations": {"video": {"buckets": [{"key": 25417499722062, // 視頻id"doc_count": 427, // 視頻觀看次數 pv"uv": {"value": 124 // 觀看視頻的用戶數 uv}},{"key": 72446898144,"doc_count": 744,"uv": {"value":233}}]}}
}
Having查詢
SQL可以使用HAVING語句根據聚合結果進行過濾,ElasticSearch可以使用pipeline aggregations達到此效果不過語法較為繁瑣。
根據 count 進行過濾
使用SQL查詢觀看超過200次的視頻:
SELECT video_id, count(*) as view_count
FROM view_log
GROUP BY video_id
HAVING count(*) > 200;
GET /view_log/_search
{"size": 0,"aggs": {"view_count": {"terms": {"field": "video_id"},"aggs": {"having": {"bucket_selector": {"buckets_path": { // 選擇 view_count 聚合的 doc_count 進行過濾"view_count": "_count"},"script": {"source": "params.view_count > 200"}}}}}}
}
response:
{"took": 83,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 775,"max_score": 0,"hits": []},"aggregations": {"view_count": {"buckets": [{"key": 35025417499764062,"doc_count": 529},{"key": 19913672446898144,"doc_count": 759}]}}
}
ElasticSearch實現類似HAVING查詢的關鍵在于使用bucket_selector選擇聚合結果進行過濾。
根據其它指標進行過濾
接下來我們嘗試查詢平均觀看時長大于5分鐘的視頻, 用SQL描述該查詢:
SELECT video_id FROM view_log
GROUP BY video_id
HAVING avg(watch_duration) > 300;
GET /view_log/_search
{"size": 0,"aggs": {"video": {"terms": {"field": "video_id"},"aggs": {"avg_duration": {"avg": {"field": "watch_duration"} },"avg_duration_filter": {"bucket_selector": {"buckets_path": {"avg_duration": "avg_duration"},"script": {"source": "params.avg_duration > 200"}} }}}}
}
response:
{"took": 137,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 255,"max_score": 0,"hits": []},"aggregations": {"video": {"buckets": [{"key": 5417499764062,"doc_count": 91576,"avg_duration": {"value": 103}},{"key": 19913672446898144,"doc_count": 15771,"avg_duration": {"value": 197}}]}}
}