【大數據】Flink SQL 語法篇(九):Window TopN、Deduplication

Flink SQL 語法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 語法篇(一):CREATE
  • Flink SQL 語法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 語法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 語法篇(四):Group 聚合、Over 聚合
  • Flink SQL 語法篇(五):Regular Join、Interval Join
  • Flink SQL 語法篇(六):Temporal Join
  • Flink SQL 語法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 語法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 語法篇(九):Window TopN、Deduplication
  • Flink SQL 語法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您覺得這篇文章有用 ?? 的話,請給博主一個一鍵三連 🚀🚀🚀 吧 (點贊 🧡、關注 💛、收藏 💚)!!!您的支持 💖💖💖 將激勵 🔥 博主輸出更多優質內容!!!

Flink SQL 語法篇(九):Window TopN、Deduplication

  • 1.Window TopN
  • 2.Deduplication
    • 2.1 案例 1(事件時間)
    • 2.2 案例 2(處理時間)

1.Window TopN

Window TopN 定義(支持 Streaming):Window TopN 是一種特殊的 TopN,它的返回結果是每一個窗口內的 N 個最小值或者最大值。

應用場景:小伙伴萌會問了,我有了 TopN 為啥還需要 Window TopN 呢?還記得上一篇博客介紹 TopN 說道的 TopN 時會出現中間結果,從而出現回撤數據的嘛?Window TopN 不會出現回撤數據,因為 Window TopN 實現是在窗口結束時輸出最終結果,不會產生中間結果。而且注意,因為是窗口上面的操作,Window TopN 在窗口結束時,會自動把 State 給清除。

SQL 語法標準:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) -- windowing TVF
WHERE rownum <= N [AND conditions]

實際案例:取當前這一分鐘的搜索關鍵詞下的搜索熱度前 10 名的詞條數據。

-- 輸入表字段:
-- 字段名         備注
-- key              搜索關鍵詞
-- name             搜索熱度名稱
-- search_cnt       熱搜消費熱度(比如 3000)
-- timestamp        消費詞條時間戳CREATE TABLE source_table (name BIGINT NOT NULL,search_cnt BIGINT NOT NULL,key BIGINT NOT NULL,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time
) WITH (...
);-- 輸出表字段:
-- 字段名         備注
-- key              搜索關鍵詞
-- name             搜索熱度名稱
-- search_cnt       熱搜消費熱度(比如 3000)
-- window_start     窗口開始時間戳
-- window_end       窗口結束時間戳CREATE TABLE sink_table (key BIGINT,name BIGINT,search_cnt BIGINT,window_start TIMESTAMP(3),window_end TIMESTAMP(3)
) WITH (...
);-- 處理 sql:INSERT INTO sink_table
SELECT key, name, search_cnt, window_start, window_end
FROM (SELECT key, name, search_cnt, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, keyORDER BY search_cnt desc) AS rownumFROM (SELECT window_start, window_end, key, name, max(search_cnt) as search_cnt-- window tvf 寫法FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES))GROUP BY window_start, window_end, key, name)
)
WHERE rownum <= 100

輸出結果:

+I[關鍵詞1, 詞條1, 8670, 2021-1-28T22:34, 2021-1-28T22:35]
+I[關鍵詞1, 詞條2, 6928, 2021-1-28T22:34, 2021-1-28T22:35]
+I[關鍵詞1, 詞條3, 1735, 2021-1-28T22:34, 2021-1-28T22:35]
+I[關鍵詞1, 詞條4, 7287, 2021-1-28T22:34, 2021-1-28T22:35]
...

SQL 語義:

  • 數據源:數據源即最新的詞條下面的搜索詞的搜索熱度數據,消費到 Kafka 中數據后,將數據按照窗口聚合的 Key 通過 Hash 分發策略發送到下游窗口聚合算子。
  • 窗口聚合算子:進行窗口聚合計算,隨著時間的推進,將窗口聚合結果計算完成發往下游窗口排序算子。
  • 窗口排序算子:這個算子其實也是一個窗口算子,只不過這個窗口算子為每個 Key 維護了一個 TopN 的榜單數據,接受到上游發送的窗口結果數據進行排序,隨著時間的推進,窗口的結束,將排序的結果輸出到下游數據匯算子。
  • 數據匯:接收到上游的數據之后,然后輸出到外部存儲引擎中。

2.Deduplication

Deduplication 定義(支持 Batch / Streaming):Deduplication 其實就是去重,也即上文介紹到的 TopN 中 row_number = 1 的場景,但是這里有一點不一樣在于其 排序字段 一定是 時間屬性列,不能是其他非時間屬性的普通列。在 row_number = 1 時,如果排序字段是普通列 Planner 會翻譯成 TopN 算子,如果是時間屬性列 Planner 會翻譯成 Deduplication,這兩者最終的執行算子是不一樣的,Deduplication 相比 TopN 算子專門做了對應的優化,性能會有很大提升。

應用場景:比如上游數據發重了,或者計算 DAU 明細數據等場景,都可以使用 Deduplication 語法去做去重。

SQL 語法標準:

SELECT [column_list]
FROM (SELECT [column_list],ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]ORDER BY time_attr [asc|desc]) AS rownumFROM table_name)
WHERE rownum = 1
  • ROW_NUMBER():標識當前數據的排序值。
  • PARTITION BY col1[, col2...]:標識分區字段,代表按照這個 col 字段作為分區粒度對數據進行排序。
  • ORDER BY time_attr [asc|desc]:標識排序規則,必須為時間戳列,當前 Flink SQL 支持處理時間、事件時間,ASC 代表保留第一行,DESC 代表保留最后一行。
  • WHERE rownum = 1:這個子句是一定需要的,而且必須為 rownum = 1

2.1 案例 1(事件時間)

某一游戲用戶等級的場景,每一個用戶都有一個用戶等級,需要求出當前用戶等級在 星星?,月亮🌙,太陽🌞 的用戶數分別有多少。

-- 數據源:當每一個用戶的等級初始化及后續變化的時候的數據,即用戶等級變化明細數據。
CREATE TABLE source_table (user_id BIGINT COMMENT '用戶 id',level STRING COMMENT '用戶等級',row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件時間戳',WATERMARK FOR row_time AS row_time
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.level.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '1000000'
);-- 數據匯:輸出即每一個等級的用戶數
CREATE TABLE sink_table (level STRING COMMENT '等級',uv BIGINT COMMENT '當前等級用戶數',row_time timestamp(3) COMMENT '時間戳'
) WITH ('connector' = 'print'
);-- 處理邏輯:
INSERT INTO sink_table
select level, count(1) as uv, max(row_time) as row_time
from (SELECTuser_id,level,row_time,row_number() over(partition by user_id order by row_time) as rnFROM source_table
)
where rn = 1
group by level

輸出結果:

+I[等級 1, 6928, 2021-1-28T22:34]
-I[等級 1, 6928, 2021-1-28T22:34]
+I[等級 1, 8670, 2021-1-28T22:34]
-I[等級 1, 8670, 2021-1-28T22:34]
+I[等級 1, 77287, 2021-1-28T22:34]
...

可以看到其有回撤數據。

其對應的 SQL 語義如下:

  • 數據源:消費到 Kafka 中數據后,將數據按照 partition by 的 Key 通過 Hash 分發策略發送到下游去重算子。
  • Deduplication 去重算子:接受到上游數據之后,根據 order by 中的條件判斷當前的這條數據和之前數據時間戳大小,以上面案例來說,如果當前數據時間戳大于之前數據時間戳,則撤回之前向下游發的中間結果,然后將最新的結果發向下游(發送策略也為 Hash,具體的 Hash 策略為按照 group by 中 Key 進行發送),如果當前數據時間戳小于之前數據時間戳,則不做操作。此算子產出的結果就是每一個用戶的對應的最新等級信息。
  • Group by 聚合算子:接受到上游數據之后,根據 Group by 聚合粒度對數據進行聚合計算結果(每一個等級的用戶數),發往下游數據匯算子。
  • 數據匯:接收到上游的數據之后,然后輸出到外部存儲引擎中。

2.2 案例 2(處理時間)

最原始的日志是明細數據,需要我們根據用戶 id 篩選出這個用戶當天的第一條數據,發往下游,下游可以據此計算分各種維度的 DAU。

-- 數據源:原始日志明細數據
CREATE TABLE source_table (user_id BIGINT COMMENT '用戶 id',name STRING COMMENT '用戶姓名',server_timestamp BIGINT COMMENT '用戶訪問時間戳',proctime AS PROCTIME()
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.name.length' = '1','fields.user_id.min' = '1','fields.user_id.max' = '10','fields.server_timestamp.min' = '1','fields.server_timestamp.max' = '100000'
);-- 數據匯:根據 user_id 去重的第一條數據
CREATE TABLE sink_table (user_id BIGINT,name STRING,server_timestamp BIGINT
) WITH ('connector' = 'print'
);-- 處理邏輯:
INSERT INTO sink_table
select user_id,name,server_timestamp
from (SELECTuser_id,name,server_timestamp,row_number() over(partition by user_id order by proctime) as rnFROM source_table
)
where rn = 1

輸出結果:

+I[1, 用戶 1, 2021-1-28T22:34]
+I[2, 用戶 2, 2021-1-28T22:34]
+I[3, 用戶 3, 2021-1-28T22:34]
...

可以看到這個處理邏輯是沒有回撤數據的。其對應的 SQL 語義如下:

  • 數據源:消費到 Kafka 中數據后,將數據按照 partition by 的 Key 通過 Hash 分發策略發送到下游去重算子。
  • Deduplication 去重算子:處理時間語義下,如果是當前 Key 的第一條數據,則直接發往下游,如果判斷(根據 State 中是否存儲過該 Key)不是第一條,則直接丟棄。
  • 數據匯:接收到上游的數據之后,然后輸出到外部存儲引擎中。

? 在 Deduplication 關于是否會出現回撤流,博主總結如下:

  • Order by 事件時間 DESC:會出現回撤流,因為當前 Key 下 可能會有 比當前事件時間還大的數據。
  • Order by 事件時間 ASC:會出現回撤流,因為當前 Key 下 可能會有 比當前事件時間還小的數據。
  • Order by 處理時間 DESC:會出現回撤流,因為當前 Key 下 可能會有 比當前處理時間還大的數據。
  • Order by 處理時間 ASC:不會出現回撤流,因為當前 Key 下 不可能會有 比當前處理時間還小的數據。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/716308.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/716308.shtml
英文地址,請注明出處:http://en.pswp.cn/news/716308.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

COM - get VARIANT value - .vt = (VT_BSTR | VT_ARRAY)

文章目錄 COM - get VARIANT value - .vt (VT_BSTR | VT_ARRAY)概述筆記END COM - get VARIANT value - .vt (VT_BSTR | VT_ARRAY) 概述 取到一個VARIANT值, .vt 0x2008, 查了一下, 0x2008 (VT_BSTR | VT_ARRAY) 查了資料, 這個vt 0x2008是BSTR的數組. 看看咋取值? 網上…

3.2 log |416. 分割等和子集,1049.最后一塊石頭的重量II,494.目標和

416. 分割等和子集 class Solution { public:bool canPartition(vector<int>& nums) {vector<int> dp(10001,0);int sumaccumulate(nums.begin(),nums.end(),0);if(sum%2) return false;int targetsum/2;for(int i0;i<nums.size();i){for(int jtarget;j>…

項目管理:高效推動項目成功的關鍵

項目管理&#xff1a;高效推動項目成功的關鍵 在當今競爭激烈的商業環境中&#xff0c;項目管理已成為企業實現目標和取得成功的關鍵因素。有效的項目管理不僅能夠確保項目按時完成&#xff0c;還能在預算范圍內達到預期的質量標準。本文將探討項目管理的重要性、關鍵環節以及…

Maven安裝并配置本地倉庫

一、安裝Maven 1.下載鏈接 Maven官網下載鏈接 Binary是可執行版本&#xff0c;已經編譯好可以直接使用。 Source是源代碼版本&#xff0c;需要自己編譯成可執行軟件才可使用。 tar.gz和zip兩種壓縮格式,其實這兩個壓縮文件里面包含的內容是同樣的,只是壓縮格式不同 tar.gz格…

Stable Video文本生成視頻公測地址——Scaling Latent Video Diffusion Models to Large Datasets

近期&#xff0c;Stability AI發布了首個開放視頻模型——"Stable Video"&#xff0c;該創新工具能夠將文本和圖像輸入轉化為生動的場景&#xff0c;將概念轉換成動態影像&#xff0c;生成出電影級別的作品&#xff0c;旨在滿足廣泛的視頻應用需求&#xff0c;包括媒…

STM32 DMA入門指導

什么是DMA DMA&#xff0c;全稱直接存儲器訪問&#xff08;Direct Memory Access&#xff09;&#xff0c;是一種允許硬件子系統直接讀寫系統內存的技術&#xff0c;無需中央處理單元&#xff08;CPU&#xff09;的介入。下面是DMA的工作原理概述&#xff1a; 數據傳輸觸發&am…

解決Java并發問題的常見思路

寫在文章開頭 近期對一些比較老的項目進行代碼走查&#xff0c;碰到一些極端的并發編程惡習&#xff0c;所以筆者就基于此文演示這類問題以及面對并發編程時我們應該需要了解一些常見套路。 Hi&#xff0c;我是sharkChili&#xff0c;是個不斷在硬核技術上作死的java coder&am…

基于 Amazon EKS 的 Stable Diffusion ComfyUI 部署方案

01 背景介紹 Stable Diffusion 作為當下最流行的開源 AI 圖像生成模型在游戲行業有著廣泛的應用實踐&#xff0c;無論是 ToC 面向玩家的游戲社區場景&#xff0c;還是 ToB 面向游戲工作室的美術制作場景&#xff0c;都可以發揮很大的價值&#xff0c;如何更好地使用 Stable Dif…

scanf和cin的利弊

scanf和cin的利弊&#xff1a; scanf: 利&#xff1a;耗時短&#xff0c;寫法方便輸入固定格式&#xff0c;比如scanf(“%*d%d”,&a)&#xff0c;可以直接忽略第一個輸入&#xff0c;不用創建新對象&#xff0c;再比如scanf(“%1d”,&x[i])&#xff0c;輸入3214&#x…

卡牌——二分

卡牌 題目分析 想一下前面題的特點&#xff0c;是不是都出現了“最大邊長”&#xff0c;“最小的數”這種字眼&#xff0c;那么這里出現了“最多能湊出多少套牌”&#xff0c;我們可以考慮用二分。接下來我們要看一下他是否符合二段性&#xff0c;二分的關鍵在于二段性。 第…

續Java的執行語句、方法--學習JavaEE的day07

day07 一、特殊的流程控制語句 break(day06) continue 1.理解&#xff1a; 作用于循環中&#xff0c;表示跳過循環體剩余的部分&#xff0c;進入到下一次循環 做實驗&#xff1a; while(true){ System.out.println(“111”); System.out.println(“222”); if(true){ conti…

編譯鏈接實戰(25)gcc ASAN、MSAN檢測內存越界、泄露、使用未初始化內存等內存相關錯誤

文章目錄 1 ASAN1.1 介紹1.2 原理編譯時插樁模塊運行時庫2 檢測示例2.1 內存越界2.2 內存泄露內存泄露檢測原理作用域外訪問2.3 使用已經釋放的內存2.4 將漏洞信息輸出文件3 MSAN1 ASAN 1.1 介紹 -fsanitize=address是一個編譯器選項,用于啟用AddressSanitizer(地址

基于SpringBoot的教師考勤管理系統(贈源碼)

作者主頁&#xff1a;易學蔚來-技術互助文末獲取源碼 簡介&#xff1a;Java領域優質創作者 Java項目、簡歷模板、學習資料、面試題庫 教師考勤管理系統是基于JavaVueSpringBootMySQL實現的&#xff0c;包含了管理員、學生、教師三類用戶。該系統實現了班級管理、課程安排、考勤…

基于springboot的足球俱樂部管理系統的設計與實現

** &#x1f345;點贊收藏關注 → 私信領取本源代碼、數據庫&#x1f345; 本人在Java畢業設計領域有多年的經驗&#xff0c;陸續會更新更多優質的Java實戰項目希望你能有所收獲&#xff0c;少走一些彎路。&#x1f345;關注我不迷路&#x1f345;** 一 、設計說明 1.1 課題…

2024.3.3每日一題

LeetCode 用隊列實現棧 題目鏈接&#xff1a;225. 用隊列實現棧 - 力扣&#xff08;LeetCode&#xff09; 題目描述 請你僅使用兩個隊列實現一個后入先出&#xff08;LIFO&#xff09;的棧&#xff0c;并支持普通棧的全部四種操作&#xff08;push、top、pop 和 empty&…

如何取消ChatGPT 4.0的自動續費和會員訂閱(chatgpt4.0自動續費嗎)

如何取消ChatGPT 4.0的自動續費和會員訂閱 ChatGPT 4.0自動續費是否存在 是的&#xff0c;ChatGPT 4.0 Plus會員服務存在自動續費功能。 ChatGPT 4.0 Plus會員服務自動續費 ChatGPT Plus會員服務的自動續費機制用戶在購買ChatGPT 4.0 Plus會員服務后&#xff0c;系統會自動…

npm ERR! code ERESOLVE

1、問題概述&#xff1f; 執行npm install命令的時候報錯如下&#xff1a; tangxiaochuntangxiaochundeMacBook-Pro stf % npm install npm ERR! code ERESOLVE npm ERR! ERESOLVE unable to resolve dependency tree npm ERR! npm ERR! While resol…

LeetCode102.二叉樹的層序遍歷

題目 給你二叉樹的根節點 root &#xff0c;返回其節點值的 層序遍歷 。 &#xff08;即逐層地&#xff0c;從左到右訪問所有節點&#xff09;。 示例 輸入&#xff1a;root [3,9,20,null,null,15,7] 輸出&#xff1a;[[3],[9,20],[15,7]]輸入&#xff1a;root [1] 輸出&am…

SpringCloud-MQ消息隊列

一、消息隊列介紹 MQ (MessageQueue) &#xff0c;中文是消息隊列&#xff0c;字面來看就是存放消息的隊列。也就是事件驅動架構中的Broker。消息隊列是一種基于生產者-消費者模型的通信方式&#xff0c;通過在消息隊列中存放和傳遞消息&#xff0c;實現了不同組件、服務或系統…

2024全新手機軟件下載應用排行、平臺和最新發布網站,采用響應式織夢模板

這是一款簡潔藍色的手機軟件下載應用排行、平臺和最新發布網站&#xff0c;采用響應式織夢模板。 主要包括主頁、APP列表頁、APP詳情介紹頁、新聞資訊列表、新聞詳情頁、關于我們等模塊頁面。 地 址 &#xff1a; runruncode.com/php/19703.html 軟件程序演示圖&#xff1a;…