- Flink SQL 對于批處理(Batch)和流處理(streaming)模式的SQL,都支持 Top-N 查詢。
- Top-N 查詢可以根據指定列排序后獲得前 N 個最小或最大值。并且該結果集還可用于進一步分析。
- Flink 使用 OVER 窗口子句和過濾條件的組合來表達一個 Top-N 查詢。借助 OVER 窗口的 PARTITION BY 子句能力,Flink 也能支持分組 Top-N。
1、SQL 語法標準:
SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name)
WHERE rownum <= N [AND conditions]
2、參數說明:
- ROW_NUMBER():根據分區數據的排序,為每一行分配一個唯一且連續的序號,從 1 開始。目前,只支持 ROW_NUMBER 作為 OVER 窗口函數。未來會支持 RANK() 和 DENSE_RANK()。
- PARTITION BY col1[, col2…]:指定分區字段。每個分區都會有一個 Top-N 的結果。
- ORDER BY col1 [asc|desc][, col2 [asc|desc]…]: 指定排序列。 每個列的排序類型(ASC/DESC)可以不同。
- WHERE rownum <= N: Flink 需要 rownum <= N 才能識別此查詢是 Top-N 查詢。 N 表示將要保留 N 個最大或最小數據。
- [AND conditions]: 可以在 WHERE 子句中添加其他條件,但是這些其他條件和 rownum <= N 需要使用 AND 結合。
3、注意事項:
輸出的位置必須支持更新,比如mysql、clickhouse等關系型數據庫,print等
不可以輸出到文件系統,否則會報錯誤:
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.sink_order' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[dataType], orderBy=[id ASC], select=[date, source_length])